Skip to content

Commit

Permalink
Add match signal function.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyro committed Oct 6, 2021
1 parent e5f52b5 commit 5d74bb0
Show file tree
Hide file tree
Showing 21 changed files with 422 additions and 80 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
## [Unreleased]
### Added
- Add final notification sent to sockets closed via single socket option.
- Add match signal function to server framework.

### Changed
- Match handlers are now required to implement a signal handler function.

## [3.7.0] - 2021-09-28
### Added
Expand Down
62 changes: 54 additions & 8 deletions data/modules/match.lua
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Dispatcher exposes useful functions to the match. Format:
}
Tick is the current match tick number, starts at 0 and increments after every match_loop call. Does not increment with
calls to match_join_attempt, match_join, or match_leave.
calls to match_join_attempt, match_join, match_leave, match_terminate, or match_signal.
State is the current in-memory match state, may be any Lua term except nil.
Expand Down Expand Up @@ -137,7 +137,7 @@ Dispatcher exposes useful functions to the match. Format:
}
Tick is the current match tick number, starts at 0 and increments after every match_loop call. Does not increment with
calls to match_join_attempt, match_join, or match_leave.
calls to match_join_attempt, match_join, match_leave, match_terminate, or match_signal.
State is the current in-memory match state, may be any Lua term except nil.
Expand Down Expand Up @@ -189,7 +189,7 @@ Dispatcher exposes useful functions to the match. Format:
}
Tick is the current match tick number, starts at 0 and increments after every match_loop call. Does not increment with
calls to match_join_attempt, match_join, or match_leave.
calls to match_join_attempt, match_join, match_leave, match_terminate, or match_signal.
State is the current in-memory match state, may be any Lua term except nil.
Expand Down Expand Up @@ -241,7 +241,7 @@ Dispatcher exposes useful functions to the match. Format:
}
Tick is the current match tick number, starts at 0 and increments after every match_loop call. Does not increment with
calls to match_join_attempt, match_join, or match_leave.
calls to match_join_attempt, match_join, match_leave, match_terminate, or match_signal.
State is the current in-memory match state, may be any Lua term except nil.
Expand All @@ -268,9 +268,9 @@ local function match_loop(context, dispatcher, tick, state, messages)
print("match " .. context.match_id .. " tick " .. tick)
print("match " .. context.match_id .. " messages:\n" .. du.print_r(messages))
end
if tick < 10 then
-- if tick < 10 then
return state
end
-- end
end

