Skip to content

Commit

Permalink
Add Fleet Manager support
Browse files Browse the repository at this point in the history
  • Loading branch information
sesposito committed Mar 15, 2024
1 parent 7de80ce commit cab254b
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 13 deletions.
6 changes: 3 additions & 3 deletions build/Dockerfile.local
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
## See the License for the specific language governing permissions and
## limitations under the License.

# docker build .. -f Dockerfile.local -t nakama:dev
# docker build "$PWD"/.. -f Dockerfile.local -t nakama:dev

FROM golang:1.21.6-bookworm as builder
FROM --platform=linux/amd64 golang:1.21.6-bookworm AS builder

ENV GOOS linux
ENV CGO_ENABLED 1
Expand All @@ -29,7 +29,7 @@ COPY . .

RUN go build -o /go/build-out/nakama -trimpath -mod=vendor -gcflags "-trimpath $PWD" -asmflags "-trimpath $PWD" -ldflags "-s -w -X main.version=local"

FROM debian:bookworm-slim
FROM --platform=linux/amd64 debian:bookworm-slim

MAINTAINER Heroic Labs <support@heroiclabs.com>

Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,13 @@ func main() {
tracker.SetMatchJoinListener(matchRegistry.Join)
tracker.SetMatchLeaveListener(matchRegistry.Leave)
streamManager := server.NewLocalStreamManager(config, sessionRegistry, tracker)
fmCallbackHandler := server.NewLocalFmCallbackHandler(config)

storageIndex, err := server.NewLocalStorageIndex(logger, db, config.GetStorage(), metrics)
if err != nil {
logger.Fatal("Failed to initialize storage index", zap.Error(err))
}
runtime, runtimeInfo, err := server.NewRuntime(ctx, logger, startupLogger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, version, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, sessionCache, statusRegistry, matchRegistry, tracker, metrics, streamManager, router, storageIndex)
runtime, runtimeInfo, err := server.NewRuntime(ctx, logger, startupLogger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, version, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, sessionCache, statusRegistry, matchRegistry, tracker, metrics, streamManager, router, storageIndex, fmCallbackHandler)
if err != nil {
startupLogger.Fatal("Failed initializing runtime modules", zap.Error(err))
}
Expand Down
61 changes: 61 additions & 0 deletions server/fleet_manager_callback_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 The Nakama Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"github.com/gofrs/uuid/v5"
"github.com/heroiclabs/nakama-common/runtime"
"net"
"sync"
)

type LocalFmCallbackHandler struct {
callbackRegistry sync.Map
idGenerator uuid.Generator

nodeHash [6]byte
}

func NewLocalFmCallbackHandler(config Config) runtime.FmCallbackHandler {
hash := NodeToHash(config.GetName())
callbackIdGen := uuid.NewGenWithHWAF(func() (net.HardwareAddr, error) {
return hash[:], nil
})

return &LocalFmCallbackHandler{
callbackRegistry: sync.Map{},
idGenerator: callbackIdGen,

nodeHash: hash,
}
}

func (fch *LocalFmCallbackHandler) GenerateCallbackId() string {
return uuid.Must(fch.idGenerator.NewV1()).String()
}

func (fch *LocalFmCallbackHandler) SetCallback(callbackId string, fn runtime.FmCreateCallbackFn) {
fch.callbackRegistry.Store(callbackId, fn)
}

func (fch *LocalFmCallbackHandler) InvokeCallback(callbackId string, status runtime.FmCreateStatus, instanceInfo *runtime.InstanceInfo, sessionInfo []*runtime.SessionInfo, metadata map[string]any, err error) {
callback, ok := fch.callbackRegistry.LoadAndDelete(callbackId)
if !ok || callback == nil {
return
}

fn := callback.(runtime.FmCreateCallbackFn)
fn(status, instanceInfo, sessionInfo, metadata, err)
}
12 changes: 8 additions & 4 deletions server/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,8 @@ type Runtime struct {
leaderboardResetFunction RuntimeLeaderboardResetFunction

eventFunctions *RuntimeEventFunctions

fleetManager runtime.FleetManager
}

