Skip to content

Commit

Permalink
Merge dfec68d into 608fdcc
Browse files Browse the repository at this point in the history
  • Loading branch information
QuangTung97 committed Apr 4, 2024
2 parents 608fdcc + dfec68d commit 4ec5e7e
Show file tree
Hide file tree
Showing 24 changed files with 298 additions and 46 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ jobs:
- uses: actions/setup-go@v2
with:
go-version: "1.21.9"
- name: Install Tools
run: make install-tools
- name: Lint
run: make lint
- name: Test
Expand Down
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
.PHONY: test lint build coverage
.PHONY: test lint build coverage install-tools

test:
go test -race -p 1 -count=1 -tags=integration -covermode=atomic -coverprofile=coverage.out ./...

lint:
echo "Do nothing"
$(foreach f,$(shell go fmt ./...),@echo "Forgot to format file: ${f}"; exit 1;)
go vet ./...
revive -config revive.toml -formatter friendly ./...

build:
go build -o bin/lock test-examples/lock/main.go

coverage:
go tool cover -func coverage.out | grep ^total

install-tools:
go install github.com/mgechev/revive
31 changes: 30 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"
)

// NewClient ...
func NewClient(servers []string, sessionTimeout time.Duration, options ...Option) (*Client, error) {
c, err := newClientInternal(servers, sessionTimeout, options...)
if err != nil {
Expand Down Expand Up @@ -106,36 +107,43 @@ func (c *Client) getRecvTimeout() time.Duration {
// Option ...
type Option func(c *Client)

// WithSessionEstablishedCallback ...
func WithSessionEstablishedCallback(callback func(c *Client)) Option {
return func(c *Client) {
c.sessEstablishedCallback = callback
}
}

// WithSessionExpiredCallback ...
func WithSessionExpiredCallback(callback func(c *Client)) Option {
return func(c *Client) {
c.sessExpiredCallback = callback
}
}

// WithReconnectingCallback is the callback when a new connection is re-established after disconnect
// and session is not expired yet
func WithReconnectingCallback(callback func(c *Client)) Option {
return func(c *Client) {
c.reconnectingCallback = callback
}
}

// WithDialRetryDuration ...
func WithDialRetryDuration(d time.Duration) Option {
return func(c *Client) {
c.dialRetryDuration = d
}
}

// WithServerSelector ...
func WithServerSelector(selector ServerSelector) Option {
return func(c *Client) {
c.selector = selector
}
}

// WithDialTimeoutFunc ...
func WithDialTimeoutFunc(
dialFunc func(addr string, timeout time.Duration) (NetworkConn, error),
) Option {
Expand All @@ -144,6 +152,7 @@ func WithDialTimeoutFunc(
}
}

// WithLogger ...
func WithLogger(l Logger) Option {
return func(c *Client) {
c.logger = l
Expand Down Expand Up @@ -178,6 +187,7 @@ type clientWatchRequest struct {
callback func(ev clientWatchEvent)
}

// NetworkConn for connections when connecting to zookeeper.
type NetworkConn interface {
io.Reader
io.Writer
Expand Down Expand Up @@ -756,7 +766,6 @@ func (c *Client) enqueueAlreadyLocked(
callback func(resp any, zxid int64, err error),
watch clientWatchRequest, setAuth bool,
) {

xid := pingRequestXid
if opCode != opPing {
xid = c.nextXid()
Expand Down Expand Up @@ -1069,6 +1078,7 @@ func (c *Client) Close() {
c.logger.Infof("Shutdown completed")
}

// CreateResponse is the response of Create
type CreateResponse struct {
Zxid int64
Path string
Expand Down Expand Up @@ -1109,6 +1119,7 @@ func (c *Client) Create(
)
}

// ChildrenResponse is the response of Children
type ChildrenResponse struct {
Zxid int64
Children []string
Expand All @@ -1119,8 +1130,10 @@ type childrenOpts struct {
watchCallback func(ev Event)
}

// ChildrenOption ...
type ChildrenOption func(opts *childrenOpts)

// WithChildrenWatch set the watch for Children
func WithChildrenWatch(callback func(ev Event)) ChildrenOption {
return func(opts *childrenOpts) {
if callback == nil {
Expand All @@ -1131,6 +1144,7 @@ func WithChildrenWatch(callback func(ev Event)) ChildrenOption {
}
}

// Children ...
func (c *Client) Children(
path string,
callback func(resp ChildrenResponse, err error),
Expand Down Expand Up @@ -1192,6 +1206,7 @@ func (c *Client) Children(
)
}

// GetResponse is the response of Get
type GetResponse struct {
Zxid int64
Data []byte
Expand All @@ -1203,8 +1218,10 @@ type getOpts struct {
watchCallback func(ev Event)
}

// GetOption ...
type GetOption func(opts *getOpts)

// WithGetWatch set the watch for Get
func WithGetWatch(callback func(ev Event)) GetOption {
return func(opts *getOpts) {
if callback == nil {
Expand All @@ -1215,6 +1232,7 @@ func WithGetWatch(callback func(ev Event)) GetOption {
}
}

// Get data of znode
func (c *Client) Get(
path string,
callback func(resp GetResponse, err error),
Expand Down Expand Up @@ -1277,11 +1295,13 @@ func (c *Client) Get(
)
}

// SetResponse is the response of Set
type SetResponse struct {
Zxid int64
Stat Stat
}

// Set update data of znode
func (c *Client) Set(
path string, data []byte, version int32,
callback func(resp SetResponse, err error),
Expand Down Expand Up @@ -1311,6 +1331,7 @@ func (c *Client) Set(
)
}

// ExistsResponse is response for Exist request
type ExistsResponse struct {
Zxid int64
Stat Stat
Expand All @@ -1321,8 +1342,10 @@ type existsOpts struct {
watchCallback func(ev Event)
}

// ExistsOption ...
type ExistsOption func(opts *existsOpts)

// WithExistsWatch set the watch for Exists
func WithExistsWatch(callback func(ev Event)) ExistsOption {
return func(opts *existsOpts) {
if callback == nil {
Expand All @@ -1333,6 +1356,7 @@ func WithExistsWatch(callback func(ev Event)) ExistsOption {
}
}

// Exists check the existence of a znode
func (c *Client) Exists(
path string,
callback func(resp ExistsResponse, err error),
Expand Down Expand Up @@ -1394,10 +1418,12 @@ func (c *Client) Exists(
)
}

// DeleteResponse is the response of Delete
type DeleteResponse struct {
Zxid int64
}

// Delete znode with compare and set using version number
func (c *Client) Delete(
path string, version int32,
callback func(resp DeleteResponse, err error),
Expand Down Expand Up @@ -1431,6 +1457,7 @@ func (c *Client) Delete(
)
}

// AddAuthResponse is the response of AddAuth request
type AddAuthResponse struct {
Zxid int64
}
Expand Down Expand Up @@ -1463,6 +1490,7 @@ func (c *Client) AddAuth(
)
}

// SetACLResponse is response of SetACL request
type SetACLResponse struct {
Zxid int64
Stat Stat
Expand Down Expand Up @@ -1506,6 +1534,7 @@ func (c *Client) SetACL(
)
}

// GetACLResponse is response of GetACL request
type GetACLResponse struct {
Zxid int64
ACL []ACL
Expand Down
2 changes: 2 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (c *clientTest) addToWatchMap() {
}
}

//revive:disable-next-line:cognitive-complexity
func TestClient_Authenticate(t *testing.T) {
t.Run("check init state", func(t *testing.T) {
c, err := newClientInternal([]string{"server01"}, 6*time.Second)
Expand Down Expand Up @@ -1004,6 +1005,7 @@ func TestClient_Exists_With_Add_Watch_With_Error(t *testing.T) {
assert.Equal(t, nil, existErr)
}

//revive:disable-next-line:cognitive-complexity
func TestClient_DoConnect(t *testing.T) {
t.Run("authenticate error", func(t *testing.T) {
var dialAddr string
Expand Down
7 changes: 7 additions & 0 deletions concurrency/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@ import (
"github.com/QuangTung97/zk/curator"
)

// Lock is for distributed lock.
// But this distributed lock should only be used when accessing Zookeeper.
// It is UNSAFE to access external resources when obtaining distributed locks in general.
type Lock struct {
parent string
nodeID string
onGranted func(sess *curator.Session)
}

// NewLock needs a path to a parent znode containing locking nodes.
// nodeID should be at least have length of 16 characters and should be an uuid
func NewLock(parent string, nodeID string) *Lock {
e := &Lock{
nodeID: nodeID,
Expand All @@ -32,6 +37,8 @@ const (
lockStatusGranted
)

// Start should be used with curator.NewChain to chaining with actions
// that happen before locking start and after it becomes a leader
func (e *Lock) Start(sess *curator.Session, next func(sess *curator.Session)) {
e.onGranted = next
e.initFunc(sess)
Expand Down
8 changes: 6 additions & 2 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
)

//revive:disable:exported

const (
protocolVersion = 0
// DefaultPort is the default port listened by server.
Expand Down Expand Up @@ -120,8 +122,10 @@ var (
ErrClosing = errors.New("zk: zookeeper is closing")
ErrNothing = errors.New("zk: no server responses to process")
ErrSessionMoved = errors.New("zk: session moved to another server, so operation is ignored")
ErrReconfigDisabled = errors.New("attempts to perform a reconfiguration operation when reconfiguration feature is disabled")
ErrBadArguments = errors.New("invalid arguments")
ErrReconfigDisabled = errors.New(
"attempts to perform a reconfiguration operation when reconfiguration feature is disabled",
)
ErrBadArguments = errors.New("invalid arguments")

errCodeToError = map[ErrCode]error{
0: nil,
Expand Down
4 changes: 4 additions & 0 deletions curator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"github.com/QuangTung97/zk"
)

// ClientFactory for creating Client
type ClientFactory interface {
Start(runner SessionRunner)
Close()
}

// Client is a simpler interface for zookeeper client, mostly for mocking & faking for testing purpose
type Client interface {
Get(path string, callback func(resp zk.GetResponse, err error))
GetW(path string,
Expand Down Expand Up @@ -40,6 +42,7 @@ type clientImpl struct {
acl []zk.ACL
}

// NewClientFactory creates a ClientFactory
func NewClientFactory(servers []string, username string, password string) ClientFactory {
return &clientFactoryImpl{
servers: servers,
Expand Down Expand Up @@ -95,6 +98,7 @@ func (f *clientFactoryImpl) Close() {
}
}

// NewClient creates a Client
func NewClient(zkClient *zk.Client, acl []zk.ACL) Client {
return &clientImpl{
zkClient: zkClient,
Expand Down
Loading

0 comments on commit 4ec5e7e

Please sign in to comment.