Skip to content

Commit

Permalink
feat(MQ): add local mq
Browse files Browse the repository at this point in the history
  • Loading branch information
GStones committed May 23, 2024
1 parent efcc067 commit eaed91d
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 16 deletions.
4 changes: 3 additions & 1 deletion cmd/demo/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ func main() {
// you can find them from moke-kit https://github.com/GStones/moke-kit
// nats message queue
mfx.NatsModule,
// local(channel) message queue
mfx.LocalModule,

// services
// service
modules.AllModule,
)
}
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/abiosoft/ishell v2.0.0+incompatible
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/gstones/moke-kit v1.0.0
github.com/gstones/zinx v1.2.5
github.com/gstones/moke-kit v1.0.1-0.20240523024334-8a0b1e0d264e
github.com/gstones/zinx v1.2.7-0.20240522125806-6252e8c9ea5f
github.com/redis/go-redis/v9 v9.5.1
github.com/spf13/cobra v1.8.0
go.uber.org/fx v1.21.1
Expand All @@ -21,6 +21,7 @@ require (

require (
cloud.google.com/go/compute/metadata v0.2.3 // indirect
github.com/ThreeDotsLabs/watermill v1.3.5 // indirect
github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
Expand Down Expand Up @@ -53,6 +54,7 @@ require (
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/klauspost/reedsolomon v1.12.1 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
Expand All @@ -62,6 +64,7 @@ require (
github.com/nats-io/nats.go v1.33.1 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package handlers
package domain

import (
"context"
"fmt"
"time"

"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -55,20 +56,32 @@ func (d *Demo) Hi(uid, message string) error {
return err
}
d.redis.Set(context.Background(), "demo", message, time.Minute)
// mq publish
//nats mq publish
natsTopic := common.NatsHeader.CreateTopic("demo")
if err := d.mq.Publish(
common.NatsHeader.CreateTopic("demo"),
miface.WithBytes([]byte(message)),
natsTopic,
miface.WithBytes([]byte(fmt.Sprintf("nats mq: %s", message))),
); err != nil {
return err
}
// local(channel) mq publish
localTopic := common.LocalHeader.CreateTopic("demo")
if err := d.mq.Publish(
localTopic,
miface.WithBytes([]byte(fmt.Sprintf("local mq: %s", message))),
); err != nil {
return err
}

return nil
}

func (d *Demo) Watch(ctx context.Context, topic string, callback func(message string) error) error {
// mq subscribe
sub, err := d.mq.Subscribe(
common.NatsHeader.CreateTopic(topic),
//nats mq subscribe
natsTopic := common.NatsHeader.CreateTopic(topic)
_, err := d.mq.Subscribe(
ctx,
natsTopic,
func(msg miface.Message, err error) common.ConsumptionCode {
if err := callback(string(msg.Data())); err != nil {
return common.ConsumeNackPersistentFailure
Expand All @@ -78,9 +91,18 @@ func (d *Demo) Watch(ctx context.Context, topic string, callback func(message st
if err != nil {
return err
}
//local(channel) mq subscribe
localTopic := common.LocalHeader.CreateTopic(topic)
_, err = d.mq.Subscribe(
ctx,
localTopic,
func(msg miface.Message, err error) common.ConsumptionCode {
if err := callback(string(msg.Data())); err != nil {
return common.ConsumeNackPersistentFailure
}
return common.ConsumeAck
})

<-ctx.Done()
if err := sub.Unsubscribe(); err != nil {
return err
}
return nil
}
6 changes: 3 additions & 3 deletions internal/services/demo/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (

pb "github.com/gstones/moke-layout/api/gen/demo/api"
"github.com/gstones/moke-layout/internal/services/demo/db_nosql"
"github.com/gstones/moke-layout/internal/services/demo/handlers"
"github.com/gstones/moke-layout/internal/services/demo/domain"
"github.com/gstones/moke-layout/pkg/dfx"
)

type Service struct {
logger *zap.Logger
demoHandler *handlers.Demo
demoHandler *domain.Demo
}

// ---------------- grpc ----------------
Expand Down Expand Up @@ -137,7 +137,7 @@ func NewService(
gdb *gorm.DB,
redis *redis.Client,
) (result *Service, err error) {
handler := handlers.NewDemo(
handler := domain.NewDemo(
logger,
db_nosql.OpenDatabase(logger, coll),
mq,
Expand Down

0 comments on commit eaed91d

Please sign in to comment.