Skip to content

Commit

Permalink
send websocket event when a new message is created
Browse files Browse the repository at this point in the history
  • Loading branch information
iammuho committed Aug 30, 2023
1 parent 225bdcc commit f5abbcb
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 14 deletions.
20 changes: 20 additions & 0 deletions internal/chat/domain/services/message_command_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/iammuho/natternet/internal/chat/domain/repository"
"github.com/iammuho/natternet/internal/chat/domain/values"
websocketTypes "github.com/iammuho/natternet/internal/user/domain/event/types"
websocketValues "github.com/iammuho/natternet/internal/user/domain/values/websocket"
"github.com/iammuho/natternet/pkg/errorhandler"

"github.com/gofiber/fiber/v2"
Expand Down Expand Up @@ -90,5 +91,24 @@ func (r *messageCommandDomainServices) CreateMessage(req *dto.CreateMessageReqDT
r.ctx.GetLogger().Error(publishErr.Error())
}

// Publishes to user websocket
websocketEventValue := websocketValues.RoomNewMessageWebsocketValue{
SenderID: req.SenderID,
RoomID: roomEntity.GetID(),
Message: values.NewMessageValueFromMessage(messageEntity),
}
// Loop users and add them to the event with userid
for _, user := range roomEntity.GetRoomUsers() {
websocketEventValue.Users = append(websocketEventValue.Users, user.UserID)
}

messageJSON, _ = json.Marshal(websocketEventValue)

_, publishErr = r.ctx.GetNatsContext().GetJetStreamContext().Publish(websocketTypes.MessageCreatedEvent, messageJSON)

if publishErr != nil {
r.ctx.GetLogger().Error(publishErr.Error())
}

return values.NewMessageValueFromMessage(messageEntity), nil
}
2 changes: 1 addition & 1 deletion internal/chat/domain/services/room_command_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (r *roomCommandDomainServices) JoinRoom(req *dto.JoinRoomReqDTO) (*values.R
}

// Publishes to user websocket
websocketEventValue := websocketValues.RoomUserJoinedEventValue{
websocketEventValue := websocketValues.RoomUserJoinedWebsocketValue{
UserID: req.UserID,
RoomID: roomEntity.GetID(),
}
Expand Down
8 changes: 8 additions & 0 deletions internal/user/domain/values/websocket/room_new_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package websocket

type RoomNewMessageWebsocketValue struct {
SenderID string `json:"sender_id"`
RoomID string `json:"room_id"`
Users []string `json:"room_users"`
Message interface{} `json:"message"`
}
2 changes: 1 addition & 1 deletion internal/user/domain/values/websocket/room_user_joined.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package websocket

type RoomUserJoinedEventValue struct {
type RoomUserJoinedWebsocketValue struct {
Username string `json:"username"`
UserID string `json:"user_id"`
Users []string `json:"room_users"`
Expand Down
27 changes: 15 additions & 12 deletions internal/user/interfaces/ws/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"github.com/dgrr/websocket"
"github.com/iammuho/natternet/internal/chat/domain/values"
eventTypes "github.com/iammuho/natternet/internal/user/domain/event/types"
websocketValues "github.com/iammuho/natternet/internal/user/domain/values/websocket"
"github.com/iammuho/natternet/internal/user/interfaces/ws/types"
Expand Down Expand Up @@ -34,7 +33,7 @@ func (h *handler) onMessageCreated(msg *nats.Msg) error {
msg.Ack()

// Unmarshal the message
var event values.MessageValue
var event websocketValues.RoomNewMessageWebsocketValue
err := json.Unmarshal(msg.Data, &event)

if err != nil {
Expand All @@ -45,18 +44,22 @@ func (h *handler) onMessageCreated(msg *nats.Msg) error {
h.clients.Range(func(client, v interface{}) bool {
nc := v.(*websocket.Conn)

if nc.UserValue("ID").(string) == event.SenderID {
h.application.AppContext.GetLogger().Logger.Info("Sending message created to ws client with ID: ", zap.String("ID", nc.UserValue("ID").(string)))
// Range the users
for _, user := range event.Users {

// Create the event model
eventModel := &types.WebsocketMessage{}
eventModel.New(types.MessageTypeMessageCreated)
eventModel.ConnectionID = fmt.Sprintf("%d", nc.ID())
eventModel.Message = event
if nc.UserValue("ID").(string) == user {
h.application.AppContext.GetLogger().Logger.Info("Sending message created to ws client with ID: ", zap.String("ID", nc.UserValue("ID").(string)))

nc.Write(eventModel.ToJson())
// Create the event model
eventModel := &types.WebsocketMessage{}
eventModel.New(types.MessageTypeMessageCreated)
eventModel.ConnectionID = fmt.Sprintf("%d", nc.ID())
eventModel.Message = event.Message

return true
nc.Write(eventModel.ToJson())

return true
}
}

return true
Expand All @@ -70,7 +73,7 @@ func (h *handler) onUserJoinedRoom(msg *nats.Msg) error {
msg.Ack()

// Unmarshal the message
var event websocketValues.RoomUserJoinedEventValue
var event websocketValues.RoomUserJoinedWebsocketValue
err := json.Unmarshal(msg.Data, &event)

if err != nil {
Expand Down

0 comments on commit f5abbcb

Please sign in to comment.