Skip to content

Commit

Permalink
Watch Test,TestSources and TestTriggers from alternate namespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
Cedric McKinnie committed Jun 29, 2023
1 parent e90ef19 commit c780607
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 23 deletions.
1 change: 1 addition & 0 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ func main() {
triggers.WithHostnameIdentifier(),
triggers.WithTestkubeNamespace(cfg.TestkubeNamespace),
triggers.WithWatcherNamespaces(cfg.TestkubeWatcherNamespaces),
triggers.WithWatchAllTestkubeResources(cfg.TestkubeWatchAll),
)
log.DefaultLogger.Info("starting trigger service")
triggerService.Run(ctx)
Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Config struct {
TestkubeCloudWorkerCount int `envconfig:"TESTKUBE_CLOUD_WORKER_COUNT" default:"50"`
TestkubeCloudLogStreamWorkerCount int `envconfig:"TESTKUBE_CLOUD_LOG_STREAM_WORKER_COUNT" default:"25"`
TestkubeWatcherNamespaces string `envconfig:"TESTKUBE_WATCHER_NAMESPACES" default:""`
TestkubeWatchAll bool `envconfig:"TESTKUBE_WATCH_ALL" default:"false"`
GraphqlPort string `envconfig:"TESTKUBE_GRAPHQL_PORT" default:"8070"`
TestkubeRegistry string `envconfig:"TESTKUBE_REGISTRY" default:""`
TestkubePodStartTimeout time.Duration `envconfig:"TESTKUBE_POD_START_TIMEOUT" default:"30m"`
Expand Down
9 changes: 8 additions & 1 deletion pkg/triggers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Service struct {
executorsClient executorsclientv1.Interface
testkubeNamespace string
watcherNamespaces []string
watchTestkubeCrAllNamespaces bool
}

type Option func(*Service)
Expand Down Expand Up @@ -115,7 +116,7 @@ func NewService(
opt(s)
}

s.informers = newK8sInformers(clientset, testKubeClientset, s.testkubeNamespace, s.watcherNamespaces)
s.informers = newK8sInformers(clientset, testKubeClientset, s.testkubeNamespace, s.watcherNamespaces, s.watchTestkubeCrAllNamespaces)

return s
}
Expand Down Expand Up @@ -182,6 +183,12 @@ func WithWatcherNamespaces(namespaces string) Option {
}
}

func WithWatchAllTestkubeResources(watchAllTestkubeResources bool) Option {
return func(s *Service) {
s.watchTestkubeCrAllNamespaces = watchAllTestkubeResources
}
}

