Skip to content

Commit

Permalink
Fix Fake Watch when Session Expired & Remove Run()
Browse files Browse the repository at this point in the history
  • Loading branch information
QuangTung97 committed Apr 10, 2024
1 parent 2b1b607 commit fc3f4d4
Show file tree
Hide file tree
Showing 8 changed files with 447 additions and 607 deletions.
106 changes: 50 additions & 56 deletions concurrency/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,31 +45,29 @@ func (e *Lock) Start(sess *curator.Session, next func(sess *curator.Session)) {
}

func (e *Lock) initFunc(sess *curator.Session) {
sess.Run(func(client curator.Client) {
client.Children(e.parent, func(resp zk.ChildrenResponse, err error) {
if err != nil {
if errors.Is(err, zk.ErrConnectionClosed) {
sess.AddRetry(e.initFunc)
return
}
if errors.Is(err, zk.ErrNoNode) {
log.Panicf("ZNode '%s' does NOT exist", e.parent)
}
panic(err)
}

var prevNode string
status := e.computeLockStatus(resp, &prevNode)
if status == lockStatusNeedCreate {
e.createEphemeral(sess)
sess.GetClient().Children(e.parent, func(resp zk.ChildrenResponse, err error) {
if err != nil {
if errors.Is(err, zk.ErrConnectionClosed) {
sess.AddRetry(e.initFunc)
return
}
if status == lockStatusBlocked {
e.watchPreviousNode(sess, prevNode)
return
if errors.Is(err, zk.ErrNoNode) {
log.Panicf("ZNode '%s' does NOT exist", e.parent)
}
e.onGranted(sess)
})
panic(err)
}

var prevNode string
status := e.computeLockStatus(resp, &prevNode)
if status == lockStatusNeedCreate {
e.createEphemeral(sess)
return
}
if status == lockStatusBlocked {
e.watchPreviousNode(sess, prevNode)
return
}
e.onGranted(sess)
})
}

Expand Down Expand Up @@ -133,43 +131,39 @@ func stringCmp(a, b string) int {
}

func (e *Lock) createEphemeral(sess *curator.Session) {
sess.Run(func(client curator.Client) {
p := e.parent + "/node:" + e.nodeID + "-"
client.Create(p, nil, zk.FlagEphemeral|zk.FlagSequence,
func(resp zk.CreateResponse, err error) {
if err != nil {
if errors.Is(err, zk.ErrConnectionClosed) {
sess.AddRetry(e.initFunc)
return
}
panic(err)
p := e.parent + "/node:" + e.nodeID + "-"
sess.GetClient().Create(p, nil, zk.FlagEphemeral|zk.FlagSequence,
func(resp zk.CreateResponse, err error) {
if err != nil {
if errors.Is(err, zk.ErrConnectionClosed) {
sess.AddRetry(e.initFunc)
return
}
e.initFunc(sess)
},
)
})
panic(err)
}
e.initFunc(sess)
},
)
}

func (e *Lock) watchPreviousNode(sess *curator.Session, prevNode string) {
sess.Run(func(client curator.Client) {
client.GetW(prevNode, func(resp zk.GetResponse, err error) {
if err == nil {
return
}
if errors.Is(err, zk.ErrNoNode) {
e.initFunc(sess)
return
}
if errors.Is(err, zk.ErrConnectionClosed) {
sess.AddRetry(e.initFunc)
return
}
panic(err)
}, func(ev zk.Event) {
if ev.Type == zk.EventNodeDeleted {
e.initFunc(sess)
return
}
})
sess.GetClient().GetW(prevNode, func(resp zk.GetResponse, err error) {
if err == nil {
return
}
if errors.Is(err, zk.ErrNoNode) {
e.initFunc(sess)
return
}
if errors.Is(err, zk.ErrConnectionClosed) {
sess.AddRetry(e.initFunc)
return
}
panic(err)
}, func(ev zk.Event) {
if ev.Type == zk.EventNodeDeleted {
e.initFunc(sess)
return
}
})
}
47 changes: 22 additions & 25 deletions concurrency/lock_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,24 @@ func newSimpleCounter(client curator.FakeClientID) *simpleCounter {
}

func (l *simpleCounter) isLeader(sess *curator.Session) {
sess.Run(func(client curator.Client) {
client.Get("/counter", func(resp zk.GetResponse, err error) {
if err != nil {
if errors.Is(err, zk.ErrNoNode) {
l.increase(sess, 1, 0)
return
}
if errors.Is(err, zk.ErrConnectionClosed) {
sess.AddRetry(l.isLeader)
return
}
panic(err)
sess.GetClient().Get("/counter", func(resp zk.GetResponse, err error) {
if err != nil {
if errors.Is(err, zk.ErrNoNode) {
l.increase(sess, 1, 0)
return
}

num, err := strconv.ParseInt(string(resp.Data), 10, 64)
if err != nil {
panic(err)
if errors.Is(err, zk.ErrConnectionClosed) {
sess.AddRetry(l.isLeader)
return
}
l.increase(sess, int(num)+1, resp.Stat.Version)
})
panic(err)
}

num, err := strconv.ParseInt(string(resp.Data), 10, 64)
if err != nil {
panic(err)
}
l.increase(sess, int(num)+1, resp.Stat.Version)
})
}

Expand Down Expand Up @@ -73,11 +71,10 @@ func (l *simpleCounter) createCounterResp(sess *curator.Session) func(_ zk.Creat
}

func (l *simpleCounter) increase(sess *curator.Session, nextVal int, version int32) {
sess.Run(func(client curator.Client) {
if nextVal > 1 {
client.Set("/counter", numToBytes(nextVal), version, l.setCounterResp(sess))
} else {
client.Create("/counter", numToBytes(nextVal), 0, l.createCounterResp(sess))
}
})
client := sess.GetClient()
if nextVal > 1 {
client.Set("/counter", numToBytes(nextVal), version, l.setCounterResp(sess))
} else {
client.Create("/counter", numToBytes(nextVal), 0, l.createCounterResp(sess))
}
}
10 changes: 4 additions & 6 deletions concurrency/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ func initStore(parent string) *curator.FakeZookeeper {

c := curator.NewFakeClientFactory(store, initClient)
c.Start(curator.New(func(sess *curator.Session) {
sess.Run(func(client curator.Client) {
client.Create(parent, nil, 0, func(resp zk.CreateResponse, err error) {
if err != nil {
panic(err)
}
})
sess.GetClient().Create(parent, nil, 0, func(resp zk.CreateResponse, err error) {
if err != nil {
panic(err)
}
})
}))

Expand Down
26 changes: 3 additions & 23 deletions curator/curator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,29 +77,9 @@ func (c *Curator) End() {
c.sess = nil
}

type nullClient struct {
valid bool
client Client
}

func (s *Session) getClient() nullClient {
if s.state.sess != s {
return nullClient{}
}
return nullClient{
valid: true,
client: s.state.client,
}
}

// Run allows to access to the Client object for accessing zookeeper.
// The callback fn function is only be called when the session is still active.
func (s *Session) Run(fn func(client Client)) {
sessClient := s.getClient()
if !sessClient.valid {
return
}
fn(sessClient.client)
// GetClient returns Client
func (s *Session) GetClient() Client {
return s.state.client
}

// AddRetry add a callback function that will be called after connection is re-established.
Expand Down
31 changes: 4 additions & 27 deletions curator/curator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ func TestCurator(t *testing.T) {
steps := make([]string, 0)
c := New(func(sess *Session) {
steps = append(steps, "start")
sess.Run(func(client Client) {
sess.AddRetry(func(sess *Session) {
steps = append(steps, "retry")
})
sess.AddRetry(func(sess *Session) {
steps = append(steps, "retry")
})
})

Expand All @@ -41,10 +39,8 @@ func TestCurator(t *testing.T) {
steps := make([]string, 0)
c := New(func(sess *Session) {
steps = append(steps, "start")
sess.Run(func(client Client) {
sess.AddRetry(func(sess *Session) {
steps = append(steps, "retry")
})
sess.AddRetry(func(sess *Session) {
steps = append(steps, "retry")
})
})

Expand All @@ -56,25 +52,6 @@ func TestCurator(t *testing.T) {

assert.Equal(t, []string{"start", "start", "retry"}, steps)
})

t.Run("callback after end", func(t *testing.T) {
steps := make([]string, 0)
var callback func()
c := New(func(sess *Session) {
steps = append(steps, "start")
callback = func() {
sess.Run(func(client Client) {
steps = append(steps, "run-callback")
})
}
})

c.Begin(nil)
c.End()
callback()

assert.Equal(t, []string{"start"}, steps)
})
}

func TestCurator_Chain(t *testing.T) {
Expand Down

0 comments on commit fc3f4d4

Please sign in to comment.