From 8ba6fc7cbe3b63fcf4182be178d6f3652bddead7 Mon Sep 17 00:00:00 2001 From: HuangTing-Yao Date: Thu, 26 Nov 2020 20:36:05 +0800 Subject: [PATCH 1/4] [YUNIKORN-469]Implement recycling service in PlaceholderManager that cleans up orphan placeholders --- pkg/cache/placeholder_manager.go | 19 +++++++++++++++++++ pkg/cache/placeholder_manager_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/pkg/cache/placeholder_manager.go b/pkg/cache/placeholder_manager.go index 80c4c0579..b9c2b4b8c 100644 --- a/pkg/cache/placeholder_manager.go +++ b/pkg/cache/placeholder_manager.go @@ -20,6 +20,7 @@ package cache import ( "sync" + "time" "go.uber.org/zap" v1 "k8s.io/api/core/v1" @@ -97,3 +98,21 @@ func (mgr *PlaceholderManager) setMockedClients(mockedClients *client.Clients) { defer mgr.Unlock() mgr.clients = mockedClients } + +func (mgr *PlaceholderManager) cleanOrphanPlaceholders() { + for taskID, pod := range mgr.orphanPod { + err := mgr.clients.KubeClient.Delete(pod) + if err == nil { + delete(mgr.orphanPod, taskID) + } + } +} + +func (mgr *PlaceholderManager) Start() { + go func() { + for { + mgr.cleanOrphanPlaceholders() + time.Sleep(5 * time.Second) + } + }() +} diff --git a/pkg/cache/placeholder_manager_test.go b/pkg/cache/placeholder_manager_test.go index b43bb7e17..e54d32eda 100644 --- a/pkg/cache/placeholder_manager_test.go +++ b/pkg/cache/placeholder_manager_test.go @@ -173,3 +173,27 @@ func TestCleanUp(t *testing.T) { assert.Equal(t, exist, false) assert.Equal(t, len(placeholderMgr.orphanPod), 0) } + +func TestCleanOrphanPlaceholders(t *testing.T) { + mockedAPIProvider := client.NewMockedAPIProvider() + placeholderMgr := &PlaceholderManager{ + clients: mockedAPIProvider.GetAPIs(), + orphanPod: make(map[string]*v1.Pod), + RWMutex: sync.RWMutex{}, + } + pod1 := &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: "pod-01", + UID: "UID-01", + }, + } + placeholderMgr.orphanPod["task01"] = pod1 + assert.Equal(t, len(placeholderMgr.orphanPod), 1) + + placeholderMgr.cleanOrphanPlaceholders() + assert.Equal(t, len(placeholderMgr.orphanPod), 0) +} From e419006a4fb03a2e538b8362c7df55292f8ce031 Mon Sep 17 00:00:00 2001 From: HuangTing-Yao Date: Fri, 27 Nov 2020 18:27:10 +0800 Subject: [PATCH 2/4] rerun travis --- pkg/cache/placeholder_manager.go | 1 + pkg/cache/placeholder_manager_test.go | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cache/placeholder_manager.go b/pkg/cache/placeholder_manager.go index b9c2b4b8c..7e6364ec3 100644 --- a/pkg/cache/placeholder_manager.go +++ b/pkg/cache/placeholder_manager.go @@ -111,6 +111,7 @@ func (mgr *PlaceholderManager) cleanOrphanPlaceholders() { func (mgr *PlaceholderManager) Start() { go func() { for { + // clean orphan placeholders every 5 seconds mgr.cleanOrphanPlaceholders() time.Sleep(5 * time.Second) } diff --git a/pkg/cache/placeholder_manager_test.go b/pkg/cache/placeholder_manager_test.go index e54d32eda..6c96fc56d 100644 --- a/pkg/cache/placeholder_manager_test.go +++ b/pkg/cache/placeholder_manager_test.go @@ -193,7 +193,6 @@ func TestCleanOrphanPlaceholders(t *testing.T) { } placeholderMgr.orphanPod["task01"] = pod1 assert.Equal(t, len(placeholderMgr.orphanPod), 1) - placeholderMgr.cleanOrphanPlaceholders() assert.Equal(t, len(placeholderMgr.orphanPod), 0) } From ba6b5e5b70b80a77658f018b680e411f41ec3231 Mon Sep 17 00:00:00 2001 From: HuangTing-Yao Date: Tue, 1 Dec 2020 15:42:03 +0800 Subject: [PATCH 3/4] add log, lock and stop chan --- pkg/cache/placeholder_manager.go | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/pkg/cache/placeholder_manager.go b/pkg/cache/placeholder_manager.go index 7e6364ec3..024d1e896 100644 --- a/pkg/cache/placeholder_manager.go +++ b/pkg/cache/placeholder_manager.go @@ -34,6 +34,7 @@ import ( type PlaceholderManager struct { clients *client.Clients orphanPod map[string]*v1.Pod + stopChan chan struct{} sync.RWMutex } @@ -100,20 +101,40 @@ func (mgr *PlaceholderManager) setMockedClients(mockedClients *client.Clients) { } func (mgr *PlaceholderManager) cleanOrphanPlaceholders() { + mgr.Lock() + defer mgr.Unlock() for taskID, pod := range mgr.orphanPod { + log.Logger().Debug("start to clean up orphan pod", + zap.String("taskID", taskID), + zap.String("podName", pod.Name)) err := mgr.clients.KubeClient.Delete(pod) - if err == nil { + if err != nil { + log.Logger().Warn("failed to clean up orphan pod", zap.Error(err)) + } else { delete(mgr.orphanPod, taskID) } } } func (mgr *PlaceholderManager) Start() { + log.Logger().Info("starting the Placeholder Manager") + mgr.stopChan = make(chan struct{}) go func() { for { - // clean orphan placeholders every 5 seconds - mgr.cleanOrphanPlaceholders() - time.Sleep(5 * time.Second) + select { + case <-mgr.stopChan: + log.Logger().Info("PlaceholderManager has been stopped") + return + default: + // clean orphan placeholders every 5 seconds + mgr.cleanOrphanPlaceholders() + time.Sleep(5 * time.Second) + } } }() } + +func (mgr *PlaceholderManager) Stop() { + log.Logger().Info("stopping the Placeholder Manager") + mgr.stopChan <- struct{}{} +} From a854cee1a4d7e9ff76bb818e7723897b0343d8c9 Mon Sep 17 00:00:00 2001 From: HuangTing-Yao Date: Thu, 3 Dec 2020 13:48:02 +0800 Subject: [PATCH 4/4] add StartStop UT --- pkg/cache/placeholder_manager.go | 22 ++++++++++++++++++++++ pkg/cache/placeholder_manager_test.go | 19 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/pkg/cache/placeholder_manager.go b/pkg/cache/placeholder_manager.go index 024d1e896..2fe768363 100644 --- a/pkg/cache/placeholder_manager.go +++ b/pkg/cache/placeholder_manager.go @@ -20,6 +20,7 @@ package cache import ( "sync" + "sync/atomic" "time" "go.uber.org/zap" @@ -35,6 +36,7 @@ type PlaceholderManager struct { clients *client.Clients orphanPod map[string]*v1.Pod stopChan chan struct{} + running atomic.Value sync.RWMutex } @@ -119,14 +121,21 @@ func (mgr *PlaceholderManager) cleanOrphanPlaceholders() { func (mgr *PlaceholderManager) Start() { log.Logger().Info("starting the Placeholder Manager") mgr.stopChan = make(chan struct{}) + if mgr.isRunning() { + log.Logger().Info("The placeholder manager has been started") + return + } + mgr.setRunning(true) go func() { for { select { case <-mgr.stopChan: log.Logger().Info("PlaceholderManager has been stopped") + mgr.setRunning(false) return default: // clean orphan placeholders every 5 seconds + log.Logger().Info("clean up orphan pod") mgr.cleanOrphanPlaceholders() time.Sleep(5 * time.Second) } @@ -135,6 +144,19 @@ func (mgr *PlaceholderManager) Start() { } func (mgr *PlaceholderManager) Stop() { + if !mgr.isRunning() { + log.Logger().Info("The placeholder manager has been stopped") + return + } log.Logger().Info("stopping the Placeholder Manager") mgr.stopChan <- struct{}{} + time.Sleep(3 * time.Second) +} + +func (mgr *PlaceholderManager) isRunning() bool { + return mgr.running.Load().(bool) +} + +func (mgr *PlaceholderManager) setRunning(flag bool) { + mgr.running.Store(flag) } diff --git a/pkg/cache/placeholder_manager_test.go b/pkg/cache/placeholder_manager_test.go index 6c96fc56d..93b9cbf0b 100644 --- a/pkg/cache/placeholder_manager_test.go +++ b/pkg/cache/placeholder_manager_test.go @@ -21,6 +21,7 @@ package cache import ( "fmt" "sync" + "sync/atomic" "testing" "gotest.tools/assert" @@ -196,3 +197,21 @@ func TestCleanOrphanPlaceholders(t *testing.T) { placeholderMgr.cleanOrphanPlaceholders() assert.Equal(t, len(placeholderMgr.orphanPod), 0) } + +func TestPlaceholderManagerStartStop(t *testing.T) { + mockedAPIProvider := client.NewMockedAPIProvider() + placeholderMgr := &PlaceholderManager{ + clients: mockedAPIProvider.GetAPIs(), + orphanPod: make(map[string]*v1.Pod), + running: atomic.Value{}, + RWMutex: sync.RWMutex{}, + } + placeholderMgr.setRunning(false) + // start clean up goroutine + placeholderMgr.Start() + assert.Equal(t, placeholderMgr.isRunning(), true) + + placeholderMgr.Stop() + // check orphan pod map is empty + assert.Equal(t, placeholderMgr.isRunning(), false) +}