Skip to content

Commit

Permalink
Implement APIDumper
Browse files Browse the repository at this point in the history
  • Loading branch information
zcqian committed Jan 4, 2022
1 parent 8ff3bba commit ca993b6
Showing 1 changed file with 175 additions and 1 deletion.
176 changes: 175 additions & 1 deletion biothings/hub/dataload/dumper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio
import cgi
import concurrent.futures
import email.utils
import inspect
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import os
import os.path
import stat
Expand All @@ -11,7 +14,9 @@
import time
from datetime import datetime, timezone
from functools import partial
from typing import Optional, Union, List
from typing import Optional, Union, List, Generator, Tuple, Any, Dict, Callable

import orjson

from biothings import config as btconfig
from biothings.hub import DUMPER_CATEGORY, UPLOADER_CATEGORY
Expand Down Expand Up @@ -1497,3 +1502,172 @@ def dump_info(self):
for name, klasses in self.register.items():
res[name] = [klass.__name__ for klass in klasses]
return res


class APIDumper(BaseDumper):
"""
Dump data from APIs
"""
_CHECK_JOIN_TIMEOUT = 20
_TARGET_BUFFER_SIZE = 2 << 13 # 16KiB

def create_todump_list(self, force=False, **kwargs):
"""
This gets called by method `dump`, to populate self.to_dump
"""
self.to_dump = [
{'remote': 'remote', 'local': 'local'}
]
self.release = self.get_release()

def remote_is_better(self, remotefile, localfile):
"""
If there is a simple method to check whether remote is better
"""
return True

def download(self, remotefile, localfile):
"""
This method is called in the coroutine
"""
if not (remotefile == 'remote') and (localfile == 'local'):
raise RuntimeError("This method is not supposed to be"
"called outside dump/do_dump")
wd = os.path.abspath(os.path.realpath(self.new_data_folder))
os.makedirs(wd, exist_ok=True)
self.to_dump = []
mp_context = multiprocessing.get_context('spawn')
executor = ProcessPoolExecutor(
max_workers=1,
mp_context=mp_context,
)

f = executor.submit(
_run_api_and_store_to_disk,
fn=self.get_document,
buffer_size=self._TARGET_BUFFER_SIZE,
working_directory=wd,
)
self.logger.debug("run_api submitted to executor...")
ex = None
while True:
try:
_ = f.result(timeout=self._CHECK_JOIN_TIMEOUT)
self.logger.info("run_api exited successfully")
break
except concurrent.futures.TimeoutError:
self.logger.debug("run_api is still running...")
except concurrent.futures.CancelledError:
self.logger.error("run_api has been unexpectedly cancelled...")
except Exception as e:
self.logger.warning("run_api exited with exception: %s", e)
ex = e
break
# we could have scheduled the shutdown right after submitting the task
# but it has some bugs
executor.shutdown(wait=True)
self.logger.info("executor shutdown successfully")
if ex is not None:
raise ex

@staticmethod
def get_document() -> Generator[Tuple[str, Any], None, None]:
"""
Get document from API source
Populate this method to yield documents to be stored on disk. Every
time you want to save something to disk, do this:
>>> yield 'name_of_file.ndjson', {'stuff': 'you want saved'}
While the type definition says Any is accepted, it has to be JSON
serilizable, so basically Python dictionaries/lists with strings and
numbers as the most basic elements.
Later on in your uploader, you can treat the files as NDJSON documents,
i.e. one JSON document per line.
It is recommended that you only do the minimal necessary processing in
this step.
A default HTTP client is not provided so you get the flexibility of
choosing your most favorite tool.
This MUST be a static method or it cannot be properly serialized to
run in a separate process.
This method is expected to be blocking (synchronous). However, be sure
to properly SET TIMEOUTS. You open the resources here in this function
so you have to deal with properly checking/closing them. If the
invoker forcefully stops this method, it will leave a mess behind,
therefore we do not do that.
You can do a 5 second timeout using the popular requests package by
doing something like this:
>>> import requests
>>> r = requests.get('https://example.org', timeout=5.0)
You can catch the exception or setup retries. If you cannot handle
the situation, just raise exceptions or not catch them. APIDumper
will handle it properly: documents are only saved when the entire
method completes successfully.
"""
raise NotImplementedError

def get_release(self) -> str:
raise NotImplementedError

@property
def client(self):
return None

def prepare_client(self):
# having the client in the main process is not a good idea anyways
# for some MongoDB client related things, it does make sense
# but just closing the connection is not enough to free it from memory
# and eliminate its threads.
# The best way to do it is to run spawn a new process and run the client
# there. Or do some kind of IPC and have the client in one process only.
raise RuntimeError("prepare_client method of APIDumper and its "
"descendents must not be called")

def release_client(self):
# dump will always call this method so we have to allow it
if inspect.stack()[1].function == 'dump':
return
raise RuntimeError("release_client method of APIDumper and its "
"descendents must not be called")

def need_prepare(self):
raise RuntimeError("need_prepare method of APIDumper and its "
"descendents must not be called")


def _run_api_and_store_to_disk(
fn: Callable[[], Generator[Tuple[str, Any], None, None]],
buffer_size: int,
working_directory: str,
):
pid = os.getpid()
ppid = os.getppid() # TODO: check parent process
os.chdir(working_directory)
buffer: Dict[str, bytearray] = {}
try:
for filename, obj in fn():
fn_byte_arr = buffer.setdefault(filename, bytearray())
fn_byte_arr.extend(orjson.dumps(obj) + b'\n')
if len(fn_byte_arr) >= buffer_size:
with open(f'{filename}.{pid}', 'ab') as f:
f.write(fn_byte_arr)
buffer[filename].clear()
except Exception as e:
# cleanup
for filename in buffer.keys():
if os.path.exists(filename):
os.unlink(f'{filename}.{pid}')
buffer.clear() # so that the remainder code does nothing
raise e
for filename, fn_byte_arr in buffer.items():
with open(f'{filename}.{pid}', 'ab') as f:
f.write(fn_byte_arr)
for filename in buffer.keys():
os.rename(src=f'{filename}.{pid}', dst=filename)

0 comments on commit ca993b6

Please sign in to comment.