Skip to content

Commit

Permalink
Add ChannelMessageSend runtime function. (#660)
Browse files Browse the repository at this point in the history
  • Loading branch information
sesposito committed Aug 10, 2021
1 parent 710a99a commit 6317c6a
Show file tree
Hide file tree
Showing 11 changed files with 399 additions and 205 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Expand Up @@ -7,6 +7,8 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
### Added
- Handle thrown JS runtime custom exceptions containing a message and a grpc code to be returned in the server response.
- Add runtime function to retrieve a random set of users.
- Add runtime ChannelMessageSend functions.
- Add runtime BuildChannelId functions.

### Changed
- Size limit for status messages increased from 128 to 2048 characters.
Expand All @@ -24,6 +26,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- Fix JS runtime missing fields from leaderboards/tournaments get, list and write functions.
- Fix JS runtime ownerId field not working correctly in leaderboard/tournament records list functions.
- Fix parameter usage in leaderboard score set operator.
- Fix JS runtime storageList not returning the cursor.

## [3.4.0] - 2021-07-08
### Added
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -13,7 +13,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/grpc-ecosystem/grpc-gateway/v2 v2.3.0
github.com/heroiclabs/nakama-common v0.0.0-20210809163116-633338b7c44b
github.com/heroiclabs/nakama-common v1.15.1-0.20210810152014-15328b1175ec
github.com/jackc/pgconn v1.8.1
github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451
github.com/jackc/pgtype v1.7.0
Expand Down
10 changes: 2 additions & 8 deletions go.sum
Expand Up @@ -310,14 +310,8 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/heroiclabs/nakama-common v0.0.0-20210809163116-633338b7c44b h1:Nn0UeBw6/bDTidJ75MwjfRBzu47iGklqtnPVZrY2tgY=
github.com/heroiclabs/nakama-common v0.0.0-20210809163116-633338b7c44b/go.mod h1:jzIGV5bI45ALRQFzHPkJn4Z0tV+xhtho1+pZhOXVAsk=
github.com/heroiclabs/nakama-common v1.14.1-0.20210707135714-ed381a338271 h1:iZt1PkdjXKYg7bSFw2X5oEUf5m7NFuku7nTJZibGNOY=
github.com/heroiclabs/nakama-common v1.14.1-0.20210707135714-ed381a338271/go.mod h1:jzIGV5bI45ALRQFzHPkJn4Z0tV+xhtho1+pZhOXVAsk=
github.com/heroiclabs/nakama-common v1.14.1-0.20210707144747-9d4fc7ac4406 h1:lBA9SThV3uTwtnq9pD+0eEPJpUERjP+/WlFBmgaYI1s=
github.com/heroiclabs/nakama-common v1.14.1-0.20210707144747-9d4fc7ac4406/go.mod h1:jzIGV5bI45ALRQFzHPkJn4Z0tV+xhtho1+pZhOXVAsk=
github.com/heroiclabs/nakama-common v1.15.0 h1:DTd1CIjoQepod6RnycQ21FkMa1cl2PzCYRCQVIFw7VI=
github.com/heroiclabs/nakama-common v1.15.0/go.mod h1:jzIGV5bI45ALRQFzHPkJn4Z0tV+xhtho1+pZhOXVAsk=
github.com/heroiclabs/nakama-common v1.15.1-0.20210810152014-15328b1175ec h1:aFSmHs1eUP5vBJEikVbx5RaKu5uMH8clmqedBUaKGXY=
github.com/heroiclabs/nakama-common v1.15.1-0.20210810152014-15328b1175ec/go.mod h1:jzIGV5bI45ALRQFzHPkJn4Z0tV+xhtho1+pZhOXVAsk=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
Expand Down
92 changes: 92 additions & 0 deletions server/core_channel.go
Expand Up @@ -22,8 +22,10 @@ import (
"encoding/gob"
"errors"
"fmt"
"github.com/heroiclabs/nakama-common/rtapi"
"strings"
"time"
"unicode/utf8"

"github.com/gofrs/uuid"
"github.com/heroiclabs/nakama-common/api"
Expand Down Expand Up @@ -411,3 +413,93 @@ func StreamToChannelId(stream PresenceStream) (string, error) {

return fmt.Sprintf("%v.%v.%v.%v", stream.Mode, subject, subcontext, stream.Label), nil
}

var errInvalidChannelTarget = errors.New("Invalid channel target")
var errInvalidChannelType = errors.New("Invalid channel type")

func BuildChannelId(ctx context.Context, logger *zap.Logger, db *sql.DB, userID uuid.UUID, target string, chanType rtapi.ChannelJoin_Type) (string, PresenceStream, error) {
if target == "" {
return "", PresenceStream{}, errInvalidChannelTarget
}

stream := PresenceStream{
Mode: StreamModeChannel,
}

switch chanType {
case rtapi.ChannelJoin_TYPE_UNSPECIFIED:
// Defaults to room channel.
fallthrough
case rtapi.ChannelJoin_ROOM:
if len(target) < 1 || len(target) > 64 {
return "", PresenceStream{}, fmt.Errorf("Channel name is required and must be 1-64 chars: %w", errInvalidChannelTarget)
}
if controlCharsRegex.MatchString(target) {
return "", PresenceStream{}, fmt.Errorf("Channel name must not contain control chars: %w", errInvalidChannelTarget)
}
if !utf8.ValidString(target) {
return "", PresenceStream{}, fmt.Errorf("Channel name must only contain valid UTF-8 bytes: %w", errInvalidChannelTarget)
}
stream.Label = target
// Channel mode is already set by default above.
case rtapi.ChannelJoin_DIRECT_MESSAGE:
// Check if user ID is valid.
uid, err := uuid.FromString(target)
if err != nil {
return "", PresenceStream{}, fmt.Errorf("Invalid user ID in direct message join: %w", errInvalidChannelTarget)
}
// Not allowed to chat to the nil uuid.
if uid == uuid.Nil {
return "", PresenceStream{}, fmt.Errorf("Invalid user ID in direct message join: %w", errInvalidChannelTarget)
}
// If userID is the system user, skip these checks
if userID != uuid.Nil {
// Check if the other user exists and has not blocked this user.
allowed, err := UserExistsAndDoesNotBlock(ctx, db, uid, userID)
if err != nil {
return "", PresenceStream{}, errors.New("Failed to look up user ID")
}
if !allowed {
return "", PresenceStream{}, fmt.Errorf("User ID not found: %w", errInvalidChannelTarget)
}
// Assign the ID pair in a consistent order.
if uid.String() > userID.String() {
stream.Subject = userID
stream.Subcontext = uid
} else {
stream.Subject = uid
stream.Subcontext = userID
}
stream.Mode = StreamModeDM
}
case rtapi.ChannelJoin_GROUP:
// Check if group ID is valid.
gid, err := uuid.FromString(target)
if err != nil {
return "", PresenceStream{}, fmt.Errorf("Invalid group ID in group channel join: %w", errInvalidChannelTarget)
}
if userID != uuid.Nil {
allowed, err := groupCheckUserPermission(ctx, logger, db, gid, userID, 2)
if err != nil {
return "", PresenceStream{}, errors.New("Failed to look up group membership")
}
if !allowed {
return "", PresenceStream{}, fmt.Errorf("Group not found: %w", errInvalidChannelTarget)
}
}

stream.Subject = gid
stream.Mode = StreamModeGroup
default:
return "", PresenceStream{}, errInvalidChannelType
}

channelID, err := StreamToChannelId(stream)
if err != nil {
// Should not happen after the input validation above, but guard just in case.
logger.Error("Error converting stream to channel identifier", zap.Error(err), zap.Any("stream", stream))
return "", PresenceStream{}, err
}

return channelID, stream, nil
}
72 changes: 72 additions & 0 deletions server/core_message.go
@@ -0,0 +1,72 @@
package server

import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"github.com/gofrs/uuid"
"github.com/heroiclabs/nakama-common/api"
"github.com/heroiclabs/nakama-common/rtapi"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
"time"
)

var errInvalidMessageContent = errors.New("Message content must be a valid JSON object")
var errMessagePersist = errors.New("Error persisting channel message")

func ChannelMessageSend(ctx context.Context, logger *zap.Logger, db *sql.DB, router MessageRouter, channelStream PresenceStream, channelId, content, senderId, senderUsername string, persist bool) (*rtapi.ChannelMessageAck, error) {
if maybeJSON := []byte(content); !json.Valid(maybeJSON) || bytes.TrimSpace(maybeJSON)[0] != byteBracket {
return nil, errInvalidMessageContent
}

ts := time.Now().Unix()
message := &api.ChannelMessage{
ChannelId: channelId,
MessageId: uuid.Must(uuid.NewV4()).String(),
Code: &wrapperspb.Int32Value{Value: ChannelMessageTypeChat},
SenderId: senderId,
Username: senderUsername,
Content: content,
CreateTime: &timestamppb.Timestamp{Seconds: ts},
UpdateTime: &timestamppb.Timestamp{Seconds: ts},
Persistent: &wrapperspb.BoolValue{Value: persist},
}

ack := &rtapi.ChannelMessageAck{
ChannelId: message.ChannelId,
MessageId: message.MessageId,
Code: message.Code,
Username: message.Username,
CreateTime: message.CreateTime,
UpdateTime: message.UpdateTime,
Persistent: message.Persistent,
}
switch channelStream.Mode {
case StreamModeChannel:
message.RoomName, ack.RoomName = channelStream.Label, channelStream.Label
case StreamModeGroup:
message.GroupId, ack.GroupId = channelStream.Subject.String(), channelStream.Subject.String()
case StreamModeDM:
message.UserIdOne, ack.UserIdOne = channelStream.Subject.String(), channelStream.Subject.String()
message.UserIdTwo, ack.UserIdTwo = channelStream.Subcontext.String(), channelStream.Subcontext.String()
}

if persist {
query := `INSERT INTO message (id, code, sender_id, username, stream_mode, stream_subject, stream_descriptor, stream_label, content, create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6::UUID, $7::UUID, $8, $9, $10, $10)`
_, err := db.ExecContext(ctx, query, message.MessageId, message.Code.Value, message.SenderId, message.Username, channelStream.Mode, channelStream.Subject, channelStream.Subcontext, channelStream.Label, message.Content, time.Unix(message.CreateTime.Seconds, 0).UTC())
if err != nil {
logger.Error("Error persisting channel message", zap.Error(err))

return nil, errMessagePersist
}
}

router.SendToStream(logger, channelStream, &rtapi.Envelope{Message: &rtapi.Envelope_ChannelMessage{ChannelMessage: message}}, true)

return ack, nil
}

0 comments on commit 6317c6a

Please sign in to comment.