From 171f843ce92d6dcadf48d8e3cf90fa4fc0783b71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Quang=20T=C3=B9ng?= Date: Fri, 5 Apr 2024 09:18:14 +0700 Subject: [PATCH] Convert Session Moved Error and Closing Error to Connection Error (#6) * Convert Session Moved Error and Closing Error to Connection Error * Remove clientWatchEvent --- client.go | 46 ++++++++++---------------------- client_test.go | 72 ++++++++++++++++++++++++++++++++++++++++++++++++-- constants.go | 15 ++++++----- todolist | 2 -- 4 files changed, 93 insertions(+), 42 deletions(-) diff --git a/client.go b/client.go index 1e3b4fa..1a2f584 100644 --- a/client.go +++ b/client.go @@ -91,7 +91,7 @@ type Client struct { conn NetworkConn creds []authCreds - watchers map[watchPathType][]func(ev clientWatchEvent) + watchers map[watchPathType][]func(ev Event) // ================================= wg sync.WaitGroup @@ -176,15 +176,9 @@ type handleEvent struct { req clientRequest } -type clientWatchEvent struct { - Type EventType - State State - Path string // For non-session events, the path of the watched node. -} - type clientWatchRequest struct { pathType watchPathType - callback func(ev clientWatchEvent) + callback func(ev Event) } // NetworkConn for connections when connecting to zookeeper. @@ -233,7 +227,7 @@ func newClientInternal(servers []string, sessionTimeout time.Duration, options . recvMap: map[int32]clientRequest{}, - watchers: map[watchPathType][]func(ev clientWatchEvent){}, + watchers: map[watchPathType][]func(ev Event){}, pingSignalChan: make(chan struct{}, 10), pingCloseChan: make(chan struct{}), @@ -596,7 +590,7 @@ func (c *Client) authenticate(conn NetworkConn) error { c.appendHandleQueueGlobalEvent(c.sessExpiredCallback) } - c.watchers = map[watchPathType][]func(ev clientWatchEvent){} + c.watchers = map[watchPathType][]func(ev Event){} c.mut.Unlock() @@ -1009,7 +1003,7 @@ func (c *Client) handleNormalResponse(res responseHeader, buf []byte, blen int) } output := connNormal() - if res.Err == errSessionExpired { + if res.Err == errSessionExpired || res.Err == errSessionMoved || res.Err == errClosing { err = ErrConnectionClosed output = connError(err) } @@ -1027,7 +1021,7 @@ func (c *Client) handleWatchEvent(buf []byte, blen int, res responseHeader) conn log.Panicf("Unable to decode watch event, err: %v", err) } - ev := clientWatchEvent{ + ev := Event{ Type: watchResp.Type, State: watchResp.State, Path: watchResp.Path, @@ -1037,7 +1031,7 @@ func (c *Client) handleWatchEvent(buf []byte, blen int, res responseHeader) conn defer c.mut.Unlock() watchTypes := computeWatchTypes(watchResp.Type) - var callbacks []func(ev clientWatchEvent) + var callbacks []func(ev Event) for _, wType := range watchTypes { wpt := watchPathType{path: ev.Path, wType: wType} callbacks = append(callbacks, c.watchers[wpt]...) @@ -1049,7 +1043,7 @@ func (c *Client) handleWatchEvent(buf []byte, blen int, res responseHeader) conn opcode: opWatcherEvent, response: &ev, callback: func(res any, zxid int64, err error) { - ev := res.(*clientWatchEvent) + ev := res.(*Event) for _, cb := range callbacks { cb(*ev) } @@ -1184,12 +1178,8 @@ func (c *Client) Children( path: path, wType: watchTypeChild, }, - callback: func(ev clientWatchEvent) { - opts.watchCallback(Event{ - Type: ev.Type, - State: ev.State, - Path: ev.Path, - }) + callback: func(ev Event) { + opts.watchCallback(ev) }, } } @@ -1273,12 +1263,8 @@ func (c *Client) Get( path: path, wType: watchTypeData, }, - callback: func(ev clientWatchEvent) { - opts.watchCallback(Event{ - Type: ev.Type, - State: ev.State, - Path: ev.Path, - }) + callback: func(ev Event) { + opts.watchCallback(ev) }, } } @@ -1396,12 +1382,8 @@ func (c *Client) Exists( path: path, wType: watchTypeExist, }, - callback: func(ev clientWatchEvent) { - opts.watchCallback(Event{ - Type: ev.Type, - State: ev.State, - Path: ev.Path, - }) + callback: func(ev Event) { + opts.watchCallback(ev) }, } } diff --git a/client_test.go b/client_test.go index c056d01..de61406 100644 --- a/client_test.go +++ b/client_test.go @@ -803,7 +803,7 @@ func TestClient_RecvData(t *testing.T) { path: "/workers-resp", wType: watchTypeData, }, - callback: func(ev clientWatchEvent) {}, + callback: func(ev Event) {}, }, ) @@ -820,7 +820,7 @@ func TestClient_RecvData(t *testing.T) { req: clientRequest{ xid: -1, opcode: opWatcherEvent, - response: &clientWatchEvent{ + response: &Event{ Type: EventNodeDataChanged, State: StateHasSession, Path: "/workers-resp", @@ -866,6 +866,74 @@ func TestClient_RecvData(t *testing.T) { assert.Equal(t, ErrConnectionClosed, queue[0].err) }) + t.Run("receive session moved error", func(t *testing.T) { + c := newClientTest(t) + c.doAuthenticate() + + c.client.enqueueRequest( + opGetData, &getDataRequest{}, &getDataResponse{}, + nil, + ) + c.client.getFromSendQueue() + + buf := make([]byte, 2048) + n1, _ := encodePacket(buf[4:], &responseHeader{ + Xid: 1, + Zxid: 73, + Err: errSessionMoved, + }) + binary.BigEndian.PutUint32(buf[:4], uint32(n1)) + + c.conn.readBuf.Write(buf[:4+n1]) + + output := c.client.readSingleData(c.conn) + assert.Equal(t, connIOOutput{ + closed: false, + broken: true, + err: ErrConnectionClosed, + }, output) + + // Check Handle Queue + queue := c.client.handleQueue + + assert.Equal(t, 1, len(queue)) + assert.Equal(t, ErrConnectionClosed, queue[0].err) + }) + + t.Run("receive zookeeper is closing error", func(t *testing.T) { + c := newClientTest(t) + c.doAuthenticate() + + c.client.enqueueRequest( + opGetData, &getDataRequest{}, &getDataResponse{}, + nil, + ) + c.client.getFromSendQueue() + + buf := make([]byte, 2048) + n1, _ := encodePacket(buf[4:], &responseHeader{ + Xid: 1, + Zxid: 73, + Err: errClosing, + }) + binary.BigEndian.PutUint32(buf[:4], uint32(n1)) + + c.conn.readBuf.Write(buf[:4+n1]) + + output := c.client.readSingleData(c.conn) + assert.Equal(t, connIOOutput{ + closed: false, + broken: true, + err: ErrConnectionClosed, + }, output) + + // Check Handle Queue + queue := c.client.handleQueue + + assert.Equal(t, 1, len(queue)) + assert.Equal(t, ErrConnectionClosed, queue[0].err) + }) + t.Run("read header len error", func(t *testing.T) { c := newClientTest(t) diff --git a/constants.go b/constants.go index eccac7d..8de4816 100644 --- a/constants.go +++ b/constants.go @@ -5,8 +5,6 @@ import ( "fmt" ) -//revive:disable:exported - const ( protocolVersion = 0 // DefaultPort is the default port listened by server. @@ -40,10 +38,13 @@ const ( ) const ( - // EventNodeCreated represents a node is created. - EventNodeCreated EventType = 1 - EventNodeDeleted EventType = 2 - EventNodeDataChanged EventType = 3 + // EventNodeCreated represents a znode is created. + EventNodeCreated EventType = 1 + // EventNodeDeleted represents a znode is deleted + EventNodeDeleted EventType = 2 + // EventNodeDataChanged represents a znode data is changed by Set + EventNodeDataChanged EventType = 3 + // EventNodeChildrenChanged when the children of a znode created or deleted EventNodeChildrenChanged EventType = 4 ) @@ -56,6 +57,8 @@ var ( } ) +//revive:disable:exported + const ( // StateUnknown means the session state is unknown. StateUnknown State = -1 diff --git a/todolist b/todolist index b20f792..15927ce 100644 --- a/todolist +++ b/todolist @@ -1,5 +1,3 @@ -*) Add Lint -*) Convert errSessionMoved to Connection Error *) Batching Read & Write to TCP (Need to do or not?) *) Stress Tests with Race Detector *) Add Multi-Ops Transactions