From d0a6fb277b81f57024df514384226e1c4dbbe932 Mon Sep 17 00:00:00 2001 From: adibrastegarnia Date: Tue, 4 Feb 2020 11:19:34 -0800 Subject: [PATCH 1/8] Add log client implementation --- go.mod | 11 +- go.sum | 10 + pkg/client/log/log.go | 617 +++++++++++++++++++++++++++++++++++++ pkg/client/log/log_test.go | 189 ++++++++++++ pkg/client/log/options.go | 145 +++++++++ pkg/client/log/session.go | 84 +++++ 6 files changed, 1055 insertions(+), 1 deletion(-) create mode 100644 pkg/client/log/log.go create mode 100644 pkg/client/log/log_test.go create mode 100644 pkg/client/log/options.go create mode 100644 pkg/client/log/session.go diff --git a/go.mod b/go.mod index 5a254e0..cb4efe2 100644 --- a/go.mod +++ b/go.mod @@ -4,10 +4,19 @@ require ( github.com/atomix/api v0.0.0-20200206211058-f075fb5b6d1b github.com/atomix/go-framework v0.0.0-20200207202010-51e205d726d2 github.com/atomix/go-local v0.0.0-20200207202057-4a81cbdd3325 + github.com/atomix/go-framework v0.0.0-20200206221034-8c7583e55420 + github.com/atomix/go-local v0.0.0-20200206221051-b1b85e86b0b7 + github.com/atomix/api v0.0.0-20200202100958-13b24edbe32d + github.com/atomix/go-framework v0.0.0-20200202102454-440bc2678f1c + github.com/atomix/go-local v0.0.0-20200202105028-743d224c66eb github.com/cenkalti/backoff v2.2.1+incompatible github.com/gogo/protobuf v1.3.1 github.com/golang/protobuf v1.3.2 github.com/google/uuid v1.1.1 github.com/stretchr/testify v1.4.0 - google.golang.org/grpc v1.23.1 + google.golang.org/grpc v1.27.0 ) + +replace github.com/atomix/api => ../api + +replace github.com/atomix/go-framework => ../go-framework diff --git a/go.sum b/go.sum index 622f24d..db7bbb3 100644 --- a/go.sum +++ b/go.sum @@ -92,7 +92,10 @@ github.com/atomix/atomix-go-node v0.0.0-20200114212450-178a2dc70336 h1:zYRXcjieg github.com/atomix/atomix-go-node v0.0.0-20200114212450-178a2dc70336/go.mod h1:DIYsaWqOiBkyE+vUgHFMM3+vCq07RUskEWN4W5cEtyE= github.com/atomix/go-client v0.0.0-20200124004211-e5e19cd4730d/go.mod h1:KBBiViOYhnvSh/U0fIYiuJ8j+k63eyRWZl42kwdseFI= github.com/atomix/go-client v0.0.0-20200203180003-61799b5ca7c2/go.mod h1:VWAEeWdocSRL1cqMs3zZ32kuIzMAbheoV02wsEVYwhw= +<<<<<<< HEAD github.com/atomix/go-client v0.0.0-20200206051325-cdc03bd1c8bc/go.mod h1:8Gdux/UtiBQK5nmzN9jtWXuH16T6JPNsAxUA2wY4xVk= +======= +>>>>>>> Add log client implementation github.com/atomix/go-framework v0.0.0-20200123235029-e29fc7d6e104/go.mod h1:Dn7tjt5LIRA/qr5afQZDh9hdtvK82uQpMrADYIlVtfQ= github.com/atomix/go-framework v0.0.0-20200124003840-f24758b13aa2 h1:4a6UlvCmvIWf+L9UIcHkR5jxtWIwr1A2PP/xcnulzIs= github.com/atomix/go-framework v0.0.0-20200124003840-f24758b13aa2/go.mod h1:vo5K/v+rc5mohoZIw9vbyj+Y/EGGaEdF6XVkEvM9CSM= @@ -120,11 +123,14 @@ github.com/atomix/go-local v0.0.0-20200207202057-4a81cbdd3325 h1:TAnk36LvpuXDYp9 github.com/atomix/go-local v0.0.0-20200207202057-4a81cbdd3325/go.mod h1:n2xWQV3vAxEHcod1K82zOHlx/+iW9gbuu/zYzo5y060= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= @@ -174,6 +180,7 @@ github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -287,8 +294,11 @@ google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ij google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.22.1 h1:/7cs52RnTJmD43s3uxzlq2U7nqVTd/37viQwMrMNlOM= google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.1 h1:q4XQuHFC6I28BKZpo6IYyb3mNO+l7lSOxRuYTCiDfXk= google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/client/log/log.go b/pkg/client/log/log.go new file mode 100644 index 0000000..4ea5bcb --- /dev/null +++ b/pkg/client/log/log.go @@ -0,0 +1,617 @@ +// Copyright 2019-present Open Networking Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/atomix/api/proto/atomix/headers" + api "github.com/atomix/api/proto/atomix/log" + "github.com/atomix/go-client/pkg/client/primitive" + "github.com/atomix/go-client/pkg/client/session" + "github.com/atomix/go-client/pkg/client/util" + "google.golang.org/grpc" +) + +// Type is the log type +const Type primitive.Type = "Log" + +// Index is the index of an entry +type Index uint64 + +// Version is the version of an entry +type Version uint64 + +// Client provides an API for creating IndexedMaps +type Client interface { + // GetLog gets the log instance of the given name + GetLog(ctx context.Context, name string, opts ...session.Option) (Log, error) +} + +// Log is a distributed log +type Log interface { + primitive.Primitive + + // Appends appends the given value to the end of the log + Append(ctx context.Context, value []byte) (*Entry, error) + + // Get gets the value of the given index + Get(ctx context.Context, index int64, opts ...GetOption) (*Entry, error) + + // FirstIndex gets the first index in the log + FirstIndex(ctx context.Context) (Index, error) + + // LastIndex gets the last index in the log + LastIndex(ctx context.Context) (Index, error) + + // PrevIndex gets the index before the given index + PrevIndex(ctx context.Context, index Index) (Index, error) + + // NextIndex gets the index after the given index + NextIndex(ctx context.Context, index Index) (Index, error) + + // FirstEntry gets the first entry in the log + FirstEntry(ctx context.Context) (*Entry, error) + + // LastEntry gets the last entry in the log + LastEntry(ctx context.Context) (*Entry, error) + + // PrevEntry gets the entry before the given index + PrevEntry(ctx context.Context, index Index) (*Entry, error) + + // NextEntry gets the entry after the given index + NextEntry(ctx context.Context, index Index) (*Entry, error) + + // Remove removes an entry from the log + Remove(ctx context.Context, index int64, opts ...RemoveOption) (*Entry, error) + + // Len returns the number of entries in the log + Len(ctx context.Context) (int, error) + + // Clear removes all entries from the log + Clear(ctx context.Context) error + + // Watch watches the log for changes + // This is a non-blocking method. If the method returns without error, log events will be pushed onto + // the given channel in the order in which they occur. + Watch(ctx context.Context, ch chan<- *Event, opts ...WatchOption) error +} + +// Entry is an indexed key/value pair +type Entry struct { + // Index is the unique, monotonically increasing, globally unique index of the entry. The index is static + // for the lifetime of a key. + Index Index + + // Version is the unique, monotonically increasing version number for the key/value pair. The version is + // suitable for use in optimistic locking. + Version Version + + // Value is the value of the pair + Value []byte + + // Timestamp + Timestamp time.Time +} + +func (kv Entry) String() string { + return fmt.Sprintf("index: %d\nvalue: %s\nversion: %d", kv.Index, string(kv.Value), kv.Version) +} + +// EventType is the type of a log event +type EventType string + +const ( + // EventNone indicates the event is not a change event + EventNone EventType = "" + + // EventAppended indicates an entry was appended to the log + EventAppended EventType = "appended" + + // EventRemoved indicates an entry was removed from the log + EventRemoved EventType = "removed" +) + +// Event is a log change event +type Event struct { + // Type indicates the change event type + Type EventType + + // Entry is the event entry + Entry *Entry +} + +// New creates a new log primitive +func New(ctx context.Context, name primitive.Name, partitions []primitive.Partition, opts ...session.Option) (Log, error) { + i, err := util.GetPartitionIndex(name.Name, len(partitions)) + if err != nil { + return nil, err + } + return newLog(ctx, name, partitions[i], opts...) +} + +// newLog creates a new Log for the given partition +func newLog(ctx context.Context, name primitive.Name, partition primitive.Partition, opts ...session.Option) (*log, error) { + sess, err := session.New(ctx, name, partition, &sessionHandler{}, opts...) + if err != nil { + return nil, err + } + return &log{ + name: name, + session: sess, + }, nil +} + +// log is the default single-partition implementation of Log +type log struct { + name primitive.Name + session *session.Session +} + +func (l *log) Name() primitive.Name { + return l.name +} + +func (l *log) Append(ctx context.Context, value []byte) (*Entry, error) { + r, err := l.session.DoCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.AppendRequest{ + Header: header, + Value: value, + Version: -1, + } + response, err := client.Append(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return nil, err + } + + response := r.(*api.AppendResponse) + if response.Status == api.ResponseStatus_OK { + return &Entry{ + Index: Index(response.Index), + Value: value, + Version: Version(response.Header.Index), + }, nil + } else if response.Status == api.ResponseStatus_PRECONDITION_FAILED { + return nil, errors.New("write condition failed") + } else if response.Status == api.ResponseStatus_WRITE_LOCK { + return nil, errors.New("write lock failed") + } else { + return &Entry{ + Index: Index(response.Index), + Value: value, + Version: Version(response.PreviousVersion), + Timestamp: response.Timestamp, + }, nil + } +} + +func (l *log) Get(ctx context.Context, index int64, opts ...GetOption) (*Entry, error) { + r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.GetRequest{ + Header: header, + Index: index, + } + for i := range opts { + opts[i].beforeGet(request) + } + response, err := client.Get(ctx, request) + if err != nil { + return nil, nil, err + } + for i := range opts { + opts[i].afterGet(response) + } + return response.Header, response, nil + }) + if err != nil { + return nil, err + } + + response := r.(*api.GetResponse) + if response.Version != 0 { + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + Version: Version(response.Version), + Timestamp: response.Timestamp, + }, nil + } + return nil, nil +} + +func (l *log) GetIndex(ctx context.Context, index Index, opts ...GetOption) (*Entry, error) { + r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.GetRequest{ + Header: header, + Index: int64(index), + } + for i := range opts { + opts[i].beforeGet(request) + } + response, err := client.Get(ctx, request) + if err != nil { + return nil, nil, err + } + for i := range opts { + opts[i].afterGet(response) + } + return response.Header, response, nil + }) + if err != nil { + return nil, err + } + + response := r.(*api.GetResponse) + if response.Version != 0 { + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + Version: Version(response.Version), + Timestamp: response.Timestamp, + }, nil + } + return nil, nil +} + +func (l *log) FirstIndex(ctx context.Context) (Index, error) { + r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.FirstEntryRequest{ + Header: header, + } + response, err := client.FirstEntry(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return 0, err + } + + response := r.(*api.FirstEntryResponse) + if response.Version != 0 { + return Index(response.Index), nil + } + return 0, nil +} + +func (l *log) LastIndex(ctx context.Context) (Index, error) { + r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.LastEntryRequest{ + Header: header, + } + response, err := client.LastEntry(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return 0, err + } + + response := r.(*api.LastEntryResponse) + if response.Version != 0 { + return Index(response.Index), nil + } + return 0, nil +} + +func (l *log) PrevIndex(ctx context.Context, index Index) (Index, error) { + r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.PrevEntryRequest{ + Header: header, + Index: int64(index), + } + response, err := client.PrevEntry(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return 0, err + } + + response := r.(*api.PrevEntryResponse) + if response.Version != 0 { + return Index(response.Index), nil + } + return 0, nil +} + +func (l *log) NextIndex(ctx context.Context, index Index) (Index, error) { + r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.NextEntryRequest{ + Header: header, + Index: int64(index), + } + response, err := client.NextEntry(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return 0, err + } + + response := r.(*api.NextEntryResponse) + if response.Version != 0 { + return Index(response.Index), nil + } + return 0, nil +} + +func (l *log) FirstEntry(ctx context.Context) (*Entry, error) { + r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.FirstEntryRequest{ + Header: header, + } + response, err := client.FirstEntry(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return nil, err + } + + response := r.(*api.FirstEntryResponse) + if response.Version != 0 { + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + Version: Version(response.Version), + Timestamp: response.Timestamp, + }, nil + } + return nil, err +} + +func (l *log) LastEntry(ctx context.Context) (*Entry, error) { + r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.LastEntryRequest{ + Header: header, + } + response, err := client.LastEntry(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return nil, err + } + + response := r.(*api.LastEntryResponse) + if response.Version != 0 { + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + Version: Version(response.Version), + Timestamp: response.Timestamp, + }, nil + } + return nil, err +} + +func (l *log) PrevEntry(ctx context.Context, index Index) (*Entry, error) { + r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.PrevEntryRequest{ + Header: header, + Index: int64(index), + } + response, err := client.PrevEntry(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return nil, err + } + + response := r.(*api.PrevEntryResponse) + if response.Version != 0 { + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + Version: Version(response.Version), + }, nil + } + return nil, err +} + +func (l *log) NextEntry(ctx context.Context, index Index) (*Entry, error) { + r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.NextEntryRequest{ + Header: header, + Index: int64(index), + } + response, err := client.NextEntry(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return nil, err + } + + response := r.(*api.NextEntryResponse) + if response.Version != 0 { + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + Version: Version(response.Version), + Timestamp: response.Timestamp, + }, nil + } + return nil, err +} + +func (l *log) Remove(ctx context.Context, index int64, opts ...RemoveOption) (*Entry, error) { + r, err := l.session.DoCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.RemoveRequest{ + Header: header, + Index: index, + } + for i := range opts { + opts[i].beforeRemove(request) + } + response, err := client.Remove(ctx, request) + if err != nil { + return nil, nil, err + } + for i := range opts { + opts[i].afterRemove(response) + } + return response.Header, response, nil + }) + if err != nil { + return nil, err + } + + response := r.(*api.RemoveResponse) + if response.Status == api.ResponseStatus_OK { + return &Entry{ + Index: Index(response.Index), + Value: response.PreviousValue, + Version: Version(response.PreviousVersion), + }, nil + } else if response.Status == api.ResponseStatus_PRECONDITION_FAILED { + return nil, errors.New("write condition failed") + } else if response.Status == api.ResponseStatus_WRITE_LOCK { + return nil, errors.New("write lock failed") + } else { + return nil, nil + } +} + +func (l *log) Len(ctx context.Context) (int, error) { + response, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.SizeRequest{ + Header: header, + } + response, err := client.Size(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return 0, err + } + return int(response.(*api.SizeResponse).Size_), nil +} + +func (l *log) Clear(ctx context.Context) error { + _, err := l.session.DoCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.ClearRequest{ + Header: header, + } + response, err := client.Clear(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + return err +} + +func (l *log) Watch(ctx context.Context, ch chan<- *Event, opts ...WatchOption) error { + stream, err := l.session.DoCommandStream(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.EventRequest{ + Header: header, + } + for _, opt := range opts { + opt.beforeWatch(request) + } + return client.Events(ctx, request) + }, func(responses interface{}) (*headers.ResponseHeader, interface{}, error) { + response, err := responses.(api.LogService_EventsClient).Recv() + if err != nil { + return nil, nil, err + } + for _, opt := range opts { + opt.afterWatch(response) + } + return response.Header, response, nil + }) + if err != nil { + return err + } + + go func() { + defer close(ch) + for event := range stream { + response := event.(*api.EventResponse) + + // If this is a normal event (not a handshake response), write the event to the watch channel + var t EventType + switch response.Type { + case api.EventResponse_NONE: + t = EventNone + case api.EventResponse_APPENDED: + t = EventAppended + case api.EventResponse_REMOVED: + t = EventRemoved + } + ch <- &Event{ + Type: t, + Entry: &Entry{ + Index: Index(response.Index), + Value: response.Value, + Version: Version(response.Version), + Timestamp: response.Timestamp, + }, + } + } + }() + return nil +} + +func (l *log) Close() error { + return l.session.Close() +} + +func (l *log) Delete() error { + return l.session.Delete() +} diff --git a/pkg/client/log/log_test.go b/pkg/client/log/log_test.go new file mode 100644 index 0000000..d832156 --- /dev/null +++ b/pkg/client/log/log_test.go @@ -0,0 +1,189 @@ +// Copyright 2019-present Open Networking Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "context" + "testing" + "time" + + "github.com/atomix/go-client/pkg/client/primitive" + "github.com/atomix/go-client/pkg/client/session" + "github.com/atomix/go-client/pkg/client/test" + "github.com/stretchr/testify/assert" +) + +func TestLogOperations(t *testing.T) { + conns, partitions := test.StartTestPartitions(3) + + // Creates a new log primitive + name := primitive.NewName("default", "test", "default", "test") + _log, err := New(context.TODO(), name, conns, session.WithTimeout(5*time.Second)) + assert.NoError(t, err) + + // Gets the log entry at index 0 + kv, err := _log.Get(context.Background(), 0) + assert.NoError(t, err) + assert.Nil(t, kv) + + // Checks the size of log primitive + size, err := _log.Len(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, size) + + // Appends an entry to the log + kv, err = _log.Append(context.Background(), []byte("bar")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "bar", string(kv.Value)) + + // Appends an entry to the log + kv, err = _log.Append(context.Background(), []byte("baz")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "baz", string(kv.Value)) + + // Gets the first entry in the log + kv, err = _log.FirstEntry(context.Background()) + assert.NoError(t, err) + assert.Equal(t, "bar", string(kv.Value)) + + // Gets the last entry in the log + kv, err = _log.LastEntry(context.Background()) + assert.NoError(t, err) + assert.Equal(t, "baz", string(kv.Value)) + + // Gets the next entry of the given index in the log + kv, err = _log.NextEntry(context.Background(), 1) + assert.NoError(t, err) + assert.Equal(t, "baz", string(kv.Value)) + + // Gets the previous entry of the given index in the log + kv, err = _log.PrevEntry(context.Background(), 2) + assert.NoError(t, err) + assert.Equal(t, "bar", string(kv.Value)) + + // Gets the log entry at index 1 + kv, err = _log.Get(context.Background(), 1) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "bar", string(kv.Value)) + + // Gets the size of the log primitive + size, err = _log.Len(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 2, size) + + // Removes the entry at index 1 from the log + kv, err = _log.Remove(context.Background(), 1) + assert.NoError(t, err) + assert.Equal(t, "bar", string(kv.Value)) + + // Removes the entry at index 2 from the log + kv, err = _log.Remove(context.Background(), 2) + assert.NoError(t, err) + assert.Equal(t, "baz", string(kv.Value)) + + // Checks the length of the log primitive + size, err = _log.Len(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, size) + + err = _log.Clear(context.Background()) + assert.NoError(t, err) + + test.StopTestPartitions(partitions) +} + +func TestLogStreams(t *testing.T) { + conns, partitions := test.StartTestPartitions(3) + + name := primitive.NewName("default", "test", "default", "test") + _log, err := New(context.TODO(), name, conns, session.WithTimeout(5*time.Second)) + assert.NoError(t, err) + + kv, err := _log.Append(context.Background(), []byte("item1")) + assert.NoError(t, err) + assert.NotNil(t, kv) + + c := make(chan *Event) + latch := make(chan struct{}) + go func() { + e := <-c + assert.Equal(t, "item2", string(e.Entry.Value)) + e = <-c + assert.Equal(t, "item3", string(e.Entry.Value)) + e = <-c + assert.Equal(t, "item4", string(e.Entry.Value)) + e = <-c + assert.Equal(t, "item5", string(e.Entry.Value)) + latch <- struct{}{} + }() + + err = _log.Watch(context.Background(), c) + assert.NoError(t, err) + + kv, err = _log.Append(context.Background(), []byte("item2")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "item2", string(kv.Value)) + + kv, err = _log.Append(context.Background(), []byte("item3")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "item3", string(kv.Value)) + + kv, err = _log.Append(context.Background(), []byte("item4")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "item4", string(kv.Value)) + + kv, err = _log.Append(context.Background(), []byte("item5")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "item5", string(kv.Value)) + + <-latch + err = _log.Close() + assert.NoError(t, err) + + log1, err := New(context.TODO(), name, conns, session.WithTimeout(5*time.Second)) + assert.NoError(t, err) + + log2, err := New(context.TODO(), name, conns, session.WithTimeout(5*time.Second)) + assert.NoError(t, err) + + size, err := log1.Len(context.TODO()) + assert.NoError(t, err) + assert.Equal(t, 5, size) + + err = log1.Close() + assert.NoError(t, err) + + err = log1.Delete() + assert.NoError(t, err) + + err = log2.Delete() + assert.NoError(t, err) + + _log, err = New(context.TODO(), name, conns, session.WithTimeout(5*time.Second)) + assert.NoError(t, err) + + size, err = _log.Len(context.TODO()) + assert.NoError(t, err) + assert.Equal(t, 0, size) + + test.StopTestPartitions(partitions) +} diff --git a/pkg/client/log/options.go b/pkg/client/log/options.go new file mode 100644 index 0000000..9dc8519 --- /dev/null +++ b/pkg/client/log/options.go @@ -0,0 +1,145 @@ +// Copyright 2019-present Open Networking Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + api "github.com/atomix/api/proto/atomix/log" +) + +// SetOption is an option for the Append method +type SetOption interface { + beforePut(request *api.AppendRequest) + afterPut(response *api.AppendResponse) +} + +// RemoveOption is an option for the Remove method +type RemoveOption interface { + beforeRemove(request *api.RemoveRequest) + afterRemove(response *api.RemoveResponse) +} + +// IfVersion sets the required version for optimistic concurrency control +func IfVersion(version Version) VersionOption { + return VersionOption{version: version} +} + +// VersionOption is an implementation of SetOption and RemoveOption to specify the version for concurrency control +type VersionOption struct { + SetOption + RemoveOption + version Version +} + +func (o VersionOption) beforePut(request *api.AppendRequest) { + request.Version = int64(o.version) +} + +func (o VersionOption) afterPut(response *api.AppendResponse) { + +} + +func (o VersionOption) beforeRemove(request *api.RemoveRequest) { + request.Version = int64(o.version) +} + +func (o VersionOption) afterRemove(response *api.RemoveResponse) { + +} + +// IfNotSet sets the value if the entry is not yet set +func IfNotSet() SetOption { + return &NotSetOption{} +} + +// NotSetOption is a SetOption that sets the value only if it's not already set +type NotSetOption struct { +} + +func (o NotSetOption) beforePut(request *api.AppendRequest) { + request.Version = -1 +} + +func (o NotSetOption) afterPut(response *api.AppendResponse) { + +} + +// GetOption is an option for the Get method +type GetOption interface { + beforeGet(request *api.GetRequest) + afterGet(response *api.GetResponse) +} + +// WithDefault sets the default value to return if the key is not present +func WithDefault(def []byte) GetOption { + return defaultOption{def: def} +} + +type defaultOption struct { + def []byte +} + +func (o defaultOption) beforeGet(request *api.GetRequest) { +} + +func (o defaultOption) afterGet(response *api.GetResponse) { + if response.Version == 0 { + response.Value = o.def + } +} + +// WatchOption is an option for the Watch method +type WatchOption interface { + beforeWatch(request *api.EventRequest) + afterWatch(response *api.EventResponse) +} + +// WithReplay returns a watch option that enables replay of watch events +func WithReplay() WatchOption { + return replayOption{} +} + +type replayOption struct{} + +func (o replayOption) beforeWatch(request *api.EventRequest) { + request.Replay = true +} + +func (o replayOption) afterWatch(response *api.EventResponse) { + +} + +type filterOption struct { + filter Filter +} + +func (o filterOption) beforeWatch(request *api.EventRequest) { + if o.filter.Index > 0 { + request.Index = int64(o.filter.Index) + } +} + +func (o filterOption) afterWatch(response *api.EventResponse) { +} + +// WithFilter returns a watch option that filters the watch events +func WithFilter(filter Filter) WatchOption { + return filterOption{filter: filter} +} + +// Filter is a watch filter configuration +type Filter struct { + Key string + Index Index +} diff --git a/pkg/client/log/session.go b/pkg/client/log/session.go new file mode 100644 index 0000000..11e72ac --- /dev/null +++ b/pkg/client/log/session.go @@ -0,0 +1,84 @@ +// Copyright 2019-present Open Networking Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "context" + + "github.com/atomix/api/proto/atomix/headers" + api "github.com/atomix/api/proto/atomix/log" + "github.com/atomix/go-client/pkg/client/session" + "google.golang.org/grpc" +) + +type sessionHandler struct{} + +func (m *sessionHandler) Create(ctx context.Context, s *session.Session) error { + return s.DoCreate(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + request := &api.CreateRequest{ + Header: header, + Timeout: &s.Timeout, + } + client := api.NewLogServiceClient(conn) + response, err := client.Create(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) +} + +func (m *sessionHandler) KeepAlive(ctx context.Context, s *session.Session) error { + return s.DoKeepAlive(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + request := &api.KeepAliveRequest{ + Header: header, + } + client := api.NewLogServiceClient(conn) + response, err := client.KeepAlive(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) +} + +func (m *sessionHandler) Close(ctx context.Context, s *session.Session) error { + return s.DoClose(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + request := &api.CloseRequest{ + Header: header, + } + client := api.NewLogServiceClient(conn) + response, err := client.Close(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) +} + +func (m *sessionHandler) Delete(ctx context.Context, s *session.Session) error { + return s.DoClose(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + request := &api.CloseRequest{ + Header: header, + Delete: true, + } + client := api.NewLogServiceClient(conn) + response, err := client.Close(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) +} From 82fb0b700a2a669a0b14d591a0db853235241bb2 Mon Sep 17 00:00:00 2001 From: adibrastegarnia Date: Tue, 4 Feb 2020 11:28:06 -0800 Subject: [PATCH 2/8] Update test cases --- pkg/client/log/log_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/client/log/log_test.go b/pkg/client/log/log_test.go index d832156..a2bfc44 100644 --- a/pkg/client/log/log_test.go +++ b/pkg/client/log/log_test.go @@ -60,11 +60,21 @@ func TestLogOperations(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "bar", string(kv.Value)) + // Gets the first index + firstIndex, err := _log.FirstIndex(context.Background()) + assert.NoError(t, err) + assert.Equal(t, uint64(0x1), uint64(firstIndex)) + // Gets the last entry in the log kv, err = _log.LastEntry(context.Background()) assert.NoError(t, err) assert.Equal(t, "baz", string(kv.Value)) + // Gets the last index + lastIndex, err := _log.LastIndex(context.Background()) + assert.NoError(t, err) + assert.Equal(t, uint64(0x2), uint64(lastIndex)) + // Gets the next entry of the given index in the log kv, err = _log.NextEntry(context.Background(), 1) assert.NoError(t, err) From 7d4710329296093a07cf91f01aca8585023b6d5b Mon Sep 17 00:00:00 2001 From: adibrastegarnia Date: Tue, 4 Feb 2020 12:26:17 -0800 Subject: [PATCH 3/8] Remove version and change len to size --- pkg/client/log/log.go | 144 ++++++++++++++----------------------- pkg/client/log/log_test.go | 14 ++-- pkg/client/log/options.go | 34 +-------- 3 files changed, 62 insertions(+), 130 deletions(-) diff --git a/pkg/client/log/log.go b/pkg/client/log/log.go index 4ea5bcb..1d76746 100644 --- a/pkg/client/log/log.go +++ b/pkg/client/log/log.go @@ -34,9 +34,6 @@ const Type primitive.Type = "Log" // Index is the index of an entry type Index uint64 -// Version is the version of an entry -type Version uint64 - // Client provides an API for creating IndexedMaps type Client interface { // GetLog gets the log instance of the given name @@ -80,8 +77,8 @@ type Log interface { // Remove removes an entry from the log Remove(ctx context.Context, index int64, opts ...RemoveOption) (*Entry, error) - // Len returns the number of entries in the log - Len(ctx context.Context) (int, error) + // Size returns the number of entries in the log + Size(ctx context.Context) (int, error) // Clear removes all entries from the log Clear(ctx context.Context) error @@ -98,10 +95,6 @@ type Entry struct { // for the lifetime of a key. Index Index - // Version is the unique, monotonically increasing version number for the key/value pair. The version is - // suitable for use in optimistic locking. - Version Version - // Value is the value of the pair Value []byte @@ -110,7 +103,7 @@ type Entry struct { } func (kv Entry) String() string { - return fmt.Sprintf("index: %d\nvalue: %s\nversion: %d", kv.Index, string(kv.Value), kv.Version) + return fmt.Sprintf("index: %d\nvalue: %s\n", kv.Index, string(kv.Value)) } // EventType is the type of a log event @@ -171,9 +164,8 @@ func (l *log) Append(ctx context.Context, value []byte) (*Entry, error) { r, err := l.session.DoCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.AppendRequest{ - Header: header, - Value: value, - Version: -1, + Header: header, + Value: value, } response, err := client.Append(ctx, request) if err != nil { @@ -188,9 +180,8 @@ func (l *log) Append(ctx context.Context, value []byte) (*Entry, error) { response := r.(*api.AppendResponse) if response.Status == api.ResponseStatus_OK { return &Entry{ - Index: Index(response.Index), - Value: value, - Version: Version(response.Header.Index), + Index: Index(response.Index), + Value: value, }, nil } else if response.Status == api.ResponseStatus_PRECONDITION_FAILED { return nil, errors.New("write condition failed") @@ -200,7 +191,6 @@ func (l *log) Append(ctx context.Context, value []byte) (*Entry, error) { return &Entry{ Index: Index(response.Index), Value: value, - Version: Version(response.PreviousVersion), Timestamp: response.Timestamp, }, nil } @@ -230,15 +220,13 @@ func (l *log) Get(ctx context.Context, index int64, opts ...GetOption) (*Entry, } response := r.(*api.GetResponse) - if response.Version != 0 { - return &Entry{ - Index: Index(response.Index), - Value: response.Value, - Version: Version(response.Version), - Timestamp: response.Timestamp, - }, nil - } - return nil, nil + + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + Timestamp: response.Timestamp, + }, nil + } func (l *log) GetIndex(ctx context.Context, index Index, opts ...GetOption) (*Entry, error) { @@ -265,15 +253,12 @@ func (l *log) GetIndex(ctx context.Context, index Index, opts ...GetOption) (*En } response := r.(*api.GetResponse) - if response.Version != 0 { - return &Entry{ - Index: Index(response.Index), - Value: response.Value, - Version: Version(response.Version), - Timestamp: response.Timestamp, - }, nil - } - return nil, nil + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + Timestamp: response.Timestamp, + }, nil + } func (l *log) FirstIndex(ctx context.Context) (Index, error) { @@ -293,10 +278,7 @@ func (l *log) FirstIndex(ctx context.Context) (Index, error) { } response := r.(*api.FirstEntryResponse) - if response.Version != 0 { - return Index(response.Index), nil - } - return 0, nil + return Index(response.Index), nil } func (l *log) LastIndex(ctx context.Context) (Index, error) { @@ -316,10 +298,8 @@ func (l *log) LastIndex(ctx context.Context) (Index, error) { } response := r.(*api.LastEntryResponse) - if response.Version != 0 { - return Index(response.Index), nil - } - return 0, nil + return Index(response.Index), nil + } func (l *log) PrevIndex(ctx context.Context, index Index) (Index, error) { @@ -340,10 +320,8 @@ func (l *log) PrevIndex(ctx context.Context, index Index) (Index, error) { } response := r.(*api.PrevEntryResponse) - if response.Version != 0 { - return Index(response.Index), nil - } - return 0, nil + return Index(response.Index), nil + } func (l *log) NextIndex(ctx context.Context, index Index) (Index, error) { @@ -364,10 +342,7 @@ func (l *log) NextIndex(ctx context.Context, index Index) (Index, error) { } response := r.(*api.NextEntryResponse) - if response.Version != 0 { - return Index(response.Index), nil - } - return 0, nil + return Index(response.Index), nil } func (l *log) FirstEntry(ctx context.Context) (*Entry, error) { @@ -387,15 +362,13 @@ func (l *log) FirstEntry(ctx context.Context) (*Entry, error) { } response := r.(*api.FirstEntryResponse) - if response.Version != 0 { - return &Entry{ - Index: Index(response.Index), - Value: response.Value, - Version: Version(response.Version), - Timestamp: response.Timestamp, - }, nil - } - return nil, err + + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + Timestamp: response.Timestamp, + }, nil + } func (l *log) LastEntry(ctx context.Context) (*Entry, error) { @@ -415,15 +388,13 @@ func (l *log) LastEntry(ctx context.Context) (*Entry, error) { } response := r.(*api.LastEntryResponse) - if response.Version != 0 { - return &Entry{ - Index: Index(response.Index), - Value: response.Value, - Version: Version(response.Version), - Timestamp: response.Timestamp, - }, nil - } - return nil, err + + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + Timestamp: response.Timestamp, + }, nil + } func (l *log) PrevEntry(ctx context.Context, index Index) (*Entry, error) { @@ -444,14 +415,11 @@ func (l *log) PrevEntry(ctx context.Context, index Index) (*Entry, error) { } response := r.(*api.PrevEntryResponse) - if response.Version != 0 { - return &Entry{ - Index: Index(response.Index), - Value: response.Value, - Version: Version(response.Version), - }, nil - } - return nil, err + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + }, nil + } func (l *log) NextEntry(ctx context.Context, index Index) (*Entry, error) { @@ -472,15 +440,11 @@ func (l *log) NextEntry(ctx context.Context, index Index) (*Entry, error) { } response := r.(*api.NextEntryResponse) - if response.Version != 0 { - return &Entry{ - Index: Index(response.Index), - Value: response.Value, - Version: Version(response.Version), - Timestamp: response.Timestamp, - }, nil - } - return nil, err + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + Timestamp: response.Timestamp, + }, nil } func (l *log) Remove(ctx context.Context, index int64, opts ...RemoveOption) (*Entry, error) { @@ -509,9 +473,8 @@ func (l *log) Remove(ctx context.Context, index int64, opts ...RemoveOption) (*E response := r.(*api.RemoveResponse) if response.Status == api.ResponseStatus_OK { return &Entry{ - Index: Index(response.Index), - Value: response.PreviousValue, - Version: Version(response.PreviousVersion), + Index: Index(response.Index), + Value: response.PreviousValue, }, nil } else if response.Status == api.ResponseStatus_PRECONDITION_FAILED { return nil, errors.New("write condition failed") @@ -522,7 +485,7 @@ func (l *log) Remove(ctx context.Context, index int64, opts ...RemoveOption) (*E } } -func (l *log) Len(ctx context.Context) (int, error) { +func (l *log) Size(ctx context.Context) (int, error) { response, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.SizeRequest{ @@ -599,7 +562,6 @@ func (l *log) Watch(ctx context.Context, ch chan<- *Event, opts ...WatchOption) Entry: &Entry{ Index: Index(response.Index), Value: response.Value, - Version: Version(response.Version), Timestamp: response.Timestamp, }, } diff --git a/pkg/client/log/log_test.go b/pkg/client/log/log_test.go index a2bfc44..598ec3e 100644 --- a/pkg/client/log/log_test.go +++ b/pkg/client/log/log_test.go @@ -36,10 +36,10 @@ func TestLogOperations(t *testing.T) { // Gets the log entry at index 0 kv, err := _log.Get(context.Background(), 0) assert.NoError(t, err) - assert.Nil(t, kv) + assert.NotNil(t, kv) // Checks the size of log primitive - size, err := _log.Len(context.Background()) + size, err := _log.Size(context.Background()) assert.NoError(t, err) assert.Equal(t, 0, size) @@ -92,7 +92,7 @@ func TestLogOperations(t *testing.T) { assert.Equal(t, "bar", string(kv.Value)) // Gets the size of the log primitive - size, err = _log.Len(context.Background()) + size, err = _log.Size(context.Background()) assert.NoError(t, err) assert.Equal(t, 2, size) @@ -106,8 +106,8 @@ func TestLogOperations(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "baz", string(kv.Value)) - // Checks the length of the log primitive - size, err = _log.Len(context.Background()) + // Checks the size of the log primitive + size, err = _log.Size(context.Background()) assert.NoError(t, err) assert.Equal(t, 0, size) @@ -175,7 +175,7 @@ func TestLogStreams(t *testing.T) { log2, err := New(context.TODO(), name, conns, session.WithTimeout(5*time.Second)) assert.NoError(t, err) - size, err := log1.Len(context.TODO()) + size, err := log1.Size(context.TODO()) assert.NoError(t, err) assert.Equal(t, 5, size) @@ -191,7 +191,7 @@ func TestLogStreams(t *testing.T) { _log, err = New(context.TODO(), name, conns, session.WithTimeout(5*time.Second)) assert.NoError(t, err) - size, err = _log.Len(context.TODO()) + size, err = _log.Size(context.TODO()) assert.NoError(t, err) assert.Equal(t, 0, size) diff --git a/pkg/client/log/options.go b/pkg/client/log/options.go index 9dc8519..746df64 100644 --- a/pkg/client/log/options.go +++ b/pkg/client/log/options.go @@ -30,34 +30,6 @@ type RemoveOption interface { afterRemove(response *api.RemoveResponse) } -// IfVersion sets the required version for optimistic concurrency control -func IfVersion(version Version) VersionOption { - return VersionOption{version: version} -} - -// VersionOption is an implementation of SetOption and RemoveOption to specify the version for concurrency control -type VersionOption struct { - SetOption - RemoveOption - version Version -} - -func (o VersionOption) beforePut(request *api.AppendRequest) { - request.Version = int64(o.version) -} - -func (o VersionOption) afterPut(response *api.AppendResponse) { - -} - -func (o VersionOption) beforeRemove(request *api.RemoveRequest) { - request.Version = int64(o.version) -} - -func (o VersionOption) afterRemove(response *api.RemoveResponse) { - -} - // IfNotSet sets the value if the entry is not yet set func IfNotSet() SetOption { return &NotSetOption{} @@ -68,7 +40,6 @@ type NotSetOption struct { } func (o NotSetOption) beforePut(request *api.AppendRequest) { - request.Version = -1 } func (o NotSetOption) afterPut(response *api.AppendResponse) { @@ -94,9 +65,8 @@ func (o defaultOption) beforeGet(request *api.GetRequest) { } func (o defaultOption) afterGet(response *api.GetResponse) { - if response.Version == 0 { - response.Value = o.def - } + response.Value = o.def + } // WatchOption is an option for the Watch method From 5f7f8ccda48a433260543fa7baa6c66b8d2e0271 Mon Sep 17 00:00:00 2001 From: adibrastegarnia Date: Tue, 4 Feb 2020 12:42:25 -0800 Subject: [PATCH 4/8] Update go.mod file --- go.mod | 4 ---- 1 file changed, 4 deletions(-) diff --git a/go.mod b/go.mod index cb4efe2..4537324 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,3 @@ require ( github.com/stretchr/testify v1.4.0 google.golang.org/grpc v1.27.0 ) - -replace github.com/atomix/api => ../api - -replace github.com/atomix/go-framework => ../go-framework From 3715ca56a525df9e1b26795aa6ddf8795542f626 Mon Sep 17 00:00:00 2001 From: adibrastegarnia Date: Fri, 7 Feb 2020 11:36:06 -0800 Subject: [PATCH 5/8] Refactor the code based on new changes --- go.mod | 10 ++-- go.sum | 4 +- pkg/client/log/{session.go => handler.go} | 27 +++-------- pkg/client/log/log.go | 57 +++++++++++------------ pkg/client/log/log_test.go | 37 +++++++++------ 5 files changed, 62 insertions(+), 73 deletions(-) rename pkg/client/log/{session.go => handler.go} (70%) diff --git a/go.mod b/go.mod index 4537324..f63f9e6 100644 --- a/go.mod +++ b/go.mod @@ -2,13 +2,9 @@ module github.com/atomix/go-client require ( github.com/atomix/api v0.0.0-20200206211058-f075fb5b6d1b + github.com/atomix/atomix-go-client v0.0.0-20200114212658-58c359bc47b1 github.com/atomix/go-framework v0.0.0-20200207202010-51e205d726d2 github.com/atomix/go-local v0.0.0-20200207202057-4a81cbdd3325 - github.com/atomix/go-framework v0.0.0-20200206221034-8c7583e55420 - github.com/atomix/go-local v0.0.0-20200206221051-b1b85e86b0b7 - github.com/atomix/api v0.0.0-20200202100958-13b24edbe32d - github.com/atomix/go-framework v0.0.0-20200202102454-440bc2678f1c - github.com/atomix/go-local v0.0.0-20200202105028-743d224c66eb github.com/cenkalti/backoff v2.2.1+incompatible github.com/gogo/protobuf v1.3.1 github.com/golang/protobuf v1.3.2 @@ -16,3 +12,7 @@ require ( github.com/stretchr/testify v1.4.0 google.golang.org/grpc v1.27.0 ) + +replace github.com/atomix/api => ../api + +replace github.com/atomix/go-framework => ../go-framework diff --git a/go.sum b/go.sum index db7bbb3..576f2dc 100644 --- a/go.sum +++ b/go.sum @@ -41,6 +41,7 @@ github.com/atomix/atomix-go-client v0.0.0-20190827234201-188602d4e780/go.mod h1: github.com/atomix/atomix-go-client v0.0.0-20191002230120-837d618e27c5/go.mod h1:MoPkrAL33saKe2GbTi+NJgLzW7ejCwrrLDrYIwGYNcE= github.com/atomix/atomix-go-client v0.0.0-20191015003555-98ae5ccbe7bd/go.mod h1:jV1V8j+zo30xhC+hufKM/1XIiJKA3ODjwuMA/rBHu8g= github.com/atomix/atomix-go-client v0.0.0-20191018192841-3644d110cbec/go.mod h1:bM51i8VXdxw3+vN5spPJWllkfsfpakHYo2bOkNj1xC4= +github.com/atomix/atomix-go-client v0.0.0-20200114212658-58c359bc47b1 h1:XGjCL/okuI40whKVS2xtiAUhoMW9MHUCeyPTZMTOstA= github.com/atomix/atomix-go-client v0.0.0-20200114212658-58c359bc47b1/go.mod h1:STv1xDq3qpFABP1zE+9/R2kYuFamQWSGH18w6sfXNE4= github.com/atomix/atomix-go-local v0.0.0-20190827233944-938e35b06834 h1:4ddTmYmAaP3WrafV/X76Jq4qNir2jezMQwKgouqNdIU= github.com/atomix/atomix-go-local v0.0.0-20190827233944-938e35b06834/go.mod h1:qLBTOiVKoEqzYOjgxIgWFa+Hfa3SR+VexA6jGBcv0HA= @@ -92,10 +93,7 @@ github.com/atomix/atomix-go-node v0.0.0-20200114212450-178a2dc70336 h1:zYRXcjieg github.com/atomix/atomix-go-node v0.0.0-20200114212450-178a2dc70336/go.mod h1:DIYsaWqOiBkyE+vUgHFMM3+vCq07RUskEWN4W5cEtyE= github.com/atomix/go-client v0.0.0-20200124004211-e5e19cd4730d/go.mod h1:KBBiViOYhnvSh/U0fIYiuJ8j+k63eyRWZl42kwdseFI= github.com/atomix/go-client v0.0.0-20200203180003-61799b5ca7c2/go.mod h1:VWAEeWdocSRL1cqMs3zZ32kuIzMAbheoV02wsEVYwhw= -<<<<<<< HEAD github.com/atomix/go-client v0.0.0-20200206051325-cdc03bd1c8bc/go.mod h1:8Gdux/UtiBQK5nmzN9jtWXuH16T6JPNsAxUA2wY4xVk= -======= ->>>>>>> Add log client implementation github.com/atomix/go-framework v0.0.0-20200123235029-e29fc7d6e104/go.mod h1:Dn7tjt5LIRA/qr5afQZDh9hdtvK82uQpMrADYIlVtfQ= github.com/atomix/go-framework v0.0.0-20200124003840-f24758b13aa2 h1:4a6UlvCmvIWf+L9UIcHkR5jxtWIwr1A2PP/xcnulzIs= github.com/atomix/go-framework v0.0.0-20200124003840-f24758b13aa2/go.mod h1:vo5K/v+rc5mohoZIw9vbyj+Y/EGGaEdF6XVkEvM9CSM= diff --git a/pkg/client/log/session.go b/pkg/client/log/handler.go similarity index 70% rename from pkg/client/log/session.go rename to pkg/client/log/handler.go index 11e72ac..9bc9fd7 100644 --- a/pkg/client/log/session.go +++ b/pkg/client/log/handler.go @@ -19,34 +19,19 @@ import ( "github.com/atomix/api/proto/atomix/headers" api "github.com/atomix/api/proto/atomix/log" - "github.com/atomix/go-client/pkg/client/session" + "github.com/atomix/go-client/pkg/client/primitive" "google.golang.org/grpc" ) -type sessionHandler struct{} +type primitiveHandler struct{} -func (m *sessionHandler) Create(ctx context.Context, s *session.Session) error { +func (m *primitiveHandler) Create(ctx context.Context, s *primitive.Instance) error { return s.DoCreate(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { request := &api.CreateRequest{ - Header: header, - Timeout: &s.Timeout, - } - client := api.NewLogServiceClient(conn) - response, err := client.Create(ctx, request) - if err != nil { - return nil, nil, err - } - return response.Header, response, nil - }) -} - -func (m *sessionHandler) KeepAlive(ctx context.Context, s *session.Session) error { - return s.DoKeepAlive(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { - request := &api.KeepAliveRequest{ Header: header, } client := api.NewLogServiceClient(conn) - response, err := client.KeepAlive(ctx, request) + response, err := client.Create(ctx, request) if err != nil { return nil, nil, err } @@ -54,7 +39,7 @@ func (m *sessionHandler) KeepAlive(ctx context.Context, s *session.Session) erro }) } -func (m *sessionHandler) Close(ctx context.Context, s *session.Session) error { +func (m *primitiveHandler) Close(ctx context.Context, s *primitive.Instance) error { return s.DoClose(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { request := &api.CloseRequest{ Header: header, @@ -68,7 +53,7 @@ func (m *sessionHandler) Close(ctx context.Context, s *session.Session) error { }) } -func (m *sessionHandler) Delete(ctx context.Context, s *session.Session) error { +func (m *primitiveHandler) Delete(ctx context.Context, s *primitive.Instance) error { return s.DoClose(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { request := &api.CloseRequest{ Header: header, diff --git a/pkg/client/log/log.go b/pkg/client/log/log.go index 1d76746..e21c874 100644 --- a/pkg/client/log/log.go +++ b/pkg/client/log/log.go @@ -23,7 +23,6 @@ import ( "github.com/atomix/api/proto/atomix/headers" api "github.com/atomix/api/proto/atomix/log" "github.com/atomix/go-client/pkg/client/primitive" - "github.com/atomix/go-client/pkg/client/session" "github.com/atomix/go-client/pkg/client/util" "google.golang.org/grpc" ) @@ -37,7 +36,7 @@ type Index uint64 // Client provides an API for creating IndexedMaps type Client interface { // GetLog gets the log instance of the given name - GetLog(ctx context.Context, name string, opts ...session.Option) (Log, error) + GetLog(ctx context.Context, name string) (Log, error) } // Log is a distributed log @@ -130,30 +129,30 @@ type Event struct { } // New creates a new log primitive -func New(ctx context.Context, name primitive.Name, partitions []primitive.Partition, opts ...session.Option) (Log, error) { +func New(ctx context.Context, name primitive.Name, partitions []*primitive.Session) (Log, error) { i, err := util.GetPartitionIndex(name.Name, len(partitions)) if err != nil { return nil, err } - return newLog(ctx, name, partitions[i], opts...) + return newLog(ctx, name, partitions[i]) } // newLog creates a new Log for the given partition -func newLog(ctx context.Context, name primitive.Name, partition primitive.Partition, opts ...session.Option) (*log, error) { - sess, err := session.New(ctx, name, partition, &sessionHandler{}, opts...) +func newLog(ctx context.Context, name primitive.Name, partition *primitive.Session) (*log, error) { + instance, err := primitive.NewInstance(ctx, name, partition, &primitiveHandler{}) if err != nil { return nil, err } return &log{ - name: name, - session: sess, + name: name, + instance: instance, }, nil } // log is the default single-partition implementation of Log type log struct { - name primitive.Name - session *session.Session + name primitive.Name + instance *primitive.Instance } func (l *log) Name() primitive.Name { @@ -161,7 +160,7 @@ func (l *log) Name() primitive.Name { } func (l *log) Append(ctx context.Context, value []byte) (*Entry, error) { - r, err := l.session.DoCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + r, err := l.instance.DoCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.AppendRequest{ Header: header, @@ -197,7 +196,7 @@ func (l *log) Append(ctx context.Context, value []byte) (*Entry, error) { } func (l *log) Get(ctx context.Context, index int64, opts ...GetOption) (*Entry, error) { - r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.GetRequest{ Header: header, @@ -230,7 +229,7 @@ func (l *log) Get(ctx context.Context, index int64, opts ...GetOption) (*Entry, } func (l *log) GetIndex(ctx context.Context, index Index, opts ...GetOption) (*Entry, error) { - r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.GetRequest{ Header: header, @@ -262,7 +261,7 @@ func (l *log) GetIndex(ctx context.Context, index Index, opts ...GetOption) (*En } func (l *log) FirstIndex(ctx context.Context) (Index, error) { - r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.FirstEntryRequest{ Header: header, @@ -282,7 +281,7 @@ func (l *log) FirstIndex(ctx context.Context) (Index, error) { } func (l *log) LastIndex(ctx context.Context) (Index, error) { - r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.LastEntryRequest{ Header: header, @@ -303,7 +302,7 @@ func (l *log) LastIndex(ctx context.Context) (Index, error) { } func (l *log) PrevIndex(ctx context.Context, index Index) (Index, error) { - r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.PrevEntryRequest{ Header: header, @@ -325,7 +324,7 @@ func (l *log) PrevIndex(ctx context.Context, index Index) (Index, error) { } func (l *log) NextIndex(ctx context.Context, index Index) (Index, error) { - r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.NextEntryRequest{ Header: header, @@ -346,7 +345,7 @@ func (l *log) NextIndex(ctx context.Context, index Index) (Index, error) { } func (l *log) FirstEntry(ctx context.Context) (*Entry, error) { - r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.FirstEntryRequest{ Header: header, @@ -372,7 +371,7 @@ func (l *log) FirstEntry(ctx context.Context) (*Entry, error) { } func (l *log) LastEntry(ctx context.Context) (*Entry, error) { - r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.LastEntryRequest{ Header: header, @@ -398,7 +397,7 @@ func (l *log) LastEntry(ctx context.Context) (*Entry, error) { } func (l *log) PrevEntry(ctx context.Context, index Index) (*Entry, error) { - r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.PrevEntryRequest{ Header: header, @@ -423,7 +422,7 @@ func (l *log) PrevEntry(ctx context.Context, index Index) (*Entry, error) { } func (l *log) NextEntry(ctx context.Context, index Index) (*Entry, error) { - r, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.NextEntryRequest{ Header: header, @@ -448,7 +447,7 @@ func (l *log) NextEntry(ctx context.Context, index Index) (*Entry, error) { } func (l *log) Remove(ctx context.Context, index int64, opts ...RemoveOption) (*Entry, error) { - r, err := l.session.DoCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + r, err := l.instance.DoCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.RemoveRequest{ Header: header, @@ -486,7 +485,7 @@ func (l *log) Remove(ctx context.Context, index int64, opts ...RemoveOption) (*E } func (l *log) Size(ctx context.Context) (int, error) { - response, err := l.session.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + response, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.SizeRequest{ Header: header, @@ -504,7 +503,7 @@ func (l *log) Size(ctx context.Context) (int, error) { } func (l *log) Clear(ctx context.Context) error { - _, err := l.session.DoCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + _, err := l.instance.DoCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.ClearRequest{ Header: header, @@ -519,7 +518,7 @@ func (l *log) Clear(ctx context.Context) error { } func (l *log) Watch(ctx context.Context, ch chan<- *Event, opts ...WatchOption) error { - stream, err := l.session.DoCommandStream(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (interface{}, error) { + stream, err := l.instance.DoCommandStream(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (interface{}, error) { client := api.NewLogServiceClient(conn) request := &api.EventRequest{ Header: header, @@ -570,10 +569,10 @@ func (l *log) Watch(ctx context.Context, ch chan<- *Event, opts ...WatchOption) return nil } -func (l *log) Close() error { - return l.session.Close() +func (l *log) Close(ctx context.Context) error { + return l.instance.Close(ctx) } -func (l *log) Delete() error { - return l.session.Delete() +func (l *log) Delete(ctx context.Context) error { + return l.instance.Delete(ctx) } diff --git a/pkg/client/log/log_test.go b/pkg/client/log/log_test.go index 598ec3e..9221ddc 100644 --- a/pkg/client/log/log_test.go +++ b/pkg/client/log/log_test.go @@ -17,20 +17,23 @@ package log import ( "context" "testing" - "time" "github.com/atomix/go-client/pkg/client/primitive" - "github.com/atomix/go-client/pkg/client/session" "github.com/atomix/go-client/pkg/client/test" "github.com/stretchr/testify/assert" ) func TestLogOperations(t *testing.T) { - conns, partitions := test.StartTestPartitions(3) + partitions, closers := test.StartTestPartitions(3) + defer test.StopTestPartitions(closers) + + sessions, err := test.OpenSessions(partitions) + assert.NoError(t, err) + defer test.CloseSessions(sessions) // Creates a new log primitive name := primitive.NewName("default", "test", "default", "test") - _log, err := New(context.TODO(), name, conns, session.WithTimeout(5*time.Second)) + _log, err := New(context.TODO(), name, sessions) assert.NoError(t, err) // Gets the log entry at index 0 @@ -114,14 +117,19 @@ func TestLogOperations(t *testing.T) { err = _log.Clear(context.Background()) assert.NoError(t, err) - test.StopTestPartitions(partitions) } func TestLogStreams(t *testing.T) { - conns, partitions := test.StartTestPartitions(3) + partitions, closers := test.StartTestPartitions(3) + defer test.StopTestPartitions(closers) + sessions, err := test.OpenSessions(partitions) + assert.NoError(t, err) + defer test.CloseSessions(sessions) + + // Creates a new log primitive name := primitive.NewName("default", "test", "default", "test") - _log, err := New(context.TODO(), name, conns, session.WithTimeout(5*time.Second)) + _log, err := New(context.TODO(), name, sessions) assert.NoError(t, err) kv, err := _log.Append(context.Background(), []byte("item1")) @@ -166,34 +174,33 @@ func TestLogStreams(t *testing.T) { assert.Equal(t, "item5", string(kv.Value)) <-latch - err = _log.Close() + err = _log.Close(context.Background()) assert.NoError(t, err) - log1, err := New(context.TODO(), name, conns, session.WithTimeout(5*time.Second)) + log1, err := New(context.TODO(), name, sessions) assert.NoError(t, err) - log2, err := New(context.TODO(), name, conns, session.WithTimeout(5*time.Second)) + log2, err := New(context.TODO(), name, sessions) assert.NoError(t, err) size, err := log1.Size(context.TODO()) assert.NoError(t, err) assert.Equal(t, 5, size) - err = log1.Close() + err = log1.Close(context.Background()) assert.NoError(t, err) - err = log1.Delete() + err = log1.Delete(context.Background()) assert.NoError(t, err) - err = log2.Delete() + err = log2.Delete(context.Background()) assert.NoError(t, err) - _log, err = New(context.TODO(), name, conns, session.WithTimeout(5*time.Second)) + _log, err = New(context.TODO(), name, sessions) assert.NoError(t, err) size, err = _log.Size(context.TODO()) assert.NoError(t, err) assert.Equal(t, 0, size) - test.StopTestPartitions(partitions) } From eee79a1a74ef276db4f5a10b2175aafe498d3152 Mon Sep 17 00:00:00 2001 From: adibrastegarnia Date: Fri, 7 Feb 2020 12:55:32 -0800 Subject: [PATCH 6/8] Remove key from options --- pkg/client/log/options.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/client/log/options.go b/pkg/client/log/options.go index 746df64..ccfa65c 100644 --- a/pkg/client/log/options.go +++ b/pkg/client/log/options.go @@ -110,6 +110,5 @@ func WithFilter(filter Filter) WatchOption { // Filter is a watch filter configuration type Filter struct { - Key string Index Index } From a41557370f8b98aa43ba948bce58a88e470d5fcf Mon Sep 17 00:00:00 2001 From: adibrastegarnia Date: Fri, 7 Feb 2020 13:02:20 -0800 Subject: [PATCH 7/8] Remove replaces --- go.mod | 10 +++------- go.sum | 4 ++++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index f63f9e6..66a2921 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,10 @@ module github.com/atomix/go-client require ( - github.com/atomix/api v0.0.0-20200206211058-f075fb5b6d1b + github.com/atomix/api v0.0.0-20200207212403-a55e2fa6e823 github.com/atomix/atomix-go-client v0.0.0-20200114212658-58c359bc47b1 - github.com/atomix/go-framework v0.0.0-20200207202010-51e205d726d2 - github.com/atomix/go-local v0.0.0-20200207202057-4a81cbdd3325 + github.com/atomix/go-framework v0.0.0-20200207214715-0cee98c57cdd + github.com/atomix/go-local v0.0.0-20200207214727-4a5d923aa934 github.com/cenkalti/backoff v2.2.1+incompatible github.com/gogo/protobuf v1.3.1 github.com/golang/protobuf v1.3.2 @@ -12,7 +12,3 @@ require ( github.com/stretchr/testify v1.4.0 google.golang.org/grpc v1.27.0 ) - -replace github.com/atomix/api => ../api - -replace github.com/atomix/go-framework => ../go-framework diff --git a/go.sum b/go.sum index 576f2dc..1d01690 100644 --- a/go.sum +++ b/go.sum @@ -20,6 +20,7 @@ github.com/atomix/api v0.0.0-20200206050905-3494e48c0084 h1:u+Hcm3wJHjIpUjh1vtRK github.com/atomix/api v0.0.0-20200206050905-3494e48c0084/go.mod h1:yD3KAX7yCeVhVjM2CD/5AXe9NW4yO6+siRQ5nfY+M1s= github.com/atomix/api v0.0.0-20200206211058-f075fb5b6d1b h1:DwaZyYMUCs4fJ1i+8eQQ9Jl3gbCk753zP4lT3wH23to= github.com/atomix/api v0.0.0-20200206211058-f075fb5b6d1b/go.mod h1:yD3KAX7yCeVhVjM2CD/5AXe9NW4yO6+siRQ5nfY+M1s= +github.com/atomix/api v0.0.0-20200207212403-a55e2fa6e823/go.mod h1:N+Jv8qV9klP+/RDAVxRbPdluB0cm1ZjKLDjd40/Ccv4= github.com/atomix/atomix v0.0.0-20191223092540-19bd4940b82e h1:tdI1nxT0hw++d6zWiTzuxqRiu5ALTcbNNtLVzIrAjxw= github.com/atomix/atomix-api v0.0.0-20190826211343-dd8f4db3bf77 h1:+PUuY9wDRp+VAg/JbEguzdOMJj6ruUw6Kw/y+QYHB6s= github.com/atomix/atomix-api v0.0.0-20190826211343-dd8f4db3bf77/go.mod h1:joWKUd0zIeYbAQ0vmYHGsnV03ZgRalhceHgnJ3EN0mI= @@ -107,6 +108,7 @@ github.com/atomix/go-framework v0.0.0-20200206211522-7019b65dabc0/go.mod h1:Q/0V github.com/atomix/go-framework v0.0.0-20200206221034-8c7583e55420 h1:+DAkriIF4oDJEl4HT0LBv35xLQ0kMnDoHk2ooa6KAyI= github.com/atomix/go-framework v0.0.0-20200206221034-8c7583e55420/go.mod h1:Q/0VngSkhuTvHc9W2/k3HCgMcSkI9UaxUgRPWjO5lJI= github.com/atomix/go-framework v0.0.0-20200207202010-51e205d726d2/go.mod h1:Q/0VngSkhuTvHc9W2/k3HCgMcSkI9UaxUgRPWjO5lJI= +github.com/atomix/go-framework v0.0.0-20200207214715-0cee98c57cdd/go.mod h1:/KVF8Ab99yMqnkELF2LIwCTR9FO+KI5MW8trOfjIYSA= github.com/atomix/go-local v0.0.0-20200124003802-357f6682b2f4 h1:acDXXOuqzbqfOYDTMvz4dhckHfmH0DMfXSQE+gLFGOA= github.com/atomix/go-local v0.0.0-20200124003802-357f6682b2f4/go.mod h1:MabPkX/j2bN399GVAYGigyvDaAslu7omZoujEfzdKDg= github.com/atomix/go-local v0.0.0-20200202105028-743d224c66eb h1:MCstZMd7aizXswJ69hf3fzYU3StG4Ge/vHEfp3mS518= @@ -119,6 +121,8 @@ github.com/atomix/go-local v0.0.0-20200206221051-b1b85e86b0b7 h1:r7hBq6HSSUMXhsI github.com/atomix/go-local v0.0.0-20200206221051-b1b85e86b0b7/go.mod h1:FdvYwF2sobJJ8TAWHzNtn/5ppZX248hWh3jd/ZlZsMY= github.com/atomix/go-local v0.0.0-20200207202057-4a81cbdd3325 h1:TAnk36LvpuXDYp9qxEt3PJR4NlFUYgUPVTB+IYj2YKM= github.com/atomix/go-local v0.0.0-20200207202057-4a81cbdd3325/go.mod h1:n2xWQV3vAxEHcod1K82zOHlx/+iW9gbuu/zYzo5y060= +github.com/atomix/go-local v0.0.0-20200207214727-4a5d923aa934 h1:/H7YGguN+Qtww7HkMWGtj3/3ayXRioVeURwvDpJQ8lw= +github.com/atomix/go-local v0.0.0-20200207214727-4a5d923aa934/go.mod h1:qGUGef763ZEO4mcEJi7Bn2S7U/amLUWQp9RsAd+EtcQ= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= From 03cd21ca10b567d49b11425cfd2daa0215ccd299 Mon Sep 17 00:00:00 2001 From: adibrastegarnia Date: Fri, 7 Feb 2020 14:06:26 -0800 Subject: [PATCH 8/8] Add options test --- pkg/client/log/options_test.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 pkg/client/log/options_test.go diff --git a/pkg/client/log/options_test.go b/pkg/client/log/options_test.go new file mode 100644 index 0000000..6b8eb94 --- /dev/null +++ b/pkg/client/log/options_test.go @@ -0,0 +1,34 @@ +// Copyright 2019-present Open Networking Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "testing" + + api "github.com/atomix/api/proto/atomix/log" + "github.com/stretchr/testify/assert" +) + +func TestOptions(t *testing.T) { + getResponse := &api.GetResponse{} + assert.Nil(t, getResponse.Value) + WithDefault([]byte("foo")).afterGet(getResponse) + assert.Equal(t, "foo", string(getResponse.Value)) + + eventRequest := &api.EventRequest{} + assert.False(t, eventRequest.Replay) + WithReplay().beforeWatch(eventRequest) + assert.True(t, eventRequest.Replay) +}