From 57a7d47afb6a1956602fd298ffc87b39cb7217df Mon Sep 17 00:00:00 2001 From: Patric Stout Date: Mon, 4 May 2020 16:58:06 +0200 Subject: [PATCH] Add: reload the database in a separate process This means that during reloads, the server is fully responsive with the old code. Once the reload is done, the database is swapped around in memory, and any changes will be available from that point on. There was need for some rewrite, as the less is sent over the wire during process creation, the better. This should now be brought to a minimum: things like configuration and that is it. This has one downside, that for a time there are two full dictionaries are in memory. This increases the memory footprint of the application a bit; in return we get a non-blocking reload. --- bananas_server/application/bananas_server.py | 68 ++++++++++++++------ bananas_server/index/github.py | 2 +- bananas_server/index/local.py | 51 ++++++++------- bananas_server/storage/s3.py | 17 ++++- bananas_server/web_routes.py | 2 +- 5 files changed, 97 insertions(+), 43 deletions(-) diff --git a/bananas_server/application/bananas_server.py b/bananas_server/application/bananas_server.py index 97be0c9..f027245 100644 --- a/bananas_server/application/bananas_server.py +++ b/bananas_server/application/bananas_server.py @@ -1,6 +1,8 @@ +import asyncio import logging from collections import defaultdict +from concurrent import futures from ..helpers.safe_filename import safe_filename from ..openttd.protocol.enums import ContentType @@ -16,15 +18,16 @@ def __init__(self, storage, index): self.index = index self.protocol = None - self._md5sum_mapping = defaultdict(lambda: defaultdict(dict)) + self._by_content_id = None + self._by_content_type = None + self._by_unique_id = None + self._by_unique_id_and_md5sum = None - self._id_mapping = defaultdict(lambda: defaultdict(dict)) - self._by_content_id = {} - self._by_content_type = defaultdict(list) - self._by_unique_id = defaultdict(dict) - self._by_unique_id_and_md5sum = defaultdict(lambda: defaultdict(dict)) + self._reload_busy = asyncio.Event() + self._reload_busy.set() - self.reload() + loop = asyncio.get_event_loop() + loop.run_until_complete(self.reload()) async def _send_content_entry(self, source, content_entry): await source.protocol.send_PACKET_CONTENT_SERVER_INFO( @@ -104,8 +107,37 @@ async def receive_PACKET_CONTENT_CLIENT_CONTENT(self, source, content_infos): stream=stream, ) - def reload_md5sum_mapping(self): - self.storage.clear_cache() + async def reload(self): + await self._reload_busy.wait() + self._reload_busy.clear() + + try: + reload_helper = ReloadHelper(self.storage, self.index) + reload_helper.prepare() + + # Run the reload in a new process, so we don't block the rest of the + # server while doing this job. + loop = asyncio.get_event_loop() + with futures.ProcessPoolExecutor(max_workers=1) as executor: + task = loop.run_in_executor(executor, reload_helper.reload) + ( + self._by_content_id, + self._by_content_type, + self._by_unique_id, + self._by_unique_id_and_md5sum, + ) = await task + finally: + self._reload_busy.set() + + +class ReloadHelper: + def __init__(self, storage, index): + self.storage = storage + self.index = index + + def _get_md5sum_mapping(self): + log.info("Building md5sum mapping") + md5sum_mapping = defaultdict(lambda: defaultdict(dict)) for content_type in ContentType: if content_type == ContentType.CONTENT_TYPE_END: @@ -120,14 +152,14 @@ def reload_md5sum_mapping(self): md5sum_partial = bytes.fromhex(md5sum[0:8]) md5sum = bytes.fromhex(md5sum) - self._md5sum_mapping[content_type][unique_id][md5sum_partial] = md5sum + md5sum_mapping[content_type][unique_id][md5sum_partial] = md5sum + + # defaultdict() cannot be pickled, so convert to a normal dict. + return {key: dict(value) for key, value in md5sum_mapping.items()} + + def prepare(self): + self.storage.clear_cache() def reload(self): - self.index.reload(self) - - def clear(self): - self._by_content_id.clear() - self._by_content_type.clear() - self._by_unique_id.clear() - self._by_unique_id_and_md5sum.clear() - self._md5sum_mapping.clear() + md5sum_mapping = self._get_md5sum_mapping() + return self.index.reload(md5sum_mapping) diff --git a/bananas_server/index/github.py b/bananas_server/index/github.py index 1b309df..73177d3 100644 --- a/bananas_server/index/github.py +++ b/bananas_server/index/github.py @@ -82,7 +82,7 @@ def _fetch_latest(self): def reload(self, application): self._fetch_latest() - super().reload(application) + return super().reload(application) @click_additional_options diff --git a/bananas_server/index/local.py b/bananas_server/index/local.py index 388b20e..7ff4288 100644 --- a/bananas_server/index/local.py +++ b/bananas_server/index/local.py @@ -52,12 +52,12 @@ def __init__( self.max_version = max_version self.tags = tags - def calculate_dependencies(self, application): + def calculate_dependencies(self, by_unique_id_and_md5sum): dependencies = [] for dependency in self.raw_dependencies: (content_type, unique_id, md5sum) = dependency - dep_content_entry = application.get_by_unique_id_and_md5sum(content_type, unique_id, md5sum) + dep_content_entry = by_unique_id_and_md5sum[content_type].get(unique_id, {}).get(md5sum) if dep_content_entry is None: log.error("Invalid dependency: %r", dependency) continue @@ -88,7 +88,6 @@ def __repr__(self): class Index: def __init__(self): self._folder = _folder - self._content_ids = defaultdict(list) def _read_content_entry_version(self, content_type, unique_id, data, md5sum_mapping): unique_id = bytes.fromhex(unique_id) @@ -179,8 +178,7 @@ def _read_content_entry_version(self, content_type, unique_id, data, md5sum_mapp # We take 24bits from the right side of the md5sum; the left side # is already given to the user as an md5sum-partial, and not a # secret. We only take 24bits to allow room for a counter. - content_id = int.from_bytes(md5sum[-3:], "little") - self._content_ids[content_id].append(content_entry) + content_entry.pre_content_id = int.from_bytes(md5sum[-3:], "little") return content_entry @@ -220,24 +218,14 @@ def _read_content_entry(self, content_type, folder_name, unique_id, md5sum_mappi return content_entries, archived_content_entries - def reload(self, application): - application.clear() - application.reload_md5sum_mapping() + def reload(self, md5sum_mapping): + by_content_id = {} + by_content_type = defaultdict(list) + by_unique_id = defaultdict(dict) + by_unique_id_and_md5sum = defaultdict(lambda: defaultdict(dict)) - self._content_ids = defaultdict(list) + content_ids = defaultdict(list) - self.load_all( - application._by_content_id, - application._by_content_type, - application._by_unique_id, - application._by_unique_id_and_md5sum, - application._md5sum_mapping, - ) - - for content_entry in application._by_content_id.values(): - content_entry.calculate_dependencies(application) - - def load_all(self, by_content_id, by_content_type, by_unique_id, by_unique_id_and_md5sum, md5sum_mapping): for content_type in ContentType: if content_type == ContentType.CONTENT_TYPE_END: continue @@ -255,10 +243,14 @@ def load_all(self, by_content_id, by_content_type, by_unique_id, by_unique_id_an content_entries, archived_content_entries = self._read_content_entry( content_type, folder_name, unique_id, md5sum_mapping ) + for content_entry in content_entries: counter_entries += 1 by_unique_id_and_md5sum[content_type][content_entry.unique_id][content_entry.md5sum] = content_entry + content_ids[content_entry.pre_content_id].append(content_entry) + del content_entry.pre_content_id + by_content_type[content_type].append(content_entry) by_unique_id[content_type][content_entry.unique_id] = content_entry @@ -266,6 +258,9 @@ def load_all(self, by_content_id, by_content_type, by_unique_id, by_unique_id_an counter_archived += 1 by_unique_id_and_md5sum[content_type][content_entry.unique_id][content_entry.md5sum] = content_entry + content_ids[content_entry.pre_content_id].append(content_entry) + del content_entry.pre_content_id + log.info( "Loaded %d entries and %d archived for %s", counter_entries, counter_archived, content_type_folder_name ) @@ -275,7 +270,7 @@ def load_all(self, by_content_id, by_content_type, by_unique_id, by_unique_id_an # time we have seen this part of the md5sum, sorted by upload-date. # This means that content_ids are stable over multiple runs, and means # we can scale this server horizontally. - for content_id, content_entries in self._content_ids.items(): + for content_id, content_entries in content_ids.items(): if len(content_entries) > 255: raise Exception( "We have more than 255 hash collisions;" @@ -286,6 +281,18 @@ def load_all(self, by_content_id, by_content_type, by_unique_id, by_unique_id_an content_entry.content_id = (i << 24) + content_id by_content_id[content_entry.content_id] = content_entry + # Now everything is known, calculate the dependencies. + for content_entry in by_content_id.values(): + content_entry.calculate_dependencies(by_unique_id_and_md5sum) + + # defaultdict() cannot be pickled, so convert to a normal dict. + return ( + by_content_id, + dict(by_content_type), + dict(by_unique_id), + {key: dict(value) for key, value in by_unique_id_and_md5sum.items()}, + ) + @click_additional_options @click.option( diff --git a/bananas_server/storage/s3.py b/bananas_server/storage/s3.py index daa75f7..2094772 100644 --- a/bananas_server/storage/s3.py +++ b/bananas_server/storage/s3.py @@ -26,9 +26,19 @@ def __init__(self): if _bucket_name is None: raise Exception("--storage-s3-bucket has to be given if storage is s3") - self._s3 = boto3.client("s3") + self._s3_cache = None self._folder_cache = None + @property + def _s3(self): + # This class will be pickled to be used by ProcessPoolExecutor(). To + # prevent the unpicklable S3 client having to be transmitted over the + # wire, create it only after the process is created. + if not self._s3_cache: + self._s3_cache = boto3.client("s3") + + return self._s3_cache + def _get_filename(self, content_entry): content_type_folder_name = get_folder_name_from_content_type(content_entry.content_type) unique_id = content_entry.unique_id.hex() @@ -67,6 +77,11 @@ def _get_folder_list(self, folder_search): yield folder def clear_cache(self): + # Reset the s3 instance, as it is not pickable. We are called just + # before a new process is created. On next use, a new object is + # created. Although this takes a few more cycles, the amount of times + # this happens makes it not worth mentioning. + self._s3_cache = None self._folder_cache = None def list_folder(self, content_type, unique_id=None): diff --git a/bananas_server/web_routes.py b/bananas_server/web_routes.py index 49e6cb0..0a08b63 100644 --- a/bananas_server/web_routes.py +++ b/bananas_server/web_routes.py @@ -63,7 +63,7 @@ async def reload(request): if data["secret"] != RELOAD_SECRET: return web.HTTPNotFound() - BANANAS_SERVER_APPLICATION.reload() + await BANANAS_SERVER_APPLICATION.reload() return web.HTTPNoContent()