Skip to content

Commit

Permalink
Add: reload the database in a separate process
Browse files Browse the repository at this point in the history
This means that during reloads, the server is fully responsive
with the old code. Once the reload is done, the database is
swapped around in memory, and any changes will be available from
that point on.

There was need for some rewrite, as the less is sent over the
wire during process creation, the better. This should now be
brought to a minimum: things like configuration and that is it.
This has one downside, that for a time there are two full
dictionaries are in memory. This increases the memory footprint
of the application a bit; in return we get a non-blocking reload.
  • Loading branch information
TrueBrain committed May 4, 2020
1 parent ba04b7f commit 57a7d47
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 43 deletions.
68 changes: 50 additions & 18 deletions bananas_server/application/bananas_server.py
@@ -1,6 +1,8 @@
import asyncio
import logging

from collections import defaultdict
from concurrent import futures

from ..helpers.safe_filename import safe_filename
from ..openttd.protocol.enums import ContentType
Expand All @@ -16,15 +18,16 @@ def __init__(self, storage, index):
self.index = index
self.protocol = None

self._md5sum_mapping = defaultdict(lambda: defaultdict(dict))
self._by_content_id = None
self._by_content_type = None
self._by_unique_id = None
self._by_unique_id_and_md5sum = None

self._id_mapping = defaultdict(lambda: defaultdict(dict))
self._by_content_id = {}
self._by_content_type = defaultdict(list)
self._by_unique_id = defaultdict(dict)
self._by_unique_id_and_md5sum = defaultdict(lambda: defaultdict(dict))
self._reload_busy = asyncio.Event()
self._reload_busy.set()

self.reload()
loop = asyncio.get_event_loop()
loop.run_until_complete(self.reload())

async def _send_content_entry(self, source, content_entry):
await source.protocol.send_PACKET_CONTENT_SERVER_INFO(
Expand Down Expand Up @@ -104,8 +107,37 @@ async def receive_PACKET_CONTENT_CLIENT_CONTENT(self, source, content_infos):
stream=stream,
)

def reload_md5sum_mapping(self):
self.storage.clear_cache()
async def reload(self):
await self._reload_busy.wait()
self._reload_busy.clear()

try:
reload_helper = ReloadHelper(self.storage, self.index)
reload_helper.prepare()

# Run the reload in a new process, so we don't block the rest of the
# server while doing this job.
loop = asyncio.get_event_loop()
with futures.ProcessPoolExecutor(max_workers=1) as executor:
task = loop.run_in_executor(executor, reload_helper.reload)
(
self._by_content_id,
self._by_content_type,
self._by_unique_id,
self._by_unique_id_and_md5sum,
) = await task
finally:
self._reload_busy.set()


class ReloadHelper:
def __init__(self, storage, index):
self.storage = storage
self.index = index

def _get_md5sum_mapping(self):
log.info("Building md5sum mapping")
md5sum_mapping = defaultdict(lambda: defaultdict(dict))

for content_type in ContentType:
if content_type == ContentType.CONTENT_TYPE_END:
Expand All @@ -120,14 +152,14 @@ def reload_md5sum_mapping(self):
md5sum_partial = bytes.fromhex(md5sum[0:8])
md5sum = bytes.fromhex(md5sum)

self._md5sum_mapping[content_type][unique_id][md5sum_partial] = md5sum
md5sum_mapping[content_type][unique_id][md5sum_partial] = md5sum

# defaultdict() cannot be pickled, so convert to a normal dict.
return {key: dict(value) for key, value in md5sum_mapping.items()}

def prepare(self):
self.storage.clear_cache()

def reload(self):
self.index.reload(self)

def clear(self):
self._by_content_id.clear()
self._by_content_type.clear()
self._by_unique_id.clear()
self._by_unique_id_and_md5sum.clear()
self._md5sum_mapping.clear()
md5sum_mapping = self._get_md5sum_mapping()
return self.index.reload(md5sum_mapping)
2 changes: 1 addition & 1 deletion bananas_server/index/github.py
Expand Up @@ -82,7 +82,7 @@ def _fetch_latest(self):

def reload(self, application):
self._fetch_latest()
super().reload(application)
return super().reload(application)