--[[
Expand Down Expand Up @@ -300,7 +300,7 @@ Dispatcher exposes useful functions to the match. Format:
}
Tick is the current match tick number, starts at 0 and increments after every match_loop call. Does not increment with
calls to match_join_attempt, match_join, or match_leave.
calls to match_join_attempt, match_join, match_leave, match_terminate, or match_signal.
State is the current in-memory match state, may be any Lua term except nil.
Expand All @@ -317,12 +317,58 @@ local function match_terminate(context, dispatcher, tick, state, grace_seconds)
return state
end

--[[
Called when the match handler receives a runtime signal.
Context represents information about the match and server, for information purposes. Format:
{
env = {}, -- key-value data set in the runtime.env server configuration.
executionMode = "Match",
match_id = "client-friendly match ID, can be shared with clients and used in match join operations",
match_node = "name of the Nakama node hosting this match",
match_label = "the label string returned from match_init",
match_tick_rate = 1 -- the tick rate returned by match_init
}
Dispatcher exposes useful functions to the match. Format:
{
broadcast_message = function(op_code, data, presences, sender),
-- numeric message op code
-- a data payload string, or nil
-- list of presences (a subset of match participants) to use as message targets, or nil to send to the whole match
-- a presence to tag on the message as the 'sender', or nil
match_kick = function(presences)
-- a list of presences to remove from the match
match_label_update = function(label)
-- a new label to set for the match
}
Tick is the current match tick number, starts at 0 and increments after every match_loop call. Does not increment with
calls to match_join_attempt, match_join, match_leave, match_terminate, or match_signal.
State is the current in-memory match state, may be any Lua term except nil.
Data is arbitrary input supplied by the runtime caller of the signal.
Expected return these values (all required) in order:
1. An (optionally) updated state. May be any non-nil Lua term, or nil to end the match.
1. Arbitrary data to return to the runtime caller of the signal. May be a string, or nil.
--]]
local function match_signal(context, dispatcher, tick, state, data)
if state.debug then
print("match " .. context.match_id .. " tick " .. tick)
print("match " .. context.match_id .. " data " .. data)
end
return state, "signal received: " .. data
end

-- Match modules must return a table with these functions defined. All functions are required.
return {
match_init = match_init,
match_join_attempt = match_join_attempt,
match_join = match_join,
match_leave = match_leave,
match_loop = match_loop,
match_terminate = match_terminate
match_terminate = match_terminate,
match_signal = match_signal
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/grpc-ecosystem/grpc-gateway/v2 v2.3.0
github.com/heroiclabs/nakama-common v1.18.0
github.com/heroiclabs/nakama-common v0.0.0-20211005124542-143e8d8ce05e
github.com/jackc/pgconn v1.8.1
github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451
github.com/jackc/pgtype v1.7.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/heroiclabs/nakama-common v0.0.0-20211005124542-143e8d8ce05e h1:U056PgGxvN0QSgdOR2JrUIkDLydUxh+EOiw5ilLdOg4=
github.com/heroiclabs/nakama-common v0.0.0-20211005124542-143e8d8ce05e/go.mod h1:jzIGV5bI45ALRQFzHPkJn4Z0tV+xhtho1+pZhOXVAsk=
github.com/heroiclabs/nakama-common v1.18.0 h1:afrtYzS9M73j5d02SX4h9ylFpgO0qKxLewgkc16l1jA=
github.com/heroiclabs/nakama-common v1.18.0/go.mod h1:jzIGV5bI45ALRQFzHPkJn4Z0tV+xhtho1+pZhOXVAsk=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
Expand Down
59 changes: 4 additions & 55 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@ package main

import (
"context"
"database/sql"
"flag"
"fmt"
"google.golang.org/protobuf/encoding/protojson"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"os"
"os/signal"
"path/filepath"
"runtime"
"strings"
"syscall"
"time"

"io/ioutil"
"path/filepath"

"github.com/gofrs/uuid"
"github.com/heroiclabs/nakama/v3/ga"
"github.com/heroiclabs/nakama/v3/migrate"
Expand All @@ -41,6 +38,7 @@ import (
_ "github.com/jackc/pgx/v4/stdlib"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/encoding/protojson"
)

const cookieFilename = ".cookie"
Expand Down Expand Up @@ -116,7 +114,7 @@ func main() {
}
startupLogger.Info("Database connections", zap.Strings("dsns", redactedAddresses))

db, dbVersion := dbConnect(startupLogger, config)
db, dbVersion := server.DbConnect(startupLogger, config)
startupLogger.Info("Database information", zap.String("version", dbVersion))

// Global server context.
Expand Down Expand Up @@ -234,55 +232,6 @@ func main() {
os.Exit(0)
}

func dbConnect(multiLogger *zap.Logger, config server.Config) (*sql.DB, string) {
rawURL := config.GetDatabase().Addresses[0]
if !(strings.HasPrefix(rawURL, "postgresql://") || strings.HasPrefix(rawURL, "postgres://")) {
rawURL = fmt.Sprintf("postgres://%s", rawURL)
}
parsedURL, err := url.Parse(rawURL)
if err != nil {
multiLogger.Fatal("Bad database connection URL", zap.Error(err))
}
query := parsedURL.Query()
if len(query.Get("sslmode")) == 0 {
query.Set("sslmode", "prefer")
parsedURL.RawQuery = query.Encode()
}

if len(parsedURL.User.Username()) < 1 {
parsedURL.User = url.User("root")
}
if len(parsedURL.Path) < 1 {
parsedURL.Path = "/nakama"
}

multiLogger.Debug("Complete database connection URL", zap.String("raw_url", parsedURL.String()))
db, err := sql.Open("pgx", parsedURL.String())
if err != nil {
multiLogger.Fatal("Error connecting to database", zap.Error(err))
}
// Limit the time allowed to ping database and get version to 15 seconds total.
ctx, ctxCancelFn := context.WithTimeout(context.Background(), 15*time.Second)
defer ctxCancelFn()
if err = db.PingContext(ctx); err != nil {
if strings.HasSuffix(err.Error(), "does not exist (SQLSTATE 3D000)") {
multiLogger.Fatal("Database schema not found, run `nakama migrate up`", zap.Error(err))
}
multiLogger.Fatal("Error pinging database", zap.Error(err))
}

db.SetConnMaxLifetime(time.Millisecond * time.Duration(config.GetDatabase().ConnMaxLifetimeMs))
db.SetMaxOpenConns(config.GetDatabase().MaxOpenConns)
db.SetMaxIdleConns(config.GetDatabase().MaxIdleConns)

var dbVersion string
if err = db.QueryRowContext(ctx, "SELECT version()").Scan(&dbVersion); err != nil {
multiLogger.Fatal("Error querying database version", zap.Error(err))
}

return db, dbVersion
}

// Help improve Nakama by sending anonymous usage statistics.
//
// You can disable the telemetry completely before server start by setting the
Expand Down
8 changes: 8 additions & 0 deletions sample_go_module/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ func (m *Match) MatchTerminate(ctx context.Context, logger runtime.Logger, db *s
return state
}

func (m *Match) MatchSignal(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, dispatcher runtime.MatchDispatcher, tick int64, state interface{}, data string) (interface{}, string) {
if state.(*MatchState).debug {
logger.Info("match signal match_id %v tick %v", ctx.Value(runtime.RUNTIME_CTX_MATCH_ID), tick)
logger.Info("match signal match_id %v data %v", ctx.Value(runtime.RUNTIME_CTX_MATCH_ID), data)
}
return state, "signal received: " + data
}

func eventSessionStart(ctx context.Context, logger runtime.Logger, evt *api.Event) {
logger.Info("session start %v %v", ctx, evt)
}
Expand Down
3 changes: 2 additions & 1 deletion server/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ func (d *DummySession) SendBytes(payload []byte, reliable bool) error {
return nil
}

func (d *DummySession) Close(msg string, reason runtime.PresenceReason) {}
func (d *DummySession) Close(msg string, reason runtime.PresenceReason, envelopes ...*rtapi.Envelope) {
}

type loggerEnabler struct{}

Expand Down
5 changes: 5 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ func CheckConfig(logger *zap.Logger, config Config) map[string]string {
if config.GetMatch().CallQueueSize < 1 {
logger.Fatal("Match call queue size must be >= 1", zap.Int("match.call_queue_size", config.GetMatch().CallQueueSize))
}
if config.GetMatch().SignalQueueSize < 1 {
logger.Fatal("Match signal queue size must be >= 1", zap.Int("match.signal_queue_size", config.GetMatch().SignalQueueSize))
}
if config.GetMatch().JoinAttemptQueueSize < 1 {
logger.Fatal("Match join attempt queue size must be >= 1", zap.Int("match.join_attempt_queue_size", config.GetMatch().JoinAttemptQueueSize))
}
Expand Down Expand Up @@ -839,6 +842,7 @@ func NewRuntimeConfig() *RuntimeConfig {
type MatchConfig struct {
InputQueueSize int `yaml:"input_queue_size" json:"input_queue_size" usage:"Size of the authoritative match buffer that stores client messages until they can be processed by the next tick. Default 128."`
CallQueueSize int `yaml:"call_queue_size" json:"call_queue_size" usage:"Size of the authoritative match buffer that sequences calls to match handler callbacks to ensure no overlaps. Default 128."`
SignalQueueSize int `yaml:"signal_queue_size" json:"signal_queue_size" usage:"Size of the authoritative match buffer that sequences signal operations to match handler callbacks to ensure no overlaps. Default 10."`
JoinAttemptQueueSize int `yaml:"join_attempt_queue_size" json:"join_attempt_queue_size" usage:"Size of the authoritative match buffer that limits the number of in-progress join attempts. Default 128."`
DeferredQueueSize int `yaml:"deferred_queue_size" json:"deferred_queue_size" usage:"Size of the authoritative match buffer that holds deferred message broadcasts until the end of each loop execution. Default 128."`
JoinMarkerDeadlineMs int `yaml:"join_marker_deadline_ms" json:"join_marker_deadline_ms" usage:"Deadline in milliseconds that client authoritative match joins will wait for match handlers to acknowledge joins. Default 15000."`
Expand All @@ -851,6 +855,7 @@ func NewMatchConfig() *MatchConfig {
return &MatchConfig{
InputQueueSize: 128,
CallQueueSize: 128,
SignalQueueSize: 10,
JoinAttemptQueueSize: 128,
DeferredQueueSize: 128,
JoinMarkerDeadlineMs: 15000,
Expand Down
54 changes: 54 additions & 0 deletions server/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,65 @@ import (
"context"
"database/sql"
"errors"
"fmt"
"net/url"
"strings"
"time"

"github.com/jackc/pgconn"
"github.com/jackc/pgerrcode"
"go.uber.org/zap"
)

func DbConnect(multiLogger *zap.Logger, config Config) (*sql.DB, string) {
rawURL := config.GetDatabase().Addresses[0]
if !(strings.HasPrefix(rawURL, "postgresql://") || strings.HasPrefix(rawURL, "postgres://")) {
rawURL = fmt.Sprintf("postgres://%s", rawURL)
}
parsedURL, err := url.Parse(rawURL)
if err != nil {
multiLogger.Fatal("Bad database connection URL", zap.Error(err))
}
query := parsedURL.Query()
if len(query.Get("sslmode")) == 0 {
query.Set("sslmode", "prefer")
parsedURL.RawQuery = query.Encode()
}

if len(parsedURL.User.Username()) < 1 {
parsedURL.User = url.User("root")
}
if len(parsedURL.Path) < 1 {
parsedURL.Path = "/nakama"
}

multiLogger.Debug("Complete database connection URL", zap.String("raw_url", parsedURL.String()))
db, err := sql.Open("pgx", parsedURL.String())
if err != nil {
multiLogger.Fatal("Error connecting to database", zap.Error(err))
}
// Limit the time allowed to ping database and get version to 15 seconds total.
ctx, ctxCancelFn := context.WithTimeout(context.Background(), 15*time.Second)
defer ctxCancelFn()
if err = db.PingContext(ctx); err != nil {
if strings.HasSuffix(err.Error(), "does not exist (SQLSTATE 3D000)") {
multiLogger.Fatal("Database schema not found, run `nakama migrate up`", zap.Error(err))
}
multiLogger.Fatal("Error pinging database", zap.Error(err))
}

db.SetConnMaxLifetime(time.Millisecond * time.Duration(config.GetDatabase().ConnMaxLifetimeMs))
db.SetMaxOpenConns(config.GetDatabase().MaxOpenConns)
db.SetMaxIdleConns(config.GetDatabase().MaxIdleConns)

var dbVersion string
if err = db.QueryRowContext(ctx, "SELECT version()").Scan(&dbVersion); err != nil {
multiLogger.Fatal("Error querying database version", zap.Error(err))
}

return db, dbVersion
}

// Tx is used to permit clients to implement custom transaction logic.
type Tx interface {
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
Expand Down
Loading

0 comments on commit 5d74bb0

Please sign in to comment.