Skip to content

Commit

Permalink
Convert Session Moved Error and Closing Error to Connection Error (#6)
Browse files Browse the repository at this point in the history
* Convert Session Moved Error and Closing Error to Connection Error

* Remove clientWatchEvent
  • Loading branch information
QuangTung97 committed Apr 5, 2024
1 parent 7317a0b commit 171f843
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 42 deletions.
46 changes: 14 additions & 32 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type Client struct {
conn NetworkConn

creds []authCreds
watchers map[watchPathType][]func(ev clientWatchEvent)
watchers map[watchPathType][]func(ev Event)
// =================================

wg sync.WaitGroup
Expand Down Expand Up @@ -176,15 +176,9 @@ type handleEvent struct {
req clientRequest
}

type clientWatchEvent struct {
Type EventType
State State
Path string // For non-session events, the path of the watched node.
}

type clientWatchRequest struct {
pathType watchPathType
callback func(ev clientWatchEvent)
callback func(ev Event)
}

// NetworkConn for connections when connecting to zookeeper.
Expand Down Expand Up @@ -233,7 +227,7 @@ func newClientInternal(servers []string, sessionTimeout time.Duration, options .

recvMap: map[int32]clientRequest{},

watchers: map[watchPathType][]func(ev clientWatchEvent){},
watchers: map[watchPathType][]func(ev Event){},

pingSignalChan: make(chan struct{}, 10),
pingCloseChan: make(chan struct{}),
Expand Down Expand Up @@ -596,7 +590,7 @@ func (c *Client) authenticate(conn NetworkConn) error {
c.appendHandleQueueGlobalEvent(c.sessExpiredCallback)
}

c.watchers = map[watchPathType][]func(ev clientWatchEvent){}
c.watchers = map[watchPathType][]func(ev Event){}

c.mut.Unlock()

Expand Down Expand Up @@ -1009,7 +1003,7 @@ func (c *Client) handleNormalResponse(res responseHeader, buf []byte, blen int)
}

output := connNormal()
if res.Err == errSessionExpired {
if res.Err == errSessionExpired || res.Err == errSessionMoved || res.Err == errClosing {
err = ErrConnectionClosed
output = connError(err)
}
Expand All @@ -1027,7 +1021,7 @@ func (c *Client) handleWatchEvent(buf []byte, blen int, res responseHeader) conn
log.Panicf("Unable to decode watch event, err: %v", err)
}

ev := clientWatchEvent{
ev := Event{
Type: watchResp.Type,
State: watchResp.State,
Path: watchResp.Path,
Expand All @@ -1037,7 +1031,7 @@ func (c *Client) handleWatchEvent(buf []byte, blen int, res responseHeader) conn
defer c.mut.Unlock()

watchTypes := computeWatchTypes(watchResp.Type)
var callbacks []func(ev clientWatchEvent)
var callbacks []func(ev Event)
for _, wType := range watchTypes {
wpt := watchPathType{path: ev.Path, wType: wType}
callbacks = append(callbacks, c.watchers[wpt]...)
Expand All @@ -1049,7 +1043,7 @@ func (c *Client) handleWatchEvent(buf []byte, blen int, res responseHeader) conn
opcode: opWatcherEvent,
response: &ev,
callback: func(res any, zxid int64, err error) {
ev := res.(*clientWatchEvent)
ev := res.(*Event)
for _, cb := range callbacks {
cb(*ev)
}
Expand Down Expand Up @@ -1184,12 +1178,8 @@ func (c *Client) Children(
path: path,
wType: watchTypeChild,
},
callback: func(ev clientWatchEvent) {
opts.watchCallback(Event{
Type: ev.Type,
State: ev.State,
Path: ev.Path,
})
callback: func(ev Event) {
opts.watchCallback(ev)
},
}
}
Expand Down Expand Up @@ -1273,12 +1263,8 @@ func (c *Client) Get(
path: path,
wType: watchTypeData,
},
callback: func(ev clientWatchEvent) {
opts.watchCallback(Event{
Type: ev.Type,
State: ev.State,
Path: ev.Path,
})
callback: func(ev Event) {
opts.watchCallback(ev)
},
}
}
Expand Down Expand Up @@ -1396,12 +1382,8 @@ func (c *Client) Exists(
path: path,
wType: watchTypeExist,
},
callback: func(ev clientWatchEvent) {
opts.watchCallback(Event{
Type: ev.Type,
State: ev.State,
Path: ev.Path,
})
callback: func(ev Event) {
opts.watchCallback(ev)
},
}
}
Expand Down
72 changes: 70 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ func TestClient_RecvData(t *testing.T) {
path: "/workers-resp",
wType: watchTypeData,
},
callback: func(ev clientWatchEvent) {},
callback: func(ev Event) {},
},
)

Expand All @@ -820,7 +820,7 @@ func TestClient_RecvData(t *testing.T) {
req: clientRequest{
xid: -1,
opcode: opWatcherEvent,
response: &clientWatchEvent{
response: &Event{
Type: EventNodeDataChanged,
State: StateHasSession,
Path: "/workers-resp",
Expand Down Expand Up @@ -866,6 +866,74 @@ func TestClient_RecvData(t *testing.T) {
assert.Equal(t, ErrConnectionClosed, queue[0].err)
})

t.Run("receive session moved error", func(t *testing.T) {
c := newClientTest(t)
c.doAuthenticate()

c.client.enqueueRequest(
opGetData, &getDataRequest{}, &getDataResponse{},
nil,
)
c.client.getFromSendQueue()

buf := make([]byte, 2048)
n1, _ := encodePacket(buf[4:], &responseHeader{
Xid: 1,
Zxid: 73,
Err: errSessionMoved,
})
binary.BigEndian.PutUint32(buf[:4], uint32(n1))

c.conn.readBuf.Write(buf[:4+n1])

output := c.client.readSingleData(c.conn)
assert.Equal(t, connIOOutput{
closed: false,
broken: true,
err: ErrConnectionClosed,
}, output)

// Check Handle Queue
queue := c.client.handleQueue

assert.Equal(t, 1, len(queue))
assert.Equal(t, ErrConnectionClosed, queue[0].err)
})

t.Run("receive zookeeper is closing error", func(t *testing.T) {
c := newClientTest(t)
c.doAuthenticate()

c.client.enqueueRequest(
opGetData, &getDataRequest{}, &getDataResponse{},
nil,
)
c.client.getFromSendQueue()

buf := make([]byte, 2048)
n1, _ := encodePacket(buf[4:], &responseHeader{
Xid: 1,
Zxid: 73,
Err: errClosing,
})
binary.BigEndian.PutUint32(buf[:4], uint32(n1))

c.conn.readBuf.Write(buf[:4+n1])

output := c.client.readSingleData(c.conn)
assert.Equal(t, connIOOutput{
closed: false,
broken: true,
err: ErrConnectionClosed,
}, output)

// Check Handle Queue
queue := c.client.handleQueue

assert.Equal(t, 1, len(queue))
assert.Equal(t, ErrConnectionClosed, queue[0].err)
})

t.Run("read header len error", func(t *testing.T) {
c := newClientTest(t)

Expand Down
15 changes: 9 additions & 6 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
)

//revive:disable:exported

const (
protocolVersion = 0
// DefaultPort is the default port listened by server.
Expand Down Expand Up @@ -40,10 +38,13 @@ const (
)

const (
// EventNodeCreated represents a node is created.
EventNodeCreated EventType = 1
EventNodeDeleted EventType = 2
EventNodeDataChanged EventType = 3
// EventNodeCreated represents a znode is created.
EventNodeCreated EventType = 1
// EventNodeDeleted represents a znode is deleted
EventNodeDeleted EventType = 2
// EventNodeDataChanged represents a znode data is changed by Set
EventNodeDataChanged EventType = 3
// EventNodeChildrenChanged when the children of a znode created or deleted
EventNodeChildrenChanged EventType = 4
)

Expand All @@ -56,6 +57,8 @@ var (
}
)

//revive:disable:exported

const (
// StateUnknown means the session state is unknown.
StateUnknown State = -1
Expand Down
2 changes: 0 additions & 2 deletions todolist
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
*) Add Lint
*) Convert errSessionMoved to Connection Error
*) Batching Read & Write to TCP (Need to do or not?)
*) Stress Tests with Race Detector
*) Add Multi-Ops Transactions

0 comments on commit 171f843

Please sign in to comment.