Skip to content

Commit

Permalink
Merge pull request #1 from awakari/rework
Browse files Browse the repository at this point in the history
rework
  • Loading branch information
akurilov committed Jun 4, 2023
2 parents 4e03678 + a7cf2a2 commit 18901b0
Show file tree
Hide file tree
Showing 32 changed files with 1,170 additions and 1,043 deletions.
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ proto:
go install github.com/golang/protobuf/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2.0
PATH=${PATH}:~/go/bin protoc --go_out=plugins=grpc:. --go_opt=paths=source_relative \
api/grpc/messages/*.proto \
api/grpc/subscriptions/*.proto \
api/grpc/writer/*.proto \
api/grpc/subject/*.proto \
api/grpc/limits/*.proto \
api/grpc/permits/*.proto
api/grpc/permits/*.proto \
api/grpc/reader/*.proto \
api/grpc/subject/*.proto \
api/grpc/subscriptions/*.proto \
api/grpc/writer/*.proto

vet: proto
go vet
Expand Down
19 changes: 10 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func main() {
ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
defer cancel()
var ws model.WriteStream[*pb.CloudEvent]
ws, err = client.WriteMessages(ctx, userId)
ws, err = client.OpenMessagesWriter(ctx, userId)
if err == nil {
panic(err)
}
Expand Down Expand Up @@ -390,29 +390,30 @@ import (
"github.com/awakari/client-sdk-go/api"
"github.com/awakari/client-sdk-go/model/usage"
"time"
...
...
)

func main() {
...
var client api.Client // initialize client
var userId string // set this to "sub" field value from an authentication token, for example
batchSize := uint32(16)
...
ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
defer cancel()
var rs model.ReadStream[*pb.CloudEvent]
rs, err = client.ReadMessages(ctx, userId, subId)
var r model.ReadStream[*pb.CloudEvent]
r, err = client.OpenMessagesReader(ctx, userId, subId, batchSize)
if err != nil {
panic(err)
panic(err)
}
defer rs.Close()
var msg *pb.CloudEvent
defer r.Close()
var msgs []*pb.CloudEvent
for {
msg, err = rs.Read()
msgs, err = r.Read()
if err != nil {
break
}
fmt.Printf("subscription %s - received the next message: %+v\n", subId, msg)
fmt.Printf("subscription %s - received the next messages batch: %+v\n", subId, msgs)
}
if err != nil {
panic(err)
Expand Down
46 changes: 23 additions & 23 deletions api/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"crypto/tls"
"crypto/x509"
"github.com/awakari/client-sdk-go/api/grpc/limits"
"github.com/awakari/client-sdk-go/api/grpc/messages"
"github.com/awakari/client-sdk-go/api/grpc/permits"
"github.com/awakari/client-sdk-go/api/grpc/reader"
"github.com/awakari/client-sdk-go/api/grpc/subscriptions"
"github.com/awakari/client-sdk-go/api/grpc/writer"
"google.golang.org/grpc"
Expand All @@ -28,17 +28,17 @@ type ClientBuilder interface {
// Enables additionally the API methods to read the usage limits and permits.
ApiUri(apiUri string) ClientBuilder

// ReadUri sets the Awakari messages reading API URI. Overrides any value set by ApiUri.
// ReaderUri sets the Awakari messages reading API URI. Overrides any value set by ApiUri.
// Useful when the specific message reading API is needed by the client.
ReadUri(readUri string) ClientBuilder
ReaderUri(readerUri string) ClientBuilder

// SubscriptionsUri sets the Awakari subscriptions API URI. Overrides any value set by ApiUri.
// Useful when the specific subscriptions management API is needed by the client.
SubscriptionsUri(subsUri string) ClientBuilder

// WriteUri sets the Awakari messages publishing API URI. Overrides any value set by ApiUri.
// WriterUri sets the Awakari messages publishing API URI. Overrides any value set by ApiUri.
// Useful when the specific message publishing API is needed by the client.
WriteUri(writeUri string) ClientBuilder
WriterUri(writerUri string) ClientBuilder

// Build instantiates the Client instance and returns it.
Build() (c Client, err error)
Expand All @@ -49,9 +49,9 @@ type builder struct {
clientCrt []byte
clientKey []byte
apiUri string
readUri string
readerUri string
subsUri string
writeUri string
writerUri string
}

func NewClientBuilder() ClientBuilder {
Expand All @@ -74,8 +74,8 @@ func (b *builder) ApiUri(apiUri string) ClientBuilder {
return b
}

func (b *builder) ReadUri(readUri string) ClientBuilder {
b.readUri = readUri
func (b *builder) ReaderUri(readerUri string) ClientBuilder {
b.readerUri = readerUri
return b
}

Expand All @@ -84,8 +84,8 @@ func (b *builder) SubscriptionsUri(subsUri string) ClientBuilder {
return b
}

func (b *builder) WriteUri(writeUri string) ClientBuilder {
b.writeUri = writeUri
func (b *builder) WriterUri(writerUri string) ClientBuilder {
b.writerUri = writerUri
return b
}

Expand Down Expand Up @@ -124,16 +124,16 @@ func (b *builder) Build() (c Client, err error) {
svcLimits = limits.NewService(clientLimits)
}
//
var connMsgs *grpc.ClientConn
if b.readUri != "" {
connMsgs, err = grpc.Dial(b.readUri, optsDial...)
var connReader *grpc.ClientConn
if b.readerUri != "" {
connReader, err = grpc.Dial(b.readerUri, optsDial...)
} else if b.apiUri != "" {
connMsgs, err = grpc.Dial(b.apiUri, optsDial...)
connReader, err = grpc.Dial(b.apiUri, optsDial...)
}
var svcMsgs messages.Service
if connMsgs != nil {
clientMsgs := messages.NewServiceClient(connMsgs)
svcMsgs = messages.NewService(clientMsgs)
var svcReader reader.Service
if connReader != nil {
clientReader := reader.NewServiceClient(connReader)
svcReader = reader.NewService(clientReader)
}
//
var connPermits *grpc.ClientConn
Expand All @@ -157,8 +157,8 @@ func (b *builder) Build() (c Client, err error) {
}
//
var connWriter *grpc.ClientConn
if b.writeUri != "" {
connWriter, err = grpc.Dial(b.writeUri, optsDial...)
if b.writerUri != "" {
connWriter, err = grpc.Dial(b.writerUri, optsDial...)
} else if b.apiUri != "" {
connWriter, err = grpc.Dial(b.apiUri, optsDial...)
}
Expand All @@ -170,12 +170,12 @@ func (b *builder) Build() (c Client, err error) {
//
c = client{
connLimits: connLimits,
connMsgs: connMsgs,
connReader: connReader,
connPermits: connPermits,
connSubs: connSubs,
connWriter: connWriter,
svcLimits: svcLimits,
svcMsgs: svcMsgs,
svcReader: svcReader,
svcPermits: svcPermits,
svcSubs: svcSubs,
svcWriter: svcWriter,
Expand Down
34 changes: 16 additions & 18 deletions api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"errors"
"fmt"
"github.com/awakari/client-sdk-go/api/grpc/limits"
"github.com/awakari/client-sdk-go/api/grpc/messages"
"github.com/awakari/client-sdk-go/api/grpc/permits"
"github.com/awakari/client-sdk-go/api/grpc/reader"
"github.com/awakari/client-sdk-go/api/grpc/subscriptions"
"github.com/awakari/client-sdk-go/api/grpc/writer"
"github.com/awakari/client-sdk-go/model"
Expand All @@ -30,11 +30,11 @@ type Client interface {

// Messages

// WriteMessages opens the stream for publishing the messages.
WriteMessages(ctx context.Context, userId string) (ws model.WriteStream[*pb.CloudEvent], err error)
// OpenMessagesWriter opens the batch message writer. A client should close it once done.
OpenMessagesWriter(ctx context.Context, userId string) (w model.Writer[*pb.CloudEvent], err error)

// ReadMessages opens the stream for receiving the messages matching the requested subscription.
ReadMessages(ctx context.Context, userId, subId string) (rs model.ReadStream[*pb.CloudEvent], err error)
// OpenMessagesReader opens batch message reader. A client should close it once done.
OpenMessagesReader(ctx context.Context, userId, subId string, batchSize uint32) (r model.Reader[[]*pb.CloudEvent], err error)

// Subscriptions

Expand All @@ -57,27 +57,25 @@ type Client interface {

type client struct {
connLimits *grpc.ClientConn
connMsgs *grpc.ClientConn
connReader *grpc.ClientConn
connPermits *grpc.ClientConn
connSubs *grpc.ClientConn
connWriter *grpc.ClientConn
svcLimits limits.Service
svcMsgs messages.Service
svcReader reader.Service
svcPermits permits.Service
svcSubs subscriptions.Service
svcWriter writer.Service
}

var ErrApiDisabled = errors.New("the API call is not enabled for this client")

var _ Client = (*client)(nil)

func (c client) Close() (err error) {
if c.connLimits != nil {
err = errors.Join(err, c.connLimits.Close())
}
if c.connMsgs != nil {
err = errors.Join(err, c.connMsgs.Close())
if c.connReader != nil {
err = errors.Join(err, c.connReader.Close())
}
if c.connPermits != nil {
err = errors.Join(err, c.connPermits.Close())
Expand Down Expand Up @@ -109,20 +107,20 @@ func (c client) ReadUsageLimit(ctx context.Context, userId string, subj usage.Su
return
}

func (c client) WriteMessages(ctx context.Context, userId string) (ws model.WriteStream[*pb.CloudEvent], err error) {
func (c client) OpenMessagesWriter(ctx context.Context, userId string) (ws model.Writer[*pb.CloudEvent], err error) {
if c.svcWriter == nil {
err = fmt.Errorf("%w: WriteMessages(...)", ErrApiDisabled)
err = fmt.Errorf("%w: OpenMessagesWriter(...)", ErrApiDisabled)
} else {
ws, err = c.svcWriter.OpenStream(ctx, userId)
ws, err = c.svcWriter.OpenWriter(ctx, userId)
}
return
}

func (c client) ReadMessages(ctx context.Context, userId, subId string) (rs model.ReadStream[*pb.CloudEvent], err error) {
if c.svcMsgs == nil {
err = fmt.Errorf("%w: ReadMessages(...)", ErrApiDisabled)
func (c client) OpenMessagesReader(ctx context.Context, userId, subId string, batchSize uint32) (rs model.Reader[[]*pb.CloudEvent], err error) {
if c.svcReader == nil {
err = fmt.Errorf("%w: OpenMessagesReader(...)", ErrApiDisabled)
} else {
rs, err = c.svcMsgs.Read(ctx, userId, subId)
rs, err = c.svcReader.OpenReader(ctx, userId, subId, batchSize)
}
return
}
Expand Down
Loading

0 comments on commit 18901b0

Please sign in to comment.