/
agent.go
143 lines (115 loc) · 4.19 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package statusspoke
import (
"context"
"flag"
"fmt"
"os"
"github.com/go-logr/zapr"
"go.elastic.co/ecszap"
"go.uber.org/zap"
"github.com/go-logr/logr"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
cappv1alpha1 "github.com/dana-team/container-app-operator/api/v1alpha1"
"open-cluster-management.io/addon-framework/pkg/lease"
addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
)
var scheme = runtime.NewScheme()
// init initializes the global scheme with core and custom resource definitions.
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(addonv1alpha1.Install(scheme))
utilruntime.Must(cappv1alpha1.AddToScheme(scheme))
}
// NewAgentCommand creates a new Cobra command to start the agent for a specific component.
func NewAgentCommand(addonName string, logger logr.Logger) *cobra.Command {
o := NewAgentOptions(addonName, logger)
ctx := context.TODO()
cmd := &cobra.Command{
Use: "agent",
Short: fmt.Sprintf("Start the %s's agent", addonName),
RunE: func(cmd *cobra.Command, args []string) error {
return o.runControllerManager(ctx)
},
}
o.AddFlags(cmd)
cmd.FParseErrWhitelist.UnknownFlags = true
return cmd
}
// AgentOptions defines the flags for workload agent.
type AgentOptions struct {
Log logr.Logger
HubKubeconfigFile string
SpokeClusterName string
AddonName string
AddonNamespace string
}
// NewAgentOptions returns new instance of AgentOptions.
func NewAgentOptions(addonName string, logger logr.Logger) *AgentOptions {
return &AgentOptions{AddonName: addonName, Log: logger}
}
// AddFlags adds the hub kubeconfig location and spoke cluster name as flags.
func (o *AgentOptions) AddFlags(cmd *cobra.Command) {
flags := cmd.Flags()
// This command only supports reading from config
flags.StringVar(&o.HubKubeconfigFile, "hub-kubeconfig", o.HubKubeconfigFile, "Location of kubeconfig file to connect to hub cluster.")
flags.StringVar(&o.SpokeClusterName, "cluster-name", o.SpokeClusterName, "Name of spoke cluster.")
}
// runControllersManger setups and runs the spoke controller.
func (o *AgentOptions) runControllerManager(ctx context.Context) error {
log := o.Log.WithName("controller-manager-setup")
flag.Parse()
encoderConfig := ecszap.NewDefaultEncoderConfig()
core := ecszap.NewCore(encoderConfig, os.Stdout, zap.DebugLevel)
logger := zap.New(core, zap.AddCaller())
ctrl.SetLogger(zapr.NewLogger(logger))
spokeConfig := ctrl.GetConfigOrDie()
mgr, err := ctrl.NewManager(spokeConfig, ctrl.Options{
Scheme: scheme,
LeaderElection: false,
})
if err != nil {
log.Error(err, "unable to start manager")
return fmt.Errorf("unable to create manager, err: %w", err)
}
// build kubeinformerfactory of hub cluster
hubConfig, err := clientcmd.BuildConfigFromFlags("" /* leave masterurl as empty */, o.HubKubeconfigFile)
if err != nil {
return fmt.Errorf("failed to create hubConfig from flag, err: %w", err)
}
hubClient, err := client.New(hubConfig, client.Options{Scheme: scheme})
if err != nil {
return fmt.Errorf("failed to create hubClient, err: %w", err)
}
spokeKubeClient, err := client.New(spokeConfig, client.Options{Scheme: scheme})
if err != nil {
return fmt.Errorf("failed to create spoke client, err: %w", err)
}
leaseClient, err := kubernetes.NewForConfig(spokeConfig)
if err != nil {
return fmt.Errorf("failed to create lease client, err: %w", err)
}
// create a lease updater
leaseUpdater := lease.NewLeaseUpdater(
leaseClient,
o.AddonName,
"open-cluster-management-agent-addon",
)
go leaseUpdater.Start(ctx)
log.Info("starting manager")
helloSpokeController := &CappSyncReconciler{
spokeClient: spokeKubeClient,
hubClient: hubClient,
Scheme: mgr.GetScheme(),
}
if err = helloSpokeController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to create capp-status agent controller: %s, err: %w", "capp-status-agent", err)
}
return mgr.Start(ctrl.SetupSignalHandler())
}