func (s *Service) Run(ctx context.Context) {
leaseChan := make(chan bool)

Expand Down
51 changes: 34 additions & 17 deletions pkg/triggers/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,12 @@ type k8sInformers struct {
clusterEventInformers []coreinformerv1.EventInformer
configMapInformers []coreinformerv1.ConfigMapInformer

testTriggerInformer testkubeinformerv1.TestTriggerInformer
testSuiteInformer testkubeinformerv2.TestSuiteInformer
testInformer testkubeinformerv3.TestInformer
testTriggerInformers []testkubeinformerv1.TestTriggerInformer
testSuiteInformers []testkubeinformerv2.TestSuiteInformer
testInformers []testkubeinformerv3.TestInformer
}

func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned.Interface,
testkubeNamespace string, watcherNamespaces []string) *k8sInformers {
func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned.Interface, testkubeNamespace string, watcherNamespaces []string, watchTestkubeCrAllNamespaces bool) *k8sInformers {
var k8sInformers k8sInformers
if len(watcherNamespaces) == 0 {
watcherNamespaces = append(watcherNamespaces, v1.NamespaceAll)
Expand All @@ -65,11 +64,17 @@ func newK8sInformers(clientset kubernetes.Interface, testKubeClientset versioned
k8sInformers.configMapInformers = append(k8sInformers.configMapInformers, f.Core().V1().ConfigMaps())
}

testkubeInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(
testKubeClientset, 0, externalversions.WithNamespace(testkubeNamespace))
k8sInformers.testTriggerInformer = testkubeInformerFactory.Tests().V1().TestTriggers()
k8sInformers.testSuiteInformer = testkubeInformerFactory.Tests().V2().TestSuites()
k8sInformers.testInformer = testkubeInformerFactory.Tests().V3().Tests()
var testkubeInformerFactory externalversions.SharedInformerFactory
if watchTestkubeCrAllNamespaces == true {
testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions(
testKubeClientset, 0)
} else {
testkubeInformerFactory = externalversions.NewSharedInformerFactoryWithOptions(
testKubeClientset, 0, externalversions.WithNamespace(testkubeNamespace))
}
k8sInformers.testTriggerInformers = append(k8sInformers.testTriggerInformers, testkubeInformerFactory.Tests().V1().TestTriggers())
k8sInformers.testSuiteInformers = append(k8sInformers.testSuiteInformers, testkubeInformerFactory.Tests().V2().TestSuites())
k8sInformers.testInformers = append(k8sInformers.testInformers, testkubeInformerFactory.Tests().V3().Tests())

return &k8sInformers
}
Expand Down Expand Up @@ -97,7 +102,7 @@ func (s *Service) runWatcher(ctx context.Context, leaseChan chan bool) {
} else {
if !running {
s.logger.Infof("trigger service: instance %s in cluster %s acquired lease", s.identifier, s.clusterID)
s.informers = newK8sInformers(s.clientset, s.testKubeClientset, s.testkubeNamespace, s.watcherNamespaces)
s.informers = newK8sInformers(s.clientset, s.testKubeClientset, s.testkubeNamespace, s.watcherNamespaces, s.watchTestkubeCrAllNamespaces)
stopChan = make(chan struct{})
s.runInformers(ctx, stopChan)
running = true
Expand Down Expand Up @@ -145,9 +150,15 @@ func (s *Service) runInformers(ctx context.Context, stop <-chan struct{}) {
s.informers.configMapInformers[i].Informer().AddEventHandler(s.configMapEventHandler(ctx))
}

s.informers.testTriggerInformer.Informer().AddEventHandler(s.testTriggerEventHandler())
s.informers.testSuiteInformer.Informer().AddEventHandler(s.testSuiteEventHandler())
s.informers.testInformer.Informer().AddEventHandler(s.testEventHandler())
for i := range s.informers.testTriggerInformers {
s.informers.testTriggerInformers[i].Informer().AddEventHandler(s.testTriggerEventHandler())
}
for i := range s.informers.testSuiteInformers {
s.informers.testSuiteInformers[i].Informer().AddEventHandler(s.testSuiteEventHandler())
}
for i := range s.informers.testInformers {
s.informers.testInformers[i].Informer().AddEventHandler(s.testEventHandler())
}

s.logger.Debugf("trigger service: starting pod informers")
for i := range s.informers.podInformers {
Expand Down Expand Up @@ -190,11 +201,17 @@ func (s *Service) runInformers(ctx context.Context, stop <-chan struct{}) {
}

s.logger.Debugf("trigger service: starting test trigger informer")
go s.informers.testTriggerInformer.Informer().Run(stop)
for i := range s.informers.testTriggerInformers {
go s.informers.testTriggerInformers[i].Informer().Run(stop)
}
s.logger.Debugf("trigger service: starting test suite informer")
go s.informers.testSuiteInformer.Informer().Run(stop)
for i := range s.informers.testSuiteInformers {
go s.informers.testSuiteInformers[i].Informer().Run(stop)
}
s.logger.Debugf("trigger service: starting test informer")
go s.informers.testInformer.Informer().Run(stop)
for i := range s.informers.testInformers {
go s.informers.testInformers[i].Informer().Run(stop)
}
}

func (s *Service) podEventHandler(ctx context.Context) cache.ResourceEventHandlerFuncs {
Expand Down
78 changes: 73 additions & 5 deletions pkg/triggers/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestService_runWatcher_lease(t *testing.T) {
clientset: clientset,
testKubeClientset: testKubeClientset,
logger: log.DefaultLogger,
informers: newK8sInformers(clientset, testKubeClientset, "", []string{}),
informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false),
}

leaseChan := make(chan bool)
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestService_runWatcher_lease(t *testing.T) {
clientset: clientset,
testKubeClientset: testKubeClientset,
logger: log.DefaultLogger,
informers: newK8sInformers(clientset, testKubeClientset, "", []string{}),
informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false),
}

leaseChan := make(chan bool)
Expand Down Expand Up @@ -129,6 +129,74 @@ func TestService_runWatcher_lease(t *testing.T) {
time.Sleep(100 * time.Millisecond)
assert.True(t, match, "pod created event should match the test trigger condition")
})

t.Run("create a test trigger in an alternate namespace for pod created and match event on pod creation", func(t *testing.T) {
t.Parallel()

clientset := fake.NewSimpleClientset()
testKubeClientset := faketestkube.NewSimpleClientset()

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

testNamespace := "testkube"
testAlternateNamespace := "testkube-alternate"

match := false
testExecutorF := func(ctx context.Context, trigger *testtriggersv1.TestTrigger) error {
assert.NotEqual(t, testNamespace, trigger.Namespace)
assert.Equal(t, "test-trigger-3", trigger.Name)
match = true
return nil
}
s := &Service{
executor: testExecutorF,
identifier: "testkube-api",
clusterID: "testkube",
triggerStatus: make(map[statusKey]*triggerStatus),
clientset: clientset,
testKubeClientset: testKubeClientset,
logger: log.DefaultLogger,
watchTestkubeCrAllNamespaces: true,
informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, true),
}

leaseChan := make(chan bool)
go func() { time.Sleep(50 * time.Millisecond); leaseChan <- true }()
go s.runWatcher(ctx, leaseChan)

time.Sleep(50 * time.Millisecond)
leaseChan <- true
time.Sleep(50 * time.Millisecond)

testTrigger := testtriggersv1.TestTrigger{
ObjectMeta: metav1.ObjectMeta{Namespace: testAlternateNamespace, Name: "test-trigger-3"},
Spec: testtriggersv1.TestTriggerSpec{
Resource: "pod",
ResourceSelector: testtriggersv1.TestTriggerSelector{Namespace: testNamespace, Name: "test-pod"},
Event: "created",
Action: "run",
Execution: "test",
TestSelector: testtriggersv1.TestTriggerSelector{Namespace: testAlternateNamespace, Name: "some-test"},
},
}
createdTestTrigger, err := testKubeClientset.TestsV1().TestTriggers(testAlternateNamespace).Create(ctx, &testTrigger, metav1.CreateOptions{})
assert.NotNil(t, createdTestTrigger)
assert.NoError(t, err)

time.Sleep(100 * time.Millisecond)

assert.Len(t, s.triggerStatus, 1)
key := newStatusKey(testAlternateNamespace, "test-trigger-3")
assert.Contains(t, s.triggerStatus, key)

testPod := corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: testNamespace, Name: "test-pod"}}
_, err = clientset.CoreV1().Pods(testNamespace).Create(ctx, &testPod, metav1.CreateOptions{})
assert.NoError(t, err)

