/
fsm-connector.go
141 lines (121 loc) · 5.52 KB
/
fsm-connector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Package main implements the main entrypoint for fsm-connector and utility routines to
// bootstrap the various internal components of fsm-connector.
package main
import (
"context"
"net/http"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/clientcmd"
gwapi "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
gwscheme "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned/scheme"
"github.com/flomesh-io/fsm/pkg/configurator"
"github.com/flomesh-io/fsm/pkg/connector"
"github.com/flomesh-io/fsm/pkg/connector/cli"
"github.com/flomesh-io/fsm/pkg/constants"
"github.com/flomesh-io/fsm/pkg/errcode"
configClientset "github.com/flomesh-io/fsm/pkg/gen/client/config/clientset/versioned"
connectorClientset "github.com/flomesh-io/fsm/pkg/gen/client/connector/clientset/versioned"
connectorscheme "github.com/flomesh-io/fsm/pkg/gen/client/connector/clientset/versioned/scheme"
machineClientset "github.com/flomesh-io/fsm/pkg/gen/client/machine/clientset/versioned"
machinescheme "github.com/flomesh-io/fsm/pkg/gen/client/machine/clientset/versioned/scheme"
"github.com/flomesh-io/fsm/pkg/health"
"github.com/flomesh-io/fsm/pkg/httpserver"
"github.com/flomesh-io/fsm/pkg/k8s"
"github.com/flomesh-io/fsm/pkg/k8s/events"
"github.com/flomesh-io/fsm/pkg/k8s/informers"
"github.com/flomesh-io/fsm/pkg/logger"
"github.com/flomesh-io/fsm/pkg/messaging"
_ "github.com/flomesh-io/fsm/pkg/sidecar/providers/pipy/driver"
"github.com/flomesh-io/fsm/pkg/signals"
"github.com/flomesh-io/fsm/pkg/version"
)
var (
log = logger.New("fsm-connector")
scheme = runtime.NewScheme()
)
func init() {
_ = clientgoscheme.AddToScheme(scheme)
_ = gwscheme.AddToScheme(scheme)
_ = machinescheme.AddToScheme(scheme)
_ = connectorscheme.AddToScheme(scheme)
}
func main() {
log.Info().Msgf("Starting fsm-connector %s; %s; %s", version.Version, version.GitCommit, version.BuildDate)
if err := cli.ParseFlags(); err != nil {
log.Fatal().Err(err).Msg("Error parsing cmd line arguments")
}
// This ensures CLI parameters (and dependent values) are correct.
if err := cli.ValidateCLIParams(); err != nil {
log.Fatal().Err(err).Msg("Error validating CLI parameters")
}
if err := logger.SetLogLevel(cli.Verbosity()); err != nil {
log.Fatal().Err(err).Msg("Error setting log level")
}
// Initialize kube config and client
kubeConfig, err := clientcmd.BuildConfigFromFlags("", cli.Cfg.KubeConfigFile)
if err != nil {
log.Fatal().Err(err).Msgf("Error creating kube config (kubeconfig=%s)", cli.Cfg.KubeConfigFile)
}
kubeClient := kubernetes.NewForConfigOrDie(kubeConfig)
machineClient := machineClientset.NewForConfigOrDie(kubeConfig)
gatewayClient := gwapi.NewForConfigOrDie(kubeConfig)
connectorClient := connectorClientset.NewForConfigOrDie(kubeConfig)
// Initialize the generic Kubernetes event recorder and associate it with the fsm-connector pod resource
connectorPod, err := cli.GetConnectorPod(kubeClient)
if err != nil {
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrFetchingConnectorPod)).
Msg("Error retrieving fsm-connector pod")
log.Fatal().Msg("Error fetching fsm-connector pod")
}
eventRecorder := events.GenericEventRecorder()
if err = eventRecorder.Initialize(connectorPod, kubeClient, cli.Cfg.FsmNamespace); err != nil {
log.Fatal().Msg("Error initializing generic event recorder")
}
k8s.SetTrustDomain(cli.Cfg.TrustDomain)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stop := signals.RegisterExitHandlers(cancel)
msgBroker := messaging.NewBroker(stop)
configClient := configClientset.NewForConfigOrDie(kubeConfig)
informerCollection, err := informers.NewInformerCollection(cli.Cfg.MeshName, stop,
informers.WithKubeClient(kubeClient),
informers.WithConfigClient(configClient, cli.Cfg.FsmMeshConfigName, cli.Cfg.FsmNamespace),
informers.WithMachineClient(machineClient),
informers.WithConnectorClient(connectorClient),
)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating informer collection")
}
cfg := configurator.NewConfigurator(informerCollection, cli.Cfg.FsmNamespace, cli.Cfg.FsmMeshConfigName, msgBroker)
connectController := cli.NewConnectController(
cli.Cfg.SdrProvider, cli.Cfg.SdrConnector,
ctx, kubeConfig, kubeClient, configClient,
connectorClient, machineClient, gatewayClient,
informerCollection, msgBroker)
connector.GatewayAPIEnabled = cfg.GetMeshConfig().Spec.GatewayAPI.Enabled
clusterSet := cfg.GetMeshConfig().Spec.ClusterSet
connectController.SetClusterSet(clusterSet.Name, clusterSet.Group, clusterSet.Zone, clusterSet.Region)
go connectController.BroadcastListener(stop)
go connectController.CacheCleaner(stop)
version.SetMetric()
/*
* Initialize fsm-connector's HTTP server
*/
httpServer := httpserver.NewHTTPServer(constants.FSMHTTPServerPort)
// Version
httpServer.AddHandler(constants.VersionPath, version.GetVersionHandler())
// Health checks
httpServer.AddHandler(constants.WebhookHealthPath, http.HandlerFunc(health.SimpleHandler))
// Start HTTP server
err = httpServer.Start()
if err != nil {
log.Fatal().Err(err).Msgf("Failed to start FSM metrics/probes HTTP server")
}
// Start the global log level watcher that updates the log level dynamically
go connector.WatchMeshConfigUpdated(connectController, msgBroker, stop)
<-stop
log.Info().Msgf("Stopping fsm-connector %s; %s; %s", version.Version, version.GitCommit, version.BuildDate)
}