Skip to content

Commit

Permalink
Updates (#27)
Browse files Browse the repository at this point in the history
- Modify JetStream defaults
- Output logs to multiple streams
- Update nats dependency and related issues
- Update easeq/go-consul-registry
  • Loading branch information
easeq committed Jan 4, 2023
1 parent 0370aff commit 94d0c95
Show file tree
Hide file tree
Showing 9 changed files with 1,276 additions and 161 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
*.vim
vendor/

29 changes: 19 additions & 10 deletions broker/jetstream/jetstream.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/easeq/go-service/broker"
"github.com/easeq/go-service/component"
Expand Down Expand Up @@ -68,7 +69,7 @@ func NewJetStream(opts ...broker.Option) *JetStream {
func AddStream(name string, subjects ...string) broker.Option {
return func(b broker.Broker) {
if len(subjects) == 0 {
subjectAll := fmt.Sprintf("%s.*", name)
subjectAll := fmt.Sprintf("%s.>", name)
subjects = []string{subjectAll}
}

Expand Down Expand Up @@ -97,20 +98,23 @@ func (j *JetStream) streamExists(name string) *nats.StreamInfo {
func (j *JetStream) createStream(name string, subjects ...string) error {
if streamInfo := j.streamExists(name); streamInfo != nil {
newStreamCfg := streamInfo.Config
newStreamCfg.Retention = nats.InterestPolicy
newStreamCfg.Subjects = utils.Unique(
append(newStreamCfg.Subjects, subjects...),
)

_, err := j.jsCtx.UpdateStream(&newStreamCfg)
return err
}

_, err := j.jsCtx.AddStream(&nats.StreamConfig{
Name: name,
Subjects: subjects,
// Duplicates: 0 * time.Second,
Discard: nats.DiscardOld,
Retention: nats.WorkQueuePolicy,
Replicas: 1,
Name: name,
Subjects: subjects,
Retention: nats.InterestPolicy,
Duplicates: 0 * time.Second,
Discard: nats.DiscardOld,
// Retention: nats.InterestPolicy,
// Replicas: 1,
})

return err
Expand All @@ -127,13 +131,15 @@ func (j *JetStream) Publish(ctx context.Context, topic string, message interface
return j.w.Publish(ctx, topic, payload, func(t *broker.TraceMsgCarrier) error {
data, err := t.Bytes()
if err != nil {
return fmt.Errorf("trace message carrier error: %v", err)
j.logger.Errorw("publish[data error]", "topic", topic, "err", err)
return err
}

// Send the message with span over NATS
_, err = j.jsCtx.Publish(topic, data, publisher.opts...)
if err != nil {
return fmt.Errorf("publish error: %v", err)
j.logger.Errorw("publish error", "topic", topic, "err", err)
return err
}

return nil
Expand All @@ -157,6 +163,7 @@ func (j *JetStream) Subscribe(ctx context.Context, topic string, handler broker.
},
}); err != nil {
m.Nak()
j.logger.Errorw("subscribe handle error", "topic", topic, "err", err)
return fmt.Errorf("subscribe handle error: %v", err)
}

Expand All @@ -166,8 +173,10 @@ func (j *JetStream) Subscribe(ctx context.Context, topic string, handler broker.
}

subscription, err := subscriber.Subscribe(natsHandler)
j.logger.Infow("subscription", "s", subscription)
if err != nil {
return fmt.Errorf("subscription error[topic: %s]: %v", topic, err)
j.logger.Errorw("subscribe error", "topic", topic, "err", err)
return err
}

j.Subscriptions[topic] = subscription
Expand Down
2 changes: 2 additions & 0 deletions broker/jetstream/subscriber.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ func NewSubscriber(j *JetStream, topic string, opts ...broker.SubscribeOption) *
s := &subscriber{
j: j,
topic: topic,
sType: DEFAULT,
opts: []nats.SubOpt{
nats.MaxDeliver(3),
nats.ManualAck(),
nats.AckExplicit(),
nats.DeliverNew(),
},
}
Expand Down
40 changes: 25 additions & 15 deletions client/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package grpc
import (
"context"
"errors"
"fmt"
"sync"

"github.com/easeq/go-service/client"
Expand Down Expand Up @@ -34,6 +33,8 @@ var (
ErrInvalidGrpcClient = errors.New("invalid GrpcClient")
// ErrInvalidStreamDescription returned when the variable passed is not of grpc.StreamDesc type
ErrInvalidStreamDescription = errors.New("invalid stream description")
// ErrInvalidFactoryConn returned when factory conn is invalid
ErrInvalidFactoryConn = errors.New("invalid factory client connection")
)

// ClientOption to pass as arg while creating new service
Expand All @@ -60,13 +61,15 @@ func NewGrpc(opts ...ClientOption) *Grpc {
opt(c)
}

c.i = NewInitializer(c)

c.pool = pool.NewPool(
pool.WithFactory(c.factory),
pool.WithSize(10),
pool.WithCloseFunc(c.closeFunc),
pool.WithLogger(c.logger),
)

c.i = NewInitializer(c)
return c
}

Expand Down Expand Up @@ -94,9 +97,25 @@ func WithCloseFunc(closeFunc pool.CloseFunc) ClientOption {
// Dial creates/gets a connection from the pool using the address from the service registry
func (c *Grpc) Dial(name string, opts ...client.DialOption) (pool.Connection, error) {
address := c.Registry.ConnectionString(name, defaultScheme)
c.logger.Debugf("dial: %s", address)
return c.pool.Get(address)
}

// Get client conn
func (c *Grpc) GetConnFromPool(serviceName string) (pool.Connection, *grpc.ClientConn, error) {
pcc, err := c.Dial(serviceName)
if err != nil {
return nil, nil, err
}

cc, ok := pcc.Conn().(*grpc.ClientConn)
if !ok {
return nil, nil, ErrInvalidFactoryConn
}

return pcc, cc, nil
}

// Call gRPC method
func (c *Grpc) Call(
ctx context.Context,
Expand All @@ -106,19 +125,14 @@ func (c *Grpc) Call(
res interface{},
opts ...client.CallOption,
) error {
pcc, err := c.Dial(sc.GetServiceName())
pcc, cc, err := c.GetConnFromPool(sc.GetServiceName())
if err != nil {
c.logger.Errorf("conn error: %s", err)
return err
}

// Put the connection back or close connection
defer pcc.Close()

cc, ok := pcc.Conn().(*grpc.ClientConn)
if !ok {
return fmt.Errorf("invalid factory client connection")
}

callOpts := make([]grpc.CallOption, len(opts))
for i, opt := range opts {
callOpts[i] = opt.(grpc.CallOption)
Expand All @@ -136,16 +150,12 @@ func (c *Grpc) Stream(
req interface{},
opts ...client.CallOption,
) (client.StreamClient, error) {
pcc, err := c.Dial(sc.GetServiceName())
pcc, cc, err := c.GetConnFromPool(sc.GetServiceName())
if err != nil {
c.logger.Errorf("conn error: %s", err)
return nil, err
}

cc, ok := pcc.Conn().(*grpc.ClientConn)
if !ok {
return nil, fmt.Errorf("invalid connection")
}

callOpts := make([]grpc.CallOption, len(opts))
for i, opt := range opts {
callOpts[i] = opt.(grpc.CallOption)
Expand Down
33 changes: 15 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,24 @@ go 1.16

require (
github.com/Netflix/go-env v0.0.0-20210215222557-e437a7e7f9fb
github.com/easeq/go-consul-registry/v2 v2.0.2-0.20210513113717-eab4959d405c
github.com/easeq/go-consul-registry/v2 v2.1.0
github.com/easeq/go-redis-access-control v0.0.6
github.com/go-redis/redis/v8 v8.11.4
github.com/gofiber/fiber/v2 v2.32.0
github.com/golang-migrate/migrate/v4 v4.14.1
github.com/gofiber/fiber/v2 v2.40.0
github.com/golang-migrate/migrate/v4 v4.15.2
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0
github.com/kr/text v0.2.0 // indirect
github.com/hashicorp/consul/api v1.8.1 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/nats-io/nats-server/v2 v2.6.4 // indirect
github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/nsqio/go-nsq v1.0.8
github.com/stretchr/testify v1.7.0
go.etcd.io/etcd/client/v3 v3.5.0
go.opentelemetry.io/otel v1.4.1
go.opentelemetry.io/otel/exporters/jaeger v1.4.1
go.opentelemetry.io/otel/sdk v1.4.1
go.opentelemetry.io/otel/trace v1.4.1
go.uber.org/zap v1.17.0
google.golang.org/grpc v1.42.0
github.com/nats-io/nats-server/v2 v2.9.8 // indirect
github.com/nats-io/nats.go v1.20.0
github.com/nsqio/go-nsq v1.1.0
github.com/stretchr/testify v1.8.0
go.etcd.io/etcd/client/v3 v3.5.6
go.opentelemetry.io/otel v1.11.1
go.opentelemetry.io/otel/exporters/jaeger v1.11.1
go.opentelemetry.io/otel/sdk v1.11.1
go.opentelemetry.io/otel/trace v1.11.1
go.uber.org/zap v1.23.0
google.golang.org/grpc v1.45.0
google.golang.org/protobuf v1.27.1
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
)
Loading

0 comments on commit 94d0c95

Please sign in to comment.