diff --git a/pkg/cache/placeholder_manager.go b/pkg/cache/placeholder_manager.go index 80c4c0579..2fe768363 100644 --- a/pkg/cache/placeholder_manager.go +++ b/pkg/cache/placeholder_manager.go @@ -20,6 +20,8 @@ package cache import ( "sync" + "sync/atomic" + "time" "go.uber.org/zap" v1 "k8s.io/api/core/v1" @@ -33,6 +35,8 @@ import ( type PlaceholderManager struct { clients *client.Clients orphanPod map[string]*v1.Pod + stopChan chan struct{} + running atomic.Value sync.RWMutex } @@ -97,3 +101,62 @@ func (mgr *PlaceholderManager) setMockedClients(mockedClients *client.Clients) { defer mgr.Unlock() mgr.clients = mockedClients } + +func (mgr *PlaceholderManager) cleanOrphanPlaceholders() { + mgr.Lock() + defer mgr.Unlock() + for taskID, pod := range mgr.orphanPod { + log.Logger().Debug("start to clean up orphan pod", + zap.String("taskID", taskID), + zap.String("podName", pod.Name)) + err := mgr.clients.KubeClient.Delete(pod) + if err != nil { + log.Logger().Warn("failed to clean up orphan pod", zap.Error(err)) + } else { + delete(mgr.orphanPod, taskID) + } + } +} + +func (mgr *PlaceholderManager) Start() { + log.Logger().Info("starting the Placeholder Manager") + mgr.stopChan = make(chan struct{}) + if mgr.isRunning() { + log.Logger().Info("The placeholder manager has been started") + return + } + mgr.setRunning(true) + go func() { + for { + select { + case <-mgr.stopChan: + log.Logger().Info("PlaceholderManager has been stopped") + mgr.setRunning(false) + return + default: + // clean orphan placeholders every 5 seconds + log.Logger().Info("clean up orphan pod") + mgr.cleanOrphanPlaceholders() + time.Sleep(5 * time.Second) + } + } + }() +} + +func (mgr *PlaceholderManager) Stop() { + if !mgr.isRunning() { + log.Logger().Info("The placeholder manager has been stopped") + return + } + log.Logger().Info("stopping the Placeholder Manager") + mgr.stopChan <- struct{}{} + time.Sleep(3 * time.Second) +} + +func (mgr *PlaceholderManager) isRunning() bool { + return mgr.running.Load().(bool) +} + +func (mgr *PlaceholderManager) setRunning(flag bool) { + mgr.running.Store(flag) +} diff --git a/pkg/cache/placeholder_manager_test.go b/pkg/cache/placeholder_manager_test.go index b43bb7e17..93b9cbf0b 100644 --- a/pkg/cache/placeholder_manager_test.go +++ b/pkg/cache/placeholder_manager_test.go @@ -21,6 +21,7 @@ package cache import ( "fmt" "sync" + "sync/atomic" "testing" "gotest.tools/assert" @@ -173,3 +174,44 @@ func TestCleanUp(t *testing.T) { assert.Equal(t, exist, false) assert.Equal(t, len(placeholderMgr.orphanPod), 0) } + +func TestCleanOrphanPlaceholders(t *testing.T) { + mockedAPIProvider := client.NewMockedAPIProvider() + placeholderMgr := &PlaceholderManager{ + clients: mockedAPIProvider.GetAPIs(), + orphanPod: make(map[string]*v1.Pod), + RWMutex: sync.RWMutex{}, + } + pod1 := &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: "pod-01", + UID: "UID-01", + }, + } + placeholderMgr.orphanPod["task01"] = pod1 + assert.Equal(t, len(placeholderMgr.orphanPod), 1) + placeholderMgr.cleanOrphanPlaceholders() + assert.Equal(t, len(placeholderMgr.orphanPod), 0) +} + +func TestPlaceholderManagerStartStop(t *testing.T) { + mockedAPIProvider := client.NewMockedAPIProvider() + placeholderMgr := &PlaceholderManager{ + clients: mockedAPIProvider.GetAPIs(), + orphanPod: make(map[string]*v1.Pod), + running: atomic.Value{}, + RWMutex: sync.RWMutex{}, + } + placeholderMgr.setRunning(false) + // start clean up goroutine + placeholderMgr.Start() + assert.Equal(t, placeholderMgr.isRunning(), true) + + placeholderMgr.Stop() + // check orphan pod map is empty + assert.Equal(t, placeholderMgr.isRunning(), false) +}