Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple servercontexts #629

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 20 additions & 36 deletions server.py
Expand Up @@ -11,7 +11,6 @@
import logging
import os
import signal
import socket
import sys
from datetime import datetime

Expand All @@ -20,9 +19,11 @@
import server
from server.api.api_accessor import ApiAccessor
from server.config import config
from server.core import create_services
from server.game_service import GameService
from server.ice_servers.nts import TwilioNTS
from server.player_service import PlayerService
from server.profiler import Profiler
from server.protocol import SimpleJsonProtocol


async def main():
Expand Down Expand Up @@ -63,65 +64,48 @@ def signal_handler(sig: int, _frame):

api_accessor = ApiAccessor()

services = create_services({
"api_accessor": api_accessor,
"database": database,
"loop": loop,
})

await asyncio.gather(*[
service.initialize() for service in services.values()
])
instance = server.ServerInstance(
"LobbyServer",
database,
api_accessor,
twilio_nts,
loop
)
player_service: PlayerService = instance.services["player_service"]
game_service: GameService = instance.services["game_service"]

profiler = Profiler(services["player_service"])
profiler = Profiler(player_service)
profiler.refresh()
config.register_callback("PROFILING_COUNT", profiler.refresh)
config.register_callback("PROFILING_DURATION", profiler.refresh)
config.register_callback("PROFILING_INTERVAL", profiler.refresh)

ctrl_server = await server.run_control_server(
services["player_service"],
services["game_service"]
)
ctrl_server = await server.run_control_server(player_service, game_service)

async def restart_control_server():
nonlocal ctrl_server
nonlocal services

await ctrl_server.shutdown()
ctrl_server = await server.run_control_server(
services["player_service"],
services["game_service"]
player_service,
game_service
)
config.register_callback("CONTROL_SERVER_PORT", restart_control_server)

lobby_server = await server.run_lobby_server(
address=('', 8001),
database=database,
geoip_service=services["geo_ip_service"],
player_service=services["player_service"],
game_service=services["game_service"],
nts_client=twilio_nts,
ladder_service=services["ladder_service"],
loop=loop
)

for sock in lobby_server.sockets:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
await instance.listen(("", 8001))
await instance.listen(("", 8002), SimpleJsonProtocol)

server.metrics.info.info({
"version": os.environ.get("VERSION") or "dev",
"python_version": ".".join(map(str, sys.version_info[:3])),
"start_time": datetime.utcnow().strftime("%m-%d %H:%M"),
"game_uid": str(services["game_service"].game_id_counter)
"game_uid": str(game_service.game_id_counter)
})

await done

# Cleanup
await asyncio.gather(*[
service.shutdown() for service in services.values()
])
await instance.shutdown()
await ctrl_server.shutdown()

# Close DB connections
Expand Down
238 changes: 157 additions & 81 deletions server/__init__.py
Expand Up @@ -6,16 +6,21 @@

