diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 64a5995..6a95014 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -17,6 +17,8 @@ jobs: - uses: actions/setup-go@v2 with: go-version: "1.21.9" + - name: Install Tools + run: make install-tools - name: Lint run: make lint - name: Test diff --git a/Makefile b/Makefile index c66f9a4..3329d02 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,18 @@ -.PHONY: test lint build coverage +.PHONY: test lint build coverage install-tools test: go test -race -p 1 -count=1 -tags=integration -covermode=atomic -coverprofile=coverage.out ./... lint: - echo "Do nothing" + $(foreach f,$(shell go fmt ./...),@echo "Forgot to format file: ${f}"; exit 1;) + go vet ./... + revive -config revive.toml -formatter friendly ./... build: go build -o bin/lock test-examples/lock/main.go coverage: go tool cover -func coverage.out | grep ^total + +install-tools: + go install github.com/mgechev/revive diff --git a/client.go b/client.go index 35094b8..1e3b4fa 100644 --- a/client.go +++ b/client.go @@ -12,6 +12,7 @@ import ( "time" ) +// NewClient ... func NewClient(servers []string, sessionTimeout time.Duration, options ...Option) (*Client, error) { c, err := newClientInternal(servers, sessionTimeout, options...) if err != nil { @@ -106,36 +107,43 @@ func (c *Client) getRecvTimeout() time.Duration { // Option ... type Option func(c *Client) +// WithSessionEstablishedCallback ... func WithSessionEstablishedCallback(callback func(c *Client)) Option { return func(c *Client) { c.sessEstablishedCallback = callback } } +// WithSessionExpiredCallback ... func WithSessionExpiredCallback(callback func(c *Client)) Option { return func(c *Client) { c.sessExpiredCallback = callback } } +// WithReconnectingCallback is the callback when a new connection is re-established after disconnect +// and session is not expired yet func WithReconnectingCallback(callback func(c *Client)) Option { return func(c *Client) { c.reconnectingCallback = callback } } +// WithDialRetryDuration ... func WithDialRetryDuration(d time.Duration) Option { return func(c *Client) { c.dialRetryDuration = d } } +// WithServerSelector ... func WithServerSelector(selector ServerSelector) Option { return func(c *Client) { c.selector = selector } } +// WithDialTimeoutFunc ... func WithDialTimeoutFunc( dialFunc func(addr string, timeout time.Duration) (NetworkConn, error), ) Option { @@ -144,6 +152,7 @@ func WithDialTimeoutFunc( } } +// WithLogger ... func WithLogger(l Logger) Option { return func(c *Client) { c.logger = l @@ -178,6 +187,7 @@ type clientWatchRequest struct { callback func(ev clientWatchEvent) } +// NetworkConn for connections when connecting to zookeeper. type NetworkConn interface { io.Reader io.Writer @@ -756,7 +766,6 @@ func (c *Client) enqueueAlreadyLocked( callback func(resp any, zxid int64, err error), watch clientWatchRequest, setAuth bool, ) { - xid := pingRequestXid if opCode != opPing { xid = c.nextXid() @@ -1069,6 +1078,7 @@ func (c *Client) Close() { c.logger.Infof("Shutdown completed") } +// CreateResponse is the response of Create type CreateResponse struct { Zxid int64 Path string @@ -1109,6 +1119,7 @@ func (c *Client) Create( ) } +// ChildrenResponse is the response of Children type ChildrenResponse struct { Zxid int64 Children []string @@ -1119,8 +1130,10 @@ type childrenOpts struct { watchCallback func(ev Event) } +// ChildrenOption ... type ChildrenOption func(opts *childrenOpts) +// WithChildrenWatch set the watch for Children func WithChildrenWatch(callback func(ev Event)) ChildrenOption { return func(opts *childrenOpts) { if callback == nil { @@ -1131,6 +1144,7 @@ func WithChildrenWatch(callback func(ev Event)) ChildrenOption { } } +// Children ... func (c *Client) Children( path string, callback func(resp ChildrenResponse, err error), @@ -1192,6 +1206,7 @@ func (c *Client) Children( ) } +// GetResponse is the response of Get type GetResponse struct { Zxid int64 Data []byte @@ -1203,8 +1218,10 @@ type getOpts struct { watchCallback func(ev Event) } +// GetOption ... type GetOption func(opts *getOpts) +// WithGetWatch set the watch for Get func WithGetWatch(callback func(ev Event)) GetOption { return func(opts *getOpts) { if callback == nil { @@ -1215,6 +1232,7 @@ func WithGetWatch(callback func(ev Event)) GetOption { } } +// Get data of znode func (c *Client) Get( path string, callback func(resp GetResponse, err error), @@ -1277,11 +1295,13 @@ func (c *Client) Get( ) } +// SetResponse is the response of Set type SetResponse struct { Zxid int64 Stat Stat } +// Set update data of znode func (c *Client) Set( path string, data []byte, version int32, callback func(resp SetResponse, err error), @@ -1311,6 +1331,7 @@ func (c *Client) Set( ) } +// ExistsResponse is response for Exist request type ExistsResponse struct { Zxid int64 Stat Stat @@ -1321,8 +1342,10 @@ type existsOpts struct { watchCallback func(ev Event) } +// ExistsOption ... type ExistsOption func(opts *existsOpts) +// WithExistsWatch set the watch for Exists func WithExistsWatch(callback func(ev Event)) ExistsOption { return func(opts *existsOpts) { if callback == nil { @@ -1333,6 +1356,7 @@ func WithExistsWatch(callback func(ev Event)) ExistsOption { } } +// Exists check the existence of a znode func (c *Client) Exists( path string, callback func(resp ExistsResponse, err error), @@ -1394,10 +1418,12 @@ func (c *Client) Exists( ) } +// DeleteResponse is the response of Delete type DeleteResponse struct { Zxid int64 } +// Delete znode with compare and set using version number func (c *Client) Delete( path string, version int32, callback func(resp DeleteResponse, err error), @@ -1431,6 +1457,7 @@ func (c *Client) Delete( ) } +// AddAuthResponse is the response of AddAuth request type AddAuthResponse struct { Zxid int64 } @@ -1463,6 +1490,7 @@ func (c *Client) AddAuth( ) } +// SetACLResponse is response of SetACL request type SetACLResponse struct { Zxid int64 Stat Stat @@ -1506,6 +1534,7 @@ func (c *Client) SetACL( ) } +// GetACLResponse is response of GetACL request type GetACLResponse struct { Zxid int64 ACL []ACL diff --git a/client_test.go b/client_test.go index 94b4d36..c056d01 100644 --- a/client_test.go +++ b/client_test.go @@ -119,6 +119,7 @@ func (c *clientTest) addToWatchMap() { } } +//revive:disable-next-line:cognitive-complexity func TestClient_Authenticate(t *testing.T) { t.Run("check init state", func(t *testing.T) { c, err := newClientInternal([]string{"server01"}, 6*time.Second) @@ -1004,6 +1005,7 @@ func TestClient_Exists_With_Add_Watch_With_Error(t *testing.T) { assert.Equal(t, nil, existErr) } +//revive:disable-next-line:cognitive-complexity func TestClient_DoConnect(t *testing.T) { t.Run("authenticate error", func(t *testing.T) { var dialAddr string diff --git a/concurrency/lock.go b/concurrency/lock.go index abee28e..168f1ac 100644 --- a/concurrency/lock.go +++ b/concurrency/lock.go @@ -10,12 +10,17 @@ import ( "github.com/QuangTung97/zk/curator" ) +// Lock is for distributed lock. +// But this distributed lock should only be used when accessing Zookeeper. +// It is UNSAFE to access external resources when obtaining distributed locks in general. type Lock struct { parent string nodeID string onGranted func(sess *curator.Session) } +// NewLock needs a path to a parent znode containing locking nodes. +// nodeID should be at least have length of 16 characters and should be an uuid func NewLock(parent string, nodeID string) *Lock { e := &Lock{ nodeID: nodeID, @@ -32,6 +37,8 @@ const ( lockStatusGranted ) +// Start should be used with curator.NewChain to chaining with actions +// that happen before locking start and after it becomes a leader func (e *Lock) Start(sess *curator.Session, next func(sess *curator.Session)) { e.onGranted = next e.initFunc(sess) diff --git a/constants.go b/constants.go index 39ec774..eccac7d 100644 --- a/constants.go +++ b/constants.go @@ -5,6 +5,8 @@ import ( "fmt" ) +//revive:disable:exported + const ( protocolVersion = 0 // DefaultPort is the default port listened by server. @@ -120,8 +122,10 @@ var ( ErrClosing = errors.New("zk: zookeeper is closing") ErrNothing = errors.New("zk: no server responses to process") ErrSessionMoved = errors.New("zk: session moved to another server, so operation is ignored") - ErrReconfigDisabled = errors.New("attempts to perform a reconfiguration operation when reconfiguration feature is disabled") - ErrBadArguments = errors.New("invalid arguments") + ErrReconfigDisabled = errors.New( + "attempts to perform a reconfiguration operation when reconfiguration feature is disabled", + ) + ErrBadArguments = errors.New("invalid arguments") errCodeToError = map[ErrCode]error{ 0: nil, diff --git a/curator/client.go b/curator/client.go index c7f94af..911542d 100644 --- a/curator/client.go +++ b/curator/client.go @@ -6,11 +6,13 @@ import ( "github.com/QuangTung97/zk" ) +// ClientFactory for creating Client type ClientFactory interface { Start(runner SessionRunner) Close() } +// Client is a simpler interface for zookeeper client, mostly for mocking & faking for testing purpose type Client interface { Get(path string, callback func(resp zk.GetResponse, err error)) GetW(path string, @@ -40,6 +42,7 @@ type clientImpl struct { acl []zk.ACL } +// NewClientFactory creates a ClientFactory func NewClientFactory(servers []string, username string, password string) ClientFactory { return &clientFactoryImpl{ servers: servers, @@ -95,6 +98,7 @@ func (f *clientFactoryImpl) Close() { } } +// NewClient creates a Client func NewClient(zkClient *zk.Client, acl []zk.ACL) Client { return &clientImpl{ zkClient: zkClient, diff --git a/curator/curator.go b/curator/curator.go index a0e891a..5d3a29e 100644 --- a/curator/curator.go +++ b/curator/curator.go @@ -1,5 +1,7 @@ package curator +// Curator is used for maintaining Session object. +// when new zookeeper session is established, old Session will be invalided and can NOT be used anymore type Curator struct { initFunc func(sess *Session) @@ -7,17 +9,20 @@ type Curator struct { sess *Session } +// SessionRunner is an interface that be implemented by Curator type SessionRunner interface { Begin(client Client) Retry() End() } +// Session represents a zookeeper session type Session struct { retryFuncs []func(sess *Session) state *Curator } +// New creates a Curator with simple init function when session started func New( initFunc func(sess *Session), ) *Curator { @@ -26,8 +31,11 @@ func New( } } +// SessionCallback ... type SessionCallback func(sess *Session, next func(sess *Session)) +// NewChain creates a chain of callbacks when next callback is called only after the previous callback allows. +// For example when doing locking, ONLY after the lock is granted the next callback could allow to run. func NewChain( initFuncList ...SessionCallback, ) *Curator { @@ -44,6 +52,7 @@ func NewChain( } } +// Begin callback when new session is established func (c *Curator) Begin(client Client) { c.client = client c.sess = &Session{ @@ -52,6 +61,7 @@ func (c *Curator) Begin(client Client) { c.initFunc(c.sess) } +// Retry callback when new connection is established after disconnecting func (c *Curator) Retry() { if c.sess == nil { return @@ -62,6 +72,7 @@ func (c *Curator) Retry() { c.sess.retryFuncs = nil } +// End callback when current session is expired func (c *Curator) End() { c.sess = nil } @@ -81,6 +92,8 @@ func (s *Session) getClient() nullClient { } } +// Run allows to access to the Client object for accessing zookeeper. +// The callback fn function is only be called when the session is still active. func (s *Session) Run(fn func(client Client)) { sessClient := s.getClient() if !sessClient.valid { @@ -89,6 +102,7 @@ func (s *Session) Run(fn func(client Client)) { fn(sessClient.client) } +// AddRetry add a callback function that will be called after connection is re-established. func (s *Session) AddRetry(callback func(sess *Session)) { s.retryFuncs = append(s.retryFuncs, callback) } diff --git a/curator/fake_client.go b/curator/fake_client.go index f135983..c289af9 100644 --- a/curator/fake_client.go +++ b/curator/fake_client.go @@ -9,13 +9,16 @@ import ( "github.com/QuangTung97/zk" ) +// FakeSessionState ... type FakeSessionState struct { SessionID int64 HasSession bool } +// FakeClientID ... type FakeClientID string +// ZNode ... type ZNode struct { Name string Data []byte @@ -32,6 +35,7 @@ type ZNode struct { DataWatches []func(ev zk.Event) } +// FakeZookeeper ... type FakeZookeeper struct { States map[FakeClientID]*FakeSessionState Sessions map[FakeClientID]SessionRunner @@ -45,6 +49,7 @@ type FakeZookeeper struct { Zxid int64 } +// NewFakeZookeeper ... func NewFakeZookeeper() *FakeZookeeper { return &FakeZookeeper{ States: map[FakeClientID]*FakeSessionState{}, @@ -65,6 +70,7 @@ type fakeClientFactory struct { clientID FakeClientID } +// NewFakeClientFactory ... func NewFakeClientFactory(store *FakeZookeeper, clientID FakeClientID) ClientFactory { store.States[clientID] = &FakeSessionState{ HasSession: false, @@ -84,9 +90,11 @@ func (c *fakeClientFactory) Start(runner SessionRunner) { } } +// Close ... func (c *fakeClientFactory) Close() { } +// Begin ... func (s *FakeZookeeper) Begin(clientID FakeClientID) { state := s.States[clientID] if state.HasSession { @@ -121,6 +129,7 @@ func (s *FakeZookeeper) runAllCallbacksWithConnectionError(clientID FakeClientID } } +// SessionExpired ... func (s *FakeZookeeper) SessionExpired(clientID FakeClientID) { state := s.States[clientID] if !state.HasSession { @@ -206,6 +215,7 @@ func getActionWithType[T any](s *FakeZookeeper, clientID FakeClientID, methodNam return val } +// PrintPendingCalls ... func (s *FakeZookeeper) PrintPendingCalls() { for client, actions := range s.Pending { if len(actions) == 0 { @@ -223,6 +233,7 @@ func (s *FakeZookeeper) PrintPendingCalls() { } } +// PrintData ... func (s *FakeZookeeper) PrintData() { s.printDataRecur(s.Root, "") } @@ -247,6 +258,7 @@ func (s *FakeZookeeper) printDataRecur(node *ZNode, space string) { } } +// PendingCalls ... func (s *FakeZookeeper) PendingCalls(clientID FakeClientID) []string { values := make([]string, 0) for _, input := range s.Pending[clientID] { @@ -278,6 +290,7 @@ func (s *FakeZookeeper) PendingCalls(clientID FakeClientID) []string { return values } +// CreateCall ... func (s *FakeZookeeper) CreateCall(clientID FakeClientID) CreateInput { return getActionWithType[CreateInput](s, clientID, "Create") } @@ -288,10 +301,12 @@ func (s *FakeZookeeper) popFirst(clientID FakeClientID) { s.Pending[clientID] = actions } +// CreateApply ... func (s *FakeZookeeper) CreateApply(clientID FakeClientID) { s.createApplyWithErr(clientID, nil) } +// CreateApplyError ... func (s *FakeZookeeper) CreateApplyError(clientID FakeClientID) { s.createApplyWithErr(clientID, zk.ErrConnectionClosed) } @@ -354,11 +369,13 @@ func (s *FakeZookeeper) notifyChildrenWatches(parent *ZNode, path string) { parent.ChildrenWatches = nil } +// ConnError ... func (s *FakeZookeeper) ConnError(clientID FakeClientID) { s.runAllCallbacksWithConnectionError(clientID) s.appendActions(clientID, RetryInput{}) } +// ChildrenApply ... func (s *FakeZookeeper) ChildrenApply(clientID FakeClientID) { input := getActionWithType[ChildrenInput](s, clientID, "Children") @@ -381,6 +398,7 @@ func (s *FakeZookeeper) ChildrenApply(clientID FakeClientID) { }, nil) } +// GetApply ... func (s *FakeZookeeper) GetApply(clientID FakeClientID) { input := getActionWithType[GetInput](s, clientID, "Get") @@ -401,6 +419,7 @@ func (s *FakeZookeeper) GetApply(clientID FakeClientID) { }, nil) } +// SetApply ... func (s *FakeZookeeper) SetApply(clientID FakeClientID) { input := getActionWithType[SetInput](s, clientID, "Set") @@ -429,6 +448,7 @@ func (s *FakeZookeeper) SetApply(clientID FakeClientID) { }, nil) } +// DeleteApply ... func (s *FakeZookeeper) DeleteApply(clientID FakeClientID) { input := getActionWithType[DeleteInput](s, clientID, "Delete") node := s.findNode(input.Path) @@ -475,6 +495,7 @@ func (s *FakeZookeeper) notifyDataWatches(node *ZNode, path string, eventType zk node.DataWatches = nil } +// Retry ... func (s *FakeZookeeper) Retry(clientID FakeClientID) { getActionWithType[RetryInput](s, clientID, "Retry") diff --git a/curator/fake_structs.go b/curator/fake_structs.go index f987a74..e4143db 100644 --- a/curator/fake_structs.go +++ b/curator/fake_structs.go @@ -4,6 +4,7 @@ import ( "github.com/QuangTung97/zk" ) +// ChildrenInput ... type ChildrenInput struct { Path string Callback func(resp zk.ChildrenResponse, err error) @@ -11,6 +12,7 @@ type ChildrenInput struct { Watcher func(ev zk.Event) } +// CreateInput ... type CreateInput struct { Path string Data []byte @@ -18,6 +20,7 @@ type CreateInput struct { Callback func(resp zk.CreateResponse, err error) } +// GetInput ... type GetInput struct { Path string Callback func(resp zk.GetResponse, err error) @@ -25,6 +28,7 @@ type GetInput struct { Watcher func(ev zk.Event) } +// SetInput ... type SetInput struct { Path string Data []byte @@ -32,11 +36,13 @@ type SetInput struct { Callback func(resp zk.SetResponse, err error) } +// DeleteInput ... type DeleteInput struct { Path string Version int32 Callback func(resp zk.DeleteResponse, err error) } +// RetryInput ... type RetryInput struct { } diff --git a/curator/util.go b/curator/util.go index 89320ac..64f9dd8 100644 --- a/curator/util.go +++ b/curator/util.go @@ -4,6 +4,7 @@ type parallelRunnerImpl struct { runners []SessionRunner } +// NewParallelRunner creates a SessionRunner that calls methods of all input runners func NewParallelRunner(runners ...SessionRunner) SessionRunner { return ¶llelRunnerImpl{ runners: runners, diff --git a/curator/util_test.go b/curator/util_test.go index 19083ed..2711046 100644 --- a/curator/util_test.go +++ b/curator/util_test.go @@ -55,5 +55,4 @@ func TestParallelRunner(t *testing.T) { "init02", }, steps) }) - } diff --git a/go.mod b/go.mod index 1135e73..5974932 100644 --- a/go.mod +++ b/go.mod @@ -2,10 +2,28 @@ module github.com/QuangTung97/zk go 1.21.2 -require github.com/stretchr/testify v1.9.0 +require ( + github.com/mgechev/revive v1.3.7 + github.com/stretchr/testify v1.9.0 +) require ( + github.com/BurntSushi/toml v1.3.2 // indirect + github.com/chavacava/garif v0.1.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/fatih/color v1.16.0 // indirect + github.com/fatih/structtag v1.2.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-runewidth v0.0.9 // indirect + github.com/mgechev/dots v0.0.0-20210922191527-e955255bf517 // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/olekukonko/tablewriter v0.0.5 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/spf13/afero v1.11.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect + golang.org/x/tools v0.17.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 60ce688..e1e5e38 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,53 @@ +github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= +github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/chavacava/garif v0.1.0 h1:2JHa3hbYf5D9dsgseMKAmc/MZ109otzgNFk5s87H9Pc= +github.com/chavacava/garif v0.1.0/go.mod h1:XMyYCkEL58DF0oyW4qDjjnPWONs2HBqYKI+UIPD+Gww= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= +github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= +github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4= +github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mgechev/dots v0.0.0-20210922191527-e955255bf517 h1:zpIH83+oKzcpryru8ceC6BxnoG8TBrhgAvRg8obzup0= +github.com/mgechev/dots v0.0.0-20210922191527-e955255bf517/go.mod h1:KQ7+USdGKfpPjXk4Ga+5XxQM4Lm4e3gAogrreFAYpOg= +github.com/mgechev/revive v1.3.7 h1:502QY0vQGe9KtYJ9FpxMz9rL+Fc/P13CI5POL4uHCcE= +github.com/mgechev/revive v1.3.7/go.mod h1:RJ16jUbF0OWC3co/+XTxmFNgEpUPwnnA0BRllX2aDNA= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= +github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= +golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/logger.go b/logger.go index d617807..fa937c9 100644 --- a/logger.go +++ b/logger.go @@ -4,6 +4,7 @@ import ( "log" ) +// Logger ... type Logger interface { Infof(format string, args ...any) Warnf(format string, args ...any) diff --git a/network.go b/network.go index e988543..04f4862 100644 --- a/network.go +++ b/network.go @@ -9,6 +9,7 @@ type tcpConnImpl struct { conn net.Conn } +// NewTCPConn ... func NewTCPConn(conn net.Conn) NetworkConn { return &tcpConnImpl{ conn: conn, diff --git a/revive.toml b/revive.toml new file mode 100644 index 0000000..d9a20f8 --- /dev/null +++ b/revive.toml @@ -0,0 +1,54 @@ +severity = "error" +confidence = 0.8 + +# Sets the error code for failures with severity "error" +errorCode = 2 +# Sets the error code for failures with severity "warning" +warningCode = 1 + +# Enable all available rules +enableAllRules = true + +# Disabled rules +[rule.file-header] +Disabled = true +[rule.max-public-structs] +Disabled = true +[rule.function-length] +Disabled = true +[rule.add-constant] +Disabled = true +[rule.deep-exit] +Disabled = true +[rule.unused-receiver] +Disabled = true +[rule.banned-characters] +Disabled = true +[rule.package-comments] +Disabled = true +[rule.get-return] +Disabled = true +[rule.unchecked-type-assertion] +Disabled = true +[rule.var-naming] +Disabled = true +[rule.flag-parameter] +Disabled = true +[rule.unused-parameter] +Disabled = true + +# Rule tuning +[rule.argument-limit] +Arguments = [6] +[rule.cyclomatic] +Arguments = [14] +[rule.cognitive-complexity] +Arguments = [20] +[rule.function-result-limit] +Arguments = [4] +[rule.unhandled-error] +Arguments = ["fmt.Printf", "fmt.Println"] +[rule.line-length-limit] +Arguments = [120] +[rule.defer] +Arguments = [["loop", "method-call", "recover", "return"]] diff --git a/selector.go b/selector.go index 5ed5ace..6aa367e 100644 --- a/selector.go +++ b/selector.go @@ -5,17 +5,23 @@ import ( "sync" ) +// SelectNextOutput ... type SelectNextOutput struct { Server string - RetryStart bool + RetryStart bool // equal to true if the client should sleep after retry to the next server } +// ServerSelector for selecting servers for connecting type ServerSelector interface { + // Init the selector Init(servers []string) + // Next choose the next server for connecting Next() SelectNextOutput + // NotifyConnected notify the selector for retrying the same address later after successfully connected NotifyConnected() } +// ServerListSelector ... type ServerListSelector struct { mut sync.Mutex servers []string @@ -25,12 +31,14 @@ type ServerListSelector struct { notified bool } +// NewServerListSelector ... func NewServerListSelector(seed int64) ServerSelector { return &ServerListSelector{ rand: rand.New(rand.NewSource(seed)), } } +// Init ... func (s *ServerListSelector) Init(servers []string) { s.mut.Lock() defer s.mut.Unlock() @@ -41,6 +49,7 @@ func (s *ServerListSelector) Init(servers []string) { s.numNextCalls = 0 } +// Next ... func (s *ServerListSelector) Next() SelectNextOutput { s.mut.Lock() defer s.mut.Unlock() @@ -62,6 +71,7 @@ func (s *ServerListSelector) Next() SelectNextOutput { } } +// NotifyConnected ... func (s *ServerListSelector) NotifyConnected() { s.mut.Lock() defer s.mut.Unlock() diff --git a/structs.go b/structs.go index 2178994..f9da0cb 100644 --- a/structs.go +++ b/structs.go @@ -10,30 +10,39 @@ import ( "time" ) +//revive:disable:exported var ( ErrUnhandledFieldType = errors.New("zk: unhandled field type") ErrPtrExpected = errors.New("zk: encode/decode expect a non-nil pointer to struct") ErrShortBuffer = errors.New("zk: buffer too small") ) +//revive:enable:exported + +// ACL ... type ACL struct { Perms int32 Scheme string ID string } +// Stat znode stat info type Stat struct { - Czxid int64 // The zxid of the change that caused this znode to be created. - Mzxid int64 // The zxid of the change that last modified this znode. - Ctime int64 // The time in milliseconds from epoch when this znode was created. - Mtime int64 // The time in milliseconds from epoch when this znode was last modified. - Version int32 // The number of changes to the data of this znode. - Cversion int32 // The number of changes to the children of this znode. - Aversion int32 // The number of changes to the ACL of this znode. - EphemeralOwner int64 // The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero. - DataLength int32 // The length of the data field of this znode. - NumChildren int32 // The number of children of this znode. - Pzxid int64 // last modified children + Czxid int64 // The zxid of the change that caused this znode to be created. + Mzxid int64 // The zxid of the change that last modified this znode. + Ctime int64 // The time in milliseconds from epoch when this znode was created. + Mtime int64 // The time in milliseconds from epoch when this znode was last modified. + Version int32 // The number of changes to the data of this znode. + Cversion int32 // The number of changes to the children of this znode. + Aversion int32 // The number of changes to the ACL of this znode. + + // The session id of the owner of this znode if the znode is an ephemeral node. + // If it is not an ephemeral node, it will be zero. + EphemeralOwner int64 + + DataLength int32 // The length of the data field of this znode. + NumChildren int32 // The number of children of this znode. + Pzxid int64 // last modified children } // ServerClient is the information for a single Zookeeper client and its session. @@ -114,6 +123,7 @@ type pathRequest struct { Path string } +// PathVersionRequest ... type PathVersionRequest struct { Path string Version int32 @@ -132,8 +142,7 @@ type statResponse struct { Stat Stat } -// - +// CheckVersionRequest ... type CheckVersionRequest PathVersionRequest type closeRequest struct{} type closeResponse struct{} @@ -153,6 +162,7 @@ type connectResponse struct { Passwd []byte } +// CreateRequest ... type CreateRequest struct { Path string Data []byte @@ -160,8 +170,10 @@ type CreateRequest struct { Flags int32 } +// CreateContainerRequest ... type CreateContainerRequest CreateRequest +// CreateTTLRequest ... type CreateTTLRequest struct { Path string Data []byte @@ -171,6 +183,8 @@ type CreateTTLRequest struct { } type createResponse pathResponse + +// DeleteRequest ... type DeleteRequest PathVersionRequest type deleteResponse struct{} @@ -214,6 +228,7 @@ type setAclRequest struct { type setAclResponse statResponse +// SetDataRequest ... type SetDataRequest struct { Path string Data []byte @@ -239,7 +254,7 @@ type setAuthResponse struct{} type multiRequestOp struct { Header multiHeader - Op interface{} + Op any } type multiRequest struct { Ops []multiRequestOp @@ -380,7 +395,7 @@ type encoder interface { Encode(buf []byte) (int, error) } -func decodePacket(buf []byte, st interface{}) (n int, err error) { +func decodePacket(buf []byte, st any) (n int, err error) { defer func() { if r := recover(); r != nil { panicErr, ok := r.(error) @@ -406,6 +421,7 @@ func decodePacket(buf []byte, st interface{}) (n int, err error) { return decodePacketValue(buf, v) } +//revive:disable-next-line:cyclomatic,cognitive-complexity func decodePacketValue(buf []byte, v reflect.Value) (int, error) { rv := v kind := v.Kind() @@ -479,7 +495,7 @@ func decodePacketValue(buf []byte, v reflect.Value) (int, error) { return n, nil } -func encodePacket(buf []byte, st interface{}) (n int, err error) { +func encodePacket(buf []byte, st any) (n int, err error) { defer func() { if r := recover(); r != nil { panicErr, ok := r.(error) @@ -505,6 +521,7 @@ func encodePacket(buf []byte, st interface{}) (n int, err error) { return encodePacketValue(buf, v) } +//revive:disable-next-line:cyclomatic,cognitive-complexity func encodePacketValue(buf []byte, v reflect.Value) (int, error) { rv := v for v.Kind() == reflect.Ptr || v.Kind() == reflect.Interface { @@ -577,7 +594,8 @@ func encodePacketValue(buf []byte, v reflect.Value) (int, error) { return n, nil } -func requestStructForOp(op int32) interface{} { +//revive:disable-next-line:cyclomatic +func requestStructForOp(op int32) any { switch op { case opClose: return &closeRequest{} diff --git a/structs_test.go b/structs_test.go index ef4af2a..adb5419 100644 --- a/structs_test.go +++ b/structs_test.go @@ -19,7 +19,14 @@ func TestEncodeDecodePacket(t *testing.T) { encodeDecodeTest(t, &pathWatchRequest{"path", false}) encodeDecodeTest(t, &CheckVersionRequest{"/", -1}) encodeDecodeTest(t, &reconfigRequest{nil, nil, nil, -1}) - encodeDecodeTest(t, &multiRequest{Ops: []multiRequestOp{{multiHeader{opCheck, false, -1}, &CheckVersionRequest{"/", -1}}}}) + encodeDecodeTest(t, &multiRequest{ + Ops: []multiRequestOp{ + { + Header: multiHeader{Type: opCheck, Done: false, Err: -1}, + Op: &CheckVersionRequest{Path: "/", Version: -1}, + }, + }, + }) } func TestRequestStructForOp(t *testing.T) { @@ -32,7 +39,7 @@ func TestRequestStructForOp(t *testing.T) { } } -func encodeDecodeTest(t *testing.T, r interface{}) { +func encodeDecodeTest(t *testing.T, r any) { buf := make([]byte, 1024) n, err := encodePacket(buf, r) if err != nil { diff --git a/todolist b/todolist index 2f8879a..b20f792 100644 --- a/todolist +++ b/todolist @@ -1,4 +1,5 @@ *) Add Lint +*) Convert errSessionMoved to Connection Error *) Batching Read & Write to TCP (Need to do or not?) *) Stress Tests with Race Detector *) Add Multi-Ops Transactions diff --git a/tools.go b/tools.go new file mode 100644 index 0000000..a3379f0 --- /dev/null +++ b/tools.go @@ -0,0 +1,7 @@ +//go:build tools + +package tools + +import ( + _ "github.com/mgechev/revive" +) diff --git a/util.go b/util.go index ede38ec..0514af9 100644 --- a/util.go +++ b/util.go @@ -24,6 +24,7 @@ func WorldACL(perms int32) []ACL { return []ACL{{perms, "world", "anyone"}} } +// DigestACL for username password authorization. func DigestACL(perms int32, user, password string) []ACL { userPass := []byte(fmt.Sprintf("%s:%s", user, password)) h := sha1.New() @@ -66,12 +67,9 @@ func stringShuffleRand(s []string, r *rand.Rand) { } // ValidatePath will make sure a path is valid before sending the request +// +//revive:disable-next-line:cyclomatic,cognitive-complexity func ValidatePath(path string, isSequential bool) error { - return validatePath(path, isSequential) -} - -// validatePath will make sure a path is valid before sending the request -func validatePath(path string, isSequential bool) error { if path == "" { return ErrInvalidPath } diff --git a/util_test.go b/util_test.go index 462bce1..58fd69c 100644 --- a/util_test.go +++ b/util_test.go @@ -114,7 +114,7 @@ func TestValidatePath(t *testing.T) { } for _, tc := range tt { - err := validatePath(tc.path, tc.seq) + err := ValidatePath(tc.path, tc.seq) if (err != nil) == tc.valid { t.Errorf("failed to validate path %q", tc.path) } @@ -122,17 +122,17 @@ func TestValidatePath(t *testing.T) { } func TestValidatePath2(t *testing.T) { - assert.Equal(t, ErrInvalidPath, validatePath("home", false)) - assert.Equal(t, nil, validatePath("/", false)) - assert.Equal(t, ErrInvalidPath, validatePath("", false)) - assert.Equal(t, ErrInvalidPath, validatePath("/config/", false)) - assert.Equal(t, ErrInvalidPath, validatePath("/\x00", false)) - assert.Equal(t, ErrInvalidPath, validatePath("//hello", false)) - assert.Equal(t, ErrInvalidPath, validatePath("/\x01", false)) - assert.Equal(t, ErrInvalidPath, validatePath("/hello/./config", false)) - assert.Equal(t, nil, validatePath("/hello/.config", false)) - assert.Equal(t, ErrInvalidPath, validatePath("/hello/config/.", false)) - assert.Equal(t, ErrInvalidPath, validatePath("/hello/../config", false)) - assert.Equal(t, nil, validatePath("/hello/..config", false)) - assert.Equal(t, nil, validatePath("/data/", true)) + assert.Equal(t, ErrInvalidPath, ValidatePath("home", false)) + assert.Equal(t, nil, ValidatePath("/", false)) + assert.Equal(t, ErrInvalidPath, ValidatePath("", false)) + assert.Equal(t, ErrInvalidPath, ValidatePath("/config/", false)) + assert.Equal(t, ErrInvalidPath, ValidatePath("/\x00", false)) + assert.Equal(t, ErrInvalidPath, ValidatePath("//hello", false)) + assert.Equal(t, ErrInvalidPath, ValidatePath("/\x01", false)) + assert.Equal(t, ErrInvalidPath, ValidatePath("/hello/./config", false)) + assert.Equal(t, nil, ValidatePath("/hello/.config", false)) + assert.Equal(t, ErrInvalidPath, ValidatePath("/hello/config/.", false)) + assert.Equal(t, ErrInvalidPath, ValidatePath("/hello/../config", false)) + assert.Equal(t, nil, ValidatePath("/hello/..config", false)) + assert.Equal(t, nil, ValidatePath("/data/", true)) }