Skip to content

Commit

Permalink
Feature: reduce bandwidth needed to serve clients the serverlist (#25)
Browse files Browse the repository at this point in the history
Game Coordinator protocol 4 allows us to send the client a table
of NewGRFs, and after that use an index into that table for each
server. This heavily reduces the size of the full serverlist, from
~60KB to ~20KB.
Additionally, we now also send the name of the NewGRF to the client.
In case the client doesn't have the NewGRF yet, it can now show
the name of the NewGRF instead of the grfid or md5sum.

Further more, this reduces the load on redis, as we only update
NewGRFs when there is something to update, instead of every 30
seconds. NewGRFs in multiplayer games can only change when starting
a newgame, and never during a game.
  • Loading branch information
TrueBrain committed Jul 18, 2021
1 parent 18e5de2 commit 15ff189
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 17 deletions.
38 changes: 35 additions & 3 deletions game_coordinator/application/coordinator.py
Expand Up @@ -34,6 +34,7 @@ def __init__(self, database, shared_secret, socks_proxy):
self.socks_proxy = socks_proxy
self._servers = {}
self._tokens = {}
self._newgrf_lookup_table = {}

self.database.application = self

Expand All @@ -47,6 +48,15 @@ def disconnect(self, source):
def delete_token(self, token):
del self._tokens[token]

async def newgrf_added(self, index, newgrf):
self._newgrf_lookup_table[index] = newgrf

async def remove_newgrf_from_table(self, grfid, md5sum):
for index, newgrf in self._newgrf_lookup_table.items():
if newgrf["grfid"] == grfid and newgrf["md5sum"] == md5sum:
del self._newgrf_lookup_table[index]
return

async def update_external_server(self, server_id, info):
if server_id not in self._servers:
self._servers[server_id] = ServerExternal(self, server_id)
Expand All @@ -57,6 +67,16 @@ async def update_external_server(self, server_id, info):

await self._servers[server_id].update(info)

async def update_newgrf_external_server(self, server_id, newgrfs_indexed):
if server_id not in self._servers:
self._servers[server_id] = ServerExternal(self, server_id)

if not isinstance(self._servers[server_id], ServerExternal):
log.error("Internal error: update_external_server() called on a server managed by us")
return

await self._servers[server_id].update_newgrf(newgrfs_indexed)

async def update_external_direct_ip(self, server_id, type, ip, port):
if server_id not in self._servers:
self._servers[server_id] = ServerExternal(self, server_id)
Expand Down Expand Up @@ -150,28 +170,40 @@ async def receive_PACKET_COORDINATOR_SERVER_REGISTER(

await token.connect()

async def receive_PACKET_COORDINATOR_SERVER_UPDATE(self, source, protocol_version, **info):
async def receive_PACKET_COORDINATOR_SERVER_UPDATE(
self, source, protocol_version, newgrf_serialization_type, newgrfs, **info
):
await source.server.update_newgrf(newgrf_serialization_type, newgrfs)
await source.server.update(info)

async def receive_PACKET_COORDINATOR_CLIENT_LISTING(
self, source, protocol_version, game_info_version, openttd_version
self, source, protocol_version, game_info_version, openttd_version, newgrf_lookup_table_cursor
):
if protocol_version >= 4 and self._newgrf_lookup_table:
await source.protocol.send_PACKET_COORDINATOR_GC_NEWGRF_LOOKUP(
protocol_version, newgrf_lookup_table_cursor, self._newgrf_lookup_table
)

# Ensure servers matching "openttd_version" are at the top.
servers_match = []
servers_other = []
for server in self._servers.values():
# Servers that are not reachable shouldn't be listed.
if server.connection_type == ConnectionType.CONNECTION_TYPE_ISOLATED:
continue
# Server is announced but hasn't finished registration.
if not server.info:
continue

if server.info["openttd_version"] == openttd_version:
servers_match.append(server)
else:
servers_other.append(server)

await source.protocol.send_PACKET_COORDINATOR_GC_LISTING(
protocol_version, game_info_version, servers_match + servers_other
protocol_version, game_info_version, servers_match + servers_other, self._newgrf_lookup_table
)
await self.database.stats_listing(game_info_version)

async def receive_PACKET_COORDINATOR_CLIENT_CONNECT(self, source, protocol_version, invite_code):
if not invite_code or invite_code[0] != "+" or invite_code not in self._servers:
Expand Down
37 changes: 37 additions & 0 deletions game_coordinator/application/helpers/server.py
@@ -1,10 +1,14 @@
import asyncio
import logging

from openttd_protocol.protocol.coordinator import (
ConnectionType,
NewGRFSerializationType,
ServerGameType,
)

log = logging.getLogger(__name__)


class ConnectAndCloseProtocol(asyncio.Protocol):
def connection_made(self, transport):
Expand Down Expand Up @@ -33,6 +37,9 @@ async def update(self, info):
self.connection_type = ConnectionType(info["connection_type"])
self.info = info

async def update_newgrf(self, newgrfs_indexed):
self.newgrfs_indexed = newgrfs_indexed

async def update_direct_ip(self, ip_type, ip, port):
# Do not overwrite the connection_string if we are named by invite-code.
if self.server_id[0] != "+":
Expand Down Expand Up @@ -82,6 +89,36 @@ def __init__(self, application, server_id, game_type, source, server_port, invit
async def disconnect(self):
await self._application.database.server_offline(self.server_id)

async def update_newgrf(self, newgrf_serialization_type, newgrfs):
if newgrfs is None:
return

# This update is sent after the first, and is meant just as
# notification the NewGRFs are still in use. A server cannot change
# NewGRFs in a running game, so as long as this packet is sent, the
# server is not actually changing NewGRFs.
if newgrf_serialization_type == NewGRFSerializationType.NST_GRFID_MD5:
for newgrf in newgrfs:
await self._application.database.newgrf_in_use(newgrf)
return

if newgrf_serialization_type not in (
NewGRFSerializationType.NST_GRFID_MD5_NAME,
NewGRFSerializationType.NST_CONVERSION_GRFID_MD5,
):
log.error("Unexpected NewGRF serialization type %s", newgrf_serialization_type.name)
return

# This is the first update; convert the NewGRFs into an index-based
# variant.
newgrfs_indexed = []
for newgrf in newgrfs:
index = await self._application.database.newgrf_assign_index(newgrf)
newgrfs_indexed.append(index)

self.newgrfs_indexed = newgrfs_indexed
await self._application.database.update_newgrf(self.server_id, newgrfs_indexed)

async def update(self, info):
self.info = info
self.info["game_type"] = self.game_type.value
Expand Down
138 changes: 125 additions & 13 deletions game_coordinator/database/redis.py
Expand Up @@ -14,6 +14,18 @@

_redis_url = None

# Server update every 30 seconds, so if we haven't seen it for twice that it
# means it is no longer there.
TTL_SERVER = 60
# Give a bit of grace period to forget about NewGRFs, so server restarts don't
# bump the counter.
TTL_NEWGRF = TTL_SERVER + 60
# GC claims are refreshed every 30 seconds, so after twice that, a GC has
# crashed / exited and it can be reclaimed.
TTL_GC_ID = 60
# Keep statistics for 30 days.
TTL_STATS = 3600 * 24 * 30


class Database:
def __init__(self):
Expand All @@ -30,7 +42,7 @@ async def sync_and_monitor(self):
# Check with redis if any of the keys are available.
while True:
for i in range(16):
res = await self._redis.set(f"gc-id:{i}", 1, ex=60, nx=True)
res = await self._redis.set(f"gc-id:{i}", 1, ex=TTL_GC_ID, nx=True)
if res is not None:
self._gc_id = str(i)
break
Expand Down Expand Up @@ -68,7 +80,7 @@ async def _keep_gc_id_alive(self):
last_time = time.time()

await asyncio.sleep(30)
await self._redis.set(f"gc-id:{self._gc_id}", 1, ex=60)
await self._redis.set(f"gc-id:{self._gc_id}", 1, ex=TTL_GC_ID)

if time.time() - last_time > 50:
raise Exception("We were about to lose our GC-id, so we crash instead.")
Expand All @@ -87,6 +99,12 @@ async def _monitor_expire(self):
if message["type"] != "message":
continue

if message["data"].startswith("gc-newgrf:"):
_, _, grfid_md5sum = message["data"].partition(":")
grfid, _, md5sum = grfid_md5sum.partition("-")

await self.application.remove_newgrf_from_table(grfid, md5sum)

if message["data"].startswith("gc-server:"):
_, _, server_id = message["data"].partition(":")

Expand All @@ -96,19 +114,57 @@ async def _monitor_expire(self):
await self.application.remove_server(server_id)

async def _scan_existing_servers(self):
servers = await self._redis.keys("gc-newgrf:*")
for server in servers:
_, _, grfid_md5sum = server.partition(":")
grfid, _, md5sum = grfid_md5sum.partition("-")

newgrf_lookup_str = await self._redis.get(server)
if newgrf_lookup_str is None:
# Server left in the meantime. The stream will update the rest.
continue

newgrf_lookup = json.loads(newgrf_lookup_str)
newgrf = {
"grfid": int(grfid),
"md5sum": md5sum,
"name": newgrf_lookup["name"],
}
await self.application.newgrf_added(newgrf_lookup["index"], newgrf)

servers = await self._redis.keys("gc-server:*")
for server in servers:
_, _, server_id = server.partition(":")

info_str = await self._redis.get(server)
if info_str is None:
# Server left in the meantime. The stream will update the rest.
continue

info = json.loads(info_str)
await self.application.update_external_server(server_id, info)

servers = await self._redis.keys("gc-server-newgrf:*")
for server in servers:
_, _, server_id = server.partition(":")

newgrf_indexed_str = await self._redis.get(server)
if newgrf_indexed_str is None:
# Server left in the meantime. The stream will update the rest.
continue

newgrf_indexed = json.loads(newgrf_indexed_str)
await self.application.update_newgrf_external_server(server_id, newgrf_indexed)

direct_ipv4s = await self._redis.keys("gc-direct-ipv4:*")
for direct_ipv4 in direct_ipv4s:
_, _, server_id = direct_ipv4.partition(":")

server_str = await self._redis.get(direct_ipv4)
if server_str is None:
# Server left in the meantime. The stream will update the rest.
continue

server = json.loads(server_str)
await self.application.update_external_direct_ip(server_id, "ipv4", server["ip"], server["port"])

Expand All @@ -117,6 +173,10 @@ async def _scan_existing_servers(self):
_, _, server_id = direct_ipv6.partition(":")

server_str = await self._redis.get(direct_ipv6)
if server_str is None:
# Server left in the meantime. The stream will update the rest.
continue

server = json.loads(server_str)
await self.application.update_external_direct_ip(server_id, "ipv6", server["ip"], server["port"])

Expand All @@ -128,13 +188,15 @@ async def _scan_existing_servers(self):
# around longer. This gives a bit of a guarantee they do not live past
# this point. The most likely scenario for this is a crashed GC, and
# not all servers reconnecting.
await asyncio.sleep(70)
await asyncio.sleep(TTL_SERVER + 10)
await self._redis.keys("gc-server:*")

async def _follow_stream(self):
lookup_table = {
"new-direct-ip": self.application.update_external_direct_ip,
"update": self.application.update_external_server,
"update-newgrf": self.application.update_newgrf_external_server,
"newgrf-added": self.application.newgrf_added,
"delete": self.application.remove_server,
"stun-result": self.application.stun_result,
"send-stun-request": self.application.send_server_stun_request,
Expand Down Expand Up @@ -171,8 +233,59 @@ async def add_to_stream(self, entry_type, payload):
"gc-stream", {"gc-id": self._gc_id, "type": entry_type, "payload": json.dumps(payload)}, maxlen=1000
)

async def newgrf_in_use(self, newgrf):
await self._redis.expire(f"gc-newgrf:{newgrf['grfid']}-{newgrf['md5sum']}", TTL_NEWGRF)

async def newgrf_assign_index(self, newgrf):
newgrf_lookup_str = await self._redis.get(f"gc-newgrf:{newgrf['grfid']}-{newgrf['md5sum']}")
if newgrf_lookup_str is not None:
newgrf_lookup = json.loads(newgrf_lookup_str)

if newgrf_lookup["name"] is not None or newgrf["name"] is None:
# Make sure the entry lives a bit longer.
await self.newgrf_in_use(newgrf)
return newgrf_lookup["index"]

# There is an entry in the table, but it doesn't have a name. This
# happens when old servers announced the NewGRF. But we do have a
# name now. So update the entry to tell the name.
newgrf_lookup["name"] = newgrf["name"]
await self._redis.set(
f"gc-newgrf:{newgrf['grfid']}-{newgrf['md5sum']}", json.dumps(newgrf_lookup), ex=TTL_NEWGRF
)

await self.application.newgrf_added(newgrf_lookup["index"], newgrf)
await self.add_to_stream("newgrf-added", {"index": newgrf_lookup["index"], "newgrf": newgrf})

return newgrf_lookup["index"]

newgrf_lookup = {
"index": await self._redis.incr("gc-newgrf-counter"),
"name": newgrf["name"],
}
res = await self._redis.set(
f"gc-newgrf:{newgrf['grfid']}-{newgrf['md5sum']}", json.dumps(newgrf_lookup), nx=True, ex=TTL_NEWGRF
)
if res is not None:
await self.application.newgrf_added(newgrf_lookup["index"], newgrf)
await self.add_to_stream("newgrf-added", {"index": newgrf_lookup["index"], "newgrf": newgrf})

return newgrf_lookup["index"]

# Another instance sneaked in between our get and set, so fetch
# the key again. This time it is guaranteed to exist.
newgrf_lookup_str = await self._redis.get(f"gc-newgrf:{newgrf['grfid']}-{newgrf['md5sum']}")
newgrf_lookup = json.loads(newgrf_lookup_str)

await self.newgrf_in_use(newgrf)
return newgrf_lookup["index"]

async def update_newgrf(self, server_id, newgrfs_indexed):
await self._redis.set(f"gc-server-newgrf:{server_id}", json.dumps(newgrfs_indexed), ex=TTL_SERVER)
await self.add_to_stream("update-newgrf", {"server_id": server_id, "newgrfs_indexed": newgrfs_indexed})

async def update_info(self, server_id, info):
await self._redis.set(f"gc-server:{server_id}", json.dumps(info), ex=60)
await self._redis.set(f"gc-server:{server_id}", json.dumps(info), ex=TTL_SERVER)
await self.add_to_stream("update", {"server_id": server_id, "info": info})

async def direct_ip(self, server_id, server_ip, server_port):
Expand All @@ -190,29 +303,28 @@ async def server_offline(self, server_id):
await self._redis.delete(f"gc-direct-ipv4:{server_id}")
await self._redis.delete(f"gc-direct-ipv6:{server_id}")
await self._redis.delete(f"gc-server:{server_id}")
await self._redis.delete(f"gc-server-newgrf:{server_id}")
await self.add_to_stream("delete", {"server_id": server_id})

async def stats_verify(self, connection_type_name):
key = "stats-verify"

await self._stats(key, connection_type_name)
await self._stats("verify", connection_type_name)

async def stats_connect(self, method_name, result):
if result:
key = "stats-connect"
else:
key = "stats-connect-failed"
key = "connect" if result else "connect-failed"

await self._stats(key, method_name)

async def stats_listing(self, game_info_version):
await self._stats("listing", game_info_version)

async def _stats(self, key, subkey):
# Put all stats of a single day in one bucket.
day_since_1970 = int(time.time()) // (3600 * 24)

key = f"{key}:{day_since_1970}-{subkey}"
key = f"stats-{key}:{day_since_1970}-{subkey}"

# Keep statistics for one month.
await self._redis.expire(key, 3600 * 24 * 30)
await self._redis.expire(key, TTL_STATS)
await self._redis.incr(key)

async def get_stats(self, key):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Expand Up @@ -9,7 +9,7 @@ hiredis==2.0.0
idna==3.2
multidict==5.1.0
openttd-helpers==1.0.1
openttd-protocol==0.4.1
openttd-protocol==0.5.0
pproxy==2.7.8
sentry-sdk==1.1.0
typing-extensions==3.10.0.0
Expand Down

0 comments on commit 15ff189

Please sign in to comment.