From 351a7874a1dce817a2802d5f55e0e35ed66f8695 Mon Sep 17 00:00:00 2001 From: Radek Pasiok Date: Thu, 24 Jun 2021 22:43:29 +0200 Subject: [PATCH 1/7] Reslving conflicts for Added API calls for downloading files at specified version and getting concatenated diff of file between specified versions. Getting diffs has also its CLI. --- mergin/cli.py | 49 +++++++++ mergin/client.py | 78 +++++++++++++- mergin/client_pull.py | 209 ++++++++++++++++++++++++++++++++++--- mergin/test/test_client.py | 70 +++++++++++++ 4 files changed, 390 insertions(+), 16 deletions(-) diff --git a/mergin/cli.py b/mergin/cli.py index 0f98489f..b8d0657a 100755 --- a/mergin/cli.py +++ b/mergin/cli.py @@ -26,6 +26,8 @@ from mergin.client_pull import ( download_project_async, download_project_cancel, + download_project_file, + download_file_finalize, download_project_finalize, download_project_is_running, ) @@ -260,6 +262,53 @@ def download(ctx, project, directory, version): _print_unhandled_exception() +@cli.command() +@click.argument("project") +@click.argument("filepath") +@click.argument("output") +@click.option("--version", help="Project version tag, for example 'v3'") +@click.pass_context +def download_file(ctx, project, filepath, output, version): + """ + Download project file at specified version. `project` needs to be a combination of namespace/project. + If no version is given, the latest will be fetched. + """ + mc = ctx.obj["client"] + if mc is None: + return + + click.echo(f"project: {project}") + click.echo(f"filepath: {filepath}") + click.echo(f"output: {output}") + click.echo(f"version: {version}") + + try: + namespace, project = project.split("/") + assert namespace, "No namespace given" + assert project, "No project name given" + except (ValueError, AssertionError) as e: + click.secho(f"Incorrect namespace/project format: {e}", fg="red") + return + try: + job = download_project_file(mc, f"{namespace}/{project}", filepath, output, version) + with click.progressbar(length=job.total_size) as bar: + last_transferred_size = 0 + while download_project_is_running(job): + time.sleep(1 / 10) # 100ms + new_transferred_size = job.transferred_size + bar.update(new_transferred_size - last_transferred_size) # the update() needs increment only + last_transferred_size = new_transferred_size + download_file_finalize(job) + click.echo("Done") + except KeyboardInterrupt: + click.secho("Cancelling...") + download_project_cancel(job) + except ClientError as e: + click.secho("Error: " + str(e), fg="red") + except Exception as e: + _print_unhandled_exception() + + def num_version(name): return int(name.lstrip("v")) diff --git a/mergin/client.py b/mergin/client.py index 545ed9b7..60687571 100644 --- a/mergin/client.py +++ b/mergin/client.py @@ -13,7 +13,15 @@ from .common import ClientError, LoginError from .merginproject import MerginProject -from .client_pull import download_project_async, download_project_wait, download_project_finalize +from .client_pull import ( + download_file_finalize, + download_project_async, + download_project_file, + download_project_file_diffs, + download_project_finalize, + download_project_wait, + pull_diffs_finalize, +) from .client_pull import pull_project_async, pull_project_wait, pull_project_finalize from .client_push import push_project_async, push_project_wait, push_project_finalize from .utils import DateTimeEncoder @@ -618,3 +626,71 @@ def get_projects_by_names(self, projects): resp = self.post("/v1/project/by_names", {"projects": projects}, {"Content-Type": "application/json"}) return json.load(resp) + + def download_file(self, project_path, file_path, output_filename, version=None): + """ + Download project file at specified version. Get the latest if no version specified. + + :param project_path: project full name (/) + :type project_path: String + :param file_path: relative path of file to download in the project directory + :type file_path: String + :param output_filename: full destination path for saving the downloaded file + :type output_filename: String + :param version: optional version tag for downloaded file + :type version: String + """ + ver_info = f"at version {version}" if version is not None else "at latest version" + self.log.info(f"Getting {file_path} from {project_path} {ver_info}") + job = download_project_file(self, project_path, file_path, output_filename, version=version) + if job is None: + return + pull_project_wait(job) + return download_file_finalize(job) + + def get_versions_with_file_diffs( + self, project_path, file_path, version_from=None, version_to=None, file_history=None): + """ + Get versions history in which the file was changed. + If version_from and version_to are not given get all changed versions. Versions can be integers or tags ('v2'). + + :returns: dictionary {version number: version history item} + :rtype: dict + """ + if file_history is None: + file_history = self.project_file_history_info(project_path, file_path)["history"] + all_version_numbers = sorted([int(k[1:]) for k in file_history.keys()]) + version_from = all_version_numbers[0] if version_from is None else version_from + version_to = all_version_numbers[-1] if version_to is None else version_to + if version_from not in all_version_numbers or version_to not in all_version_numbers: + warn = f"Wrong version parameter: {version_from}-{version_to} while getting diffs for {file_path}" + self.log.warning(warn) + raise ClientError(warn) + + # Find versions to fetch between the 'from' and 'to' versions + idx_from = idx_to = None + for idx, version in enumerate(all_version_numbers): + if version == version_from: + idx_from = idx + elif version == version_to: + idx_to = idx + break + version_numbers_to_fetch = all_version_numbers[idx_from:idx_to + 1] + return {k: v for k, v in file_history.items() if int(k[1:]) in version_numbers_to_fetch} + + def get_file_diff(self, project_path, project_dir, file_path, output_diff, version_from, version_to): + """ Create concatenated diff for project file diffs between versions version_from and version_to. """ + self.log.info(f"Getting diffs for file {file_path} of {project_path}") + file_history = self.project_file_history_info(project_path, file_path)["history"] + version_from = int(version_from[1:]) if isinstance(version_from, str) else version_from + version_to = int(version_to[1:]) if isinstance(version_to, str) else version_to + versions_to_fetch = self.get_versions_with_file_diffs( + project_path, file_path, version_from=version_from, version_to=version_to, file_history=file_history + ) + job = download_project_file_diffs(self, project_dir, file_path, versions_to_fetch) + if job is None: + return + pull_project_wait(job) + + # finalize getting diffs + return pull_diffs_finalize(job, output_diff) diff --git a/mergin/client_pull.py b/mergin/client_pull.py index c1e94b40..e6e59b89 100644 --- a/mergin/client_pull.py +++ b/mergin/client_pull.py @@ -14,11 +14,12 @@ import os import pprint import shutil +import tempfile import concurrent.futures from .common import CHUNK_SIZE, ClientError -from .merginproject import MerginProject +from .merginproject import MerginProject, InvalidProject from .utils import save_to_file @@ -37,7 +38,8 @@ class DownloadJob: """ Keeps all the important data about a pending download job """ - def __init__(self, project_path, total_size, version, update_tasks, download_queue_items, directory, mp, project_info): + def __init__(self, project_path, total_size, version, update_tasks, download_queue_items, + directory, mp, mc, project_info): self.project_path = project_path self.total_size = total_size # size of data to download (in bytes) self.transferred_size = 0 @@ -45,10 +47,13 @@ def __init__(self, project_path, total_size, version, update_tasks, download_que self.update_tasks = update_tasks self.download_queue_items = download_queue_items self.directory = directory # project's directory + self.mc = mc # MerginClient instance self.mp = mp # MerginProject instance self.is_cancelled = False self.project_info = project_info # parsed JSON with project info returned from the server - + self.executor = None + self.futures = None + def dump(self): print("--- JOB ---", self.total_size, "bytes") for task in self.update_tasks: @@ -104,7 +109,6 @@ def download_project_async(mc, project_path, directory, project_version=None): """ Starts project download in background and returns handle to the pending project download. Using that object it is possible to watch progress or cancel the ongoing work. - """ if '/' not in project_path: @@ -144,7 +148,7 @@ def download_project_async(mc, project_path, directory, project_version=None): mp.log.info(f"will download {len(update_tasks)} files in {len(download_list)} chunks, total size {total_size}") - job = DownloadJob(project_path, total_size, version, update_tasks, download_list, directory, mp, project_info) + job = DownloadJob(project_path, total_size, version, update_tasks, download_list, directory, mp, mc, project_info) # start download job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) @@ -171,7 +175,8 @@ def download_project_is_running(job): """ for future in job.futures: if future.done() and future.exception() is not None: - _cleanup_failed_download(job.directory, job.mp) + if job.mp is not None: + _cleanup_failed_download(job.directory, job.mp) raise future.exception() if future.running(): return True @@ -226,26 +231,37 @@ def download_project_cancel(job): class UpdateTask: """ - Entry for each file that will be updated. At the end of a successful download of new data, all the tasks are executed. + Entry for each file that will be updated. + At the end of a successful download of new data, all the tasks are executed. """ # TODO: methods other than COPY - def __init__(self, file_path, download_queue_items): + def __init__(self, file_path, download_queue_items, destination_file=None): self.file_path = file_path + self.destination_file = destination_file self.download_queue_items = download_queue_items def apply(self, directory, mp): """ assemble downloaded chunks into a single file """ - basename = os.path.basename(self.file_path) #file['diff']['path']) if diff_only else os.path.basename(file['path']) - file_dir = os.path.dirname(os.path.normpath(os.path.join(directory, self.file_path))) - dest_file_path = os.path.join(file_dir, basename) + if self.destination_file is None: + basename = os.path.basename(self.file_path) + file_dir = os.path.dirname(os.path.normpath(os.path.join(directory, self.file_path))) + dest_file_path = os.path.join(file_dir, basename) + else: + file_dir = os.path.dirname(os.path.normpath(self.destination_file)) + dest_file_path = self.destination_file os.makedirs(file_dir, exist_ok=True) # merge chunks together (and delete them afterwards) file_to_merge = FileToMerge(dest_file_path, self.download_queue_items) file_to_merge.merge() + if mp is None or dest_file_path is not None: + # Some tasks (downloading single file) do not require the project to be downloaded. + # In that case mp = None and there is no need to copy the file. + # Also skip copying if user specified the destination path for the downloaded file + return if mp.is_versioned_file(self.file_path): mp.geodiff.make_copy_sqlite(mp.fpath(self.file_path), mp.fpath_meta(self.file_path)) @@ -268,7 +284,8 @@ def __repr__(self): def download_blocking(self, mc, mp, project_path): """ Starts download and only returns once the file has been fully downloaded and saved """ - mp.log.debug(f"Downloading {self.file_path} version={self.version} diff={self.diff_only} part={self.part_index}") + log = mc.log if mp is None else mp.log + log.debug(f"Downloading {self.file_path} version={self.version} diff={self.diff_only} part={self.part_index}") start = self.part_index * (1 + CHUNK_SIZE) resp = mc.get("/v1/project/raw/{}".format(project_path), data={ "file": self.file_path, @@ -279,15 +296,16 @@ def download_blocking(self, mc, mp, project_path): } ) if resp.status in [200, 206]: - mp.log.debug(f"Download finished: {self.file_path}") + log.debug(f"Download finished: {self.file_path}") save_to_file(resp, self.download_file_path) else: - mp.log.error(f"Download failed: {self.file_path}") + log.error(f"Download failed: {self.file_path}") raise ClientError('Failed to download part {} of file {}'.format(self.part_index, self.file_path)) class PullJob: - def __init__(self, project_path, pull_changes, total_size, version, files_to_merge, download_queue_items, temp_dir, mp, project_info, basefiles_to_patch): + def __init__(self, project_path, pull_changes, total_size, version, files_to_merge, download_queue_items, + temp_dir, mp, project_info, basefiles_to_patch): self.project_path = project_path self.pull_changes = pull_changes # dictionary with changes (dict[str, list[dict]] - keys: "added", "updated", ...) self.total_size = total_size # size of data to download (in bytes) @@ -551,3 +569,164 @@ def pull_project_finalize(job): shutil.rmtree(job.temp_dir) return conflicts + + +def download_project_file(mc, project_path, file_path, output_file, version): + """ + Starts background download project file at specified versions. + Returns handle to the pending download. + """ + mc.log.info(f"--- start download") + project_info = mc.project_info(project_path, version=version) + + mc.log.info(f"Got project info. version {project_info['version']}") + + # set temporary directory and make sure the destination directory exists + temp_dir = os.path.join(tempfile.gettempdir(), "mergin_temp") + os.makedirs(os.path.dirname(output_file), exist_ok=True) + + download_list = [] + update_tasks = [] + total_size = 0 + for file in project_info['files']: + if file["path"] == file_path: + file['version'] = version + items = _download_items(file, project_path) + task = UpdateTask(file['path'], items, output_file) + download_list.extend(task.download_queue_items) + for item in task.download_queue_items: + total_size += item.size + update_tasks.append(task) + break + if not download_list: + warn = f"No {file_path} exists at version {version}" + mc.log.warning(warn) + raise ClientError(warn) + + mc.log.info(f"will download file {file_path} in {len(download_list)} chunks, total size {total_size}") + job = DownloadJob( + project_path, total_size, version, update_tasks, download_list, temp_dir, None, mc, project_info + ) + job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) + job.futures = [] + for item in download_list: + future = job.executor.submit(_do_download, item, mc, None, project_path, job) + job.futures.append(future) + + return job + + +def download_file_finalize(job): + """ + To be called when download_project_file is finished + """ + job.executor.shutdown(wait=True) + + # make sure any exceptions from threads are not lost + for future in job.futures: + if future.exception() is not None: + raise future.exception() + + job.mc.log.info("--- download finished") + + for task in job.update_tasks: + task.apply(job.directory, job.mp) + + +def download_project_file_diffs(mc, project_directory, file_path, versions): + """ + Starts background download project file diffs for specified versions. + Returns handle to the pending download. + """ + try: + mp = MerginProject(project_directory) + except InvalidProject as e: + mc.log.error(f"Couldn't create Mergin project for directory: {project_directory}: {repr(e)}") + return None + project_path = mp.metadata["name"] + mp.log.info(f"--- version: {mc.user_agent_info()}") + mp.log.info(f"--- start download diffs for {file_path}, versions: {[v for v in versions]}") + + try: + server_info = mc.project_info(project_path) + except ClientError as err: + mp.log.error("Error getting project info: " + str(err)) + mp.log.info("--- pull aborted") + raise + + version_numbers = sorted([int(v[1:]) for v in versions]) + temp_dir = os.path.join(tempfile.gettempdir(), "mergin_temp") + os.makedirs(temp_dir, exist_ok=True) + fetch_files = [] + + for version, version_data in versions.items(): + if int(version[1:]) == version_numbers[0]: + continue + diff_file = copy.deepcopy(version_data) + diff_file['version'] = version + diff_file['diff'] = version_data['diff'] + fetch_files.append(diff_file) + + files_to_merge = [] # list of FileToMerge instances + for file in fetch_files: + items = _download_items(file, temp_dir, diff_only=True) + dest_file_path = os.path.normpath(os.path.join(temp_dir, os.path.basename(file['diff']['path']))) + files_to_merge.append(FileToMerge(dest_file_path, items)) + + # make a single list of items to download + total_size = 0 + download_list = [] + for file_to_merge in files_to_merge: + download_list.extend(file_to_merge.downloaded_items) + for item in file_to_merge.downloaded_items: + total_size += item.size + + mp.log.info(f"will download {len(download_list)} chunks, total size {total_size}") + + job = PullJob(project_path, None, total_size, None, files_to_merge, download_list, temp_dir, mp, + server_info, {}) + + # start download + job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) + job.futures = [] + for item in download_list: + future = job.executor.submit(_do_download, item, mc, mp, project_path, job) + job.futures.append(future) + + return job + + +def pull_diffs_finalize(job, output_diff): + """ To be called after download_project_file_diffs """ + + job.executor.shutdown(wait=True) + + # make sure any exceptions from threads are not lost + for future in job.futures: + if future.exception() is not None: + job.mp.log.error("Error while pulling data: " + str(future.exception())) + job.mp.log.info("--- pull aborted") + raise future.exception() + + job.mp.log.info("finalizing diffs pull") + + # merge downloaded chunks + try: + for file_to_merge in job.files_to_merge: + file_to_merge.merge() + except ClientError as err: + job.mp.log.error("Error merging chunks of downloaded file: " + str(err)) + job.mp.log.info("--- diffs pull aborted") + raise + + job.mp.log.info("--- diffs pull finished") + + # Concatenate diffs + diffs = [] + for file_to_merge in job.files_to_merge: + diffs.append(file_to_merge.dest_file) + output_dir = os.path.dirname(output_diff) + os.makedirs(output_dir, exist_ok=True) + job.mp.geodiff.concat_changes(diffs, output_diff) + for diff in diffs: + os.remove(diff) diff --git a/mergin/test/test_client.py b/mergin/test/test_client.py index 2a182eed..ee95f203 100644 --- a/mergin/test/test_client.py +++ b/mergin/test/test_client.py @@ -810,12 +810,81 @@ def test_server_compatibility(mc): assert mc.is_server_compatible() +def create_versioned_project(mc, project_name, project_dir, updated_file, remove=True): + project = API_USER + '/' + project_name + cleanup(mc, project, [project_dir]) + + # create remote project + shutil.copytree(TEST_DATA_DIR, project_dir) + mc.create_project_and_push(project_name, project_dir) + + mp = MerginProject(project_dir) + + # create versions 2-5 + changes = ("inserted_1_A.gpkg", "inserted_1_A_mod.gpkg", "inserted_1_B.gpkg",) + for change in changes: + shutil.copy(mp.fpath(change), mp.fpath(updated_file)) + mc.push_project(project_dir) + + if remove: + os.remove(os.path.join(project_dir, updated_file)) + mc.push_project(project_dir) + + return mp + + +def test_download_file(mc): + """Test downloading single file at specified versions.""" + test_project = 'test_download_file' + project = API_USER + '/' + test_project + project_dir = os.path.join(TMP_DIR, test_project) + download_dir = os.path.join(project_dir, "versions") # project for downloading files at various versions + f_updated = "base.gpkg" + + mp = create_versioned_project(mc, test_project, project_dir, f_updated) + + project_info = mc.project_info(project) + assert project_info["version"] == "v5" + + # Download the base file at versions 2-4 and check their checksums + f_versioned = os.path.join(download_dir, f_updated) + for ver in range(2, 5): + mc.download_file(project, f_updated, f_versioned, version=f"v{ver}") + f_downloaded_checksum = generate_checksum(f_versioned) + proj_info = mc.project_info(project, version=f"v{ver}") + f_remote_checksum = next((f['checksum'] for f in proj_info['files'] if f['path'] == f_updated), None) + assert f_remote_checksum == f_downloaded_checksum + + with pytest.raises(ClientError, match=f"No {f_updated} exists at version v5"): + mc.download_file(project, f_updated, f_versioned, version=f"v5") + + +def test_download_diffs(mc): + """Test download diffs for a project file between specified project versions.""" + test_project = 'test_download_diffs' + project = API_USER + '/' + test_project + project_dir = os.path.join(TMP_DIR, test_project) + download_dir = os.path.join(project_dir, "diffs") # project for downloading files at various versions + f_updated = "base.gpkg" + diff = os.path.join(download_dir, f_updated + ".diff") + + mp = create_versioned_project(mc, test_project, project_dir, f_updated, remove=False) + + project_info = mc.project_info(project) + assert project_info["version"] == "v4" + + # Download diffs of updated file between versions 2 and 4 + mc.get_file_diff(project, project_dir, f_updated, diff, "v1", "v4") + assert os.path.exists(diff) + + def _use_wal(db_file): """ Ensures that sqlite database is using WAL journal mode """ con = sqlite3.connect(db_file) cursor = con.cursor() cursor.execute('PRAGMA journal_mode=wal;') + def _create_test_table(db_file): """ Creates a table called 'test' in sqlite database. Useful to simulate change of database schema. """ con = sqlite3.connect(db_file) @@ -824,6 +893,7 @@ def _create_test_table(db_file): cursor.execute('INSERT INTO test VALUES (123, \'hello\');') cursor.execute('COMMIT;') + def _check_test_table(db_file): """ Checks whether the 'test' table exists and has one row - otherwise fails with an exception. """ #con_verify = sqlite3.connect(db_file) From f45c15c4598767818f535a21780c7ef1ca0b65fb Mon Sep 17 00:00:00 2001 From: Radek Pasiok Date: Fri, 25 Jun 2021 09:20:01 +0200 Subject: [PATCH 2/7] Fixed failing tests --- mergin/cli.py | 6 ------ mergin/client.py | 26 +++++++++++++++++++++++--- mergin/client_pull.py | 2 +- mergin/test/test_client.py | 2 +- 4 files changed, 25 insertions(+), 11 deletions(-) diff --git a/mergin/cli.py b/mergin/cli.py index b8d0657a..5ce89805 100755 --- a/mergin/cli.py +++ b/mergin/cli.py @@ -276,12 +276,6 @@ def download_file(ctx, project, filepath, output, version): mc = ctx.obj["client"] if mc is None: return - - click.echo(f"project: {project}") - click.echo(f"filepath: {filepath}") - click.echo(f"output: {output}") - click.echo(f"version: {version}") - try: namespace, project = project.split("/") assert namespace, "No namespace given" diff --git a/mergin/client.py b/mergin/client.py index 60687571..7eac4657 100644 --- a/mergin/client.py +++ b/mergin/client.py @@ -11,7 +11,7 @@ import dateutil.parser import ssl -from .common import ClientError, LoginError +from .common import ClientError, LoginError, InvalidProject from .merginproject import MerginProject from .client_pull import ( download_file_finalize, @@ -678,8 +678,28 @@ def get_versions_with_file_diffs( version_numbers_to_fetch = all_version_numbers[idx_from:idx_to + 1] return {k: v for k, v in file_history.items() if int(k[1:]) in version_numbers_to_fetch} - def get_file_diff(self, project_path, project_dir, file_path, output_diff, version_from, version_to): - """ Create concatenated diff for project file diffs between versions version_from and version_to. """ + def get_file_diff(self, project_dir, file_path, output_diff, version_from, version_to): + """ Create concatenated diff for project file diffs between versions version_from and version_to. + + :param project_dir: project local directory + :type project_dir: String + :param file_path: relative path of file to download in the project directory + :type file_path: String + :param output_diff: full destination path for concatenated diff file + :type output_diff: String + :param version_from: starting project version tag for getting diff + :type version_from: String + :param version_to: ending project version tag for getting diff + :type version_to: String + """ + try: + mp = MerginProject(project_dir) + except InvalidProject as e: + err = f"Couldn't create Mergin project for {project_dir}: {repr(e)}" + self.log.error(err) + raise ClientError(err) + + project_path = mp.metadata["name"] self.log.info(f"Getting diffs for file {file_path} of {project_path}") file_history = self.project_file_history_info(project_path, file_path)["history"] version_from = int(version_from[1:]) if isinstance(version_from, str) else version_from diff --git a/mergin/client_pull.py b/mergin/client_pull.py index e6e59b89..b6eebc0f 100644 --- a/mergin/client_pull.py +++ b/mergin/client_pull.py @@ -257,7 +257,7 @@ def apply(self, directory, mp): file_to_merge = FileToMerge(dest_file_path, self.download_queue_items) file_to_merge.merge() - if mp is None or dest_file_path is not None: + if mp is None or self.destination_file is not None: # Some tasks (downloading single file) do not require the project to be downloaded. # In that case mp = None and there is no need to copy the file. # Also skip copying if user specified the destination path for the downloaded file diff --git a/mergin/test/test_client.py b/mergin/test/test_client.py index ee95f203..5d611815 100644 --- a/mergin/test/test_client.py +++ b/mergin/test/test_client.py @@ -874,7 +874,7 @@ def test_download_diffs(mc): assert project_info["version"] == "v4" # Download diffs of updated file between versions 2 and 4 - mc.get_file_diff(project, project_dir, f_updated, diff, "v1", "v4") + mc.get_file_diff(project_dir, f_updated, diff, "v1", "v4") assert os.path.exists(diff) From a784d6afa86b55fff077b04b710f19414b0e6d73 Mon Sep 17 00:00:00 2001 From: Radek Pasiok Date: Fri, 25 Jun 2021 09:37:05 +0200 Subject: [PATCH 3/7] Fixed failing tests --- mergin/client_pull.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mergin/client_pull.py b/mergin/client_pull.py index b6eebc0f..8993655c 100644 --- a/mergin/client_pull.py +++ b/mergin/client_pull.py @@ -591,7 +591,7 @@ def download_project_file(mc, project_path, file_path, output_file, version): for file in project_info['files']: if file["path"] == file_path: file['version'] = version - items = _download_items(file, project_path) + items = _download_items(file, temp_dir) task = UpdateTask(file['path'], items, output_file) download_list.extend(task.download_queue_items) for item in task.download_queue_items: From 1e474d851f2942edca5ba90c1765a0722e5e8580 Mon Sep 17 00:00:00 2001 From: Radek Pasiok Date: Fri, 2 Jul 2021 21:29:14 +0200 Subject: [PATCH 4/7] Changes suggested in the review. --- mergin/cli.py | 16 ++----- mergin/client.py | 68 +++++++++------------------ mergin/client_pull.py | 58 +++++++++++++---------- mergin/test/test_client.py | 94 ++++++++++++++++++++++++++++++++------ mergin/utils.py | 42 +++++++++++++++++ 5 files changed, 184 insertions(+), 94 deletions(-) diff --git a/mergin/cli.py b/mergin/cli.py index 5ce89805..c5a98d88 100755 --- a/mergin/cli.py +++ b/mergin/cli.py @@ -26,7 +26,7 @@ from mergin.client_pull import ( download_project_async, download_project_cancel, - download_project_file, + download_file_async, download_file_finalize, download_project_finalize, download_project_is_running, @@ -263,12 +263,11 @@ def download(ctx, project, directory, version): @cli.command() -@click.argument("project") @click.argument("filepath") @click.argument("output") @click.option("--version", help="Project version tag, for example 'v3'") @click.pass_context -def download_file(ctx, project, filepath, output, version): +def download_file(ctx, filepath, output, version): """ Download project file at specified version. `project` needs to be a combination of namespace/project. If no version is given, the latest will be fetched. @@ -276,15 +275,10 @@ def download_file(ctx, project, filepath, output, version): mc = ctx.obj["client"] if mc is None: return + mp = MerginProject(os.getcwd()) + project_path = mp.metadata["name"] try: - namespace, project = project.split("/") - assert namespace, "No namespace given" - assert project, "No project name given" - except (ValueError, AssertionError) as e: - click.secho(f"Incorrect namespace/project format: {e}", fg="red") - return - try: - job = download_project_file(mc, f"{namespace}/{project}", filepath, output, version) + job = download_file_async(mc, project_path, filepath, output, version) with click.progressbar(length=job.total_size) as bar: last_transferred_size = 0 while download_project_is_running(job): diff --git a/mergin/client.py b/mergin/client.py index 7eac4657..bd2ceb93 100644 --- a/mergin/client.py +++ b/mergin/client.py @@ -16,15 +16,15 @@ from .client_pull import ( download_file_finalize, download_project_async, - download_project_file, - download_project_file_diffs, + download_file_async, + download_diffs_async, download_project_finalize, download_project_wait, - pull_diffs_finalize, + download_diffs_finalize, ) from .client_pull import pull_project_async, pull_project_wait, pull_project_finalize from .client_push import push_project_async, push_project_wait, push_project_finalize -from .utils import DateTimeEncoder +from .utils import DateTimeEncoder, get_versions_with_file_changes from .version import __version__ this_dir = os.path.dirname(os.path.realpath(__file__)) @@ -627,12 +627,12 @@ def get_projects_by_names(self, projects): resp = self.post("/v1/project/by_names", {"projects": projects}, {"Content-Type": "application/json"}) return json.load(resp) - def download_file(self, project_path, file_path, output_filename, version=None): + def download_file(self, project_dir, file_path, output_filename, version=None): """ Download project file at specified version. Get the latest if no version specified. - :param project_path: project full name (/) - :type project_path: String + :param project_dir: project local directory + :type project_dir: String :param file_path: relative path of file to download in the project directory :type file_path: String :param output_filename: full destination path for saving the downloaded file @@ -640,44 +640,22 @@ def download_file(self, project_path, file_path, output_filename, version=None): :param version: optional version tag for downloaded file :type version: String """ + try: + mp = MerginProject(project_dir) + except InvalidProject as e: + err = f"Couldn't create Mergin project for {project_dir}: {repr(e)}" + self.log.error(err) + raise ClientError(err) + + project_path = mp.metadata["name"] ver_info = f"at version {version}" if version is not None else "at latest version" self.log.info(f"Getting {file_path} from {project_path} {ver_info}") - job = download_project_file(self, project_path, file_path, output_filename, version=version) + job = download_file_async(self, project_path, file_path, output_filename, version=version) if job is None: return pull_project_wait(job) return download_file_finalize(job) - def get_versions_with_file_diffs( - self, project_path, file_path, version_from=None, version_to=None, file_history=None): - """ - Get versions history in which the file was changed. - If version_from and version_to are not given get all changed versions. Versions can be integers or tags ('v2'). - - :returns: dictionary {version number: version history item} - :rtype: dict - """ - if file_history is None: - file_history = self.project_file_history_info(project_path, file_path)["history"] - all_version_numbers = sorted([int(k[1:]) for k in file_history.keys()]) - version_from = all_version_numbers[0] if version_from is None else version_from - version_to = all_version_numbers[-1] if version_to is None else version_to - if version_from not in all_version_numbers or version_to not in all_version_numbers: - warn = f"Wrong version parameter: {version_from}-{version_to} while getting diffs for {file_path}" - self.log.warning(warn) - raise ClientError(warn) - - # Find versions to fetch between the 'from' and 'to' versions - idx_from = idx_to = None - for idx, version in enumerate(all_version_numbers): - if version == version_from: - idx_from = idx - elif version == version_to: - idx_to = idx - break - version_numbers_to_fetch = all_version_numbers[idx_from:idx_to + 1] - return {k: v for k, v in file_history.items() if int(k[1:]) in version_numbers_to_fetch} - def get_file_diff(self, project_dir, file_path, output_diff, version_from, version_to): """ Create concatenated diff for project file diffs between versions version_from and version_to. @@ -687,7 +665,7 @@ def get_file_diff(self, project_dir, file_path, output_diff, version_from, versi :type file_path: String :param output_diff: full destination path for concatenated diff file :type output_diff: String - :param version_from: starting project version tag for getting diff + :param version_from: starting project version tag for getting diff, for example 'v3' :type version_from: String :param version_to: ending project version tag for getting diff :type version_to: String @@ -701,16 +679,14 @@ def get_file_diff(self, project_dir, file_path, output_diff, version_from, versi project_path = mp.metadata["name"] self.log.info(f"Getting diffs for file {file_path} of {project_path}") - file_history = self.project_file_history_info(project_path, file_path)["history"] - version_from = int(version_from[1:]) if isinstance(version_from, str) else version_from - version_to = int(version_to[1:]) if isinstance(version_to, str) else version_to - versions_to_fetch = self.get_versions_with_file_diffs( - project_path, file_path, version_from=version_from, version_to=version_to, file_history=file_history + file_history = self.project_file_history_info(project_path, file_path) + versions_to_fetch = get_versions_with_file_changes( + self, project_path, file_path, version_from=version_from, version_to=version_to, file_history=file_history ) - job = download_project_file_diffs(self, project_dir, file_path, versions_to_fetch) + job = download_diffs_async(self, project_dir, file_path, versions_to_fetch, file_history=file_history) if job is None: return pull_project_wait(job) # finalize getting diffs - return pull_diffs_finalize(job, output_diff) + download_diffs_finalize(job, output_diff) diff --git a/mergin/client_pull.py b/mergin/client_pull.py index 8993655c..f9cbcbcc 100644 --- a/mergin/client_pull.py +++ b/mergin/client_pull.py @@ -571,7 +571,7 @@ def pull_project_finalize(job): return conflicts -def download_project_file(mc, project_path, file_path, output_file, version): +def download_file_async(mc, project_path, file_path, output_file, version): """ Starts background download project file at specified versions. Returns handle to the pending download. @@ -618,7 +618,7 @@ def download_project_file(mc, project_path, file_path, output_file, version): def download_file_finalize(job): """ - To be called when download_project_file is finished + To be called when download_file_async is finished """ job.executor.shutdown(wait=True) @@ -633,10 +633,20 @@ def download_file_finalize(job): task.apply(job.directory, job.mp) -def download_project_file_diffs(mc, project_directory, file_path, versions): +def download_diffs_async(mc, project_directory, file_path, versions, file_history=None): """ Starts background download project file diffs for specified versions. Returns handle to the pending download. + + Args: + mc (MerginClient): MerginClient instance. + project_directory (str): local project directory. + file_path (str): file path relative to Mergin project root. + versions ([str]): list of tags of versions to fetch, for example: ["v2", "v5"]. + file_history (dict): optional file history info, result of MerginClient.project_file_history_info() + + Returns: + PullJob/None: a handle for the pending download. """ try: mp = MerginProject(project_directory) @@ -649,36 +659,33 @@ def download_project_file_diffs(mc, project_directory, file_path, versions): try: server_info = mc.project_info(project_path) + if file_history is None: + file_history = mc.project_file_history_info(project_path, file_path) except ClientError as err: mp.log.error("Error getting project info: " + str(err)) mp.log.info("--- pull aborted") raise - version_numbers = sorted([int(v[1:]) for v in versions]) temp_dir = os.path.join(tempfile.gettempdir(), "mergin_temp") os.makedirs(temp_dir, exist_ok=True) fetch_files = [] - for version, version_data in versions.items(): - if int(version[1:]) == version_numbers[0]: - continue - diff_file = copy.deepcopy(version_data) - diff_file['version'] = version - diff_file['diff'] = version_data['diff'] - fetch_files.append(diff_file) + for version in versions[1:]: + version_data = file_history["history"][version] + diff_data = copy.deepcopy(version_data) + diff_data['version'] = version + diff_data['diff'] = version_data['diff'] + fetch_files.append(diff_data) files_to_merge = [] # list of FileToMerge instances + download_list = [] # list of all items to be downloaded + total_size = 0 for file in fetch_files: items = _download_items(file, temp_dir, diff_only=True) dest_file_path = os.path.normpath(os.path.join(temp_dir, os.path.basename(file['diff']['path']))) files_to_merge.append(FileToMerge(dest_file_path, items)) - - # make a single list of items to download - total_size = 0 - download_list = [] - for file_to_merge in files_to_merge: - download_list.extend(file_to_merge.downloaded_items) - for item in file_to_merge.downloaded_items: + download_list.extend(items) + for item in items: total_size += item.size mp.log.info(f"will download {len(download_list)} chunks, total size {total_size}") @@ -696,8 +703,8 @@ def download_project_file_diffs(mc, project_directory, file_path, versions): return job -def pull_diffs_finalize(job, output_diff): - """ To be called after download_project_file_diffs """ +def download_diffs_finalize(job, output_diff): + """ To be called after download_diffs_async """ job.executor.shutdown(wait=True) @@ -721,12 +728,17 @@ def pull_diffs_finalize(job, output_diff): job.mp.log.info("--- diffs pull finished") - # Concatenate diffs + # Concatenate diffs, if needed diffs = [] for file_to_merge in job.files_to_merge: diffs.append(file_to_merge.dest_file) + output_dir = os.path.dirname(output_diff) - os.makedirs(output_dir, exist_ok=True) - job.mp.geodiff.concat_changes(diffs, output_diff) + if len(diffs) >= 1: + os.makedirs(output_dir, exist_ok=True) + if len(diffs) > 1: + job.mp.geodiff.concat_changes(diffs, output_diff) + elif len(diffs) == 1: + shutil.copy(diffs[0], output_diff) for diff in diffs: os.remove(diff) diff --git a/mergin/test/test_client.py b/mergin/test/test_client.py index 5d611815..5d2c4d29 100644 --- a/mergin/test/test_client.py +++ b/mergin/test/test_client.py @@ -1,3 +1,4 @@ +import json import logging import os import tempfile @@ -10,7 +11,7 @@ from ..client import MerginClient, ClientError, MerginProject, LoginError, decode_token_data, TokenError from ..client_push import push_project_async, push_project_cancel -from ..utils import generate_checksum +from ..utils import generate_checksum, get_versions_with_file_changes from ..merginproject import pygeodiff @@ -833,12 +834,37 @@ def create_versioned_project(mc, project_name, project_dir, updated_file, remove return mp +def test_get_versions_with_file_changes(mc): + """Test getting versions where the file was changed.""" + test_project = 'test_file_modified_versions' + project = API_USER + '/' + test_project + project_dir = os.path.join(TMP_DIR, test_project) + f_updated = "base.gpkg" + + mp = create_versioned_project(mc, test_project, project_dir, f_updated, remove=False) + + project_info = mc.project_info(project) + assert project_info["version"] == "v4" + file_history = mc.project_file_history_info(project, f_updated) + + wrong_versions_err = "Wrong version parameter: 1-5 while getting diffs for base.gpkg. Available versions: [1, 2, 3, 4]" + with pytest.raises(ClientError) as err: + mod_versions = get_versions_with_file_changes( + mc, project, f_updated, version_from="v1", version_to="v5", file_history=file_history + ) + assert err.value.args[0] == wrong_versions_err + + mod_versions = get_versions_with_file_changes( + mc, project, f_updated, version_from="v2", version_to="v4", file_history=file_history + ) + assert mod_versions == [f"v{i}" for i in range(2, 5)] + + def test_download_file(mc): """Test downloading single file at specified versions.""" test_project = 'test_download_file' project = API_USER + '/' + test_project project_dir = os.path.join(TMP_DIR, test_project) - download_dir = os.path.join(project_dir, "versions") # project for downloading files at various versions f_updated = "base.gpkg" mp = create_versioned_project(mc, test_project, project_dir, f_updated) @@ -846,17 +872,40 @@ def test_download_file(mc): project_info = mc.project_info(project) assert project_info["version"] == "v5" - # Download the base file at versions 2-4 and check their checksums - f_versioned = os.path.join(download_dir, f_updated) + # Download the base file at versions 2-4 and check the changes + expected = { + 2: {"table": "simple", "change": "insert", "nr_of_changes": 1}, + 3: {"table": "simple", "change": "update", "nr_of_changes": 2}, + 4: {"table": "simple", "change": "update", "nr_of_changes": 3}, + } + f_versioned = os.path.join(project_dir, f_updated) for ver in range(2, 5): - mc.download_file(project, f_updated, f_versioned, version=f"v{ver}") - f_downloaded_checksum = generate_checksum(f_versioned) - proj_info = mc.project_info(project, version=f"v{ver}") - f_remote_checksum = next((f['checksum'] for f in proj_info['files'] if f['path'] == f_updated), None) - assert f_remote_checksum == f_downloaded_checksum - + if not os.path.exists(f_versioned): + shutil.copy(os.path.join(TEST_DATA_DIR, f_updated), mp.fpath_meta(f_updated)) + else: + shutil.copy(f_versioned, mp.fpath_meta(f_updated)) + mc.download_file(project_dir, f_updated, f_versioned, version=f"v{ver}") + diff_file = mp.fpath_meta(f"changes_v{ver}.diff") + mp.geodiff.create_changeset( + mp.fpath_meta(f_updated), f_versioned, mp.fpath_meta(diff_file)) + + assert mp.geodiff.has_changes(diff_file) + assert mp.geodiff.changes_count(diff_file) == expected[ver]["nr_of_changes"] + + changes_file = diff_file + ".changes" + mp.geodiff.list_changes_summary(diff_file, changes_file) + with open(changes_file, 'r') as f: + expected_changed_table = expected[ver]["table"] + expected_change = expected[ver]["change"] + expected_nr_of_changes = expected[ver]["nr_of_changes"] + changes = json.loads(f.read())["geodiff_summary"][0] + assert changes["table"] == expected_changed_table + assert expected_change in changes + assert changes[expected_change] == expected_nr_of_changes + + # make sure there will be exception raised if a file doesn't exist in the version with pytest.raises(ClientError, match=f"No {f_updated} exists at version v5"): - mc.download_file(project, f_updated, f_versioned, version=f"v5") + mc.download_file(project_dir, f_updated, f_versioned, version=f"v5") def test_download_diffs(mc): @@ -866,16 +915,33 @@ def test_download_diffs(mc): project_dir = os.path.join(TMP_DIR, test_project) download_dir = os.path.join(project_dir, "diffs") # project for downloading files at various versions f_updated = "base.gpkg" - diff = os.path.join(download_dir, f_updated + ".diff") + diff_file = os.path.join(download_dir, f_updated + ".diff") mp = create_versioned_project(mc, test_project, project_dir, f_updated, remove=False) project_info = mc.project_info(project) assert project_info["version"] == "v4" + # Download diffs of updated file between versions 1 and 2 + mc.get_file_diff(project_dir, f_updated, diff_file, "v1", "v2") + assert os.path.exists(diff_file) + assert mp.geodiff.has_changes(diff_file) + assert mp.geodiff.changes_count(diff_file) == 1 + changes_file = diff_file + ".changes1-2" + mp.geodiff.list_changes_summary(diff_file, changes_file) + with open(changes_file, 'r') as f: + changes = json.loads(f.read())["geodiff_summary"][0] + assert changes["insert"] == 1 + assert changes["update"] == 0 + # Download diffs of updated file between versions 2 and 4 - mc.get_file_diff(project_dir, f_updated, diff, "v1", "v4") - assert os.path.exists(diff) + mc.get_file_diff(project_dir, f_updated, diff_file, "v2", "v4") + changes_file = diff_file + ".changes2-4" + mp.geodiff.list_changes_summary(diff_file, changes_file) + with open(changes_file, 'r') as f: + changes = json.loads(f.read())["geodiff_summary"][0] + assert changes["insert"] == 0 + assert changes["update"] == 1 def _use_wal(db_file): diff --git a/mergin/utils.py b/mergin/utils.py index 84045052..e01c45a5 100644 --- a/mergin/utils.py +++ b/mergin/utils.py @@ -5,6 +5,7 @@ import re import sqlite3 from datetime import datetime +from .common import ClientError def generate_checksum(file, chunk_size=4096): @@ -99,3 +100,44 @@ def do_sqlite_checkpoint(path, log=None): log.info("checkpoint - new size {} checksum {}".format(new_size, new_checksum)) return new_size, new_checksum + + +def get_versions_with_file_changes( + mc, project_path, file_path, version_from=None, version_to=None, file_history=None): + """ + Get the project version tags where the file was added, modified or deleted. + + Args: + mc (MerginClient): MerginClient instance + project_path (str): project full name (/) + file_path (str): relative path of file to download in the project directory + version_from (str): optional minimum version to fetch, for example "v3" + version_to (str): optional maximum version to fetch + file_history (dict): optional file history info, result of project_file_history_info(). + + Returns: + list of version tags, for example ["v4", "v7", "v8"] + """ + if file_history is None: + file_history = mc.project_file_history_info(project_path, file_path) + all_version_numbers = sorted([int(k[1:]) for k in file_history["history"].keys()]) + if "v" not in version_from or "v" not in version_to: + err = f"Wrong version parameter: {version_from}-{version_to} while getting diffs for {file_path}. " + err += f"Version tags required in the form: 'v2', 'v11', etc." + raise ClientError(err) + version_from = all_version_numbers[0] if version_from is None else int(version_from[1:]) + version_to = all_version_numbers[-1] if version_to is None else int(version_to[1:]) + if version_from not in all_version_numbers or version_to not in all_version_numbers: + err = f"Wrong version parameter: {version_from}-{version_to} while getting diffs for {file_path}. " + err += f"Available versions: {all_version_numbers}" + raise ClientError(err) + + # Find versions to fetch between the 'from' and 'to' versions + idx_from = idx_to = None + for idx, version in enumerate(all_version_numbers): + if version == version_from: + idx_from = idx + elif version == version_to: + idx_to = idx + break + return [f"v{ver_nr}" for ver_nr in all_version_numbers[idx_from:idx_to + 1]] \ No newline at end of file From 0de923146578972b80be43fc059d181f800e9d6d Mon Sep 17 00:00:00 2001 From: Radek Pasiok Date: Fri, 2 Jul 2021 22:14:55 +0200 Subject: [PATCH 5/7] Changes suggested in the review. --- mergin/client_pull.py | 7 +++++-- mergin/utils.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/mergin/client_pull.py b/mergin/client_pull.py index f9cbcbcc..ec42806a 100644 --- a/mergin/client_pull.py +++ b/mergin/client_pull.py @@ -36,7 +36,10 @@ class DownloadJob: - """ Keeps all the important data about a pending download job """ + """ + Keeps all the important data about a pending download job. + Used for downloading whole projects but also single files. + """ def __init__(self, project_path, total_size, version, update_tasks, download_queue_items, directory, mp, mc, project_info): @@ -576,7 +579,7 @@ def download_file_async(mc, project_path, file_path, output_file, version): Starts background download project file at specified versions. Returns handle to the pending download. """ - mc.log.info(f"--- start download") + mc.log.info(f"--- start download {file_path} for {project_path}") project_info = mc.project_info(project_path, version=version) mc.log.info(f"Got project info. version {project_info['version']}") diff --git a/mergin/utils.py b/mergin/utils.py index e01c45a5..b36fbaf5 100644 --- a/mergin/utils.py +++ b/mergin/utils.py @@ -140,4 +140,4 @@ def get_versions_with_file_changes( elif version == version_to: idx_to = idx break - return [f"v{ver_nr}" for ver_nr in all_version_numbers[idx_from:idx_to + 1]] \ No newline at end of file + return [f"v{ver_nr}" for ver_nr in all_version_numbers[idx_from:idx_to + 1]] From f51cb7c24669727363f365f8836c6a6fefac7a3f Mon Sep 17 00:00:00 2001 From: Radek Pasiok Date: Tue, 6 Jul 2021 09:42:14 +0200 Subject: [PATCH 6/7] Changes suggested in the review. --- mergin/client.py | 35 ++---------- mergin/client_pull.py | 106 ++++++++++++++++++++----------------- mergin/test/test_client.py | 65 +++++++++++------------ mergin/utils.py | 14 +++-- 4 files changed, 99 insertions(+), 121 deletions(-) diff --git a/mergin/client.py b/mergin/client.py index bd2ceb93..7fc2bee7 100644 --- a/mergin/client.py +++ b/mergin/client.py @@ -640,21 +640,9 @@ def download_file(self, project_dir, file_path, output_filename, version=None): :param version: optional version tag for downloaded file :type version: String """ - try: - mp = MerginProject(project_dir) - except InvalidProject as e: - err = f"Couldn't create Mergin project for {project_dir}: {repr(e)}" - self.log.error(err) - raise ClientError(err) - - project_path = mp.metadata["name"] - ver_info = f"at version {version}" if version is not None else "at latest version" - self.log.info(f"Getting {file_path} from {project_path} {ver_info}") - job = download_file_async(self, project_path, file_path, output_filename, version=version) - if job is None: - return + job = download_file_async(self, project_dir, file_path, output_filename, version=version) pull_project_wait(job) - return download_file_finalize(job) + download_file_finalize(job) def get_file_diff(self, project_dir, file_path, output_diff, version_from, version_to): """ Create concatenated diff for project file diffs between versions version_from and version_to. @@ -670,23 +658,6 @@ def get_file_diff(self, project_dir, file_path, output_diff, version_from, versi :param version_to: ending project version tag for getting diff :type version_to: String """ - try: - mp = MerginProject(project_dir) - except InvalidProject as e: - err = f"Couldn't create Mergin project for {project_dir}: {repr(e)}" - self.log.error(err) - raise ClientError(err) - - project_path = mp.metadata["name"] - self.log.info(f"Getting diffs for file {file_path} of {project_path}") - file_history = self.project_file_history_info(project_path, file_path) - versions_to_fetch = get_versions_with_file_changes( - self, project_path, file_path, version_from=version_from, version_to=version_to, file_history=file_history - ) - job = download_diffs_async(self, project_dir, file_path, versions_to_fetch, file_history=file_history) - if job is None: - return + job = download_diffs_async(self, project_dir, file_path, version_from, version_to) pull_project_wait(job) - - # finalize getting diffs download_diffs_finalize(job, output_diff) diff --git a/mergin/client_pull.py b/mergin/client_pull.py index ec42806a..9e85b251 100644 --- a/mergin/client_pull.py +++ b/mergin/client_pull.py @@ -19,8 +19,8 @@ import concurrent.futures from .common import CHUNK_SIZE, ClientError -from .merginproject import MerginProject, InvalidProject -from .utils import save_to_file +from .merginproject import MerginProject +from .utils import save_to_file, get_versions_with_file_changes # status = download_project_async(...) @@ -41,8 +41,7 @@ class DownloadJob: Used for downloading whole projects but also single files. """ - def __init__(self, project_path, total_size, version, update_tasks, download_queue_items, - directory, mp, mc, project_info): + def __init__(self, project_path, total_size, version, update_tasks, download_queue_items, directory, mp, project_info): self.project_path = project_path self.total_size = total_size # size of data to download (in bytes) self.transferred_size = 0 @@ -50,12 +49,9 @@ def __init__(self, project_path, total_size, version, update_tasks, download_que self.update_tasks = update_tasks self.download_queue_items = download_queue_items self.directory = directory # project's directory - self.mc = mc # MerginClient instance self.mp = mp # MerginProject instance self.is_cancelled = False self.project_info = project_info # parsed JSON with project info returned from the server - self.executor = None - self.futures = None def dump(self): print("--- JOB ---", self.total_size, "bytes") @@ -151,7 +147,7 @@ def download_project_async(mc, project_path, directory, project_version=None): mp.log.info(f"will download {len(update_tasks)} files in {len(download_list)} chunks, total size {total_size}") - job = DownloadJob(project_path, total_size, version, update_tasks, download_list, directory, mp, mc, project_info) + job = DownloadJob(project_path, total_size, version, update_tasks, download_list, directory, mp, project_info) # start download job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) @@ -178,8 +174,7 @@ def download_project_is_running(job): """ for future in job.futures: if future.done() and future.exception() is not None: - if job.mp is not None: - _cleanup_failed_download(job.directory, job.mp) + _cleanup_failed_download(job.directory, job.mp) raise future.exception() if future.running(): return True @@ -239,10 +234,11 @@ class UpdateTask: """ # TODO: methods other than COPY - def __init__(self, file_path, download_queue_items, destination_file=None): + def __init__(self, file_path, download_queue_items, destination_file=None, remove_download_dir=False): self.file_path = file_path self.destination_file = destination_file self.download_queue_items = download_queue_items + self.remove_download_dir = remove_download_dir def apply(self, directory, mp): """ assemble downloaded chunks into a single file """ @@ -260,14 +256,18 @@ def apply(self, directory, mp): file_to_merge = FileToMerge(dest_file_path, self.download_queue_items) file_to_merge.merge() - if mp is None or self.destination_file is not None: - # Some tasks (downloading single file) do not require the project to be downloaded. - # In that case mp = None and there is no need to copy the file. - # Also skip copying if user specified the destination path for the downloaded file - return - if mp.is_versioned_file(self.file_path): + # Make a copy of the file to meta dir only if there is no user-specified path for the file. + # destination_file is None for full project download and takes a meaningful value for a single file download. + if mp.is_versioned_file(self.file_path) and self.destination_file is None: mp.geodiff.make_copy_sqlite(mp.fpath(self.file_path), mp.fpath_meta(self.file_path)) + # For single file download, chunks are saved in a temporary dir that needs to be removed manually + if self.remove_download_dir: + # extract download dir from the first download item + download_item = self.download_queue_items[0] + download_dir = os.path.dirname(download_item.download_file_path) + shutil.rmtree(download_dir) + class DownloadQueueItem: """ a piece of data from a project that should be downloaded - it can be either a chunk or it can be a diff """ @@ -287,8 +287,7 @@ def __repr__(self): def download_blocking(self, mc, mp, project_path): """ Starts download and only returns once the file has been fully downloaded and saved """ - log = mc.log if mp is None else mp.log - log.debug(f"Downloading {self.file_path} version={self.version} diff={self.diff_only} part={self.part_index}") + mp.log.debug(f"Downloading {self.file_path} version={self.version} diff={self.diff_only} part={self.part_index}") start = self.part_index * (1 + CHUNK_SIZE) resp = mc.get("/v1/project/raw/{}".format(project_path), data={ "file": self.file_path, @@ -299,10 +298,10 @@ def download_blocking(self, mc, mp, project_path): } ) if resp.status in [200, 206]: - log.debug(f"Download finished: {self.file_path}") + mp.log.debug(f"Download finished: {self.file_path}") save_to_file(resp, self.download_file_path) else: - log.error(f"Download failed: {self.file_path}") + mp.log.error(f"Download failed: {self.file_path}") raise ClientError('Failed to download part {} of file {}'.format(self.part_index, self.file_path)) @@ -574,19 +573,20 @@ def pull_project_finalize(job): return conflicts -def download_file_async(mc, project_path, file_path, output_file, version): +def download_file_async(mc, project_dir, file_path, output_file, version): """ - Starts background download project file at specified versions. + Starts background download project file at specified version. Returns handle to the pending download. """ - mc.log.info(f"--- start download {file_path} for {project_path}") + mp = MerginProject(project_dir) + project_path = mp.metadata["name"] + ver_info = f"at version {version}" if version is not None else "at latest version" + mp.log.info(f"Getting {file_path} {ver_info}") project_info = mc.project_info(project_path, version=version) + mp.log.info(f"Got project info. version {project_info['version']}") - mc.log.info(f"Got project info. version {project_info['version']}") - - # set temporary directory and make sure the destination directory exists - temp_dir = os.path.join(tempfile.gettempdir(), "mergin_temp") - os.makedirs(os.path.dirname(output_file), exist_ok=True) + # set temporary directory for download + temp_dir = tempfile.mkdtemp(prefix="mergin-py-client-") download_list = [] update_tasks = [] @@ -595,7 +595,7 @@ def download_file_async(mc, project_path, file_path, output_file, version): if file["path"] == file_path: file['version'] = version items = _download_items(file, temp_dir) - task = UpdateTask(file['path'], items, output_file) + task = UpdateTask(file['path'], items, output_file, remove_download_dir=True) download_list.extend(task.download_queue_items) for item in task.download_queue_items: total_size += item.size @@ -603,17 +603,18 @@ def download_file_async(mc, project_path, file_path, output_file, version): break if not download_list: warn = f"No {file_path} exists at version {version}" - mc.log.warning(warn) + mp.log.warning(warn) + shutil.rmtree(temp_dir) raise ClientError(warn) - mc.log.info(f"will download file {file_path} in {len(download_list)} chunks, total size {total_size}") + mp.log.info(f"will download file {file_path} in {len(download_list)} chunks, total size {total_size}") job = DownloadJob( - project_path, total_size, version, update_tasks, download_list, temp_dir, None, mc, project_info + project_path, total_size, version, update_tasks, download_list, temp_dir, mp, project_info ) job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) job.futures = [] for item in download_list: - future = job.executor.submit(_do_download, item, mc, None, project_path, job) + future = job.executor.submit(_do_download, item, mc, mp, project_path, job) job.futures.append(future) return job @@ -630,13 +631,13 @@ def download_file_finalize(job): if future.exception() is not None: raise future.exception() - job.mc.log.info("--- download finished") + job.mp.log.info("--- download finished") for task in job.update_tasks: task.apply(job.directory, job.mp) -def download_diffs_async(mc, project_directory, file_path, versions, file_history=None): +def download_diffs_async(mc, project_directory, file_path, version_from, version_to): """ Starts background download project file diffs for specified versions. Returns handle to the pending download. @@ -645,20 +646,20 @@ def download_diffs_async(mc, project_directory, file_path, versions, file_histor mc (MerginClient): MerginClient instance. project_directory (str): local project directory. file_path (str): file path relative to Mergin project root. - versions ([str]): list of tags of versions to fetch, for example: ["v2", "v5"]. - file_history (dict): optional file history info, result of MerginClient.project_file_history_info() + version_from (str): starting project version tag for getting diff, for example 'v3'. + version_to (str): ending project version tag for getting diff. Returns: PullJob/None: a handle for the pending download. """ - try: - mp = MerginProject(project_directory) - except InvalidProject as e: - mc.log.error(f"Couldn't create Mergin project for directory: {project_directory}: {repr(e)}") - return None + mp = MerginProject(project_directory) project_path = mp.metadata["name"] + file_history = mc.project_file_history_info(project_path, file_path) + versions_to_fetch = get_versions_with_file_changes( + mc, project_path, file_path, version_from=version_from, version_to=version_to, file_history=file_history + ) mp.log.info(f"--- version: {mc.user_agent_info()}") - mp.log.info(f"--- start download diffs for {file_path}, versions: {[v for v in versions]}") + mp.log.info(f"--- start download diffs for {file_path} of {project_path}, versions: {[v for v in versions_to_fetch]}") try: server_info = mc.project_info(project_path) @@ -666,14 +667,13 @@ def download_diffs_async(mc, project_directory, file_path, versions, file_histor file_history = mc.project_file_history_info(project_path, file_path) except ClientError as err: mp.log.error("Error getting project info: " + str(err)) - mp.log.info("--- pull aborted") + mp.log.info("--- downloading diffs aborted") raise - temp_dir = os.path.join(tempfile.gettempdir(), "mergin_temp") - os.makedirs(temp_dir, exist_ok=True) + temp_dir = tempfile.mkdtemp(prefix="mergin-py-client-") fetch_files = [] - for version in versions[1:]: + for version in versions_to_fetch[1:]: version_data = file_history["history"][version] diff_data = copy.deepcopy(version_data) diff_data['version'] = version @@ -715,7 +715,7 @@ def download_diffs_finalize(job, output_diff): for future in job.futures: if future.exception() is not None: job.mp.log.error("Error while pulling data: " + str(future.exception())) - job.mp.log.info("--- pull aborted") + job.mp.log.info("--- diffs download aborted") raise future.exception() job.mp.log.info("finalizing diffs pull") @@ -731,17 +731,23 @@ def download_diffs_finalize(job, output_diff): job.mp.log.info("--- diffs pull finished") - # Concatenate diffs, if needed + # Collect and finally concatenate diffs, if needed diffs = [] for file_to_merge in job.files_to_merge: diffs.append(file_to_merge.dest_file) output_dir = os.path.dirname(output_diff) + temp_dir = None if len(diffs) >= 1: os.makedirs(output_dir, exist_ok=True) + temp_dir = os.path.dirname(diffs[0]) if len(diffs) > 1: job.mp.geodiff.concat_changes(diffs, output_diff) elif len(diffs) == 1: shutil.copy(diffs[0], output_diff) for diff in diffs: os.remove(diff) + + # remove the diffs download temporary directory + if temp_dir is not None: + shutil.rmtree(temp_dir) diff --git a/mergin/test/test_client.py b/mergin/test/test_client.py index 5d2c4d29..908d73f2 100644 --- a/mergin/test/test_client.py +++ b/mergin/test/test_client.py @@ -821,12 +821,12 @@ def create_versioned_project(mc, project_name, project_dir, updated_file, remove mp = MerginProject(project_dir) - # create versions 2-5 + # create versions 2-4 changes = ("inserted_1_A.gpkg", "inserted_1_A_mod.gpkg", "inserted_1_B.gpkg",) for change in changes: shutil.copy(mp.fpath(change), mp.fpath(updated_file)) mc.push_project(project_dir) - + # create version 5 with modified file removed if remove: os.remove(os.path.join(project_dir, updated_file)) mc.push_project(project_dir) @@ -847,12 +847,12 @@ def test_get_versions_with_file_changes(mc): assert project_info["version"] == "v4" file_history = mc.project_file_history_info(project, f_updated) - wrong_versions_err = "Wrong version parameter: 1-5 while getting diffs for base.gpkg. Available versions: [1, 2, 3, 4]" - with pytest.raises(ClientError) as err: + with pytest.raises(ClientError) as e: mod_versions = get_versions_with_file_changes( mc, project, f_updated, version_from="v1", version_to="v5", file_history=file_history ) - assert err.value.args[0] == wrong_versions_err + assert "Wrong version parameters: 1-5" in str(e.value) + assert "Available versions: [1, 2, 3, 4]" in str(e.value) mod_versions = get_versions_with_file_changes( mc, project, f_updated, version_from="v2", version_to="v4", file_history=file_history @@ -860,6 +860,14 @@ def test_get_versions_with_file_changes(mc): assert mod_versions == [f"v{i}" for i in range(2, 5)] +def check_gpkg_same_content(mergin_project, gpkg_path_1, gpkg_path_2): + """Check if the two GeoPackages have equal content.""" + with tempfile.TemporaryDirectory() as temp_dir: + diff_path = os.path.join(temp_dir, "diff_file") + mergin_project.geodiff.create_changeset(gpkg_path_1, gpkg_path_2, diff_path) + return not mergin_project.geodiff.has_changes(diff_path) + + def test_download_file(mc): """Test downloading single file at specified versions.""" test_project = 'test_download_file' @@ -872,40 +880,19 @@ def test_download_file(mc): project_info = mc.project_info(project) assert project_info["version"] == "v5" + # Versioned file should have the following content at versions 2-4 + expected_content = ("inserted_1_A.gpkg", "inserted_1_A_mod.gpkg", "inserted_1_B.gpkg") + # Download the base file at versions 2-4 and check the changes - expected = { - 2: {"table": "simple", "change": "insert", "nr_of_changes": 1}, - 3: {"table": "simple", "change": "update", "nr_of_changes": 2}, - 4: {"table": "simple", "change": "update", "nr_of_changes": 3}, - } - f_versioned = os.path.join(project_dir, f_updated) + f_downloaded = os.path.join(project_dir, f_updated) for ver in range(2, 5): - if not os.path.exists(f_versioned): - shutil.copy(os.path.join(TEST_DATA_DIR, f_updated), mp.fpath_meta(f_updated)) - else: - shutil.copy(f_versioned, mp.fpath_meta(f_updated)) - mc.download_file(project_dir, f_updated, f_versioned, version=f"v{ver}") - diff_file = mp.fpath_meta(f"changes_v{ver}.diff") - mp.geodiff.create_changeset( - mp.fpath_meta(f_updated), f_versioned, mp.fpath_meta(diff_file)) - - assert mp.geodiff.has_changes(diff_file) - assert mp.geodiff.changes_count(diff_file) == expected[ver]["nr_of_changes"] - - changes_file = diff_file + ".changes" - mp.geodiff.list_changes_summary(diff_file, changes_file) - with open(changes_file, 'r') as f: - expected_changed_table = expected[ver]["table"] - expected_change = expected[ver]["change"] - expected_nr_of_changes = expected[ver]["nr_of_changes"] - changes = json.loads(f.read())["geodiff_summary"][0] - assert changes["table"] == expected_changed_table - assert expected_change in changes - assert changes[expected_change] == expected_nr_of_changes + mc.download_file(project_dir, f_updated, f_downloaded, version=f"v{ver}") + expected = os.path.join(TEST_DATA_DIR, expected_content[ver - 2]) # GeoPackage with expected content + assert check_gpkg_same_content(mp, f_downloaded, expected) # make sure there will be exception raised if a file doesn't exist in the version with pytest.raises(ClientError, match=f"No {f_updated} exists at version v5"): - mc.download_file(project_dir, f_updated, f_versioned, version=f"v5") + mc.download_file(project_dir, f_updated, f_downloaded, version=f"v5") def test_download_diffs(mc): @@ -943,6 +930,16 @@ def test_download_diffs(mc): assert changes["insert"] == 0 assert changes["update"] == 1 + with pytest.raises(ClientError) as e: + mc.get_file_diff(project_dir, f_updated, diff_file, "v4", "v1") + assert "Wrong version parameters" in str(e.value) + assert "version_from needs to be smaller than version_to" in str(e.value) + + with pytest.raises(ClientError) as e: + mc.get_file_diff(project_dir, f_updated, diff_file, "v4", "v5") + assert "Wrong version parameters" in str(e.value) + assert "Available versions: [1, 2, 3, 4]" in str(e.value) + def _use_wal(db_file): """ Ensures that sqlite database is using WAL journal mode """ diff --git a/mergin/utils.py b/mergin/utils.py index b36fbaf5..b6010258 100644 --- a/mergin/utils.py +++ b/mergin/utils.py @@ -121,14 +121,18 @@ def get_versions_with_file_changes( if file_history is None: file_history = mc.project_file_history_info(project_path, file_path) all_version_numbers = sorted([int(k[1:]) for k in file_history["history"].keys()]) - if "v" not in version_from or "v" not in version_to: - err = f"Wrong version parameter: {version_from}-{version_to} while getting diffs for {file_path}. " + version_from = all_version_numbers[0] if version_from is None else int_version(version_from) + version_to = all_version_numbers[-1] if version_to is None else int_version(version_to) + if version_from is None or version_to is None: + err = f"Wrong version parameters: {version_from}-{version_to} while getting diffs for {file_path}. " err += f"Version tags required in the form: 'v2', 'v11', etc." raise ClientError(err) - version_from = all_version_numbers[0] if version_from is None else int(version_from[1:]) - version_to = all_version_numbers[-1] if version_to is None else int(version_to[1:]) + if version_from >= version_to: + err = f"Wrong version parameters: {version_from}-{version_to} while getting diffs for {file_path}. " + err += f"version_from needs to be smaller than version_to." + raise ClientError(err) if version_from not in all_version_numbers or version_to not in all_version_numbers: - err = f"Wrong version parameter: {version_from}-{version_to} while getting diffs for {file_path}. " + err = f"Wrong version parameters: {version_from}-{version_to} while getting diffs for {file_path}. " err += f"Available versions: {all_version_numbers}" raise ClientError(err) From 1416e5ec450338814ca2a7aa31cf7a6d96b59571 Mon Sep 17 00:00:00 2001 From: Radek Pasiok Date: Tue, 6 Jul 2021 10:51:35 +0200 Subject: [PATCH 7/7] Changes suggested in the review. --- mergin/client_pull.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/mergin/client_pull.py b/mergin/client_pull.py index 9e85b251..332269a7 100644 --- a/mergin/client_pull.py +++ b/mergin/client_pull.py @@ -234,12 +234,11 @@ class UpdateTask: """ # TODO: methods other than COPY - def __init__(self, file_path, download_queue_items, destination_file=None, remove_download_dir=False): + def __init__(self, file_path, download_queue_items, destination_file=None): self.file_path = file_path self.destination_file = destination_file self.download_queue_items = download_queue_items - self.remove_download_dir = remove_download_dir - + def apply(self, directory, mp): """ assemble downloaded chunks into a single file """ @@ -261,13 +260,6 @@ def apply(self, directory, mp): if mp.is_versioned_file(self.file_path) and self.destination_file is None: mp.geodiff.make_copy_sqlite(mp.fpath(self.file_path), mp.fpath_meta(self.file_path)) - # For single file download, chunks are saved in a temporary dir that needs to be removed manually - if self.remove_download_dir: - # extract download dir from the first download item - download_item = self.download_queue_items[0] - download_dir = os.path.dirname(download_item.download_file_path) - shutil.rmtree(download_dir) - class DownloadQueueItem: """ a piece of data from a project that should be downloaded - it can be either a chunk or it can be a diff """ @@ -595,7 +587,7 @@ def download_file_async(mc, project_dir, file_path, output_file, version): if file["path"] == file_path: file['version'] = version items = _download_items(file, temp_dir) - task = UpdateTask(file['path'], items, output_file, remove_download_dir=True) + task = UpdateTask(file['path'], items, output_file) download_list.extend(task.download_queue_items) for item in task.download_queue_items: total_size += item.size @@ -633,8 +625,15 @@ def download_file_finalize(job): job.mp.log.info("--- download finished") + temp_dir = None for task in job.update_tasks: task.apply(job.directory, job.mp) + if task.download_queue_items: + temp_dir = os.path.dirname(task.download_queue_items[0].download_file_path) + + # Remove temporary download directory + if temp_dir is not None: + shutil.rmtree(temp_dir) def download_diffs_async(mc, project_directory, file_path, version_from, version_to):