diff --git a/go.mod b/go.mod index 5a254e0..66a2921 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,14 @@ module github.com/atomix/go-client require ( - github.com/atomix/api v0.0.0-20200206211058-f075fb5b6d1b - github.com/atomix/go-framework v0.0.0-20200207202010-51e205d726d2 - github.com/atomix/go-local v0.0.0-20200207202057-4a81cbdd3325 + github.com/atomix/api v0.0.0-20200207212403-a55e2fa6e823 + github.com/atomix/atomix-go-client v0.0.0-20200114212658-58c359bc47b1 + github.com/atomix/go-framework v0.0.0-20200207214715-0cee98c57cdd + github.com/atomix/go-local v0.0.0-20200207214727-4a5d923aa934 github.com/cenkalti/backoff v2.2.1+incompatible github.com/gogo/protobuf v1.3.1 github.com/golang/protobuf v1.3.2 github.com/google/uuid v1.1.1 github.com/stretchr/testify v1.4.0 - google.golang.org/grpc v1.23.1 + google.golang.org/grpc v1.27.0 ) diff --git a/go.sum b/go.sum index 622f24d..1d01690 100644 --- a/go.sum +++ b/go.sum @@ -20,6 +20,7 @@ github.com/atomix/api v0.0.0-20200206050905-3494e48c0084 h1:u+Hcm3wJHjIpUjh1vtRK github.com/atomix/api v0.0.0-20200206050905-3494e48c0084/go.mod h1:yD3KAX7yCeVhVjM2CD/5AXe9NW4yO6+siRQ5nfY+M1s= github.com/atomix/api v0.0.0-20200206211058-f075fb5b6d1b h1:DwaZyYMUCs4fJ1i+8eQQ9Jl3gbCk753zP4lT3wH23to= github.com/atomix/api v0.0.0-20200206211058-f075fb5b6d1b/go.mod h1:yD3KAX7yCeVhVjM2CD/5AXe9NW4yO6+siRQ5nfY+M1s= +github.com/atomix/api v0.0.0-20200207212403-a55e2fa6e823/go.mod h1:N+Jv8qV9klP+/RDAVxRbPdluB0cm1ZjKLDjd40/Ccv4= github.com/atomix/atomix v0.0.0-20191223092540-19bd4940b82e h1:tdI1nxT0hw++d6zWiTzuxqRiu5ALTcbNNtLVzIrAjxw= github.com/atomix/atomix-api v0.0.0-20190826211343-dd8f4db3bf77 h1:+PUuY9wDRp+VAg/JbEguzdOMJj6ruUw6Kw/y+QYHB6s= github.com/atomix/atomix-api v0.0.0-20190826211343-dd8f4db3bf77/go.mod h1:joWKUd0zIeYbAQ0vmYHGsnV03ZgRalhceHgnJ3EN0mI= @@ -41,6 +42,7 @@ github.com/atomix/atomix-go-client v0.0.0-20190827234201-188602d4e780/go.mod h1: github.com/atomix/atomix-go-client v0.0.0-20191002230120-837d618e27c5/go.mod h1:MoPkrAL33saKe2GbTi+NJgLzW7ejCwrrLDrYIwGYNcE= github.com/atomix/atomix-go-client v0.0.0-20191015003555-98ae5ccbe7bd/go.mod h1:jV1V8j+zo30xhC+hufKM/1XIiJKA3ODjwuMA/rBHu8g= github.com/atomix/atomix-go-client v0.0.0-20191018192841-3644d110cbec/go.mod h1:bM51i8VXdxw3+vN5spPJWllkfsfpakHYo2bOkNj1xC4= +github.com/atomix/atomix-go-client v0.0.0-20200114212658-58c359bc47b1 h1:XGjCL/okuI40whKVS2xtiAUhoMW9MHUCeyPTZMTOstA= github.com/atomix/atomix-go-client v0.0.0-20200114212658-58c359bc47b1/go.mod h1:STv1xDq3qpFABP1zE+9/R2kYuFamQWSGH18w6sfXNE4= github.com/atomix/atomix-go-local v0.0.0-20190827233944-938e35b06834 h1:4ddTmYmAaP3WrafV/X76Jq4qNir2jezMQwKgouqNdIU= github.com/atomix/atomix-go-local v0.0.0-20190827233944-938e35b06834/go.mod h1:qLBTOiVKoEqzYOjgxIgWFa+Hfa3SR+VexA6jGBcv0HA= @@ -106,6 +108,7 @@ github.com/atomix/go-framework v0.0.0-20200206211522-7019b65dabc0/go.mod h1:Q/0V github.com/atomix/go-framework v0.0.0-20200206221034-8c7583e55420 h1:+DAkriIF4oDJEl4HT0LBv35xLQ0kMnDoHk2ooa6KAyI= github.com/atomix/go-framework v0.0.0-20200206221034-8c7583e55420/go.mod h1:Q/0VngSkhuTvHc9W2/k3HCgMcSkI9UaxUgRPWjO5lJI= github.com/atomix/go-framework v0.0.0-20200207202010-51e205d726d2/go.mod h1:Q/0VngSkhuTvHc9W2/k3HCgMcSkI9UaxUgRPWjO5lJI= +github.com/atomix/go-framework v0.0.0-20200207214715-0cee98c57cdd/go.mod h1:/KVF8Ab99yMqnkELF2LIwCTR9FO+KI5MW8trOfjIYSA= github.com/atomix/go-local v0.0.0-20200124003802-357f6682b2f4 h1:acDXXOuqzbqfOYDTMvz4dhckHfmH0DMfXSQE+gLFGOA= github.com/atomix/go-local v0.0.0-20200124003802-357f6682b2f4/go.mod h1:MabPkX/j2bN399GVAYGigyvDaAslu7omZoujEfzdKDg= github.com/atomix/go-local v0.0.0-20200202105028-743d224c66eb h1:MCstZMd7aizXswJ69hf3fzYU3StG4Ge/vHEfp3mS518= @@ -118,13 +121,18 @@ github.com/atomix/go-local v0.0.0-20200206221051-b1b85e86b0b7 h1:r7hBq6HSSUMXhsI github.com/atomix/go-local v0.0.0-20200206221051-b1b85e86b0b7/go.mod h1:FdvYwF2sobJJ8TAWHzNtn/5ppZX248hWh3jd/ZlZsMY= github.com/atomix/go-local v0.0.0-20200207202057-4a81cbdd3325 h1:TAnk36LvpuXDYp9qxEt3PJR4NlFUYgUPVTB+IYj2YKM= github.com/atomix/go-local v0.0.0-20200207202057-4a81cbdd3325/go.mod h1:n2xWQV3vAxEHcod1K82zOHlx/+iW9gbuu/zYzo5y060= +github.com/atomix/go-local v0.0.0-20200207214727-4a5d923aa934 h1:/H7YGguN+Qtww7HkMWGtj3/3ayXRioVeURwvDpJQ8lw= +github.com/atomix/go-local v0.0.0-20200207214727-4a5d923aa934/go.mod h1:qGUGef763ZEO4mcEJi7Bn2S7U/amLUWQp9RsAd+EtcQ= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= @@ -174,6 +182,7 @@ github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -287,8 +296,11 @@ google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ij google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.22.1 h1:/7cs52RnTJmD43s3uxzlq2U7nqVTd/37viQwMrMNlOM= google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.1 h1:q4XQuHFC6I28BKZpo6IYyb3mNO+l7lSOxRuYTCiDfXk= google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/client/log/handler.go b/pkg/client/log/handler.go new file mode 100644 index 0000000..9bc9fd7 --- /dev/null +++ b/pkg/client/log/handler.go @@ -0,0 +1,69 @@ +// Copyright 2019-present Open Networking Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "context" + + "github.com/atomix/api/proto/atomix/headers" + api "github.com/atomix/api/proto/atomix/log" + "github.com/atomix/go-client/pkg/client/primitive" + "google.golang.org/grpc" +) + +type primitiveHandler struct{} + +func (m *primitiveHandler) Create(ctx context.Context, s *primitive.Instance) error { + return s.DoCreate(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + request := &api.CreateRequest{ + Header: header, + } + client := api.NewLogServiceClient(conn) + response, err := client.Create(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) +} + +func (m *primitiveHandler) Close(ctx context.Context, s *primitive.Instance) error { + return s.DoClose(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + request := &api.CloseRequest{ + Header: header, + } + client := api.NewLogServiceClient(conn) + response, err := client.Close(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) +} + +func (m *primitiveHandler) Delete(ctx context.Context, s *primitive.Instance) error { + return s.DoClose(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + request := &api.CloseRequest{ + Header: header, + Delete: true, + } + client := api.NewLogServiceClient(conn) + response, err := client.Close(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) +} diff --git a/pkg/client/log/log.go b/pkg/client/log/log.go new file mode 100644 index 0000000..e21c874 --- /dev/null +++ b/pkg/client/log/log.go @@ -0,0 +1,578 @@ +// Copyright 2019-present Open Networking Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/atomix/api/proto/atomix/headers" + api "github.com/atomix/api/proto/atomix/log" + "github.com/atomix/go-client/pkg/client/primitive" + "github.com/atomix/go-client/pkg/client/util" + "google.golang.org/grpc" +) + +// Type is the log type +const Type primitive.Type = "Log" + +// Index is the index of an entry +type Index uint64 + +// Client provides an API for creating IndexedMaps +type Client interface { + // GetLog gets the log instance of the given name + GetLog(ctx context.Context, name string) (Log, error) +} + +// Log is a distributed log +type Log interface { + primitive.Primitive + + // Appends appends the given value to the end of the log + Append(ctx context.Context, value []byte) (*Entry, error) + + // Get gets the value of the given index + Get(ctx context.Context, index int64, opts ...GetOption) (*Entry, error) + + // FirstIndex gets the first index in the log + FirstIndex(ctx context.Context) (Index, error) + + // LastIndex gets the last index in the log + LastIndex(ctx context.Context) (Index, error) + + // PrevIndex gets the index before the given index + PrevIndex(ctx context.Context, index Index) (Index, error) + + // NextIndex gets the index after the given index + NextIndex(ctx context.Context, index Index) (Index, error) + + // FirstEntry gets the first entry in the log + FirstEntry(ctx context.Context) (*Entry, error) + + // LastEntry gets the last entry in the log + LastEntry(ctx context.Context) (*Entry, error) + + // PrevEntry gets the entry before the given index + PrevEntry(ctx context.Context, index Index) (*Entry, error) + + // NextEntry gets the entry after the given index + NextEntry(ctx context.Context, index Index) (*Entry, error) + + // Remove removes an entry from the log + Remove(ctx context.Context, index int64, opts ...RemoveOption) (*Entry, error) + + // Size returns the number of entries in the log + Size(ctx context.Context) (int, error) + + // Clear removes all entries from the log + Clear(ctx context.Context) error + + // Watch watches the log for changes + // This is a non-blocking method. If the method returns without error, log events will be pushed onto + // the given channel in the order in which they occur. + Watch(ctx context.Context, ch chan<- *Event, opts ...WatchOption) error +} + +// Entry is an indexed key/value pair +type Entry struct { + // Index is the unique, monotonically increasing, globally unique index of the entry. The index is static + // for the lifetime of a key. + Index Index + + // Value is the value of the pair + Value []byte + + // Timestamp + Timestamp time.Time +} + +func (kv Entry) String() string { + return fmt.Sprintf("index: %d\nvalue: %s\n", kv.Index, string(kv.Value)) +} + +// EventType is the type of a log event +type EventType string + +const ( + // EventNone indicates the event is not a change event + EventNone EventType = "" + + // EventAppended indicates an entry was appended to the log + EventAppended EventType = "appended" + + // EventRemoved indicates an entry was removed from the log + EventRemoved EventType = "removed" +) + +// Event is a log change event +type Event struct { + // Type indicates the change event type + Type EventType + + // Entry is the event entry + Entry *Entry +} + +// New creates a new log primitive +func New(ctx context.Context, name primitive.Name, partitions []*primitive.Session) (Log, error) { + i, err := util.GetPartitionIndex(name.Name, len(partitions)) + if err != nil { + return nil, err + } + return newLog(ctx, name, partitions[i]) +} + +// newLog creates a new Log for the given partition +func newLog(ctx context.Context, name primitive.Name, partition *primitive.Session) (*log, error) { + instance, err := primitive.NewInstance(ctx, name, partition, &primitiveHandler{}) + if err != nil { + return nil, err + } + return &log{ + name: name, + instance: instance, + }, nil +} + +// log is the default single-partition implementation of Log +type log struct { + name primitive.Name + instance *primitive.Instance +} + +func (l *log) Name() primitive.Name { + return l.name +} + +func (l *log) Append(ctx context.Context, value []byte) (*Entry, error) { + r, err := l.instance.DoCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.AppendRequest{ + Header: header, + Value: value, + } + response, err := client.Append(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return nil, err + } + + response := r.(*api.AppendResponse) + if response.Status == api.ResponseStatus_OK { + return &Entry{ + Index: Index(response.Index), + Value: value, + }, nil + } else if response.Status == api.ResponseStatus_PRECONDITION_FAILED { + return nil, errors.New("write condition failed") + } else if response.Status == api.ResponseStatus_WRITE_LOCK { + return nil, errors.New("write lock failed") + } else { + return &Entry{ + Index: Index(response.Index), + Value: value, + Timestamp: response.Timestamp, + }, nil + } +} + +func (l *log) Get(ctx context.Context, index int64, opts ...GetOption) (*Entry, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.GetRequest{ + Header: header, + Index: index, + } + for i := range opts { + opts[i].beforeGet(request) + } + response, err := client.Get(ctx, request) + if err != nil { + return nil, nil, err + } + for i := range opts { + opts[i].afterGet(response) + } + return response.Header, response, nil + }) + if err != nil { + return nil, err + } + + response := r.(*api.GetResponse) + + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + Timestamp: response.Timestamp, + }, nil + +} + +func (l *log) GetIndex(ctx context.Context, index Index, opts ...GetOption) (*Entry, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.GetRequest{ + Header: header, + Index: int64(index), + } + for i := range opts { + opts[i].beforeGet(request) + } + response, err := client.Get(ctx, request) + if err != nil { + return nil, nil, err + } + for i := range opts { + opts[i].afterGet(response) + } + return response.Header, response, nil + }) + if err != nil { + return nil, err + } + + response := r.(*api.GetResponse) + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + Timestamp: response.Timestamp, + }, nil + +} + +func (l *log) FirstIndex(ctx context.Context) (Index, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.FirstEntryRequest{ + Header: header, + } + response, err := client.FirstEntry(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return 0, err + } + + response := r.(*api.FirstEntryResponse) + return Index(response.Index), nil +} + +func (l *log) LastIndex(ctx context.Context) (Index, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.LastEntryRequest{ + Header: header, + } + response, err := client.LastEntry(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return 0, err + } + + response := r.(*api.LastEntryResponse) + return Index(response.Index), nil + +} + +func (l *log) PrevIndex(ctx context.Context, index Index) (Index, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.PrevEntryRequest{ + Header: header, + Index: int64(index), + } + response, err := client.PrevEntry(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return 0, err + } + + response := r.(*api.PrevEntryResponse) + return Index(response.Index), nil + +} + +func (l *log) NextIndex(ctx context.Context, index Index) (Index, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.NextEntryRequest{ + Header: header, + Index: int64(index), + } + response, err := client.NextEntry(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return 0, err + } + + response := r.(*api.NextEntryResponse) + return Index(response.Index), nil +} + +func (l *log) FirstEntry(ctx context.Context) (*Entry, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.FirstEntryRequest{ + Header: header, + } + response, err := client.FirstEntry(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return nil, err + } + + response := r.(*api.FirstEntryResponse) + + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + Timestamp: response.Timestamp, + }, nil + +} + +func (l *log) LastEntry(ctx context.Context) (*Entry, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.LastEntryRequest{ + Header: header, + } + response, err := client.LastEntry(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return nil, err + } + + response := r.(*api.LastEntryResponse) + + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + Timestamp: response.Timestamp, + }, nil + +} + +func (l *log) PrevEntry(ctx context.Context, index Index) (*Entry, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.PrevEntryRequest{ + Header: header, + Index: int64(index), + } + response, err := client.PrevEntry(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return nil, err + } + + response := r.(*api.PrevEntryResponse) + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + }, nil + +} + +func (l *log) NextEntry(ctx context.Context, index Index) (*Entry, error) { + r, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.NextEntryRequest{ + Header: header, + Index: int64(index), + } + response, err := client.NextEntry(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return nil, err + } + + response := r.(*api.NextEntryResponse) + return &Entry{ + Index: Index(response.Index), + Value: response.Value, + Timestamp: response.Timestamp, + }, nil +} + +func (l *log) Remove(ctx context.Context, index int64, opts ...RemoveOption) (*Entry, error) { + r, err := l.instance.DoCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.RemoveRequest{ + Header: header, + Index: index, + } + for i := range opts { + opts[i].beforeRemove(request) + } + response, err := client.Remove(ctx, request) + if err != nil { + return nil, nil, err + } + for i := range opts { + opts[i].afterRemove(response) + } + return response.Header, response, nil + }) + if err != nil { + return nil, err + } + + response := r.(*api.RemoveResponse) + if response.Status == api.ResponseStatus_OK { + return &Entry{ + Index: Index(response.Index), + Value: response.PreviousValue, + }, nil + } else if response.Status == api.ResponseStatus_PRECONDITION_FAILED { + return nil, errors.New("write condition failed") + } else if response.Status == api.ResponseStatus_WRITE_LOCK { + return nil, errors.New("write lock failed") + } else { + return nil, nil + } +} + +func (l *log) Size(ctx context.Context) (int, error) { + response, err := l.instance.DoQuery(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.SizeRequest{ + Header: header, + } + response, err := client.Size(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + if err != nil { + return 0, err + } + return int(response.(*api.SizeResponse).Size_), nil +} + +func (l *log) Clear(ctx context.Context) error { + _, err := l.instance.DoCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (*headers.ResponseHeader, interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.ClearRequest{ + Header: header, + } + response, err := client.Clear(ctx, request) + if err != nil { + return nil, nil, err + } + return response.Header, response, nil + }) + return err +} + +func (l *log) Watch(ctx context.Context, ch chan<- *Event, opts ...WatchOption) error { + stream, err := l.instance.DoCommandStream(ctx, func(ctx context.Context, conn *grpc.ClientConn, header *headers.RequestHeader) (interface{}, error) { + client := api.NewLogServiceClient(conn) + request := &api.EventRequest{ + Header: header, + } + for _, opt := range opts { + opt.beforeWatch(request) + } + return client.Events(ctx, request) + }, func(responses interface{}) (*headers.ResponseHeader, interface{}, error) { + response, err := responses.(api.LogService_EventsClient).Recv() + if err != nil { + return nil, nil, err + } + for _, opt := range opts { + opt.afterWatch(response) + } + return response.Header, response, nil + }) + if err != nil { + return err + } + + go func() { + defer close(ch) + for event := range stream { + response := event.(*api.EventResponse) + + // If this is a normal event (not a handshake response), write the event to the watch channel + var t EventType + switch response.Type { + case api.EventResponse_NONE: + t = EventNone + case api.EventResponse_APPENDED: + t = EventAppended + case api.EventResponse_REMOVED: + t = EventRemoved + } + ch <- &Event{ + Type: t, + Entry: &Entry{ + Index: Index(response.Index), + Value: response.Value, + Timestamp: response.Timestamp, + }, + } + } + }() + return nil +} + +func (l *log) Close(ctx context.Context) error { + return l.instance.Close(ctx) +} + +func (l *log) Delete(ctx context.Context) error { + return l.instance.Delete(ctx) +} diff --git a/pkg/client/log/log_test.go b/pkg/client/log/log_test.go new file mode 100644 index 0000000..9221ddc --- /dev/null +++ b/pkg/client/log/log_test.go @@ -0,0 +1,206 @@ +// Copyright 2019-present Open Networking Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "context" + "testing" + + "github.com/atomix/go-client/pkg/client/primitive" + "github.com/atomix/go-client/pkg/client/test" + "github.com/stretchr/testify/assert" +) + +func TestLogOperations(t *testing.T) { + partitions, closers := test.StartTestPartitions(3) + defer test.StopTestPartitions(closers) + + sessions, err := test.OpenSessions(partitions) + assert.NoError(t, err) + defer test.CloseSessions(sessions) + + // Creates a new log primitive + name := primitive.NewName("default", "test", "default", "test") + _log, err := New(context.TODO(), name, sessions) + assert.NoError(t, err) + + // Gets the log entry at index 0 + kv, err := _log.Get(context.Background(), 0) + assert.NoError(t, err) + assert.NotNil(t, kv) + + // Checks the size of log primitive + size, err := _log.Size(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, size) + + // Appends an entry to the log + kv, err = _log.Append(context.Background(), []byte("bar")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "bar", string(kv.Value)) + + // Appends an entry to the log + kv, err = _log.Append(context.Background(), []byte("baz")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "baz", string(kv.Value)) + + // Gets the first entry in the log + kv, err = _log.FirstEntry(context.Background()) + assert.NoError(t, err) + assert.Equal(t, "bar", string(kv.Value)) + + // Gets the first index + firstIndex, err := _log.FirstIndex(context.Background()) + assert.NoError(t, err) + assert.Equal(t, uint64(0x1), uint64(firstIndex)) + + // Gets the last entry in the log + kv, err = _log.LastEntry(context.Background()) + assert.NoError(t, err) + assert.Equal(t, "baz", string(kv.Value)) + + // Gets the last index + lastIndex, err := _log.LastIndex(context.Background()) + assert.NoError(t, err) + assert.Equal(t, uint64(0x2), uint64(lastIndex)) + + // Gets the next entry of the given index in the log + kv, err = _log.NextEntry(context.Background(), 1) + assert.NoError(t, err) + assert.Equal(t, "baz", string(kv.Value)) + + // Gets the previous entry of the given index in the log + kv, err = _log.PrevEntry(context.Background(), 2) + assert.NoError(t, err) + assert.Equal(t, "bar", string(kv.Value)) + + // Gets the log entry at index 1 + kv, err = _log.Get(context.Background(), 1) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "bar", string(kv.Value)) + + // Gets the size of the log primitive + size, err = _log.Size(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 2, size) + + // Removes the entry at index 1 from the log + kv, err = _log.Remove(context.Background(), 1) + assert.NoError(t, err) + assert.Equal(t, "bar", string(kv.Value)) + + // Removes the entry at index 2 from the log + kv, err = _log.Remove(context.Background(), 2) + assert.NoError(t, err) + assert.Equal(t, "baz", string(kv.Value)) + + // Checks the size of the log primitive + size, err = _log.Size(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, size) + + err = _log.Clear(context.Background()) + assert.NoError(t, err) + +} + +func TestLogStreams(t *testing.T) { + partitions, closers := test.StartTestPartitions(3) + defer test.StopTestPartitions(closers) + + sessions, err := test.OpenSessions(partitions) + assert.NoError(t, err) + defer test.CloseSessions(sessions) + + // Creates a new log primitive + name := primitive.NewName("default", "test", "default", "test") + _log, err := New(context.TODO(), name, sessions) + assert.NoError(t, err) + + kv, err := _log.Append(context.Background(), []byte("item1")) + assert.NoError(t, err) + assert.NotNil(t, kv) + + c := make(chan *Event) + latch := make(chan struct{}) + go func() { + e := <-c + assert.Equal(t, "item2", string(e.Entry.Value)) + e = <-c + assert.Equal(t, "item3", string(e.Entry.Value)) + e = <-c + assert.Equal(t, "item4", string(e.Entry.Value)) + e = <-c + assert.Equal(t, "item5", string(e.Entry.Value)) + latch <- struct{}{} + }() + + err = _log.Watch(context.Background(), c) + assert.NoError(t, err) + + kv, err = _log.Append(context.Background(), []byte("item2")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "item2", string(kv.Value)) + + kv, err = _log.Append(context.Background(), []byte("item3")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "item3", string(kv.Value)) + + kv, err = _log.Append(context.Background(), []byte("item4")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "item4", string(kv.Value)) + + kv, err = _log.Append(context.Background(), []byte("item5")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "item5", string(kv.Value)) + + <-latch + err = _log.Close(context.Background()) + assert.NoError(t, err) + + log1, err := New(context.TODO(), name, sessions) + assert.NoError(t, err) + + log2, err := New(context.TODO(), name, sessions) + assert.NoError(t, err) + + size, err := log1.Size(context.TODO()) + assert.NoError(t, err) + assert.Equal(t, 5, size) + + err = log1.Close(context.Background()) + assert.NoError(t, err) + + err = log1.Delete(context.Background()) + assert.NoError(t, err) + + err = log2.Delete(context.Background()) + assert.NoError(t, err) + + _log, err = New(context.TODO(), name, sessions) + assert.NoError(t, err) + + size, err = _log.Size(context.TODO()) + assert.NoError(t, err) + assert.Equal(t, 0, size) + +} diff --git a/pkg/client/log/options.go b/pkg/client/log/options.go new file mode 100644 index 0000000..ccfa65c --- /dev/null +++ b/pkg/client/log/options.go @@ -0,0 +1,114 @@ +// Copyright 2019-present Open Networking Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + api "github.com/atomix/api/proto/atomix/log" +) + +// SetOption is an option for the Append method +type SetOption interface { + beforePut(request *api.AppendRequest) + afterPut(response *api.AppendResponse) +} + +// RemoveOption is an option for the Remove method +type RemoveOption interface { + beforeRemove(request *api.RemoveRequest) + afterRemove(response *api.RemoveResponse) +} + +// IfNotSet sets the value if the entry is not yet set +func IfNotSet() SetOption { + return &NotSetOption{} +} + +// NotSetOption is a SetOption that sets the value only if it's not already set +type NotSetOption struct { +} + +func (o NotSetOption) beforePut(request *api.AppendRequest) { +} + +func (o NotSetOption) afterPut(response *api.AppendResponse) { + +} + +// GetOption is an option for the Get method +type GetOption interface { + beforeGet(request *api.GetRequest) + afterGet(response *api.GetResponse) +} + +// WithDefault sets the default value to return if the key is not present +func WithDefault(def []byte) GetOption { + return defaultOption{def: def} +} + +type defaultOption struct { + def []byte +} + +func (o defaultOption) beforeGet(request *api.GetRequest) { +} + +func (o defaultOption) afterGet(response *api.GetResponse) { + response.Value = o.def + +} + +// WatchOption is an option for the Watch method +type WatchOption interface { + beforeWatch(request *api.EventRequest) + afterWatch(response *api.EventResponse) +} + +// WithReplay returns a watch option that enables replay of watch events +func WithReplay() WatchOption { + return replayOption{} +} + +type replayOption struct{} + +func (o replayOption) beforeWatch(request *api.EventRequest) { + request.Replay = true +} + +func (o replayOption) afterWatch(response *api.EventResponse) { + +} + +type filterOption struct { + filter Filter +} + +func (o filterOption) beforeWatch(request *api.EventRequest) { + if o.filter.Index > 0 { + request.Index = int64(o.filter.Index) + } +} + +func (o filterOption) afterWatch(response *api.EventResponse) { +} + +// WithFilter returns a watch option that filters the watch events +func WithFilter(filter Filter) WatchOption { + return filterOption{filter: filter} +} + +// Filter is a watch filter configuration +type Filter struct { + Index Index +} diff --git a/pkg/client/log/options_test.go b/pkg/client/log/options_test.go new file mode 100644 index 0000000..6b8eb94 --- /dev/null +++ b/pkg/client/log/options_test.go @@ -0,0 +1,34 @@ +// Copyright 2019-present Open Networking Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "testing" + + api "github.com/atomix/api/proto/atomix/log" + "github.com/stretchr/testify/assert" +) + +func TestOptions(t *testing.T) { + getResponse := &api.GetResponse{} + assert.Nil(t, getResponse.Value) + WithDefault([]byte("foo")).afterGet(getResponse) + assert.Equal(t, "foo", string(getResponse.Value)) + + eventRequest := &api.EventRequest{} + assert.False(t, eventRequest.Replay) + WithReplay().beforeWatch(eventRequest) + assert.True(t, eventRequest.Replay) +}