Skip to content

Commit

Permalink
Convert Session Moved Error and Closing Error to Connection Error
Browse files Browse the repository at this point in the history
  • Loading branch information
QuangTung97 committed Apr 5, 2024
1 parent 7317a0b commit f924c43
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 3 deletions.
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ func (c *Client) handleNormalResponse(res responseHeader, buf []byte, blen int)
}

output := connNormal()
if res.Err == errSessionExpired {
if res.Err == errSessionExpired || res.Err == errSessionMoved || res.Err == errClosing {
err = ErrConnectionClosed
output = connError(err)
}
Expand Down
68 changes: 68 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,74 @@ func TestClient_RecvData(t *testing.T) {
assert.Equal(t, ErrConnectionClosed, queue[0].err)
})

t.Run("receive session moved error", func(t *testing.T) {
c := newClientTest(t)
c.doAuthenticate()

c.client.enqueueRequest(
opGetData, &getDataRequest{}, &getDataResponse{},
nil,
)
c.client.getFromSendQueue()

buf := make([]byte, 2048)
n1, _ := encodePacket(buf[4:], &responseHeader{
Xid: 1,
Zxid: 73,
Err: errSessionMoved,
})
binary.BigEndian.PutUint32(buf[:4], uint32(n1))

c.conn.readBuf.Write(buf[:4+n1])

output := c.client.readSingleData(c.conn)
assert.Equal(t, connIOOutput{
closed: false,
broken: true,
err: ErrConnectionClosed,
}, output)

// Check Handle Queue
queue := c.client.handleQueue

assert.Equal(t, 1, len(queue))
assert.Equal(t, ErrConnectionClosed, queue[0].err)
})

t.Run("receive zookeeper is closing error", func(t *testing.T) {
c := newClientTest(t)
c.doAuthenticate()

c.client.enqueueRequest(
opGetData, &getDataRequest{}, &getDataResponse{},
nil,
)
c.client.getFromSendQueue()

buf := make([]byte, 2048)
n1, _ := encodePacket(buf[4:], &responseHeader{
Xid: 1,
Zxid: 73,
Err: errClosing,
})
binary.BigEndian.PutUint32(buf[:4], uint32(n1))

c.conn.readBuf.Write(buf[:4+n1])

output := c.client.readSingleData(c.conn)
assert.Equal(t, connIOOutput{
closed: false,
broken: true,
err: ErrConnectionClosed,
}, output)

// Check Handle Queue
queue := c.client.handleQueue

assert.Equal(t, 1, len(queue))
assert.Equal(t, ErrConnectionClosed, queue[0].err)
})

t.Run("read header len error", func(t *testing.T) {
c := newClientTest(t)

Expand Down
2 changes: 0 additions & 2 deletions todolist
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
*) Add Lint
*) Convert errSessionMoved to Connection Error
*) Batching Read & Write to TCP (Need to do or not?)
*) Stress Tests with Race Detector
*) Add Multi-Ops Transactions

0 comments on commit f924c43

Please sign in to comment.