Skip to content

Commit

Permalink
Move websocket event to publish by subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
elct9620 committed Mar 19, 2024
1 parent b3c852e commit 38b1fae
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 34 deletions.
3 changes: 2 additions & 1 deletion cmd/server/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions internal/db/subscriber.go
Expand Up @@ -94,9 +94,8 @@ func (s *Subscriber) dispatch(ctx context.Context, change *memdb.Change) {

func (s *Subscriber) send(ctx context.Context, out chan *message.Message, msg *message.Message) {
msgCtx, cancel := context.WithCancel(ctx)
defer cancel()

msg.SetContext(msgCtx)
defer cancel()

ResendLoop:
for {
Expand Down
18 changes: 18 additions & 0 deletions internal/subscriber/database.go
@@ -0,0 +1,18 @@
package subscriber

type DatabaseChange[T any] struct {
Before *T
After *T
}

func (c *DatabaseChange[T]) Created() bool {
return c.Before == nil && c.After != nil
}

func (c *DatabaseChange[T]) Updated() bool {
return c.Before != nil && c.After != nil
}

func (c *DatabaseChange[T]) Deleted() bool {
return c.Before != nil && c.After == nil
}
33 changes: 31 additions & 2 deletions internal/subscriber/match_changed.go
@@ -1,7 +1,11 @@
package subscriber

import (
"encoding/json"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/elct9620/wvs/internal/db"
"github.com/elct9620/wvs/internal/usecase"
)

const (
Expand All @@ -12,10 +16,13 @@ const (
var _ Subscriber = &MatchChangedSubscriber{}

type MatchChangedSubscriber struct {
notifyJoinMatch *usecase.NotifyJoinMatchCommand
}

func NewMatchChangedSubscriber() *MatchChangedSubscriber {
return &MatchChangedSubscriber{}
func NewMatchChangedSubscriber(notifyJoinMatch *usecase.NotifyJoinMatchCommand) *MatchChangedSubscriber {
return &MatchChangedSubscriber{
notifyJoinMatch: notifyJoinMatch,
}
}

func (s *MatchChangedSubscriber) Name() string {
Expand All @@ -27,5 +34,27 @@ func (s *MatchChangedSubscriber) Topic() string {
}

func (s *MatchChangedSubscriber) Handler(msg *message.Message) error {
var change DatabaseChange[db.Match]
if err := json.Unmarshal(msg.Payload, &change); err != nil {
return err
}

isNewPlayerJoined := change.Created() || change.Updated()
if !isNewPlayerJoined {
return nil
}

for _, player := range change.After.Players {
_, err := s.notifyJoinMatch.Execute(msg.Context(), &usecase.NotifyJoinMatchCommandInput{
MatchId: change.After.Id,
PlayerId: player.Id,
})

if err != nil {
msg.Ack()
return err
}
}

return nil
}
7 changes: 5 additions & 2 deletions internal/subscriber/subscriber.go
Expand Up @@ -2,6 +2,7 @@ package subscriber

import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/elct9620/wvs/internal/usecase"
"github.com/google/wire"
)

Expand All @@ -17,8 +18,10 @@ type Subscriber interface {

type DatabaseSubscriber Subscriber

func ProvideDatabaseSubscribers() []DatabaseSubscriber {
func ProvideDatabaseSubscribers(
notifyJoinMatch *usecase.NotifyJoinMatchCommand,
) []DatabaseSubscriber {
return []DatabaseSubscriber{
NewMatchChangedSubscriber(),
NewMatchChangedSubscriber(notifyJoinMatch),
}
}
26 changes: 0 additions & 26 deletions internal/usecase/create_match_command.go
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/elct9620/wvs/internal/entity/match"
"github.com/elct9620/wvs/pkg/event"
"github.com/google/uuid"
)

Expand Down Expand Up @@ -39,10 +38,6 @@ func (c *CreateMatchCommand) Execute(ctx context.Context, input *CreateMatchInpu

playerJoined := entity != nil
if playerJoined {
if err := c.publishEvents(ctx, entity); err != nil {
return nil, err
}

return &CreateMatchOutput{MatchId: entity.Id()}, nil
}

Expand All @@ -66,10 +61,6 @@ func (c *CreateMatchCommand) joinOrCreate(ctx context.Context, input *CreateMatc
return nil, err
}

if err := c.publishEvents(ctx, entity); err != nil {
return nil, err
}

return &CreateMatchOutput{MatchId: entity.Id()}, nil
}

Expand All @@ -82,20 +73,3 @@ func (c *CreateMatchCommand) nextAvailableMatch(matches []*match.Match, team mat

return match.NewMatch(uuid.NewString())
}

func (c *CreateMatchCommand) publishEvents(ctx context.Context, entity *match.Match) error {
for _, player := range entity.Players() {
stream, err := c.streams.Find(ctx, player.Id())
if err != nil {
continue
}

event := event.NewJoinMatchEvent(entity.Id(), player.Id())

if err := stream.Publish(event); err != nil {
return err
}
}

return nil
}
41 changes: 41 additions & 0 deletions internal/usecase/notify_join_match_command.go
@@ -0,0 +1,41 @@
package usecase

import (
"context"

"github.com/elct9620/wvs/pkg/event"
)

type NotifyJoinMatchCommandInput struct {
MatchId string
PlayerId string
}

type NotifyJoinMatchCommandOutput struct {
}

var _ Command[*NotifyJoinMatchCommandInput, *NotifyJoinMatchCommandOutput] = &NotifyJoinMatchCommand{}

type NotifyJoinMatchCommand struct {
streams StreamRepository
}

func NewNotifyJoinMatchCommand(streams StreamRepository) *NotifyJoinMatchCommand {
return &NotifyJoinMatchCommand{
streams: streams,
}
}

func (c *NotifyJoinMatchCommand) Execute(ctx context.Context, input *NotifyJoinMatchCommandInput) (*NotifyJoinMatchCommandOutput, error) {
stream, err := c.streams.Find(ctx, input.PlayerId)
if err != nil {
return nil, err
}

event := event.NewJoinMatchEvent(input.MatchId, input.PlayerId)
if err := stream.Publish(event); err != nil {
return nil, err
}

return &NotifyJoinMatchCommandOutput{}, nil
}
2 changes: 2 additions & 0 deletions internal/usecase/usecase.go
Expand Up @@ -11,6 +11,8 @@ var DefaultSet = wire.NewSet(
wire.Bind(new(Command[*CreateMatchInput, *CreateMatchOutput]), new(*CreateMatchCommand)),
NewSubscribeCommand,
wire.Bind(new(Command[*SubscribeCommandInput, *SubscribeCommandOutput]), new(*SubscribeCommand)),
NewNotifyJoinMatchCommand,
wire.Bind(new(Command[*NotifyJoinMatchCommandInput, *NotifyJoinMatchCommandOutput]), new(*NotifyJoinMatchCommand)),
)

type Command[I any, O any] interface {
Expand Down
3 changes: 2 additions & 1 deletion wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 38b1fae

Please sign in to comment.