time.Sleep(100 * time.Millisecond)
assert.True(t, match, "pod created event should match the test trigger condition")
})
}

func TestService_runWatcher_noLease(t *testing.T) {
Expand All @@ -150,7 +218,7 @@ func TestService_runWatcher_noLease(t *testing.T) {
clientset: clientset,
testKubeClientset: testKubeClientset,
logger: log.DefaultLogger,
informers: newK8sInformers(clientset, testKubeClientset, "", []string{}),
informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false),
}

leaseChan := make(chan bool)
Expand Down Expand Up @@ -190,7 +258,7 @@ func TestService_runWatcher_noLease(t *testing.T) {
clientset: clientset,
testKubeClientset: testKubeClientset,
logger: log.DefaultLogger,
informers: newK8sInformers(clientset, testKubeClientset, "", []string{}),
informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false),
}

leaseChan := make(chan bool)
Expand Down Expand Up @@ -232,7 +300,7 @@ func TestService_runWatcher_noLease(t *testing.T) {
clientset: clientset,
testKubeClientset: testKubeClientset,
logger: log.DefaultLogger,
informers: newK8sInformers(clientset, testKubeClientset, "", []string{}),
informers: newK8sInformers(clientset, testKubeClientset, "", []string{}, false),
}

leaseChan := make(chan bool)
Expand Down

0 comments on commit c780607

Please sign in to comment.