Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(MQ): add local mq #10

Merged
merged 3 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/demo/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ func main() {
// you can find them from moke-kit https://github.com/GStones/moke-kit
// nats message queue
mfx.NatsModule,
// local(channel) message queue
mfx.LocalModule,

// services
// service
modules.AllModule,
)
}
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/abiosoft/ishell v2.0.0+incompatible
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/gstones/moke-kit v1.0.0
github.com/gstones/zinx v1.2.5
github.com/gstones/moke-kit v1.0.1-0.20240523024334-8a0b1e0d264e
github.com/gstones/zinx v1.2.7-0.20240522125806-6252e8c9ea5f
github.com/redis/go-redis/v9 v9.5.1
github.com/spf13/cobra v1.8.0
go.uber.org/fx v1.21.1
Expand All @@ -21,6 +21,7 @@ require (

require (
cloud.google.com/go/compute/metadata v0.2.3 // indirect
github.com/ThreeDotsLabs/watermill v1.3.5 // indirect
github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
Expand Down Expand Up @@ -53,6 +54,7 @@ require (
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/klauspost/reedsolomon v1.12.1 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
Expand All @@ -62,6 +64,7 @@ require (
github.com/nats-io/nats.go v1.33.1 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
Expand Down
15 changes: 11 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ cloud.google.com/go/compute v1.25.1/go.mod h1:oopOIR53ly6viBYxaDhBfJwzUAxf1zE//u
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK2WofkbXg=
github.com/ThreeDotsLabs/watermill v1.3.5/go.mod h1:O/u/Ptyrk5MPTxSeWM5vzTtZcZfxXfO9PK9eXTYiFZY=
github.com/abiosoft/ishell v2.0.0+incompatible h1:zpwIuEHc37EzrsIYah3cpevrIc8Oma7oZPxr03tlmmw=
github.com/abiosoft/ishell v2.0.0+incompatible/go.mod h1:HQR9AqF2R3P4XXpMpI0NAzgHf/aS6+zVXRj14cVk9qg=
github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db h1:CjPUSXOiYptLbTdr1RceuZgSFDQ7U15ITERUGrUORx8=
Expand Down Expand Up @@ -75,6 +77,7 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
Expand All @@ -83,10 +86,10 @@ github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwn
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM=
github.com/gstones/moke-kit v1.0.0 h1:8qJwNC3pIw1nhPsvwCbbM25uxLqahGFfCfS293OiVIY=
github.com/gstones/moke-kit v1.0.0/go.mod h1:nKNpYdF8PE0VBCejW1NR90okBbU2LtDjgag706910AI=
github.com/gstones/zinx v1.2.5 h1:JSCxE9KHdNPxQeHNdGMkL/Memsce4EoJE+EtGoq7H7c=
github.com/gstones/zinx v1.2.5/go.mod h1:tgm/iZBMyKKZj4totfeu4+PMzSLb4JH8pnGdcG3ql+8=
github.com/gstones/moke-kit v1.0.1-0.20240523024334-8a0b1e0d264e h1:kO7OxXR+wyDzAmn2UfwLVIjluAJAf1d39T1+k5/0foc=
github.com/gstones/moke-kit v1.0.1-0.20240523024334-8a0b1e0d264e/go.mod h1:4OsiwkB0nj5fmgGEc+e+uMYKyd419irbY6UL1M/gcJU=
github.com/gstones/zinx v1.2.7-0.20240522125806-6252e8c9ea5f h1:6xqmkcQXUZ87OyHp9pYhTLqbkvwYJwtvWsdObAvoFNg=
github.com/gstones/zinx v1.2.7-0.20240522125806-6252e8c9ea5f/go.mod h1:tgm/iZBMyKKZj4totfeu4+PMzSLb4JH8pnGdcG3ql+8=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down Expand Up @@ -125,6 +128,8 @@ github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuV
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/reedsolomon v1.12.1 h1:NhWgum1efX1x58daOBGCFWcxtEhOhXKKl1HAPQUp03Q=
github.com/klauspost/reedsolomon v1.12.1/go.mod h1:nEi5Kjb6QqtbofI6s+cbG/j1da11c96IBYBSnVGtuBs=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand All @@ -144,6 +149,8 @@ github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package handlers
package domain

import (
"context"
"fmt"
"time"

"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -55,32 +56,55 @@ func (d *Demo) Hi(uid, message string) error {
return err
}
d.redis.Set(context.Background(), "demo", message, time.Minute)
// mq publish
//nats mq publish
natsTopic := common.NatsHeader.CreateTopic("demo")
if err := d.mq.Publish(
common.NatsHeader.CreateTopic("demo"),
miface.WithBytes([]byte(message)),
natsTopic,
miface.WithBytes([]byte(fmt.Sprintf("nats mq: %s", message))),
); err != nil {
return err
}
// local(channel) mq publish
localTopic := common.LocalHeader.CreateTopic("demo")
if err := d.mq.Publish(
localTopic,
miface.WithBytes([]byte(fmt.Sprintf("local mq: %s", message))),
); err != nil {
return err
}

return nil
}

func (d *Demo) Watch(ctx context.Context, topic string, callback func(message string) error) error {
// mq subscribe
sub, err := d.mq.Subscribe(
common.NatsHeader.CreateTopic(topic),
//nats mq subscribe
natsTopic := common.NatsHeader.CreateTopic(topic)
if _, err := d.mq.Subscribe(
ctx,
natsTopic,
func(msg miface.Message, err error) common.ConsumptionCode {
if err := callback(string(msg.Data())); err != nil {
return common.ConsumeNackPersistentFailure
}
return common.ConsumeAck
})
if err != nil {
}); err != nil {
return err
}
<-ctx.Done()
if err := sub.Unsubscribe(); err != nil {

//local(channel) mq subscribe
localTopic := common.LocalHeader.CreateTopic(topic)
if _, err := d.mq.Subscribe(
ctx,
localTopic,
func(msg miface.Message, err error) common.ConsumptionCode {
if err := callback(string(msg.Data())); err != nil {
return common.ConsumeNackPersistentFailure
}
return common.ConsumeAck
}); err != nil {
return err
}

<-ctx.Done()
return nil
}
6 changes: 3 additions & 3 deletions internal/services/demo/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (

pb "github.com/gstones/moke-layout/api/gen/demo/api"
"github.com/gstones/moke-layout/internal/services/demo/db_nosql"
"github.com/gstones/moke-layout/internal/services/demo/handlers"
"github.com/gstones/moke-layout/internal/services/demo/domain"
"github.com/gstones/moke-layout/pkg/dfx"
)

type Service struct {
logger *zap.Logger
demoHandler *handlers.Demo
demoHandler *domain.Demo
}

// ---------------- grpc ----------------
Expand Down Expand Up @@ -137,7 +137,7 @@ func NewService(
gdb *gorm.DB,
redis *redis.Client,
) (result *Service, err error) {
handler := handlers.NewDemo(
handler := domain.NewDemo(
logger,
db_nosql.OpenDatabase(logger, coll),
mq,
Expand Down
Loading