Distributed under GPLv3, see license.txt
"""
import asyncio
import logging
from typing import Optional, Tuple, Type
from typing import Dict, Optional, Set, Tuple, Type

from prometheus_client import start_http_server

from server.db import FAFDatabase
import server.metrics as metrics

from .config import config
from .api.api_accessor import ApiAccessor
from .asyncio_extensions import synchronizedmethod
from .config import TRACE, config
from .configuration_service import ConfigurationService
from .control import run_control_server
from .core import Service, create_services
from .db import FAFDatabase
from .game_service import GameService
from .gameconnection import GameConnection
from .games import GameState, VisibilityState
Expand All @@ -37,81 +42,126 @@
__copyright__ = 'Copyright (c) 2011-2015 ' + __author__

__all__ = (
'GameConnection',
'GameStatsService',
'GameService',
'LadderService',
'RatingService',
'run_lobby_server',
'run_control_server',
'game_service',
'control',
'abc',
'protocol',
'ConfigurationService',
'MessageQueueService',
'RatingService'
"ConfigurationService",
"GameConnection",
"GameService",
"GameStatsService",
"GeoIpService",
"LadderService",
"MessageQueueService",
"RatingService",
"RatingService",
"ServerInstance",
"abc",
"control",
"game_service",
"protocol",
"run_control_server",
)

DIRTY_REPORT_INTERVAL = 1 # Seconds
stats = None
logger = logging.getLogger("server")

if config.ENABLE_METRICS:
logger.info("Using prometheus on port: %i", config.METRICS_PORT)
start_http_server(config.METRICS_PORT)


async def run_lobby_server(
address: Tuple[str, int],
database: FAFDatabase,
player_service: PlayerService,
game_service: GameService,
nts_client: Optional[TwilioNTS],
geoip_service: GeoIpService,
ladder_service: LadderService,
loop,
protocol_class: Type[Protocol] = QDataStreamProtocol,
) -> ServerContext:
class ServerInstance(object):
"""
Run the lobby server
A class representing a shared server state. Each ServerInstance may be
exposed on multiple ports, but each port will share the same internal server
state, i.e. the same players, games, etc.
"""

@at_interval(DIRTY_REPORT_INTERVAL, loop=loop)
async def do_report_dirties():
game_service.update_active_game_metrics()
dirty_games = game_service.dirty_games
dirty_queues = game_service.dirty_queues
dirty_players = player_service.dirty_players
game_service.clear_dirty()
player_service.clear_dirty()
def __init__(
self,
name: str,
database: FAFDatabase,
api_accessor: Optional[ApiAccessor],
twilio_nts: Optional[TwilioNTS],
loop: asyncio.BaseEventLoop,
# For testing
_override_services: Optional[Dict[str, Service]] = None
):
self.name = name
self._logger = logging.getLogger(self.name)
self.database = database
self.api_accessor = api_accessor
self.twilio_nts = twilio_nts
self.loop = loop

self.started = False

self.contexts: Set[ServerContext] = set()

self.services = _override_services or create_services({
"database": self.database,
"api_accessor": self.api_accessor,
"loop": self.loop,
})

self.connection_factory = lambda: LobbyConnection(
database=database,
geoip=self.services["geo_ip_service"],
game_service=self.services["game_service"],
nts_client=twilio_nts,
players=self.services["player_service"],
ladder_service=self.services["ladder_service"]
)

def write_broadcast(self, message, predicate=lambda conn: conn.authenticated):
self._logger.log(TRACE, "]]: %s", message)
metrics.server_broadcasts.inc()

for ctx in self.contexts:
try:
ctx.write_broadcast(message, predicate)
except Exception:
self._logger.exception(
"Error writing '%s'",
message.get("command", message)
)

@synchronizedmethod
async def _start_services(self) -> None:
if self.started:
return

await asyncio.gather(*[
service.initialize() for service in self.services.values()
])

game_service: GameService = self.services["game_service"]
player_service: PlayerService = self.services["player_service"]

@at_interval(DIRTY_REPORT_INTERVAL, loop=self.loop)
def do_report_dirties():
game_service.update_active_game_metrics()
dirty_games = game_service.dirty_games
dirty_queues = game_service.dirty_queues
dirty_players = player_service.dirty_players
game_service.clear_dirty()
player_service.clear_dirty()

try:
if dirty_queues:
ctx.write_broadcast({
'command': 'matchmaker_info',
'queues': [queue.to_dict() for queue in dirty_queues]
},
lambda lobby_conn: lobby_conn.authenticated
self.write_broadcast({
"command": "matchmaker_info",
"queues": [queue.to_dict() for queue in dirty_queues]
}
)
except Exception:
logger.exception("Error writing matchmaker_info")

try:
if dirty_players:
ctx.write_broadcast({
'command': 'player_info',
'players': [player.to_dict() for player in dirty_players]
self.write_broadcast({
"command": "player_info",
"players": [player.to_dict() for player in dirty_players]
},
lambda lobby_conn: lobby_conn.authenticated
)
except Exception:
logger.exception("Error writing player_info")

# TODO: This spams squillions of messages: we should implement per-
# connection message aggregation at the next abstraction layer down :P
for game in dirty_games:
try:
# TODO: This spams squillions of messages: we should implement per-
# connection message aggregation at the next abstraction layer down :P
for game in dirty_games:
if game.state == GameState.ENDED:
game_service.remove_game(game)

Expand All @@ -123,37 +173,63 @@ async def do_report_dirties():
if game.visibility == VisibilityState.FRIENDS:
# To see this game, you must have an authenticated
# connection and be a friend of the host, or the host.
def validation_func(lobby_conn):
return lobby_conn.player.id in game.host.friends or \
lobby_conn.player == game.host
def validation_func(conn):
return conn.player.id in game.host.friends or \
conn.player == game.host
else:
def validation_func(lobby_conn):
return lobby_conn.player.id not in game.host.foes
def validation_func(conn):
return conn.player.id not in game.host.foes

ctx.write_broadcast(
self.write_broadcast(
message,
lambda lobby_conn: lobby_conn.authenticated and validation_func(lobby_conn)
lambda conn:
conn.authenticated and validation_func(conn)
)
except Exception:
logger.exception("Error writing game_info %s", game.id)

ping_msg = protocol_class.encode_message({"command": "ping"})
@at_interval(45, loop=self.loop)
def ping_broadcast():
self.write_broadcast({"command": "ping"})

self.started = True

async def listen(
self,
address: Tuple[str, int],
protocol_class: Type[Protocol] = QDataStreamProtocol
) -> ServerContext:
"""
Start listening on a new address.
"""
if not self.started:
Askaholic marked this conversation as resolved.
Show resolved Hide resolved
await self._start_services()

ctx = ServerContext(
f"{self.name}[{protocol_class.__name__}]",
self.connection_factory,
protocol_class
)
self.contexts.add(ctx)

@at_interval(45, loop=loop)
def ping_broadcast():
ctx.write_broadcast_raw(ping_msg)
await ctx.listen(*address)

def make_connection() -> LobbyConnection:
return LobbyConnection(
database=database,
geoip=geoip_service,
game_service=game_service,
nts_client=nts_client,
players=player_service,
ladder_service=ladder_service
)
return ctx

async def shutdown(self):
for ctx in self.contexts:
ctx.close()

for ctx in self.contexts:
try:
await ctx.wait_closed()
except Exception:
self._logger.error(
"Encountered unexpected error when trying to shut down "
"context %s",
ctx
)

ctx = ServerContext("LobbyServer", make_connection, protocol_class)
await asyncio.gather(*[
service.shutdown() for service in self.services.values()
])

await ctx.listen(*address)
return ctx
self.started = False