Skip to content

Commit

Permalink
Add Parallel Runner
Browse files Browse the repository at this point in the history
  • Loading branch information
QuangTung97 committed Apr 2, 2024
1 parent 49df920 commit 74133e7
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 3 deletions.
2 changes: 1 addition & 1 deletion curator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (f *clientFactoryImpl) Start(runner SessionRunner) {
auth := []byte(f.username + ":" + f.password)
acl := zk.DigestACL(zk.PermAll, f.username, f.password)

// channel will be closed when add auth completed (not need for the callback to be finished)
// channel will be closed when add auth completed (not need for the add auth callback to be finished)
addAuthDone := make(chan struct{})

zkClient, err := zk.NewClient(f.servers, 12*time.Second,
Expand Down
29 changes: 29 additions & 0 deletions curator/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package curator

type parallelRunnerImpl struct {
runners []SessionRunner
}

func NewParallelRunner(runners ...SessionRunner) SessionRunner {
return &parallelRunnerImpl{
runners: runners,
}
}

func (r *parallelRunnerImpl) Begin(client Client) {
for _, runner := range r.runners {
runner.Begin(client)
}
}

func (r *parallelRunnerImpl) Retry() {
for _, runner := range r.runners {
runner.Retry()
}
}

func (r *parallelRunnerImpl) End() {
for _, runner := range r.runners {
runner.End()
}
}
59 changes: 59 additions & 0 deletions curator/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package curator

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestParallelRunner(t *testing.T) {
t.Run("normal", func(t *testing.T) {
var steps []string
r := NewParallelRunner(
New(func(sess *Session) {
steps = append(steps, "init01")
sess.Run(func(client Client) {
sess.AddRetry(func(sess *Session) {
steps = append(steps, "retry01")
})
})
}),
New(func(sess *Session) {
steps = append(steps, "init02")
}),
)

assert.Equal(t, 0, len(steps))

r.Begin(nil)
assert.Equal(t, []string{
"init01",
"init02",
}, steps)

r.Retry()
assert.Equal(t, []string{
"init01",
"init02",
"retry01",
}, steps)

r.End()
assert.Equal(t, []string{
"init01",
"init02",
"retry01",
}, steps)

// Session Begin Again
r.Begin(nil)
assert.Equal(t, []string{
"init01",
"init02",
"retry01",
"init01",
"init02",
}, steps)
})

}
3 changes: 1 addition & 2 deletions todolist
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
*) Test Retry When Leader Expired (when Error = Session Expired)
*) Add CI Pipeline for Integration Tests
*) Batching Read & Write to TCP
*) Test Zookeeper with Clusters and IPTables
*) Stress Tests with Race Detector
*) Add Multi-Ops Transactions

0 comments on commit 74133e7

Please sign in to comment.