Skip to content

Commit

Permalink
Update to python 3.9 (#828)
Browse files Browse the repository at this point in the history
* Change imports to use unittest.mock instead of mock

Note that this commit does not remove the mock package from the Pipenv
dependencies.

* Replace asynctest functions with builtins and copied code

Note that this commit does not remove asynctest from the Pipenv dependencies.

* Use errors exported through asyncio

* Remove deprecated asyncio features

* Ensure ongoing rating tasks are finished when shutting down

* Remove mock and asynctest

* Use platform.python_version

* Implement typing.Protocol refactors

* Use dict union to merge dicts

* Use standard collections as typing generics where possible

* Fix flake8 errors

* Ignore errors thrown during disconnect handlers

* Unpin pytest-asyncio version

* Bump python version to 3.9

* Run pipenv lock

* Refactor test_backpressure_handling

* Don't wait for protocol to close when disconnecting

* Force warnings to error in tests

* Close sockets opened in fixtures

* Add cleanup code to event_loop fixture for better isolation

* Limit how long to wait when shutting down PlayerService

* Better logging in test_backpressure_handling

* Refactor ServerContext shutdown to always close connections

* Code cleanup

* Don't drain protos in PlayerService shutdown

* Improve logging

* Initialize services before starting control server

* Add mark for flaky tests

* Don't import from unittest.mock

* Copy wrap_func from new version of aiocron

* Refactor tests.utils module

* Revert "Force warnings to error in tests"

This reverts commit 27283fe.
  • Loading branch information
Askaholic committed Dec 18, 2021
1 parent 0b987e8 commit 78a8ef2
Show file tree
Hide file tree
Showing 93 changed files with 1,104 additions and 1,016 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/doc.yml
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v1
with:
python-version: 3.7
python-version: 3.9

- name: Install dependencies with pipenv
run: |
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Expand Up @@ -24,7 +24,7 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v1
with:
python-version: 3.7
python-version: 3.9

- name: Install dependencies
run: pip install isort
Expand Down Expand Up @@ -78,7 +78,7 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v1
with:
python-version: 3.7
python-version: 3.9

- name: Run flyway db migrations
env:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
@@ -1,4 +1,4 @@
FROM python:3.7-slim
FROM python:3.9-slim

# Need git for installing aiomysql
RUN apt-get update
Expand Down
6 changes: 2 additions & 4 deletions Pipfile
Expand Up @@ -33,12 +33,10 @@ uvloop = "*"
pytest = "*"
pytest-mock = "*"
pytest-cov = "*"
pytest-asyncio = "==0.12.0"
mock = "*"
pytest-asyncio = "*"
vulture = "*"
asynctest = "*"
hypothesis = "*"
pdoc3 = "*"

[requires]
python_version = "3.7"
python_version = "3.9"
214 changes: 73 additions & 141 deletions Pipfile.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -3,7 +3,7 @@
[![codecov](https://codecov.io/gh/FAForever/server/branch/develop/graph/badge.svg?token=55ndgNQdUv)](https://codecov.io/gh/FAForever/server)
[![docs](https://img.shields.io/badge/docs-latest-purple)](https://faforever.github.io/server/)
[![license](https://img.shields.io/badge/license-GPLv3-blue)](license.txt)
![python](https://img.shields.io/badge/python-3.7-3776AB)
![python](https://img.shields.io/badge/python-3.9-3776AB)

This is the source code for the
[Forged Alliance Forever](https://www.faforever.com/) lobby server.
Expand Down
30 changes: 29 additions & 1 deletion server.py
Expand Up @@ -10,10 +10,13 @@
import asyncio
import logging
import os
import platform
import signal
import sys
import time
from datetime import datetime

import humanize
from docopt import docopt

import server
Expand All @@ -27,8 +30,10 @@


async def main():
global startup_time, shutdown_time

version = os.environ.get("VERSION") or "dev"
python_version = ".".join(map(str, sys.version_info[:3]))
python_version = platform.python_version()

logger.info(
"Lobby %s (Python %s) on %s",
Expand Down Expand Up @@ -90,6 +95,8 @@ def signal_handler(sig: int, _frame):
config.register_callback("PROFILING_DURATION", profiler.refresh)
config.register_callback("PROFILING_INTERVAL", profiler.refresh)

await instance.start_services()

ctrl_server = await server.run_control_server(player_service, game_service)

async def restart_control_server():
Expand All @@ -111,9 +118,15 @@ async def restart_control_server():
"start_time": datetime.utcnow().strftime("%m-%d %H:%M"),
"game_uid": str(game_service.game_id_counter)
})
logger.info(
"Server started in %0.2f seconds",
time.perf_counter() - startup_time
)

await done

shutdown_time = time.perf_counter()

# Cleanup
await instance.shutdown()
await ctrl_server.shutdown()
Expand All @@ -123,6 +136,9 @@ async def restart_control_server():


if __name__ == "__main__":
startup_time = time.perf_counter()
shutdown_time = None

args = docopt(__doc__, version="FAF Server")
config_file = args.get("--configuration-file")
if config_file:
Expand All @@ -144,3 +160,15 @@ async def restart_control_server():
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

asyncio.run(main())

stop_time = time.perf_counter()
logger.info(
"Total server uptime: %s",
humanize.naturaldelta(stop_time - startup_time)
)

if shutdown_time is not None:
logger.info(
"Server shut down in %0.2f seconds",
stop_time - shutdown_time
)
72 changes: 52 additions & 20 deletions server/__init__.py
Expand Up @@ -87,7 +87,8 @@

import asyncio
import logging
from typing import Dict, Optional, Set, Tuple, Type
import time
from typing import Optional

from prometheus_client import start_http_server

Expand Down Expand Up @@ -164,7 +165,7 @@ def __init__(
twilio_nts: Optional[TwilioNTS],
loop: asyncio.BaseEventLoop,
# For testing
_override_services: Optional[Dict[str, Service]] = None
_override_services: Optional[dict[str, Service]] = None
):
self.name = name
self._logger = logging.getLogger(self.name)
Expand All @@ -175,7 +176,7 @@ def __init__(

self.started = False

self.contexts: Set[ServerContext] = set()
self.contexts: set[ServerContext] = set()

self.services = _override_services or create_services({
"server": self,
Expand Down Expand Up @@ -217,26 +218,40 @@ def write_broadcast(
)

@synchronizedmethod
async def _start_services(self) -> None:
async def start_services(self) -> None:
if self.started:
return

num_services = len(self.services)
self._logger.debug("Initializing %s services", num_services)

async def initialize(service):
start = time.perf_counter()
await service.initialize()
service._logger.debug(
"%s initialized in %0.2f seconds",
service.__class__.__name__,
time.perf_counter() - start
)

await asyncio.gather(*[
service.initialize() for service in self.services.values()
initialize(service) for service in self.services.values()
])

self._logger.debug("Initialized %s services", num_services)

self.started = True

async def listen(
self,
address: Tuple[str, int],
protocol_class: Type[Protocol] = QDataStreamProtocol
address: tuple[str, int],
protocol_class: type[Protocol] = QDataStreamProtocol
) -> ServerContext:
"""
Start listening on a new address.
"""
if not self.started:
await self._start_services()
await self.start_services()

ctx = ServerContext(
f"{self.name}[{protocol_class.__name__}]",
Expand All @@ -251,21 +266,38 @@ async def listen(
return ctx

async def shutdown(self):
for ctx in self.contexts:
ctx.close()

for ctx in self.contexts:
try:
await ctx.wait_closed()
except Exception:
results = await asyncio.gather(
*(ctx.stop() for ctx in self.contexts),
return_exceptions=True
)
for result, ctx in zip(results, self.contexts):
if isinstance(result, BaseException):
self._logger.error(
"Encountered unexpected error when trying to shut down "
"context %s",
"Unexpected error when stopping context %s",
ctx
)

await asyncio.gather(*[
service.shutdown() for service in self.services.values()
])
results = await asyncio.gather(
*(service.shutdown() for service in self.services.values()),
return_exceptions=True
)
for result, service in zip(results, self.services.values()):
if isinstance(result, BaseException):
self._logger.error(
"Unexpected error when shutting down service %s",
service
)

results = await asyncio.gather(
*(ctx.shutdown() for ctx in self.contexts),
return_exceptions=True
)
for result, ctx in zip(results, self.contexts):
if isinstance(result, BaseException):
self._logger.error(
"Unexpected error when shutting down context %s",
ctx
)

self.contexts.clear()
self.started = False
5 changes: 2 additions & 3 deletions server/api/oauth_session.py
@@ -1,6 +1,5 @@
import os
import time
from typing import Dict

import aiohttp
from oauthlib.oauth2.rfc6749.errors import (
Expand Down Expand Up @@ -57,15 +56,15 @@ async def refresh_tokens(self) -> None:
creds = await self._make_request(data=data)
self.update_tokens(creds)

def update_tokens(self, creds: Dict[str, str]) -> None:
def update_tokens(self, creds: dict[str, str]) -> None:
self.token = creds["access_token"]
self.refresh_token = creds.get("refresh_token")
expires_in = creds.get("expires_in")
if expires_in is not None:
self.token_expires_in = int(expires_in)
self.token_time = time.time()

async def _make_request(self, data: Dict[str, str]) -> Dict[str, str]:
async def _make_request(self, data: dict[str, str]) -> dict[str, str]:
async with aiohttp.ClientSession(raise_for_status=True) as session:
async with session.post(self.token_url, data=data) as resp:
return await resp.json()
Expand Down
24 changes: 13 additions & 11 deletions server/asyncio_extensions.py
Expand Up @@ -9,12 +9,12 @@
from functools import wraps
from typing import (
Any,
AsyncContextManager,
Callable,
Coroutine,
List,
Optional,
Type,
Union,
Protocol,
cast,
overload
)

Expand All @@ -24,14 +24,16 @@
AsyncDecorator = Callable[[AsyncFunc], AsyncFunc]


# TODO: Need python >= 3.8 for typing.Protocol
AsyncLockable = Union[asyncio.Lock, "SpinLock"]
class AsyncLock(Protocol, AsyncContextManager["AsyncLock"]):
def locked(self) -> bool: ...
async def acquire(self) -> bool: ...
def release(self) -> None: ...


async def gather_without_exceptions(
tasks: List[asyncio.Task],
*exceptions: Type[BaseException],
) -> List[Any]:
tasks: list[asyncio.Task],
*exceptions: type[BaseException],
) -> list[Any]:
"""
Run coroutines in parallel, raising the first exception that dosen't
match any of the specified exception classes.
Expand Down Expand Up @@ -119,7 +121,7 @@ def synchronized() -> AsyncDecorator: ...
@overload
def synchronized(function: AsyncFunc) -> AsyncFunc: ...
@overload
def synchronized(lock: Optional[AsyncLockable]) -> AsyncDecorator: ...
def synchronized(lock: Optional[AsyncLock]) -> AsyncDecorator: ...


def synchronized(*args):
Expand All @@ -139,15 +141,15 @@ def synchronized(*args):

def _synchronize(
function: AsyncFunc,
lock: Optional[AsyncLockable] = None
lock: Optional[AsyncLock] = None
) -> AsyncFunc:
"""Wrap an async function with an async lock."""
@wraps(function)
async def wrapped(*args, **kwargs):
nonlocal lock

if lock is None:
lock = asyncio.Lock()
lock = lock or cast(AsyncLock, asyncio.Lock())

async with lock:
return await function(*args, **kwargs)
Expand Down
1 change: 0 additions & 1 deletion server/broadcast_service.py
Expand Up @@ -41,7 +41,6 @@ async def initialize(self):
self.broadcast_ping,
start=True
)
self._logger.debug("Broadcast service initialized")

async def report_dirties(self):
"""
Expand Down
4 changes: 2 additions & 2 deletions server/config.py
Expand Up @@ -5,7 +5,7 @@
import asyncio
import logging
import os
from typing import Callable, Dict
from typing import Callable

import trueskill
import yaml
Expand Down Expand Up @@ -128,7 +128,7 @@ def __init__(self):
key: value for key, value in vars(self).items() if key.isupper()
}

self._callbacks: Dict[str, Callable] = {}
self._callbacks: dict[str, Callable] = {}
self.refresh()

def refresh(self) -> None:
Expand Down
3 changes: 1 addition & 2 deletions server/configuration_service.py
Expand Up @@ -12,13 +12,12 @@
@with_logger
class ConfigurationService(Service):
def __init__(self) -> None:
self._logger.info("Configuration service created.")
self._store = config
self._task = None

async def initialize(self) -> None:
self._task = asyncio.create_task(self._worker_loop())
self._logger.info("Configuration service started.")
self._logger.info("Configuration service initialized")

async def _worker_loop(self) -> None:
while True:
Expand Down

0 comments on commit 78a8ef2

Please sign in to comment.