Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event hook registration and processing. #315

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
595 changes: 330 additions & 265 deletions api/api.pb.go

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions api/api.proto
Expand Up @@ -288,6 +288,16 @@ message DeleteStorageObjectsRequest {
repeated DeleteStorageObjectId object_ids = 1;
}

// Represents an event to be passed through the server to registered event handlers.
message Event {
// An event name, type, category, or identifier.
string name = 1;
// Arbitrary event property values.
map<string, string> properties = 2;
// The time when the event was triggered.
google.protobuf.Timestamp timestamp = 3;
}

// A friend of a user.
message Friend {
// The friendship status.
Expand Down
8 changes: 7 additions & 1 deletion runtime/runtime.go
Expand Up @@ -101,7 +101,7 @@ const (
RUNTIME_CTX_ENV = "env"

// The mode associated with the execution context. It's one of these values:
// "run_once", "rpc", "before", "after", "match", "matchmaker", "leaderboard_reset", "tournament_reset", "tournament_end".
// "event", "run_once", "rpc", "before", "after", "match", "matchmaker", "leaderboard_reset", "tournament_reset", "tournament_end".
RUNTIME_CTX_MODE = "execution_mode"

// Query params that was passed through from HTTP request.
Expand Down Expand Up @@ -647,6 +647,12 @@ type Initializer interface {

// RegisterAfterGetUsers can be used to perform additional logic after retrieving users.
RegisterAfterGetUsers(fn func(ctx context.Context, logger Logger, db *sql.DB, nk NakamaModule, out *api.Users, in *api.GetUsersRequest) error) error

// RegisterEventSessionStart can be used to define functions triggered when client sessions start.
RegisterEventSessionStart(fn func(ctx context.Context, logger Logger, evt *api.Event)) error

// RegisterEventSessionStart can be used to define functions triggered when client sessions end.
RegisterEventSessionEnd(fn func(ctx context.Context, logger Logger, evt *api.Event)) error
}

type Leaderboard interface {
Expand Down
14 changes: 14 additions & 0 deletions sample_go_module/sample.go
Expand Up @@ -37,6 +37,12 @@ func InitModule(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runti
}); err != nil {
return err
}
if err := initializer.RegisterEventSessionStart(eventSessionStart); err != nil {
return err
}
if err := initializer.RegisterEventSessionEnd(eventSessionEnd); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -129,3 +135,11 @@ func (m *Match) MatchTerminate(ctx context.Context, logger runtime.Logger, db *s

return state
}

func eventSessionStart(ctx context.Context, logger runtime.Logger, evt *api.Event) {
logger.Printf("session start %v %v", ctx, evt)
}

func eventSessionEnd(ctx context.Context, logger runtime.Logger, evt *api.Event) {
logger.Printf("session end %v %v", ctx, evt)
}
2 changes: 1 addition & 1 deletion server/api.go
Expand Up @@ -177,7 +177,7 @@ func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, j
grpcGatewayRouter := mux.NewRouter()
// Special case routes. Do NOT enable compression on WebSocket route, it results in "http: response.Write on hijacked connection" errors.
grpcGatewayRouter.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) }).Methods("GET")
grpcGatewayRouter.HandleFunc("/ws", NewSocketWsAcceptor(logger, config, sessionRegistry, matchmaker, tracker, jsonpbMarshaler, jsonpbUnmarshaler, pipeline)).Methods("GET")
grpcGatewayRouter.HandleFunc("/ws", NewSocketWsAcceptor(logger, config, sessionRegistry, matchmaker, tracker, runtime, jsonpbMarshaler, jsonpbUnmarshaler, pipeline)).Methods("GET")

