Skip to content

Commit

Permalink
fix: Fixed logic in version checking, remote_is_better, and unblocked…
Browse files Browse the repository at this point in the history
… process in hub when remote file is generating
  • Loading branch information
jal347 committed May 2, 2023
1 parent 7b24dab commit 5cf7e45
Showing 1 changed file with 96 additions and 48 deletions.
144 changes: 96 additions & 48 deletions biothings/hub/dataload/dumper.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,10 @@ async def dump(self, steps=None, force=False, job_manager=None, check_only=False
# no src_doc or no download info
pass
# TODO: blocking call for now, FTP client can't be properly set in thread after
self.create_todump_list(force=force, **kwargs)
if inspect.iscoroutinefunction(self.create_todump_list):
await self.create_todump_list(force=force, job_manager=job_manager, **kwargs)
else:
self.create_todump_list(force=force, **kwargs)
# make sure we release (disconnect) client so we don't keep an open
# connection for nothing
self.release_client()
Expand Down Expand Up @@ -1318,6 +1321,7 @@ def download(self, remotefile, localfile):


class DumperManager(BaseSourceManager):

SOURCE_CLASS = BaseDumper

def get_source_ids(self):
Expand Down Expand Up @@ -1882,6 +1886,7 @@ def source_config(self):
return self._source_info

def prepare_client(self):
self.logger.info("Preparing docker client...")
assert type(self.__class__.SRC_URLS) is list, "SRC_URLS should be a list"
assert self.__class__.SRC_URLS, "SRC_URLS list is empty"
if not self._state["client"]:
Expand All @@ -1908,14 +1913,28 @@ def prepare_client(self):
raise DumperException("Can not connect to the Docker server!")
self.logger.info("Connected to Docker server")
self._state["client"] = client
# prepare docker configs
self.prepare_dumper_params()
return self._state["client"]

def need_prepare(self):
if not self.client:
return True

def set_release(self):
self.release = datetime.now().strftime("%Y%m%d%H%M%S")
_release = None
if self.custom_get_version_cmd:
container = self.client.containers.get(self.CONTAINER_NAME)
if container.status == "running":
# we can't execute the cmd in a stop container
exit_code, remote_version = container.exec_run(["sh", "-c", self.custom_get_version_cmd])
if exit_code == 0:
# make sure cmd run successfully on the remote
# the output of get_version_cmd should be the value returned from set_release method
_release = remote_version.decode().strip()
else:
self.logger.error("Failed to run get_version_cmd. Fallback to timestamp as the release.")
self.release = _release or datetime.now().strftime("%Y%m%d%H%M%S")

def release_client(self):
if self.client:
Expand All @@ -1925,39 +1944,36 @@ def release_client(self):

def post_dump(self, *args, **kwargs):
super().post_dump(*args, **kwargs)
self.release_client()
self.delete_container()

def remote_is_better(self, remote_file, local_file):
try:
os.stat(local_file)
except (FileNotFoundError, TypeError):
return True
def get_remote_lastmodified(self, remote_file):
if not self.CONTAINER_NAME:
return True # new fresh container, remote is always better
return None # new fresh container, remote is always better
try:
container = self.client.containers.get(self.CONTAINER_NAME)
except Exception:
return True # exception will be caught in the download step
if self.custom_get_version_cmd:
if container.status != "running":
return True # we can't execute the cmd in a stop container
current_version = os.popen(self.custom_get_version_cmd.replace("{}", local_file, 1)).read().strip()
exit_code, remote_version = container.exec_run(
["sh", "-c", self.custom_get_version_cmd.replace("{}", remote_file, 1)]
)
if exit_code != 0:
return True # failed when run cmd check on the remote
remote_version = remote_version.decode().strip()
self.release = (
remote_version # the output of get_version_cmd should be the value returned from set_release method
)
if current_version != remote_version:
return True
else:
# Always dump unless get_version_cmd is provided to check whether or not there is a new version.
# Prevent double click dump button
# So there is a bit trick by check file is already exist on the new_data_folder
return not os.path.isfile(f"{self.new_data_folder}/{os.path.basename(remote_file)}")
return None # exception will be caught in the download step

lastmodified = None
if container.status == "running":
docker_cmd = f'stat "{remote_file}" -c "%Y"'
exit_code, remote_lastmodified_stdout = container.exec_run(["sh", "-c", docker_cmd])
if exit_code == 0:
lastmodified = int(remote_lastmodified_stdout.decode().strip())
else:
self.logger.error("Could not run stat on remote file in get_remote_lastmodified.")
return lastmodified

def remote_is_better(self, remote_file, local_file):
try:
res = os.stat(local_file)
except (FileNotFoundError, TypeError):
return True

local_lastmodified = int(res.st_mtime)
remote_lastmodified = self.get_remote_lastmodified(remote_file)
if local_lastmodified != remote_lastmodified:
return True
return False

def set_dump_command(self):
Expand All @@ -1980,7 +1996,7 @@ def set_version_cmd(self):
else:
self.custom_get_version_cmd = self.image_metadata.get("get_version_cmd")

def get_remote_file(self, url=None):
def get_remote_file(self):
if self._data_path:
return self._data_path
if self.source_config.get("path"):
Expand All @@ -1991,12 +2007,33 @@ def get_remote_file(self, url=None):
raise DumperException("Missing the require parameter: path")
return self._data_path

def create_todump_list(self, force=False, **kwargs):
async def create_todump_list(self, force=False, job_manager=None, **kwargs):
assert type(self.__class__.SRC_URLS) is list, "SRC_URLS should be a list"
assert self.__class__.SRC_URLS, "SRC_URLS list is empty"
self.set_release() # so we can generate new_data_folder
self.prepare_client()
self.prepare_dumper_params()
self.prepare_remote_container()
# unprepare unpicklable objects so we can use multiprocessing
self.unprepare()
# set up job to generate remote file
pinfo = self.get_pinfo()
pinfo["step"] = "check"
job = await job_manager.defer_to_process(pinfo, partial(self.generate_remote_file))
remote_error = False

def remote_done(f):
nonlocal remote_error
if f.exception():
remote_error = f.exception()

job.add_done_callback(remote_done)
await job
if remote_error:
raise remote_error
# Need to reinit _state b/c of unprepare
if self.need_prepare():
self.prepare_client()
self.setup_log()

self.set_release()
for src_url in self.__class__.SRC_URLS:
remote_file = self.get_remote_file()
file_name = os.path.basename(remote_file)
Expand All @@ -2010,6 +2047,9 @@ def create_todump_list(self, force=False, **kwargs):
if force or current_local_file is None or remote_better:
new_localfile = os.path.join(self.new_data_folder, file_name)
self.to_dump.append({"remote": src_url, "local": new_localfile})
# if there is nothing to dump check if you need to delete container
if not self.to_dump:
self.delete_container()

def prepare_local_folders(self, localfile):
super().prepare_local_folders(localfile)
Expand Down Expand Up @@ -2082,37 +2122,45 @@ def prepare_remote_container(self):
if self.container.status == "exited":
raise DockerContainerException("Container is exited")

def download(self, remote_file_url, local_file):
self.prepare_client()
self.prepare_remote_container()
self.prepare_local_folders(local_file)
remote_file = self.get_remote_file(remote_file_url)
def generate_remote_file(self):
if self.need_prepare():
self.prepare_client()
if self.DUMP_COMMAND:
self.logger.info(f"Exec the command: sh -c {self.DUMP_COMMAND}")
exit_code, output = self.container.exec_run(["sh", "-c", self.DUMP_COMMAND])
self.logger.debug(output.decode())
if exit_code != 0:
self.logger.error(f"Failed to download {remote_file}, non-zero exit code from custom cmd: {exit_code}")
remote_file = self.get_remote_file()
self.logger.error(f"Failed to generate {remote_file}, non-zero exit code from custom cmd: {exit_code}")
self.logger.error(output)
raise DumperException(f"Can not execute the command: {self.DUMP_COMMAND}")
else:
raise DockerContainerException("The dump_command parameter is must be defined")

def delete_container(self):
if self.container:
if not self.keep_container:
self.logger.debug(f"Removing container: {self.container.name}")
self.container.stop()
self.container.wait(timeout=self.TIMEOUT)
self.container.remove()

def download(self, remote_file_url, local_file):
# removes local file if exists before downloading remote file to local
self.prepare_local_folders(local_file)
remote_file = self.get_remote_file()
try:
# get_archive returns a tar datastream and dict with stat info
bits, stat = self.container.get_archive(remote_file, encode_stream=True)
if stat.get("size", 0) > 0:
tmp_file = f"{local_file}.tar"
with open(tmp_file, "wb") as fp:
for chunk in bits:
fp.write(chunk)
# untar data from get_archive
untarall(folder=os.path.dirname(local_file), pattern="*.tar")
# we dont actually need to modify locallastmodified. tar file created from container.get_archive keeps the original date_modified
self.logger.info(f"Successfully download file {remote_file} to {local_file}")
except Exception as ex:
self.logger.error(f"Failed to download {remote_file} with exception: {ex}", exc_info=True)
raise DumperException("Can not download the data file")

if self.container:
if not self.keep_container:
self.logger.debug(f"Removing container: {self.container.name}")
self.container.stop()
self.container.wait(timeout=self.TIMEOUT)
self.container.remove()

0 comments on commit 5cf7e45

Please sign in to comment.