From 74133e7b8ecf6b772480e86734af27e7ccefe23b Mon Sep 17 00:00:00 2001 From: "tung.tq" Date: Tue, 2 Apr 2024 14:56:23 +0700 Subject: [PATCH] Add Parallel Runner --- curator/client.go | 2 +- curator/util.go | 29 ++++++++++++++++++++++ curator/util_test.go | 59 ++++++++++++++++++++++++++++++++++++++++++++ todolist | 3 +-- 4 files changed, 90 insertions(+), 3 deletions(-) create mode 100644 curator/util.go create mode 100644 curator/util_test.go diff --git a/curator/client.go b/curator/client.go index c5158fe..c7f94af 100644 --- a/curator/client.go +++ b/curator/client.go @@ -65,7 +65,7 @@ func (f *clientFactoryImpl) Start(runner SessionRunner) { auth := []byte(f.username + ":" + f.password) acl := zk.DigestACL(zk.PermAll, f.username, f.password) - // channel will be closed when add auth completed (not need for the callback to be finished) + // channel will be closed when add auth completed (not need for the add auth callback to be finished) addAuthDone := make(chan struct{}) zkClient, err := zk.NewClient(f.servers, 12*time.Second, diff --git a/curator/util.go b/curator/util.go new file mode 100644 index 0000000..89320ac --- /dev/null +++ b/curator/util.go @@ -0,0 +1,29 @@ +package curator + +type parallelRunnerImpl struct { + runners []SessionRunner +} + +func NewParallelRunner(runners ...SessionRunner) SessionRunner { + return ¶llelRunnerImpl{ + runners: runners, + } +} + +func (r *parallelRunnerImpl) Begin(client Client) { + for _, runner := range r.runners { + runner.Begin(client) + } +} + +func (r *parallelRunnerImpl) Retry() { + for _, runner := range r.runners { + runner.Retry() + } +} + +func (r *parallelRunnerImpl) End() { + for _, runner := range r.runners { + runner.End() + } +} diff --git a/curator/util_test.go b/curator/util_test.go new file mode 100644 index 0000000..19083ed --- /dev/null +++ b/curator/util_test.go @@ -0,0 +1,59 @@ +package curator + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParallelRunner(t *testing.T) { + t.Run("normal", func(t *testing.T) { + var steps []string + r := NewParallelRunner( + New(func(sess *Session) { + steps = append(steps, "init01") + sess.Run(func(client Client) { + sess.AddRetry(func(sess *Session) { + steps = append(steps, "retry01") + }) + }) + }), + New(func(sess *Session) { + steps = append(steps, "init02") + }), + ) + + assert.Equal(t, 0, len(steps)) + + r.Begin(nil) + assert.Equal(t, []string{ + "init01", + "init02", + }, steps) + + r.Retry() + assert.Equal(t, []string{ + "init01", + "init02", + "retry01", + }, steps) + + r.End() + assert.Equal(t, []string{ + "init01", + "init02", + "retry01", + }, steps) + + // Session Begin Again + r.Begin(nil) + assert.Equal(t, []string{ + "init01", + "init02", + "retry01", + "init01", + "init02", + }, steps) + }) + +} diff --git a/todolist b/todolist index 5b588d3..22ab28b 100644 --- a/todolist +++ b/todolist @@ -1,5 +1,4 @@ -*) Test Retry When Leader Expired (when Error = Session Expired) +*) Add CI Pipeline for Integration Tests *) Batching Read & Write to TCP -*) Test Zookeeper with Clusters and IPTables *) Stress Tests with Race Detector *) Add Multi-Ops Transactions