Skip to content

Commit

Permalink
updated documentation for APIDumper
Browse files Browse the repository at this point in the history
and other minor tweaks
  • Loading branch information
zcqian committed Jan 4, 2022
1 parent ca993b6 commit b1d565e
Showing 1 changed file with 78 additions and 7 deletions.
85 changes: 78 additions & 7 deletions biothings/hub/dataload/dumper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import time
from datetime import datetime, timezone
from functools import partial
from typing import Optional, Union, List, Generator, Tuple, Any, Dict, Callable
from typing import Optional, Union, List, Generator, Tuple, Any, Dict, Callable, Iterable

import orjson

Expand Down Expand Up @@ -1508,7 +1508,17 @@ class APIDumper(BaseDumper):
"""
Dump data from APIs
This will run API calls in a clean process and write its results in
one or more NDJSON documents.
Populate the static methods get_document and get_release in your
subclass, along with other necessary bits common to all dumpers.
For details on specific parts, read the docstring for individual
methods.
An example subclass implementation can be found in the unii data
source for MyGene.info.
"""
_CHECK_JOIN_TIMEOUT = 20
_TARGET_BUFFER_SIZE = 2 << 13 # 16KiB
Expand All @@ -1520,6 +1530,8 @@ def create_todump_list(self, force=False, **kwargs):
self.to_dump = [
{'remote': 'remote', 'local': 'local'}
]
# TODO: we can have get_release in another process as well
# but I don't think it is worth it.
self.release = self.get_release()

def remote_is_better(self, remotefile, localfile):
Expand All @@ -1530,7 +1542,24 @@ def remote_is_better(self, remotefile, localfile):

def download(self, remotefile, localfile):
"""
This method is called in the coroutine
Runs helper function in new process to download data
This is run in a new process by the do_dump coroutine of the
parent class. Then this will spawn another process that actually
does all the work. This method is mostly for setting up the
environment, setting up the the process pool executor to
correctly use spawn and using concurrent.futures to simply run
tasks in the new process, and periodically check the status
of the task.
Explanation: because this is actually running inside a process
that forked from a threaded process, the internal state is more
or less corrupt/broken, see `man 2 fork` for details.
More discussions are in Slack from some time in 2021 on why it
has to be forked and why it is broken.
Caveats: the existing job manager will not know how much memory
the actual worker process is using.
"""
if not (remotefile == 'remote') and (localfile == 'local'):
raise RuntimeError("This method is not supposed to be"
Expand All @@ -1550,6 +1579,8 @@ def download(self, remotefile, localfile):
buffer_size=self._TARGET_BUFFER_SIZE,
working_directory=wd,
)
# we can schedule shutdown of the executor right now
# but it has bugs, see below
self.logger.debug("run_api submitted to executor...")
ex = None
while True:
Expand All @@ -1566,7 +1597,7 @@ def download(self, remotefile, localfile):
ex = e
break
# we could have scheduled the shutdown right after submitting the task
# but it has some bugs
# but it has some bugs, see https://bugs.python.org/issue39104
executor.shutdown(wait=True)
self.logger.info("executor shutdown successfully")
if ex is not None:
Expand Down Expand Up @@ -1613,11 +1644,22 @@ def get_document() -> Generator[Tuple[str, Any], None, None]:
"""
raise NotImplementedError

def get_release(self) -> str:
@staticmethod
def get_release() -> str:
"""
Get the string for the release information.
This is run in the main process and thread so it must return quickly.
This must be populated
Returns:
string representing the release.
"""
raise NotImplementedError

@property
def client(self):
# overides the parent class
return None

def prepare_client(self):
Expand All @@ -1643,12 +1685,41 @@ def need_prepare(self):


def _run_api_and_store_to_disk(
fn: Callable[[], Generator[Tuple[str, Any], None, None]],
fn: Callable[[], Iterable[Tuple[str, Any]]],
buffer_size: int,
working_directory: str,
):
) -> None:
"""
Runs an API Callable and Store result as NDJSON
This is a helper function used by APIDumper and is supposed to be
run in a separate process.
It is defined in the module so that it can be serialized. The
arguments must also be serializable.
Args:
fn: Callable (function or static method) that takes no arguments
or keyword arguments. Must return an Iterable which
individual items must be tuples. The said tuples must
contain two elements, the first is a string which is the name
of the output file, and the second is the object to be saved.
The object must be JSON serializable.
buffer_size: target buffer size per file, in bytes. It will always
be overrun, but will immediately be written to the disk if it
does overrun. This is so that very large operations will not
cause out-of-memory errors. It is not for optimizing disk IO
performance.
working_directory: absolute path of the working directory. Files
will be written in the given directory.
"""
pid = os.getpid()
ppid = os.getppid() # TODO: check parent process
if not os.path.isabs(working_directory):
raise ValueError(
f"desired working_directory {working_directory}"
f" is not absolute"
)
os.chdir(working_directory)
buffer: Dict[str, bytearray] = {}
try:
Expand All @@ -1664,7 +1735,7 @@ def _run_api_and_store_to_disk(
for filename in buffer.keys():
if os.path.exists(filename):
os.unlink(f'{filename}.{pid}')
buffer.clear() # so that the remainder code does nothing
buffer.clear()
raise e
for filename, fn_byte_arr in buffer.items():
with open(f'{filename}.{pid}', 'ab') as f:
Expand Down

0 comments on commit b1d565e

Please sign in to comment.