forked from kyma-project/kyma
/
main.go
136 lines (113 loc) · 5.01 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"
scCs "github.com/kubernetes-incubator/service-catalog/pkg/client/clientset_generated/clientset"
catalogInformers "github.com/kubernetes-incubator/service-catalog/pkg/client/informers_generated/externalversions"
"github.com/kubernetes-incubator/service-catalog/pkg/svcat/service-catalog"
"github.com/kyma-project/kyma/components/remote-environment-broker/internal/access"
"github.com/kyma-project/kyma/components/remote-environment-broker/internal/broker"
"github.com/kyma-project/kyma/components/remote-environment-broker/internal/config"
"github.com/kyma-project/kyma/components/remote-environment-broker/internal/labeler"
"github.com/kyma-project/kyma/components/remote-environment-broker/internal/storage"
"github.com/kyma-project/kyma/components/remote-environment-broker/internal/storage/populator"
"github.com/kyma-project/kyma/components/remote-environment-broker/internal/syncer"
"github.com/kyma-project/kyma/components/remote-environment-broker/pkg/client/clientset/versioned"
"github.com/kyma-project/kyma/components/remote-environment-broker/pkg/client/informers/externalversions"
"github.com/kyma-project/kyma/components/remote-environment-broker/platform/logger"
"github.com/sirupsen/logrus"
"k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
// informerResyncPeriod defines how often informer will execute relist action. Setting to zero disable resync.
// BEWARE: too short period time will increase the CPU load.
const informerResyncPeriod = 30 * time.Minute
func main() {
verbose := flag.Bool("verbose", false, "specify if log verbosely loading configuration")
flag.Parse()
cfg, err := config.Load(*verbose)
fatalOnError(err)
log := logger.New(&cfg.Logger)
// create storage factory
storageConfig := storage.ConfigList(cfg.Storage)
sFact, err := storage.NewFactory(&storageConfig)
fatalOnError(err)
k8sConfig, err := restclient.InClusterConfig()
fatalOnError(err)
// k8s
k8sClient, err := kubernetes.NewForConfig(k8sConfig)
fatalOnError(err)
nsInformer := v1.NewNamespaceInformer(k8sClient, informerResyncPeriod, cache.Indexers{})
// ServiceCatalog
scClientSet, err := scCs.NewForConfig(k8sConfig)
fatalOnError(err)
scSDK := &servicecatalog.SDK{ServiceCatalogClient: scClientSet}
scInformerFactory := catalogInformers.NewSharedInformerFactory(scClientSet, informerResyncPeriod)
scInformersGroup := scInformerFactory.Servicecatalog().V1beta1()
// instance populator
instancePopulator := populator.NewInstances(scClientSet, sFact.Instance(), cfg.BrokerName)
popCtx, popCancelFunc := context.WithTimeout(context.Background(), time.Minute)
defer popCancelFunc()
log.Info("Instance storage population...")
err = instancePopulator.Do(popCtx)
fatalOnError(err)
log.Info("Instance storage populated")
// RemoteEnvironments
reClient, err := versioned.NewForConfig(k8sConfig)
fatalOnError(err)
reInformerFactory := externalversions.NewSharedInformerFactory(reClient, informerResyncPeriod)
reInformersGroup := reInformerFactory.Remoteenvironment().V1alpha1()
// internal services
relistRequester := syncer.NewRelistRequester(scSDK, cfg.BrokerName, cfg.BrokerRelistDurationWindow, log)
siFacade := broker.NewServiceInstanceFacade(scInformersGroup.ServiceInstances().Informer())
accessChecker := access.New(sFact.RemoteEnvironment(), reClient.RemoteenvironmentV1alpha1(), sFact.Instance())
reSyncCtrl := syncer.New(reInformersGroup.RemoteEnvironments(), sFact.RemoteEnvironment(), sFact.RemoteEnvironment(), relistRequester, log)
labelerCtrl := labeler.New(reInformersGroup.EnvironmentMappings().Informer(), nsInformer, k8sClient.CoreV1().Namespaces(), sFact.RemoteEnvironment(), log)
// create broker
srv := broker.New(sFact.RemoteEnvironment(), sFact.Instance(), sFact.InstanceOperation(), accessChecker,
reClient.RemoteenvironmentV1alpha1(), siFacade, log)
// setup graceful shutdown signals
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
stopCh := make(chan struct{})
cancelOnInterrupt(ctx, stopCh, cancelFunc)
// start informers
scInformerFactory.Start(stopCh)
reInformerFactory.Start(stopCh)
go nsInformer.Run(stopCh)
// wait for cache sync
scInformerFactory.WaitForCacheSync(stopCh)
reInformerFactory.WaitForCacheSync(stopCh)
cache.WaitForCacheSync(stopCh, nsInformer.HasSynced)
// start services & ctrl
go reSyncCtrl.Run(stopCh)
go labelerCtrl.Run(stopCh)
go relistRequester.Run(stopCh)
fatalOnError(srv.Run(ctx, fmt.Sprintf(":%d", cfg.Port)))
}
func fatalOnError(err error) {
if err != nil {
logrus.Fatal(err.Error())
}
}
// cancelOnInterrupt closes given channel and also calls cancel func when os.Interrupt or SIGTERM is received
func cancelOnInterrupt(ctx context.Context, ch chan<- struct{}, cancel context.CancelFunc) {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
select {
case <-ctx.Done():
close(ch)
case <-c:
close(ch)
cancel()
}
}()
}