// Enable stats recording on all request paths except:
// "/" is not tracked at all.
Expand Down
42 changes: 26 additions & 16 deletions server/config.go
Expand Up @@ -185,6 +185,12 @@ func CheckConfig(logger *zap.Logger, config Config) map[string]string {
if config.GetRuntime().CallStackSize < 1 {
logger.Fatal("Runtime instance call stack size must be >= 1", zap.Int("runtime.call_stack_size", config.GetRuntime().CallStackSize))
}
if config.GetRuntime().EventQueueSize < 1 {
logger.Fatal("Runtime event queue stack size must be >= 1", zap.Int("runtime.event_queue_size", config.GetRuntime().EventQueueSize))
}
if config.GetRuntime().EventQueueWorkers < 1 {
logger.Fatal("Runtime event queue workers must be >= 1", zap.Int("runtime.event_queue_workers", config.GetRuntime().EventQueueWorkers))
}
if config.GetRuntime().RegistrySize < 128 {
logger.Fatal("Runtime instance registry size must be >= 128", zap.Int("runtime.registry_size", config.GetRuntime().RegistrySize))
}
Expand Down Expand Up @@ -591,27 +597,31 @@ func NewSocialConfig() *SocialConfig {

// RuntimeConfig is configuration relevant to the Runtime Lua VM.
type RuntimeConfig struct {
Environment map[string]string `yaml:"-" json:"-"`
Env []string `yaml:"env" json:"env" usage:"Values to pass into Runtime as environment variables."`
Path string `yaml:"path" json:"path" usage:"Path for the server to scan for Lua and Go library files."`
HTTPKey string `yaml:"http_key" json:"http_key" usage:"Runtime HTTP Invocation key."`
MinCount int `yaml:"min_count" json:"min_count" usage:"Minimum number of runtime instances to allocate. Default 16."`
MaxCount int `yaml:"max_count" json:"max_count" usage:"Maximum number of runtime instances to allocate. Default 256."`
CallStackSize int `yaml:"call_stack_size" json:"call_stack_size" usage:"Size of each runtime instance's call stack. Default 128."`
RegistrySize int `yaml:"registry_size" json:"registry_size" usage:"Size of each runtime instance's registry. Default 512."`
Environment map[string]string `yaml:"-" json:"-"`
Env []string `yaml:"env" json:"env" usage:"Values to pass into Runtime as environment variables."`
Path string `yaml:"path" json:"path" usage:"Path for the server to scan for Lua and Go library files."`
HTTPKey string `yaml:"http_key" json:"http_key" usage:"Runtime HTTP Invocation key."`
MinCount int `yaml:"min_count" json:"min_count" usage:"Minimum number of runtime instances to allocate. Default 16."`
MaxCount int `yaml:"max_count" json:"max_count" usage:"Maximum number of runtime instances to allocate. Default 256."`
CallStackSize int `yaml:"call_stack_size" json:"call_stack_size" usage:"Size of each runtime instance's call stack. Default 128."`
RegistrySize int `yaml:"registry_size" json:"registry_size" usage:"Size of each runtime instance's registry. Default 512."`
EventQueueSize int `yaml:"event_queue_size" json:"event_queue_size" usage:"Size of the event queue buffer. Default 8192."`
EventQueueWorkers int `yaml:"event_queue_workers" json:"event_queue_workers" usage:"Number of workers to use for concurrent processing of events. Default 8."`
}

// NewRuntimeConfig creates a new RuntimeConfig struct.
func NewRuntimeConfig() *RuntimeConfig {
return &RuntimeConfig{
Environment: make(map[string]string, 0),
Env: make([]string, 0),
Path: "",
HTTPKey: "defaultkey",
MinCount: 16,
MaxCount: 256,
CallStackSize: 128,
RegistrySize: 512,
Environment: make(map[string]string, 0),
Env: make([]string, 0),
Path: "",
HTTPKey: "defaultkey",
MinCount: 16,
MaxCount: 256,
CallStackSize: 128,
RegistrySize: 512,
EventQueueSize: 8192,
EventQueueWorkers: 8,
}
}

Expand Down
42 changes: 38 additions & 4 deletions server/runtime.go
Expand Up @@ -17,12 +17,11 @@ package server
import (
"context"
"database/sql"
"github.com/heroiclabs/nakama/runtime"
"os"
"path/filepath"
"strings"

"github.com/heroiclabs/nakama/runtime"

"github.com/gofrs/uuid"
"github.com/golang/protobuf/jsonpb"
"github.com/heroiclabs/nakama/api"
Expand Down Expand Up @@ -170,12 +169,18 @@ type (
RuntimeTournamentResetFunction func(ctx context.Context, tournament *api.Tournament, end, reset int64) error

RuntimeLeaderboardResetFunction func(ctx context.Context, leaderboard runtime.Leaderboard, reset int64) error

RuntimeEventFunction func(ctx context.Context, logger runtime.Logger, evt *api.Event)

RuntimeEventSessionStartFunction func(userID, username string, expiry int64, sessionID, clientIP, clientPort string, evtTimeSec int64)
RuntimeEventSessionEndFunction func(userID, username string, expiry int64, sessionID, clientIP, clientPort string, evtTimeSec int64)
)

type RuntimeExecutionMode int

const (
RuntimeExecutionModeRunOnce RuntimeExecutionMode = iota
RuntimeExecutionModeEvent RuntimeExecutionMode = iota
RuntimeExecutionModeRunOnce
RuntimeExecutionModeRPC
RuntimeExecutionModeBefore
RuntimeExecutionModeAfter
Expand All @@ -189,6 +194,8 @@ const (

func (e RuntimeExecutionMode) String() string {
switch e {
case RuntimeExecutionModeEvent:
return "event"
case RuntimeExecutionModeRunOnce:
return "run_once"
case RuntimeExecutionModeRPC:
Expand Down Expand Up @@ -225,6 +232,11 @@ type RuntimeMatchCore interface {
Cancel()
}

type RuntimeEventFunctions struct {
sessionStartFunction RuntimeEventSessionStartFunction
sessionEndFunction RuntimeEventSessionEndFunction
}

type RuntimeBeforeReqFunctions struct {
beforeGetAccountFunction RuntimeBeforeGetAccountFunction
beforeUpdateAccountFunction RuntimeBeforeUpdateAccountFunction
Expand Down Expand Up @@ -362,6 +374,8 @@ type Runtime struct {
tournamentResetFunction RuntimeTournamentResetFunction

leaderboardResetFunction RuntimeLeaderboardResetFunction

eventFunctions *RuntimeEventFunctions
}

func NewRuntime(logger, startupLogger *zap.Logger, db *sql.DB, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, matchRegistry MatchRegistry, tracker Tracker, streamManager StreamManager, router MessageRouter) (*Runtime, error) {
Expand Down Expand Up @@ -389,7 +403,11 @@ func NewRuntime(logger, startupLogger *zap.Logger, db *sql.DB, jsonpbMarshaler *
return nil, err
}

goModules, goRpcFunctions, goBeforeRtFunctions, goAfterRtFunctions, goBeforeReqFunctions, goAfterReqFunctions, goMatchmakerMatchedFunction, goMatchCreateFn, goTournamentEndFunction, goTournamentResetFunction, goLeaderboardResetFunction, goSetMatchCreateFn, goMatchNamesListFn, err := NewRuntimeProviderGo(logger, startupLogger, db, config, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, streamManager, router, runtimeConfig.Path, paths)
startupLogger.Info("Initialising runtime event queue processor")
eventQueue := NewRuntimeEventQueue(logger, config)
startupLogger.Info("Runtime event queue processor started", zap.Int("size", config.GetRuntime().EventQueueSize), zap.Int("workers", config.GetRuntime().EventQueueWorkers))

goModules, goRpcFunctions, goBeforeRtFunctions, goAfterRtFunctions, goBeforeReqFunctions, goAfterReqFunctions, goMatchmakerMatchedFunction, goMatchCreateFn, goTournamentEndFunction, goTournamentResetFunction, goLeaderboardResetFunction, allEventFunctions, goSetMatchCreateFn, goMatchNamesListFn, err := NewRuntimeProviderGo(logger, startupLogger, db, config, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, streamManager, router, runtimeConfig.Path, paths, eventQueue)
if err != nil {
startupLogger.Error("Error initialising Go runtime provider", zap.Error(err))
return nil, err
Expand All @@ -413,6 +431,13 @@ func NewRuntime(logger, startupLogger *zap.Logger, db *sql.DB, jsonpbMarshaler *
}
startupLogger.Info("Found runtime modules", zap.Int("count", len(allModules)), zap.Strings("modules", allModules))

if allEventFunctions.sessionStartFunction != nil {
startupLogger.Info("Registered event function invocation", zap.String("id", "session_start"))
}
if allEventFunctions.sessionEndFunction != nil {
startupLogger.Info("Registered event function invocation", zap.String("id", "session_end"))
}

allRpcFunctions := make(map[string]RuntimeRpcFunction, len(goRpcFunctions)+len(luaRpcFunctions))
for id, fn := range luaRpcFunctions {
allRpcFunctions[id] = fn
Expand Down Expand Up @@ -1302,6 +1327,7 @@ func NewRuntime(logger, startupLogger *zap.Logger, db *sql.DB, jsonpbMarshaler *
tournamentEndFunction: allTournamentEndFunction,
tournamentResetFunction: allTournamentResetFunction,
leaderboardResetFunction: allLeaderboardResetFunction,
eventFunctions: allEventFunctions,
}, nil
}

Expand Down Expand Up @@ -1792,3 +1818,11 @@ func (r *Runtime) TournamentReset() RuntimeTournamentResetFunction {
func (r *Runtime) LeaderboardReset() RuntimeLeaderboardResetFunction {
return r.leaderboardResetFunction
}

func (r *Runtime) EventSessionStart() RuntimeEventSessionStartFunction {
return r.eventFunctions.sessionStartFunction
}

func (r *Runtime) EventSessionEnd() RuntimeEventSessionEndFunction {
return r.eventFunctions.sessionEndFunction
}
68 changes: 68 additions & 0 deletions server/runtime_event.go
@@ -0,0 +1,68 @@
// Copyright 2019 The Nakama Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"go.uber.org/zap"
)

type RuntimeEventQueue struct {
logger *zap.Logger

ch chan func()

ctx context.Context
ctxCancelFn context.CancelFunc
}

func NewRuntimeEventQueue(logger *zap.Logger, config Config) *RuntimeEventQueue {
b := &RuntimeEventQueue{
logger: logger,

ch: make(chan func(), config.GetRuntime().EventQueueSize),
}
b.ctx, b.ctxCancelFn = context.WithCancel(context.Background())

// Start a fixed number of workers.
for i := 0; i < config.GetRuntime().EventQueueWorkers; i++ {
go func() {
for {
select {
case <-b.ctx.Done():
return
case fn := <-b.ch:
fn()
}
}
}()
}

return b
}

func (b *RuntimeEventQueue) Queue(fn func()) {
select {
case b.ch <- fn:
// Event queued successfully.
default:
// Event queue is full, drop it to avoid blocking the caller.
b.logger.Warn("Runtime event queue full, events may be lost")
}
}

func (b *RuntimeEventQueue) Stop() {
b.ctxCancelFn()
}