@click_additional_options
Expand Down
51 changes: 29 additions & 22 deletions bananas_server/index/local.py
Expand Up @@ -52,12 +52,12 @@ def __init__(
self.max_version = max_version
self.tags = tags

def calculate_dependencies(self, application):
def calculate_dependencies(self, by_unique_id_and_md5sum):
dependencies = []

for dependency in self.raw_dependencies:
(content_type, unique_id, md5sum) = dependency
dep_content_entry = application.get_by_unique_id_and_md5sum(content_type, unique_id, md5sum)
dep_content_entry = by_unique_id_and_md5sum[content_type].get(unique_id, {}).get(md5sum)
if dep_content_entry is None:
log.error("Invalid dependency: %r", dependency)
continue
Expand Down Expand Up @@ -88,7 +88,6 @@ def __repr__(self):
class Index:
def __init__(self):
self._folder = _folder
self._content_ids = defaultdict(list)

def _read_content_entry_version(self, content_type, unique_id, data, md5sum_mapping):
unique_id = bytes.fromhex(unique_id)
Expand Down Expand Up @@ -179,8 +178,7 @@ def _read_content_entry_version(self, content_type, unique_id, data, md5sum_mapp
# We take 24bits from the right side of the md5sum; the left side
# is already given to the user as an md5sum-partial, and not a
# secret. We only take 24bits to allow room for a counter.
content_id = int.from_bytes(md5sum[-3:], "little")
self._content_ids[content_id].append(content_entry)
content_entry.pre_content_id = int.from_bytes(md5sum[-3:], "little")

return content_entry

Expand Down Expand Up @@ -220,24 +218,14 @@ def _read_content_entry(self, content_type, folder_name, unique_id, md5sum_mappi

return content_entries, archived_content_entries

def reload(self, application):
application.clear()
application.reload_md5sum_mapping()
def reload(self, md5sum_mapping):
by_content_id = {}
by_content_type = defaultdict(list)
by_unique_id = defaultdict(dict)
by_unique_id_and_md5sum = defaultdict(lambda: defaultdict(dict))

self._content_ids = defaultdict(list)
content_ids = defaultdict(list)

self.load_all(
application._by_content_id,
application._by_content_type,
application._by_unique_id,
application._by_unique_id_and_md5sum,
application._md5sum_mapping,
)

for content_entry in application._by_content_id.values():
content_entry.calculate_dependencies(application)

def load_all(self, by_content_id, by_content_type, by_unique_id, by_unique_id_and_md5sum, md5sum_mapping):
for content_type in ContentType:
if content_type == ContentType.CONTENT_TYPE_END:
continue
Expand All @@ -255,17 +243,24 @@ def load_all(self, by_content_id, by_content_type, by_unique_id, by_unique_id_an
content_entries, archived_content_entries = self._read_content_entry(
content_type, folder_name, unique_id, md5sum_mapping
)

for content_entry in content_entries:
counter_entries += 1
by_unique_id_and_md5sum[content_type][content_entry.unique_id][content_entry.md5sum] = content_entry

content_ids[content_entry.pre_content_id].append(content_entry)
del content_entry.pre_content_id

by_content_type[content_type].append(content_entry)
by_unique_id[content_type][content_entry.unique_id] = content_entry

for content_entry in archived_content_entries:
counter_archived += 1
by_unique_id_and_md5sum[content_type][content_entry.unique_id][content_entry.md5sum] = content_entry

content_ids[content_entry.pre_content_id].append(content_entry)
del content_entry.pre_content_id

log.info(
"Loaded %d entries and %d archived for %s", counter_entries, counter_archived, content_type_folder_name
)
Expand All @@ -275,7 +270,7 @@ def load_all(self, by_content_id, by_content_type, by_unique_id, by_unique_id_an
# time we have seen this part of the md5sum, sorted by upload-date.
# This means that content_ids are stable over multiple runs, and means
# we can scale this server horizontally.
for content_id, content_entries in self._content_ids.items():
for content_id, content_entries in content_ids.items():
if len(content_entries) > 255:
raise Exception(
"We have more than 255 hash collisions;"
Expand All @@ -286,6 +281,18 @@ def load_all(self, by_content_id, by_content_type, by_unique_id, by_unique_id_an
content_entry.content_id = (i << 24) + content_id
by_content_id[content_entry.content_id] = content_entry

# Now everything is known, calculate the dependencies.
for content_entry in by_content_id.values():
content_entry.calculate_dependencies(by_unique_id_and_md5sum)

# defaultdict() cannot be pickled, so convert to a normal dict.
return (
by_content_id,
dict(by_content_type),
dict(by_unique_id),
{key: dict(value) for key, value in by_unique_id_and_md5sum.items()},
)


@click_additional_options
@click.option(
Expand Down
17 changes: 16 additions & 1 deletion bananas_server/storage/s3.py
Expand Up @@ -26,9 +26,19 @@ def __init__(self):
if _bucket_name is None:
raise Exception("--storage-s3-bucket has to be given if storage is s3")

self._s3 = boto3.client("s3")
self._s3_cache = None
self._folder_cache = None

@property
def _s3(self):
# This class will be pickled to be used by ProcessPoolExecutor(). To
# prevent the unpicklable S3 client having to be transmitted over the
# wire, create it only after the process is created.
if not self._s3_cache:
self._s3_cache = boto3.client("s3")

return self._s3_cache

def _get_filename(self, content_entry):
content_type_folder_name = get_folder_name_from_content_type(content_entry.content_type)
unique_id = content_entry.unique_id.hex()
Expand Down Expand Up @@ -67,6 +77,11 @@ def _get_folder_list(self, folder_search):
yield folder

def clear_cache(self):
# Reset the s3 instance, as it is not pickable. We are called just
# before a new process is created. On next use, a new object is
# created. Although this takes a few more cycles, the amount of times
# this happens makes it not worth mentioning.
self._s3_cache = None
self._folder_cache = None

def list_folder(self, content_type, unique_id=None):
Expand Down
2 changes: 1 addition & 1 deletion bananas_server/web_routes.py
Expand Up @@ -63,7 +63,7 @@ async def reload(request):
if data["secret"] != RELOAD_SECRET:
return web.HTTPNotFound()

BANANAS_SERVER_APPLICATION.reload()
await BANANAS_SERVER_APPLICATION.reload()

return web.HTTPNoContent()

Expand Down

0 comments on commit 57a7d47

Please sign in to comment.