type MatchNamesListFunction func() []string
Expand Down Expand Up @@ -625,7 +627,7 @@ func CheckRuntime(logger *zap.Logger, config Config, version string) error {
return nil
}

func NewRuntime(ctx context.Context, logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, config Config, version string, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, sessionCache SessionCache, statusRegistry *StatusRegistry, matchRegistry MatchRegistry, tracker Tracker, metrics Metrics, streamManager StreamManager, router MessageRouter, storageIndex StorageIndex) (*Runtime, *RuntimeInfo, error) {
func NewRuntime(ctx context.Context, logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, config Config, version string, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, sessionCache SessionCache, statusRegistry *StatusRegistry, matchRegistry MatchRegistry, tracker Tracker, metrics Metrics, streamManager StreamManager, router MessageRouter, storageIndex StorageIndex, fmCallbackHandler runtime.FmCallbackHandler) (*Runtime, *RuntimeInfo, error) {
runtimeConfig := config.GetRuntime()
startupLogger.Info("Initialising runtime", zap.String("path", runtimeConfig.Path))

Expand All @@ -640,19 +642,19 @@ func NewRuntime(ctx context.Context, logger, startupLogger *zap.Logger, db *sql.

matchProvider := NewMatchProvider()

goModules, goRPCFns, goBeforeRtFns, goAfterRtFns, goBeforeReqFns, goAfterReqFns, goMatchmakerMatchedFn, goMatchmakerCustomMatchingFn, goTournamentEndFn, goTournamentResetFn, goLeaderboardResetFn, goPurchaseNotificationAppleFn, goSubscriptionNotificationAppleFn, goPurchaseNotificationGoogleFn, goSubscriptionNotificationGoogleFn, goIndexFilterFns, allEventFns, goMatchNamesListFn, err := NewRuntimeProviderGo(ctx, logger, startupLogger, db, protojsonMarshaler, config, version, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, sessionCache, statusRegistry, matchRegistry, tracker, metrics, streamManager, router, storageIndex, runtimeConfig.Path, paths, eventQueue, matchProvider)
goModules, goRPCFns, goBeforeRtFns, goAfterRtFns, goBeforeReqFns, goAfterReqFns, goMatchmakerMatchedFn, goMatchmakerCustomMatchingFn, goTournamentEndFn, goTournamentResetFn, goLeaderboardResetFn, goPurchaseNotificationAppleFn, goSubscriptionNotificationAppleFn, goPurchaseNotificationGoogleFn, goSubscriptionNotificationGoogleFn, goIndexFilterFns, fleetManager, allEventFns, goMatchNamesListFn, err := NewRuntimeProviderGo(ctx, logger, startupLogger, db, protojsonMarshaler, config, version, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, sessionCache, statusRegistry, matchRegistry, tracker, metrics, streamManager, router, storageIndex, runtimeConfig.Path, paths, eventQueue, matchProvider, fmCallbackHandler)
if err != nil {
startupLogger.Error("Error initialising Go runtime provider", zap.Error(err))
return nil, nil, err
}

luaModules, luaRPCFns, luaBeforeRtFns, luaAfterRtFns, luaBeforeReqFns, luaAfterReqFns, luaMatchmakerMatchedFn, luaTournamentEndFn, luaTournamentResetFn, luaLeaderboardResetFn, luaPurchaseNotificationAppleFn, luaSubscriptionNotificationAppleFn, luaPurchaseNotificationGoogleFn, luaSubscriptionNotificationGoogleFn, luaIndexFilterFns, err := NewRuntimeProviderLua(ctx, logger, startupLogger, db, protojsonMarshaler, protojsonUnmarshaler, config, version, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, sessionCache, statusRegistry, matchRegistry, tracker, metrics, streamManager, router, allEventFns.eventFunction, runtimeConfig.Path, paths, matchProvider, storageIndex)
luaModules, luaRPCFns, luaBeforeRtFns, luaAfterRtFns, luaBeforeReqFns, luaAfterReqFns, luaMatchmakerMatchedFn, luaTournamentEndFn, luaTournamentResetFn, luaLeaderboardResetFn, luaPurchaseNotificationAppleFn, luaSubscriptionNotificationAppleFn, luaPurchaseNotificationGoogleFn, luaSubscriptionNotificationGoogleFn, luaIndexFilterFns, err := NewRuntimeProviderLua(ctx, logger, startupLogger, db, protojsonMarshaler, protojsonUnmarshaler, config, version, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, sessionCache, statusRegistry, matchRegistry, tracker, metrics, streamManager, router, allEventFns.eventFunction, runtimeConfig.Path, paths, matchProvider, storageIndex) //, fleetManager)
if err != nil {
startupLogger.Error("Error initialising Lua runtime provider", zap.Error(err))
return nil, nil, err
}

jsModules, jsRPCFns, jsBeforeRtFns, jsAfterRtFns, jsBeforeReqFns, jsAfterReqFns, jsMatchmakerMatchedFn, jsTournamentEndFn, jsTournamentResetFn, jsLeaderboardResetFn, jsPurchaseNotificationAppleFn, jsSubscriptionNotificationAppleFn, jsPurchaseNotificationGoogleFn, jsSubscriptionNotificationGoogleFn, jsIndexFilterFns, err := NewRuntimeProviderJS(ctx, logger, startupLogger, db, protojsonMarshaler, protojsonUnmarshaler, config, version, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, sessionCache, statusRegistry, matchRegistry, tracker, metrics, streamManager, router, allEventFns.eventFunction, runtimeConfig.Path, runtimeConfig.JsEntrypoint, matchProvider, storageIndex)
jsModules, jsRPCFns, jsBeforeRtFns, jsAfterRtFns, jsBeforeReqFns, jsAfterReqFns, jsMatchmakerMatchedFn, jsTournamentEndFn, jsTournamentResetFn, jsLeaderboardResetFn, jsPurchaseNotificationAppleFn, jsSubscriptionNotificationAppleFn, jsPurchaseNotificationGoogleFn, jsSubscriptionNotificationGoogleFn, jsIndexFilterFns, err := NewRuntimeProviderJS(ctx, logger, startupLogger, db, protojsonMarshaler, protojsonUnmarshaler, config, version, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, sessionCache, statusRegistry, matchRegistry, tracker, metrics, streamManager, router, allEventFns.eventFunction, runtimeConfig.Path, runtimeConfig.JsEntrypoint, matchProvider, storageIndex) //, fleetManager)
if err != nil {
startupLogger.Error("Error initialising JavaScript runtime provider", zap.Error(err))
return nil, nil, err
Expand Down Expand Up @@ -2632,6 +2634,8 @@ func NewRuntime(ctx context.Context, logger, startupLogger *zap.Logger, db *sql.
subscriptionNotificationGoogleFunction: allSubscriptionNotificationGoogleFunction,
storageIndexFilterFunctions: allStorageIndexFilterFunctions,

fleetManager: fleetManager,

eventFunctions: allEventFns,
}, rInfo, nil
}
Expand Down
32 changes: 27 additions & 5 deletions server/runtime_go.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"errors"
"fmt"
"path/filepath"
"plugin"
"strings"
Expand All @@ -43,7 +44,7 @@ type RuntimeGoInitializer struct {
node string
version string
env map[string]string
nk runtime.NakamaModule
nk runtime.NakamaModule // *RuntimeGoNakamaModule

rpc map[string]RuntimeRpcFunction
beforeRt map[string]RuntimeBeforeRtFunction
Expand All @@ -61,6 +62,8 @@ type RuntimeGoInitializer struct {
matchmakerOverride RuntimeMatchmakerOverrideFunction
storageIndexFunctions map[string]RuntimeStorageIndexFilterFunction

fleetManager runtime.FleetManager

eventFunctions []RuntimeEventFunction
sessionStartFunctions []RuntimeEventFunction
sessionEndFunctions []RuntimeEventFunction
Expand All @@ -69,6 +72,8 @@ type RuntimeGoInitializer struct {

match map[string]func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule) (runtime.Match, error)
matchLock *sync.RWMutex

fmCallbackHandler runtime.FmCallbackHandler
}

func (ri *RuntimeGoInitializer) RegisterEvent(fn func(ctx context.Context, logger runtime.Logger, evt *api.Event)) error {
Expand Down Expand Up @@ -2621,17 +2626,30 @@ func (ri *RuntimeGoInitializer) RegisterStorageIndexFilter(indexName string, fn
return nil
}

// TODO: wrap runtime.FleetManager interface so that Init is not exposed
func (ri *RuntimeGoInitializer) RegisterFleetManager(fleetManager runtime.FleetManagerInitializer) error {
if fleetManager == nil {
return errors.New("fleet manager cannot be nil")
}
if err := fleetManager.Init(ri.nk, ri.fmCallbackHandler); err != nil {
return fmt.Errorf("failed to run fleet manager Init function: %s", err.Error())
}
ri.fleetManager = fleetManager
return nil
}

func (ri *RuntimeGoInitializer) RegisterMatch(name string, fn func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule) (runtime.Match, error)) error {
ri.matchLock.Lock()
ri.match[name] = fn
ri.matchLock.Unlock()
return nil
}

func NewRuntimeProviderGo(ctx context.Context, logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, config Config, version string, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, sessionCache SessionCache, statusRegistry *StatusRegistry, matchRegistry MatchRegistry, tracker Tracker, metrics Metrics, streamManager StreamManager, router MessageRouter, storageIndex StorageIndex, rootPath string, paths []string, eventQueue *RuntimeEventQueue, matchProvider *MatchProvider) ([]string, map[string]RuntimeRpcFunction, map[string]RuntimeBeforeRtFunction, map[string]RuntimeAfterRtFunction, *RuntimeBeforeReqFunctions, *RuntimeAfterReqFunctions, RuntimeMatchmakerMatchedFunction, RuntimeMatchmakerOverrideFunction, RuntimeTournamentEndFunction, RuntimeTournamentResetFunction, RuntimeLeaderboardResetFunction, RuntimePurchaseNotificationAppleFunction, RuntimeSubscriptionNotificationAppleFunction, RuntimePurchaseNotificationGoogleFunction, RuntimeSubscriptionNotificationGoogleFunction, map[string]RuntimeStorageIndexFilterFunction, *RuntimeEventFunctions, func() []string, error) {
func NewRuntimeProviderGo(ctx context.Context, logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, config Config, version string, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, sessionCache SessionCache, statusRegistry *StatusRegistry, matchRegistry MatchRegistry, tracker Tracker, metrics Metrics, streamManager StreamManager, router MessageRouter, storageIndex StorageIndex, rootPath string, paths []string, eventQueue *RuntimeEventQueue, matchProvider *MatchProvider, fmCallbackHandler runtime.FmCallbackHandler) ([]string, map[string]RuntimeRpcFunction, map[string]RuntimeBeforeRtFunction, map[string]RuntimeAfterRtFunction, *RuntimeBeforeReqFunctions, *RuntimeAfterReqFunctions, RuntimeMatchmakerMatchedFunction, RuntimeMatchmakerOverrideFunction, RuntimeTournamentEndFunction, RuntimeTournamentResetFunction, RuntimeLeaderboardResetFunction, RuntimePurchaseNotificationAppleFunction, RuntimeSubscriptionNotificationAppleFunction, RuntimePurchaseNotificationGoogleFunction, RuntimeSubscriptionNotificationGoogleFunction, map[string]RuntimeStorageIndexFilterFunction, runtime.FleetManager, *RuntimeEventFunctions, func() []string, error) {
runtimeLogger := NewRuntimeGoLogger(logger)
node := config.GetName()
env := config.GetRuntime().Environment

nk := NewRuntimeGoNakamaModule(logger, db, protojsonMarshaler, config, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, sessionCache, statusRegistry, matchRegistry, tracker, metrics, streamManager, router, storageIndex)

match := make(map[string]func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule) (runtime.Match, error), 0)
Expand Down Expand Up @@ -2668,6 +2686,8 @@ func NewRuntimeProviderGo(ctx context.Context, logger, startupLogger *zap.Logger
})
nk.matchCreateFn = matchProvider.CreateMatch

var _ runtime.NakamaModule = nk

initializer := &RuntimeGoInitializer{
logger: runtimeLogger,
db: db,
Expand All @@ -2693,6 +2713,8 @@ func NewRuntimeProviderGo(ctx context.Context, logger, startupLogger *zap.Logger

match: match,
matchLock: matchLock,

fmCallbackHandler: fmCallbackHandler,
}

// The baseline context that will be passed to all InitModule calls.
Expand All @@ -2711,13 +2733,13 @@ func NewRuntimeProviderGo(ctx context.Context, logger, startupLogger *zap.Logger
relPath, name, fn, err := openGoModule(startupLogger, rootPath, path)
if err != nil {
// Errors are already logged in the function above.
return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err
}

// Run the initialisation.
if err = fn(ctx, runtimeLogger, db, nk, initializer); err != nil {
startupLogger.Fatal("Error returned by InitModule function in Go module", zap.String("name", name), zap.Error(err))
return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, errors.New("error returned by InitModule function in Go module")
return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, errors.New("error returned by InitModule function in Go module")
}
modulePaths = append(modulePaths, relPath)
}
Expand Down Expand Up @@ -2765,7 +2787,7 @@ func NewRuntimeProviderGo(ctx context.Context, logger, startupLogger *zap.Logger
}
}

return modulePaths, initializer.rpc, initializer.beforeRt, initializer.afterRt, initializer.beforeReq, initializer.afterReq, initializer.matchmakerMatched, initializer.matchmakerOverride, initializer.tournamentEnd, initializer.tournamentReset, initializer.leaderboardReset, initializer.purchaseNotificationApple, initializer.subscriptionNotificationApple, initializer.purchaseNotificationGoogle, initializer.subscriptionNotificationGoogle, initializer.storageIndexFunctions, events, matchNamesListFn, nil
return modulePaths, initializer.rpc, initializer.beforeRt, initializer.afterRt, initializer.beforeReq, initializer.afterReq, initializer.matchmakerMatched, initializer.matchmakerOverride, initializer.tournamentEnd, initializer.tournamentReset, initializer.leaderboardReset, initializer.purchaseNotificationApple, initializer.subscriptionNotificationApple, initializer.purchaseNotificationGoogle, initializer.subscriptionNotificationGoogle, initializer.storageIndexFunctions, initializer.fleetManager, events, matchNamesListFn, nil
}

func CheckRuntimeProviderGo(logger *zap.Logger, rootPath string, paths []string) error {
Expand Down
6 changes: 6 additions & 0 deletions server/runtime_go_nakama.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type RuntimeGoNakamaModule struct {
node string
matchCreateFn RuntimeMatchCreateFunction
satori runtime.Satori
fleetManager runtime.FleetManager
storageIndex StorageIndex
}

Expand Down Expand Up @@ -4210,3 +4211,8 @@ func (n *RuntimeGoNakamaModule) ChannelIdBuild(ctx context.Context, senderId, ta
func (n *RuntimeGoNakamaModule) GetSatori() runtime.Satori {
return n.satori
}

func (n *RuntimeGoNakamaModule) GetFleetManager() runtime.FleetManager {
// TODO: Should we return error if this is nil?
return n.fleetManager
}

0 comments on commit cab254b

Please sign in to comment.