diff --git a/cli.py b/cli.py index 6a5c7680..1905f4d1 100755 --- a/cli.py +++ b/cli.py @@ -16,6 +16,9 @@ MerginProject, InvalidProject ) +from mergin.client_pull import download_project_async, download_project_is_running, download_project_finalize, download_project_cancel +from mergin.client_pull import pull_project_async, pull_project_is_running, pull_project_finalize, pull_project_cancel +from mergin.client_push import push_project_async, push_project_is_running, push_project_finalize, push_project_cancel def get_changes_count(diff): @@ -99,18 +102,48 @@ def init(project, directory, public): click.secho(str(e), fg='red') +@cli.command() +@click.option('--flag', help="What kind of projects (e.g. 'created' for just my projects," + "'shared' for projects shared with me. No flag means returns all public projects.") +def list_projects(flag): + """List projects on the server""" + filter_str = "(filter flag={})".format(flag) if flag is not None else "(all public)" + click.echo('List of projects {}:'.format(filter_str)) + c = _init_client() + projects_list = c.projects_list(flag=flag) + for project in projects_list: + full_name = "{} / {}".format(project["namespace"], project["name"]) + click.echo(" {:40}\t{:6.1f} MB\t{}".format(full_name, project["disk_usage"]/(1024*1024), project['version'])) + + @cli.command() @click.argument('project') @click.argument('directory', type=click.Path(), required=False) -@click.option('--parallel/--no-parallel', default=True, help='Download by sending parallel requests') -def download(project, directory, parallel): +def download(project, directory): """Download last version of mergin project""" + c = _init_client() directory = directory or os.path.basename(project) + click.echo('Downloading into {}'.format(directory)) try: - c.download_project(project, directory, parallel) + job = download_project_async(c, project, directory) + + import time + with click.progressbar(length=job.total_size) as bar: + last_transferred_size = 0 + while download_project_is_running(job): + time.sleep(1/10) # 100ms + new_transferred_size = job.transferred_size + bar.update(new_transferred_size - last_transferred_size) # the update() needs increment only + last_transferred_size = new_transferred_size + + download_project_finalize(job) + click.echo('Done') + except KeyboardInterrupt: + print("Cancelling...") + download_project_cancel(job) except Exception as e: click.secho(str(e), fg='red') @@ -140,31 +173,70 @@ def status(): @cli.command() -@click.option('--parallel/--no-parallel', default=True, help='Upload by sending parallel requests') -def push(parallel): +def push(): """Upload local changes into Mergin repository""" c = _init_client() + directory = os.getcwd() + try: - c.push_project(os.getcwd(), parallel) + job = push_project_async(c, directory) + + if job is not None: # if job is none, we don't upload any files, and the transaction is finished already + import time + with click.progressbar(length=job.total_size) as bar: + last_transferred_size = 0 + while push_project_is_running(job): + time.sleep(1/10) # 100ms + new_transferred_size = job.transferred_size + bar.update(new_transferred_size - last_transferred_size) # the update() needs increment only + last_transferred_size = new_transferred_size + + push_project_finalize(job) + click.echo('Done') except InvalidProject: click.echo('Invalid project directory') + except KeyboardInterrupt: + print("Cancelling...") + push_project_cancel(job) except Exception as e: click.secho(str(e), fg='red') @cli.command() -@click.option('--parallel/--no-parallel', default=True, help='Download by sending parallel requests') -def pull(parallel): +def pull(): """Fetch changes from Mergin repository""" c = _init_client() + directory = os.getcwd() + try: - c.pull_project(os.getcwd(), parallel) + job = pull_project_async(c, directory) + + if job is None: + click.echo('Project is up to date') + return + + import time + with click.progressbar(length=job.total_size) as bar: + last_transferred_size = 0 + while pull_project_is_running(job): + time.sleep(1/10) # 100ms + new_transferred_size = job.transferred_size + bar.update(new_transferred_size - last_transferred_size) # the update() needs increment only + last_transferred_size = new_transferred_size + + pull_project_finalize(job) + click.echo('Done') except InvalidProject: - click.secho('Invalid project directory', fg='red') + click.echo('Invalid project directory') + except KeyboardInterrupt: + print("Cancelling...") + pull_project_cancel(job) + except Exception as e: + click.secho(str(e), fg='red') @cli.command() diff --git a/mergin/__init__.py b/mergin/__init__.py index 9bec89b6..bbbb6a53 100644 --- a/mergin/__init__.py +++ b/mergin/__init__.py @@ -1,6 +1,5 @@ -from .client import ( - MerginClient, - MerginProject, - ClientError, - InvalidProject -) +from . import common + +from .client import MerginClient +from .common import ClientError +from .merginproject import MerginProject, InvalidProject diff --git a/mergin/client.py b/mergin/client.py index 5ec9e8b0..95be389c 100644 --- a/mergin/client.py +++ b/mergin/client.py @@ -1,577 +1,32 @@ import os -import re import json import zlib import base64 -import shutil import urllib.parse import urllib.request -import uuid -import math -import hashlib -import copy +import urllib.error import platform from datetime import datetime, timezone -import concurrent.futures +import dateutil.parser import ssl from pip._vendor import distro -from .utils import save_to_file, generate_checksum, move_file, DateTimeEncoder, int_version, find, do_sqlite_checkpoint - -this_dir = os.path.dirname(os.path.realpath(__file__)) - -try: - import dateutil.parser - from dateutil.tz import tzlocal -except ImportError: - # this is to import all dependencies shipped with package (e.g. to use in qgis-plugin) - deps_dir = os.path.join(this_dir, 'deps') - if os.path.exists(deps_dir): - import sys - for f in os.listdir(os.path.join(deps_dir)): - sys.path.append(os.path.join(deps_dir, f)) - - import dateutil.parser - from dateutil.tz import tzlocal - -try: - from .deps import pygeodiff -except ImportError: - os.environ['GEODIFF_ENABLED'] = 'False' - -CHUNK_SIZE = 100 * 1024 * 1024 -# there is an upper limit for chunk size on server, ideally should be requested from there once implemented -UPLOAD_CHUNK_SIZE = 10 * 1024 * 1024 +from .common import ClientError, SyncError +from .merginproject import MerginProject +from .client_pull import download_project_async, download_project_wait, download_project_finalize +from .client_pull import pull_project_async, pull_project_wait, pull_project_finalize +from .client_push import push_project_async, push_project_wait, push_project_finalize +from .utils import DateTimeEncoder -class InvalidProject(Exception): - pass - - -class ClientError(Exception): - pass +this_dir = os.path.dirname(os.path.realpath(__file__)) class LoginError(Exception): pass -class SyncError(Exception): - def __init__(self, msg, detail=""): - super().__init__(msg) - self.detail = detail - - -class MerginProject: - """ Base class for Mergin local projects. - - Linked to existing local directory, with project metadata (mergin.json) and backups located in .mergin directory. - """ - def __init__(self, directory): - self.dir = os.path.abspath(directory) - if not os.path.exists(self.dir): - raise InvalidProject('Project directory does not exist') - - # make sure we can load correct pygeodiff - if os.environ.get('GEODIFF_ENABLED', 'True').lower() == 'true': - try: - self.geodiff = pygeodiff.GeoDiff() - except pygeodiff.geodifflib.GeoDiffLibVersionError: - self.geodiff = None - else: - self.geodiff = None - - self.meta_dir = os.path.join(self.dir, '.mergin') - if not os.path.exists(self.meta_dir): - os.mkdir(self.meta_dir) - - def fpath(self, file, other_dir=None): - """ - Helper function to get absolute path of project file. Defaults to project dir but - alternative dir get be provided (mostly meta or temp). Also making sure that parent dirs to file exist. - - :param file: relative file path in project (posix) - :type file: str - :param other_dir: alternative base directory for file, defaults to None - :type other_dir: str - :returns: file's absolute path - :rtype: str - """ - root = other_dir or self.dir - abs_path = os.path.abspath(os.path.join(root, file)) - f_dir = os.path.dirname(abs_path) - os.makedirs(f_dir, exist_ok=True) - return abs_path - - def fpath_meta(self, file): - """ Helper function to get absolute path of file in meta dir. """ - return self.fpath(file, self.meta_dir) - - @property - def metadata(self): - if not os.path.exists(self.fpath_meta('mergin.json')): - raise InvalidProject('Project metadata has not been created yet') - with open(self.fpath_meta('mergin.json'), 'r') as file: - return json.load(file) - - @metadata.setter - def metadata(self, data): - with open(self.fpath_meta('mergin.json'), 'w') as file: - json.dump(data, file, indent=2) - - def is_versioned_file(self, file): - """ Check if file is compatible with geodiff lib and hence suitable for versioning. - - :param file: file path - :type file: str - :returns: if file is compatible with geodiff lib - :rtype: bool - """ - if not self.geodiff: - return False - diff_extensions = ['.gpkg', '.sqlite'] - f_extension = os.path.splitext(file)[1] - return f_extension in diff_extensions - - def is_gpkg_open(self, path): - """ - Check whether geopackage file is open (and wal file exists) - - :param path: absolute path of file on disk - :type path: str - :returns: whether file is open - :rtype: bool - """ - f_extension = os.path.splitext(path)[1] - if f_extension != '.gpkg': - return False - if os.path.exists(f'{path}-wal'): - return True - return False - - def ignore_file(self, file): - """ - Helper function for blacklisting certain types of files. - - :param file: file path in project - :type file: str - :returns: whether file should be ignored - :rtype: bool - """ - ignore_ext = re.compile(r'({})$'.format('|'.join(re.escape(x) for x in ['-shm', '-wal', '~', 'pyc', 'swap']))) - ignore_files = ['.DS_Store', '.directory'] - name, ext = os.path.splitext(file) - if ext and ignore_ext.search(ext): - return True - if file in ignore_files: - return True - return False - - def inspect_files(self): - """ - Inspect files in project directory and return metadata. - - :returns: metadata for files in project directory in server required format - :rtype: list[dict] - """ - files_meta = [] - for root, dirs, files in os.walk(self.dir, topdown=True): - dirs[:] = [d for d in dirs if d not in ['.mergin']] - for file in files: - if self.ignore_file(file): - continue - - abs_path = os.path.abspath(os.path.join(root, file)) - rel_path = os.path.relpath(abs_path, start=self.dir) - proj_path = '/'.join(rel_path.split(os.path.sep)) # we need posix path - files_meta.append({ - "path": proj_path, - "checksum": generate_checksum(abs_path), - "size": os.path.getsize(abs_path), - "mtime": datetime.fromtimestamp(os.path.getmtime(abs_path), tzlocal()) - }) - return files_meta - - def compare_file_sets(self, origin, current): - """ - Helper function to calculate difference between two sets of files metadata using file names and checksums. - - :Example: - - >>> origin = [{'checksum': '08b0e8caddafe74bf5c11a45f65cedf974210fed', 'path': 'base.gpkg', 'size': 2793, 'mtime': '2019-08-26T11:08:34.051221+02:00'}] - >>> current = [{'checksum': 'c9a4fd2afd513a97aba19d450396a4c9df8b2ba4', 'path': 'test.qgs', 'size': 31980, 'mtime': '2019-08-26T11:09:30.051221+02:00'}] - >>> self.compare_file_sets(origin, current) - {"added": [{'checksum': 'c9a4fd2afd513a97aba19d450396a4c9df8b2ba4', 'path': 'test.qgs', 'size': 31980, 'mtime': '2019-08-26T11:09:30.051221+02:00'}], "removed": [[{'checksum': '08b0e8caddafe74bf5c11a45f65cedf974210fed', 'path': 'base.gpkg', 'size': 2793, 'mtime': '2019-08-26T11:08:34.051221+02:00'}]], "renamed": [], "updated": []} - - :param origin: origin set of files metadata - :type origin: list[dict] - :param current: current set of files metadata to be compared against origin - :type current: list[dict] - :returns: changes between two sets with change type - :rtype: dict[str, list[dict]]' - """ - origin_map = {f["path"]: f for f in origin} - current_map = {f["path"]: f for f in current} - removed = [f for f in origin if f["path"] not in current_map] - - added = [] - for f in current: - if f["path"] in origin_map: - continue - added.append(f) - - moved = [] - for rf in removed: - match = find( - current, - lambda f: f["checksum"] == rf["checksum"] and f["size"] == rf["size"] and all( - f["path"] != mf["path"] for mf in moved) - ) - if match: - moved.append({**rf, "new_path": match["path"]}) - - added = [f for f in added if all(f["path"] != mf["new_path"] for mf in moved)] - removed = [f for f in removed if all(f["path"] != mf["path"] for mf in moved)] - - updated = [] - for f in current: - path = f["path"] - if path not in origin_map: - continue - # with open WAL files we don't know yet, better to mark file as updated - if not self.is_gpkg_open(self.fpath(path)) and f["checksum"] == origin_map[path]["checksum"]: - continue - f["origin_checksum"] = origin_map[path]["checksum"] - updated.append(f) - - return { - "renamed": moved, - "added": added, - "removed": removed, - "updated": updated - } - - def get_pull_changes(self, server_files): - """ - Calculate changes needed to be pulled from server. - - Calculate diffs between local files metadata and server's ones. Because simple metadata like file size or - checksum are not enough to determine geodiff files changes, evaluate also their history (provided by server). - For small files ask for full versions of geodiff files, otherwise determine list of diffs needed to update file. - - .. seealso:: self.compare_file_sets - - :param server_files: list of server files' metadata (see also self.inspect_files()) - :type server_files: list[dict] - :returns: changes metadata for files to be pulled from server - :rtype: dict - """ - changes = self.compare_file_sets(self.metadata['files'], server_files) - if not self.geodiff: - return changes - - not_updated = [] - for file in changes['updated']: - # for small geodiff files it does not make sense to download diff and then apply it (slow) - if not self.is_versioned_file(file["path"]): - continue - - diffs = [] - diffs_size = 0 - is_updated = False - # need to track geodiff file history to see if there were any changes - for k, v in file['history'].items(): - if int_version(k) <= int_version(self.metadata['version']): - continue # ignore history of no interest - is_updated = True - if 'diff' in v: - diffs.append(v['diff']['path']) - diffs_size += v['diff']['size'] - else: - diffs = [] - break # we found force update in history, does not make sense to download diffs - - if is_updated: - file['diffs'] = diffs - else: - not_updated.append(file) - - changes['updated'] = [f for f in changes['updated'] if f not in not_updated] - return changes - - def get_push_changes(self): - """ - Calculate changes needed to be pushed to server. - - Calculate diffs between local files metadata and actual files in project directory. Because simple metadata like - file size or checksum are not enough to determine geodiff files changes, geodiff tool is used to determine change - of file content and update corresponding metadata. - - .. seealso:: self.compare_file_sets - - :returns: changes metadata for files to be pushed to server - :rtype: dict - """ - changes = self.compare_file_sets(self.metadata['files'], self.inspect_files()) - # do checkpoint to push changes from wal file to gpkg - for file in changes['added'] + changes['updated']: - size, checksum = do_sqlite_checkpoint(self.fpath(file["path"])) - if size and checksum: - file["size"] = size - file["checksum"] = checksum - file['chunks'] = [str(uuid.uuid4()) for i in range(math.ceil(file["size"] / UPLOAD_CHUNK_SIZE))] - - if not self.geodiff: - return changes - - # need to check for for real changes in geodiff files using geodiff tool (comparing checksum is not enough) - not_updated = [] - for file in changes['updated']: - path = file["path"] - if not self.is_versioned_file(path): - continue - - # we use geodiff to check if we can push only diff files - current_file = self.fpath(path) - origin_file = self.fpath_meta(path) - diff_id = str(uuid.uuid4()) - diff_name = path + '-diff-' + diff_id - diff_file = self.fpath_meta(diff_name) - try: - self.geodiff.create_changeset(origin_file, current_file, diff_file) - if self.geodiff.has_changes(diff_file): - diff_size = os.path.getsize(diff_file) - file['checksum'] = file['origin_checksum'] # need to match basefile on server - file['chunks'] = [str(uuid.uuid4()) for i in range(math.ceil(diff_size / UPLOAD_CHUNK_SIZE))] - file['mtime'] = datetime.fromtimestamp(os.path.getmtime(current_file), tzlocal()) - file['diff'] = { - "path": diff_name, - "checksum": generate_checksum(diff_file), - "size": diff_size, - 'mtime': datetime.fromtimestamp(os.path.getmtime(diff_file), tzlocal()) - } - else: - not_updated.append(file) - except (pygeodiff.GeoDiffLibError, pygeodiff.GeoDiffLibConflictError) as e: - # changes from wal file already committed - pass - - changes['updated'] = [f for f in changes['updated'] if f not in not_updated] - return changes - - def get_list_of_push_changes(self, push_changes): - changes = {} - for idx, file in enumerate(push_changes["updated"]): - if "diff" in file: - changeset_path = file["diff"]["path"] - changeset = self.fpath_meta(changeset_path) - result_file = self.fpath("change_list" + str(idx), self.meta_dir) - try: - self.geodiff.list_changes_summary(changeset, result_file) - with open(result_file, 'r') as f: - change = f.read() - changes[file["path"]] = json.loads(change) - os.remove(result_file) - except (pygeodiff.GeoDiffLibError, pygeodiff.GeoDiffLibConflictError): - pass - return changes - - def apply_pull_changes(self, changes, temp_dir): - """ - Apply changes pulled from server. - - Update project files according to file changes. Apply changes to geodiff basefiles as well - so they are up to date with server. In case of conflicts create backups from locally modified versions. - - .. seealso:: self.pull_changes - - :param changes: metadata for pulled files - :type changes: dict[str, list[dict]] - :param temp_dir: directory with downloaded files from server - :type temp_dir: str - :returns: files where conflicts were found - :rtype: list[str] - """ - conflicts = [] - local_changes = self.get_push_changes() - modified = {} - for f in local_changes["added"] + local_changes["updated"]: - modified.update({f['path']: f}) - for f in local_changes["renamed"]: - modified.update({f['new_path']: f}) - - local_files_map = {} - for f in self.inspect_files(): - local_files_map.update({f['path']: f}) - - for k, v in changes.items(): - for item in v: - path = item['path'] if k != 'renamed' else item['new_path'] - src = self.fpath(path, temp_dir) if k != 'renamed' else self.fpath(item["path"]) - dest = self.fpath(path) - basefile = self.fpath_meta(path) - - # special care is needed for geodiff files - # 'src' here is server version of file and 'dest' is locally modified - if self.is_versioned_file(path) and k == 'updated': - if path in modified: - server_diff = self.fpath(f'{path}-server_diff', temp_dir) # diff between server file and local basefile - local_diff = self.fpath(f'{path}-local_diff', temp_dir) - - # temporary backup of file pulled from server for recovery - f_server_backup = self.fpath(f'{path}-server_backup', temp_dir) - shutil.copy(src, f_server_backup) - - # create temp backup (ideally with geodiff) of locally modified file if needed later - f_conflict_file = self.fpath(f'{path}-local_backup', temp_dir) - try: - self.geodiff.create_changeset(basefile, dest, local_diff) - shutil.copy(basefile, f_conflict_file) - self.geodiff.apply_changeset(f_conflict_file, local_diff) - except (pygeodiff.GeoDiffLibError, pygeodiff.GeoDiffLibConflictError): - # FIXME hard copy can lead to data loss if changes from -wal file were not flushed !!! - shutil.copy(dest, f_conflict_file) - - # try to do rebase magic - try: - self.geodiff.create_changeset(basefile, src, server_diff) - self.geodiff.rebase(basefile, src, dest) - # make sure basefile is in the same state as remote server file (for calc of push changes) - self.geodiff.apply_changeset(basefile, server_diff) - except (pygeodiff.GeoDiffLibError, pygeodiff.GeoDiffLibConflictError) as err: - # it would not be possible to commit local changes, they need to end up in new conflict file - shutil.copy(f_conflict_file, dest) # revert file - conflict = self.backup_file(path) - conflicts.append(conflict) - # original file synced with server - shutil.copy(f_server_backup, basefile) - shutil.copy(f_server_backup, dest) - # changes in -wal have been already applied in conflict file or LOST (see above) - if os.path.exists(f'{dest}-wal'): - os.remove(f'{dest}-wal') - if os.path.exists(f'{dest}-shm'): - os.remove(f'{dest}-shm') - else: - # The local file is not modified -> no rebase needed. - # We just apply the diff between our copy and server to both the local copy and its basefile - try: - server_diff = self.fpath(f'{path}-server_diff', temp_dir) # diff between server file and local basefile - # TODO: it could happen that basefile does not exist. - # It was either never created (e.g. when pushing without geodiff) - # or it was deleted by mistake(?) by the user. We should detect that - # when starting pull and download it as well - self.geodiff.create_changeset(basefile, src, server_diff) - self.geodiff.apply_changeset(dest, server_diff) - self.geodiff.apply_changeset(basefile, server_diff) - except (pygeodiff.GeoDiffLibError, pygeodiff.GeoDiffLibConflictError): - # something bad happened and we have failed to patch our local files - this should not happen if there - # wasn't a schema change or something similar that geodiff can't handle. - # FIXME: this is a last resort and may corrupt data! (we should warn user) - shutil.copy(src, dest) - shutil.copy(src, basefile) - else: - # backup if needed - if path in modified and item['checksum'] != local_files_map[path]['checksum']: - conflict = self.backup_file(path) - conflicts.append(conflict) - - if k == 'removed': - os.remove(dest) - if self.is_versioned_file(path): - os.remove(basefile) - elif k == 'renamed': - move_file(src, dest) - if self.is_versioned_file(path): - move_file(self.fpath_meta(item["path"]), basefile) - else: - shutil.copy(src, dest) - if self.is_versioned_file(path): - shutil.copy(src, basefile) - - return conflicts - - def apply_push_changes(self, changes): - """ - For geodiff files update basefiles according to changes pushed to server. - - :param changes: metadata for pulled files - :type changes: dict[str, list[dict]] - """ - if not self.geodiff: - return - for k, v in changes.items(): - for item in v: - path = item['path'] if k != 'renamed' else item['new_path'] - if not self.is_versioned_file(path): - continue - - basefile = self.fpath_meta(path) - if k == 'renamed': - move_file(self.fpath_meta(item["path"]), basefile) - elif k == 'removed': - os.remove(basefile) - elif k == 'added': - shutil.copy(self.fpath(path), basefile) - elif k == 'updated': - # in case for geopackage cannot be created diff (e.g. forced update with committed changes from wal file) - if "diff" not in item: - shutil.copy(self.fpath(path), basefile) - else: - # better to apply diff to previous basefile to avoid issues with geodiff tmp files - changeset = self.fpath_meta(item['diff']['path']) - patch_error = self.apply_diffs(basefile, [changeset]) - if patch_error: - # in case of local sync issues it is safier to remove basefile, next time it will be downloaded from server - os.remove(basefile) - else: - pass - - def backup_file(self, file): - """ - Create backup file next to its origin. - - :param file: path of file in project - :type file: str - :returns: path to backupfile - :rtype: str - """ - src = self.fpath(file) - if not os.path.exists(src): - return - backup_path = self.fpath(f'{file}_conflict_copy') - index = 2 - while os.path.exists(backup_path): - backup_path = self.fpath(f'{file}_conflict_copy{index}') - index += 1 - shutil.copy(src, backup_path) - return backup_path - - def apply_diffs(self, basefile, diffs): - """ - Helper function to update content of geodiff file using list of diffs. - Input file will be overwritten (make sure to create backup if needed). - - :param basefile: abs path to file to be updated - :type basefile: str - :param diffs: list of abs paths to geodiff changeset files - :type diffs: list[str] - :returns: error message if diffs were not successfully applied or None - :rtype: str - """ - error = None - if not self.is_versioned_file(basefile): - return error - - for index, diff in enumerate(diffs): - try: - self.geodiff.apply_changeset(basefile, diff) - except (pygeodiff.GeoDiffLibError, pygeodiff.GeoDiffLibConflictError) as e: - error = str(e) - break - return error - - def decode_token_data(token): token_prefix = "Bearer ." if not token.startswith(token_prefix): @@ -860,7 +315,7 @@ def project_versions(self, project_path): resp = self.get("/v1/project/version/{}".format(project_path)) return json.load(resp) - def download_project(self, project_path, directory, parallel=True): + def download_project(self, project_path, directory): """ Download latest version of project into given directory. @@ -869,48 +324,10 @@ def download_project(self, project_path, directory, parallel=True): :param directory: Target directory :type directory: String - - :param parallel: Use multi-thread approach to download files in parallel requests, default True - :type parallel: Boolean """ - if os.path.exists(directory): - raise Exception("Project directory already exists") - os.makedirs(directory) - mp = MerginProject(directory) - - project_info = self.project_info(project_path) - version = project_info['version'] if project_info['version'] else 'v0' - - # sending parallel requests is good for projects with a lot of small files - if parallel: - with concurrent.futures.ThreadPoolExecutor() as executor: - futures_map = {} - for file in project_info['files']: - file['version'] = version - future = executor.submit(self._download_file, project_path, file, directory, parallel) - futures_map[future] = file - - for future in concurrent.futures.as_completed(futures_map): - file = futures_map[future] - try: - f_downloaded = future.result(600) - # create local backups, so called basefiles - if mp.is_versioned_file(file['path']): - shutil.copy(mp.fpath(f_downloaded), mp.fpath_meta(f_downloaded)) - except concurrent.futures.TimeoutError: - raise ClientError("Timeout error: failed to download {}".format(file)) - else: - for file in project_info['files']: - file['version'] = version - f_downloaded = self._download_file(project_path, file, directory, parallel) - if mp.is_versioned_file(file['path']): - shutil.copy(mp.fpath(f_downloaded), mp.fpath_meta(f_downloaded)) - - mp.metadata = { - "name": project_path, - "version": version, - "files": project_info["files"] - } + job = download_project_async(self, project_path, directory) + download_project_wait(job) + download_project_finalize(job) def enough_storage_available(self, data): user_name = self._user_info["username"] @@ -925,247 +342,31 @@ def enough_storage_available(self, data): return True, free_space - def push_project(self, directory, parallel=True): + def push_project(self, directory): """ Upload local changes to the repository. :param directory: Project's directory :type directory: String - :param parallel: Use multi-thread approach to upload files in parallel requests, defaults to True - :type parallel: Boolean """ - mp = MerginProject(directory) - project_path = mp.metadata["name"] - local_version = mp.metadata["version"] - server_info = self.project_info(project_path) - server_version = server_info["version"] if server_info["version"] else "v0" - if local_version != server_version: - raise ClientError("Update your local repository") - - changes = mp.get_push_changes() - enough_free_space, freespace = self.enough_storage_available(changes) - if not enough_free_space: - freespace = int(freespace/(1024*1024)) - raise SyncError("Storage limit has been reached. Only " + str(freespace) + "MB left") - - if not sum(len(v) for v in changes.values()): - return - # drop internal info from being sent to server - for item in changes['updated']: - item.pop('origin_checksum', None) - data = { - "version": local_version, - "changes": changes - } - server_resp = self._push_changes(mp, data, parallel) - if 'error' in server_resp: - #TODO would be good to get some detailed info from server so user could decide what to do with it - # e.g. diff conflicts, basefiles issues, or any other failure - raise ClientError(server_resp['error']) - - mp.metadata = { - 'name': project_path, - 'version': server_resp['version'], - 'files': server_resp["files"] - } - mp.apply_push_changes(changes) + job = push_project_async(self, directory) + if job is None: + return # there is nothing to push (or we only deleted some files) + push_project_wait(job) + push_project_finalize(job) - def _push_changes(self, mp, data, parallel): - project = mp.metadata['name'] - resp = self.post(f'/v1/project/push/{project}', data, {"Content-Type": "application/json"}) - info = json.load(resp) - - upload_files = data['changes']["added"] + data['changes']["updated"] - # upload files' chunks and close transaction - if upload_files: - if parallel: - with concurrent.futures.ThreadPoolExecutor() as executor: - futures_map = {} - for file in upload_files: - file['location'] = mp.fpath_meta(file['diff']['path']) if 'diff' in file else mp.fpath(file['path']) - future = executor.submit(self._upload_file, info["transaction"], file, parallel) - futures_map[future] = file - - for future in concurrent.futures.as_completed(futures_map): - file = futures_map[future] - try: - future.result(600) - except concurrent.futures.TimeoutError: - raise ClientError("Timeout error: failed to upload {}".format(file)) - else: - for file in upload_files: - file['location'] = mp.fpath_meta(file['diff']['path']) if 'diff' in file else mp.fpath(file['path']) - self._upload_file(info["transaction"], file, parallel) - - try: - resp = self.post("/v1/project/push/finish/%s" % info["transaction"]) - info = json.load(resp) - except ClientError as err: - self.post("/v1/project/push/cancel/%s" % info["transaction"]) - # server returns various error messages with filename or something generic - # it would be better if it returned list of failed files (and reasons) whenever possible - return {'error': str(err)} - return info - - def pull_project(self, directory, parallel=True): + def pull_project(self, directory): """ Fetch and apply changes from repository. :param directory: Project's directory :type directory: String - :param parallel: Use multi-thread approach to fetch files in parallel requests, defaults to True - :type parallel: Boolean - """ - mp = MerginProject(directory) - project_path = mp.metadata["name"] - local_version = mp.metadata["version"] - server_info = self.project_info(project_path, since=local_version) - if local_version == server_info["version"]: - return # Project is up to date - - # we either download a versioned file using diffs (strongly preferred), - # but if we don't have history with diffs (e.g. uploaded without diffs) - # then we just download the whole file - _pulling_file_with_diffs = lambda f: 'diffs' in f and len(f['diffs']) != 0 - - temp_dir = mp.fpath_meta(f'fetch_{local_version}-{server_info["version"]}') - os.makedirs(temp_dir, exist_ok=True) - pull_changes = mp.get_pull_changes(server_info["files"]) - fetch_files = [] - for f in pull_changes["added"]: - f['version'] = server_info['version'] - fetch_files.append(f) - # extend fetch files download list with various version of diff files (if needed) - for f in pull_changes["updated"]: - if _pulling_file_with_diffs(f): - for diff in f['diffs']: - diff_file = copy.deepcopy(f) - for k, v in f['history'].items(): - if 'diff' not in v: - continue - if diff == v['diff']['path']: - diff_file['version'] = k - diff_file['diff'] = v['diff'] - fetch_files.append(diff_file) - else: - f['version'] = server_info['version'] - fetch_files.append(f) - - # sending parallel requests is good for projects with a lot of small files - if parallel: - with concurrent.futures.ThreadPoolExecutor() as executor: - futures_map = {} - for file in fetch_files: - diff_only = _pulling_file_with_diffs(file) - future = executor.submit(self._download_file, project_path, file, temp_dir, parallel, diff_only) - futures_map[future] = file - - for future in concurrent.futures.as_completed(futures_map): - file = futures_map[future] - try: - future.result(600) - except concurrent.futures.TimeoutError: - raise ClientError("Timeout error: failed to download {}".format(file)) - else: - for file in fetch_files: - # TODO check it does not fail, do some retry on ClientError - diff_only = _pulling_file_with_diffs(file) - self._download_file(project_path, file, temp_dir, parallel, diff_only) - - # make sure we can update geodiff reference files (aka. basefiles) with diffs or - # download their full versions so we have them up-to-date for applying changes - for file in pull_changes['updated']: - if not _pulling_file_with_diffs(file): - continue - file['version'] = server_info['version'] - basefile = mp.fpath_meta(file['path']) - server_file = mp.fpath(file["path"], temp_dir) - if os.path.exists(basefile): - shutil.copy(basefile, server_file) - diffs = [mp.fpath(f, temp_dir) for f in file['diffs']] - patch_error = mp.apply_diffs(server_file, diffs) - if patch_error: - # we can't use diffs, overwrite with full version of file fetched from server - self._download_file(project_path, file, temp_dir, parallel, diff_only=False) - else: - self._download_file(project_path, file, temp_dir, parallel, diff_only=False) - - conflicts = mp.apply_pull_changes(pull_changes, temp_dir) - mp.metadata = { - 'name': project_path, - 'version': server_info['version'] if server_info['version'] else 'v0', - 'files': server_info['files'] - } - - shutil.rmtree(temp_dir) - return conflicts - - def _download_file(self, project_path, file, directory, parallel=True, diff_only=False): - """ - Helper to download single project file from server in chunks. - - :param project_path: Project's full name (/) - :type project_path: String - :param file: File metadata item from Project['files'] - :type file: dict - :param directory: Project's directory - :type directory: String - :param parallel: Use multi-thread approach to download parts in parallel requests, default True - :type parallel: Boolean """ - query_params = { - "file": file['path'], - "version": file['version'], - "diff": diff_only - } - file_dir = os.path.dirname(os.path.normpath(os.path.join(directory, file['path']))) - basename = os.path.basename(file['diff']['path']) if diff_only else os.path.basename(file['path']) - expected_size = file['diff']['size'] if diff_only else file['size'] - - if file['size'] == 0: - os.makedirs(file_dir, exist_ok=True) - open(os.path.join(file_dir, basename), 'w').close() - return file["path"] - - def download_file_part(part): - """Callback to get a part of file using request to server with Range header.""" - start = part * (1 + CHUNK_SIZE) - range_header = {"Range": "bytes={}-{}".format(start, start + CHUNK_SIZE)} - resp = self.get("/v1/project/raw/{}".format(project_path), data=query_params, headers=range_header) - if resp.status in [200, 206]: - save_to_file(resp, os.path.join(file_dir, basename + ".{}".format(part))) - else: - raise ClientError('Failed to download part {} of file {}'.format(part, basename)) - - # download large files in chunks is beneficial mostly for retry on failure - chunks = math.ceil(file['size'] / CHUNK_SIZE) - if parallel: - # create separate n threads, default as cores * 5 - with concurrent.futures.ThreadPoolExecutor() as executor: - futures_map = {executor.submit(download_file_part, i): i for i in range(chunks)} - for future in concurrent.futures.as_completed(futures_map): - i = futures_map[future] - try: - future.result(300) - except concurrent.futures.TimeoutError: - raise ClientError('Timeout error: failed to download part {} of file {}'.format(i, basename)) - else: - for i in range(chunks): - download_file_part(i) - - # merge chunks together - with open(os.path.join(file_dir, basename), 'wb') as final: - for i in range(chunks): - file_part = os.path.join(file_dir, basename + ".{}".format(i)) - with open(file_part, 'rb') as chunk: - shutil.copyfileobj(chunk, final) - os.remove(file_part) - - if os.path.getsize(os.path.join(file_dir, basename)) != expected_size: - os.remove(os.path.join(file_dir, basename)) - raise ClientError(f'Download of file {basename} failed. Please try it again.') - - return file['path'] + job = pull_project_async(self, directory) + if job is None: + return # project is up to date + pull_project_wait(job) + return pull_project_finalize(job) def delete_project(self, project_path): """ @@ -1180,46 +381,6 @@ def delete_project(self, project_path): request = urllib.request.Request(url, method="DELETE") self._do_request(request) - def _upload_file(self, transaction, file_meta, parallel=True): - """ - Upload file in open upload transaction. - - :param transaction: transaction uuid - :type transaction: String - :param project_dir: local project directory - :type project_dir: String - :param file_meta: metadata for file to upload - :type file_meta: Dict - :param parallel: Use multi-thread approach to upload file chunks in parallel requests, defaults to True - :type parallel: Boolean - :raises ClientError: raise on data integrity check failure - """ - headers = {"Content-Type": "application/octet-stream"} - - def upload_chunk(chunk_id, data): - checksum = hashlib.sha1() - checksum.update(data) - size = len(data) - resp = self.post("/v1/project/push/chunk/{}/{}".format(transaction, chunk_id), data, headers) - data = json.load(resp) - if not (data['size'] == size and data['checksum'] == checksum.hexdigest()): - self.post("/v1/project/push/cancel/{}".format(transaction)) - raise ClientError("Mismatch between uploaded file chunk {} and local one".format(chunk)) - - with open(file_meta['location'], 'rb') as file: - if parallel: - with concurrent.futures.ThreadPoolExecutor() as executor: - futures_map = {executor.submit(upload_chunk, chunk, file.read(UPLOAD_CHUNK_SIZE)): chunk for chunk in file_meta["chunks"]} - for future in concurrent.futures.as_completed(futures_map): - chunk = futures_map[future] - try: - future.result(300) - except concurrent.futures.TimeoutError: - raise ClientError('Timeout error: failed to upload chunk {}'.format(chunk)) - else: - for chunk in file_meta["chunks"]: - data = file.read(UPLOAD_CHUNK_SIZE) - upload_chunk(chunk, data) def project_status(self, directory): """ Get project status, e.g. server and local changes. diff --git a/mergin/client_pull.py b/mergin/client_pull.py new file mode 100644 index 00000000..7be93a3e --- /dev/null +++ b/mergin/client_pull.py @@ -0,0 +1,483 @@ +""" +To download projects asynchronously. Start download: (does not block) + +job = download_project_async(mergin_client, 'user/project', '/tmp/my_project) + +Then we need to wait until we are finished downloading - either by periodically +calling download_project_is_running(job) that will just return True/False or by calling +download_project_wait(job) that will block the current thread (not good for GUI). +To finish the download job, we have to call download_project_finalize(job). +""" + +import copy +import math +import os +import shutil + +import concurrent.futures +import threading + +from .common import CHUNK_SIZE, ClientError +from .merginproject import MerginProject +from .utils import save_to_file + + +# status = download_project_async(...) +# +# for completely async approach: +# - a method called (in worker thread(!)) when new data are received -- to update progress bar +# - a method called (in worker thread(!)) when download is complete -- and we just need to do the final steps (in main thread) +# - the methods in worker threads could send queued signals to some QObject instances owned by main thread to do updating/finalization +# +# polling approach: +# - caller will caller a method every X ms to check the status +# - once status says download is finished, the caller would call a function to do finalization + + +class DownloadJob: + """ Keeps all the important data about a pending download job """ + + def __init__(self, project_path, total_size, version, update_tasks, download_queue_items, directory, mp, project_info): + self.project_path = project_path + self.total_size = total_size # size of data to download (in bytes) + self.transferred_size = 0 + self.version = version + self.update_tasks = update_tasks + self.download_queue_items = download_queue_items + self.directory = directory # project's directory + self.mp = mp # MerginProject instance + self.is_cancelled = False + self.project_info = project_info # parsed JSON with project info returned from the server + + def dump(self): + print("--- JOB ---", self.total_size, "bytes") + for task in self.update_tasks: + print("- {} ... {}".format(task.file_path, len(task.download_queue_items))) + print ("--") + for item in self.download_queue_items: + print("- {} {} {} {}".format(item.file_path, item.version, item.part_index, item.size)) + print("--- END ---") + + +def _download_items(file, directory, diff_only=False): + """ Returns an array of download queue items """ + + file_dir = os.path.dirname(os.path.normpath(os.path.join(directory, file['path']))) + basename = os.path.basename(file['diff']['path']) if diff_only else os.path.basename(file['path']) + file_size = file['diff']['size'] if diff_only else file['size'] + chunks = math.ceil(file_size / CHUNK_SIZE) + + items = [] + for part_index in range(chunks): + download_file_path = os.path.join(file_dir, basename + ".{}".format(part_index)) + size = min(CHUNK_SIZE, file_size - part_index * CHUNK_SIZE) + items.append(DownloadQueueItem(file['path'], size, file['version'], diff_only, part_index, download_file_path)) + + return items + + +def _do_download(item, mc, project_path, job): + """ runs in worker thread """ + if job.is_cancelled: + return + + # TODO: make download_blocking / save_to_file cancellable so that we can cancel as soon as possible + + item.download_blocking(mc, project_path) + job.transferred_size += item.size + + +def download_project_async(mc, project_path, directory): + """ + Starts project download in background and returns handle to the pending project download. + Using that object it is possible to watch progress or cancel the ongoing work. + + """ + + if os.path.exists(directory): + raise Exception("Project directory already exists") + os.makedirs(directory) + mp = MerginProject(directory) + + project_info = mc.project_info(project_path) + version = project_info['version'] if project_info['version'] else 'v0' + + # prepare download + update_tasks = [] # stuff to do at the end of download + for file in project_info['files']: + file['version'] = version + items = _download_items(file, directory) + update_tasks.append(UpdateTask(file['path'], items)) + + # make a single list of items to download + total_size = 0 + download_list = [] + for task in update_tasks: + download_list.extend(task.download_queue_items) + for item in task.download_queue_items: + total_size += item.size + + job = DownloadJob(project_path, total_size, version, update_tasks, download_list, directory, mp, project_info) + + # start download + job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) + job.futures = [] + for item in download_list: + future = job.executor.submit(_do_download, item, mc, project_path, job) + job.futures.append(future) + + return job + + +def download_project_wait(job): + """ blocks until all download tasks are finished """ + + concurrent.futures.wait(job.futures) + + +def download_project_is_running(job): + """ + Returns true/false depending on whether we have some pending downloads. + + It also forwards any exceptions from workers (e.g. some network errors). If an exception + is raised, it is advised to call download_project_cancel() to abort the job. + """ + for future in job.futures: + if future.done() and future.exception() is not None: + raise future.exception() + if future.running(): + return True + return False + + +def download_project_finalize(job): + """ + To be called when download in the background is finished and we need to do the finalization (merge chunks etc.) + + This should not be called from a worker thread (e.g. directly from a handler when download is complete). + + If any of the workers has thrown any exception, it will be re-raised (e.g. some network errors). + That also means that the whole job has been aborted. + """ + + job.executor.shutdown(wait=True) + + # make sure any exceptions from threads are not lost + for future in job.futures: + if future.exception() is not None: + raise future.exception() + + for task in job.update_tasks: + + # right now only copy tasks... + task.apply(job.directory, job.mp) + + # final update of project metadata + # TODO: why not exact copy of project info JSON ? + job.mp.metadata = { + "name": job.project_path, + "version": job.version, + "files": job.project_info["files"] + } + + +def download_project_cancel(job): + """ + To be called (from main thread) to cancel a job that has downloads in progress. + Returns once all background tasks have exited (may block for a bit of time). + """ + + # set job as cancelled + job.is_cancelled = True + + job.executor.shutdown(wait=True) + + + +class UpdateTask: + """ + Entry for each file that will be updated. At the end of a successful download of new data, all the tasks are executed. + """ + + # TODO: methods other than COPY + def __init__(self, file_path, download_queue_items): + self.file_path = file_path + self.download_queue_items = download_queue_items + + def apply(self, directory, mp): + """ assemble downloaded chunks into a single file """ + + basename = os.path.basename(self.file_path) #file['diff']['path']) if diff_only else os.path.basename(file['path']) + file_dir = os.path.dirname(os.path.normpath(os.path.join(directory, self.file_path))) + dest_file_path = os.path.join(file_dir, basename) + + # merge chunks together (and delete them afterwards) + file_to_merge = FileToMerge(dest_file_path, self.download_queue_items) + file_to_merge.merge() + + if mp.is_versioned_file(self.file_path): + shutil.copy(mp.fpath(self.file_path), mp.fpath_meta(self.file_path)) + + + +class DownloadQueueItem: + """ a piece of data from a project that should be downloaded - it can be either a chunk or it can be a diff """ + + def __init__(self, file_path, size, version, diff_only, part_index, download_file_path): + self.file_path = file_path # relative path to the file within project + self.size = size # size of the item in bytes + self.version = version # version of the file ("v123") + self.diff_only = diff_only # whether downloading diff or full version + self.part_index = part_index # index of the chunk + self.download_file_path = download_file_path # full path to a temporary file which will receive the content + + def __repr__(self): + return "".format( + self.file_path, self.version, self.diff_only, self.part_index, self.size, self.download_file_path) + + def download_blocking(self, mc, project_path): + """ Starts download and only returns once the file has been fully downloaded and saved """ + + start = self.part_index * (1 + CHUNK_SIZE) + resp = mc.get("/v1/project/raw/{}".format(project_path), data={ + "file": self.file_path, + "version": self.version, + "diff": self.diff_only + }, headers={ + "Range": "bytes={}-{}".format(start, start + CHUNK_SIZE) + } + ) + if resp.status in [200, 206]: + save_to_file(resp, self.download_file_path) + else: + raise ClientError('Failed to download part {} of file {}'.format(part, basename)) + + +class PullJob: + def __init__(self, project_path, pull_changes, total_size, version, files_to_merge, download_queue_items, temp_dir, mp, project_info, basefiles_to_patch): + self.project_path = project_path + self.pull_changes = pull_changes # dictionary with changes (dict[str, list[dict]] - keys: "added", "updated", ...) + self.total_size = total_size # size of data to download (in bytes) + self.transferred_size = 0 + self.version = version + self.files_to_merge = files_to_merge # list of FileToMerge instances + self.download_queue_items = download_queue_items + self.temp_dir = temp_dir # full path to temporary directory where we store downloaded files + self.mp = mp # MerginProject instance + self.is_cancelled = False + self.project_info = project_info # parsed JSON with project info returned from the server + self.basefiles_to_patch = basefiles_to_patch # list of tuples (relative path within project, list of diff files in temp dir to apply) + + def dump(self): + print("--- JOB ---", self.total_size, "bytes") + for file_to_merge in self.files_to_merge: + print("- {} ... download items={}".format(file_to_merge.dest_file, len(file_to_merge.downloaded_items))) + print("--") + for basefile, diffs in self.basefiles_to_patch: + print("patch basefile {} with {} diffs".format(basefile, len(diffs))) + print("--") + for item in self.download_queue_items: + print("- {} {} {} {}".format(item.file_path, item.version, item.part_index, item.size)) + print("--- END ---") + + +def pull_project_async(mc, directory): + """ + Starts project pull in background and returns handle to the pending job. + Using that object it is possible to watch progress or cancel the ongoing work. + """ + + mp = MerginProject(directory) + project_path = mp.metadata["name"] + local_version = mp.metadata["version"] + server_info = mc.project_info(project_path, since=local_version) + if local_version == server_info["version"]: + return # Project is up to date + + # we either download a versioned file using diffs (strongly preferred), + # but if we don't have history with diffs (e.g. uploaded without diffs) + # then we just download the whole file + _pulling_file_with_diffs = lambda f: 'diffs' in f and len(f['diffs']) != 0 + + server_version = server_info["version"] + temp_dir = mp.fpath_meta(f'fetch_{local_version}-{server_version}') + os.makedirs(temp_dir, exist_ok=True) + pull_changes = mp.get_pull_changes(server_info["files"]) + fetch_files = [] + for f in pull_changes["added"]: + f['version'] = server_version + fetch_files.append(f) + # extend fetch files download list with various version of diff files (if needed) + for f in pull_changes["updated"]: + if _pulling_file_with_diffs(f): + for diff in f['diffs']: + diff_file = copy.deepcopy(f) + for k, v in f['history'].items(): + if 'diff' not in v: + continue + if diff == v['diff']['path']: + diff_file['version'] = k + diff_file['diff'] = v['diff'] + fetch_files.append(diff_file) + else: + f['version'] = server_version + fetch_files.append(f) + + files_to_merge = [] # list of FileToMerge instances + + for file in fetch_files: + diff_only = _pulling_file_with_diffs(file) + items = _download_items(file, temp_dir, diff_only) + + # figure out destination path for the file + file_dir = os.path.dirname(os.path.normpath(os.path.join(temp_dir, file['path']))) + basename = os.path.basename(file['diff']['path']) if diff_only else os.path.basename(file['path']) + dest_file_path = os.path.join(file_dir, basename) + files_to_merge.append( FileToMerge(dest_file_path, items) ) + + + # make sure we can update geodiff reference files (aka. basefiles) with diffs or + # download their full versions so we have them up-to-date for applying changes + basefiles_to_patch = [] # list of tuples (relative path within project, list of diff files in temp dir to apply) + for file in pull_changes['updated']: + if not _pulling_file_with_diffs(file): + continue # this is only for diffable files (e.g. geopackages) + + basefile = mp.fpath_meta(file['path']) + if not os.path.exists(basefile): + # The basefile does not exist for some reason. This should not happen normally (maybe user removed the file + # or we removed it within previous pull because we failed to apply patch the older version for some reason). + # But it's not a problem - we will download the newest version and we're sorted. + items = _download_items(file, temp_dir, diff_only=False) + dest_file_path = mp.fpath(file["path"], temp_dir) + #dest_file_path = os.path.join(os.path.dirname(os.path.normpath(os.path.join(temp_dir, file['path']))), os.path.basename(file['path'])) + files_to_merge.append( FileToMerge(dest_file_path, items) ) + continue + + basefiles_to_patch.append( (file['path'], file['diffs']) ) + + # make a single list of items to download + total_size = 0 + download_list = [] + for file_to_merge in files_to_merge: + download_list.extend(file_to_merge.downloaded_items) + for item in file_to_merge.downloaded_items: + total_size += item.size + + job = PullJob(project_path, pull_changes, total_size, server_version, files_to_merge, download_list, temp_dir, mp, server_info, basefiles_to_patch) + + # start download + job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) + job.futures = [] + for item in download_list: + future = job.executor.submit(_do_download, item, mc, project_path, job) + job.futures.append(future) + + return job + + +def pull_project_wait(job): + """ blocks until all download tasks are finished """ + + concurrent.futures.wait(job.futures) + + +def pull_project_is_running(job): + """ + Returns true/false depending on whether we have some pending downloads + + It also forwards any exceptions from workers (e.g. some network errors). If an exception + is raised, it is advised to call pull_project_cancel() to abort the job. + """ + for future in job.futures: + if future.done() and future.exception() is not None: + raise future.exception() + if future.running(): + return True + return False + + +def pull_project_cancel(job): + """ + To be called (from main thread) to cancel a job that has downloads in progress. + Returns once all background tasks have exited (may block for a bit of time). + """ + + # set job as cancelled + job.is_cancelled = True + + job.executor.shutdown(wait=True) + + +class FileToMerge: + """ + Keeps information about how to create a file (path specified by dest_file) from a couple + of downloaded items (chunks) - each item is DownloadQueueItem object which has path + to the temporary file containing its data. Calling merge() will create the destination file + and remove the temporary files of the chunks + """ + def __init__(self, dest_file, downloaded_items): + self.dest_file = dest_file # full path to the destination file to be created + self.downloaded_items = downloaded_items # list of pieces of the destination file to be merged + + def merge(self): + with open(self.dest_file, 'wb') as final: + for item in self.downloaded_items: + with open(item.download_file_path, 'rb') as chunk: + shutil.copyfileobj(chunk, final) + os.remove(item.download_file_path) + + expected_size = sum(item.size for item in self.downloaded_items) + if os.path.getsize(self.dest_file) != expected_size: + os.remove(self.dest_file) + raise ClientError('Download of file {} failed. Please try it again.'.format(self.dest_file)) + + +def pull_project_finalize(job): + """ + To be called when pull in the background is finished and we need to do the finalization (merge chunks etc.) + + This should not be called from a worker thread (e.g. directly from a handler when download is complete) + + If any of the workers has thrown any exception, it will be re-raised (e.g. some network errors). + That also means that the whole job has been aborted. + """ + + job.executor.shutdown(wait=True) + + # make sure any exceptions from threads are not lost + for future in job.futures: + if future.exception() is not None: + raise future.exception() + + # merge downloaded chunks + for file_to_merge in job.files_to_merge: + file_to_merge.merge() + + # make sure we can update geodiff reference files (aka. basefiles) with diffs or + # download their full versions so we have them up-to-date for applying changes + for file_path, file_diffs in job.basefiles_to_patch: + basefile = job.mp.fpath_meta(file_path) + server_file = job.mp.fpath(file_path, job.temp_dir) + + shutil.copy(basefile, server_file) + diffs = [job.mp.fpath(f, job.temp_dir) for f in file_diffs] + patch_error = job.mp.apply_diffs(server_file, diffs) + if patch_error: + # that's weird that we are unable to apply diffs to the basefile! + # because it should be possible to apply them cleanly since the server + # was also able to apply those diffs. It could be that someone modified + # the basefile and we ended up in this inconsistent state. + # let's remove the basefile and let the user retry - we should download clean version again + os.remove(basefile) + raise Exception("Cannot patch basefile {}! Please try syncing again.".format(basefile)) + + conflicts = job.mp.apply_pull_changes(job.pull_changes, job.temp_dir) + job.mp.metadata = { + 'name': job.project_path, + 'version': job.version, + 'files': job.project_info['files'] + } + + shutil.rmtree(job.temp_dir) + return conflicts diff --git a/mergin/client_push.py b/mergin/client_push.py new file mode 100644 index 00000000..2b756ba6 --- /dev/null +++ b/mergin/client_push.py @@ -0,0 +1,228 @@ +""" +To push projects asynchronously. Start push: (does not block) + +job = push_project_async(mergin_client, '/tmp/my_project') + +Then we need to wait until we are finished uploading - either by periodically +calling push_project_is_running(job) that will just return True/False or by calling +push_project_wait(job) that will block the current thread (not good for GUI). +To finish the upload job, we have to call push_project_finalize(job). +""" + +import json +import hashlib +import concurrent.futures +import threading + +from .common import UPLOAD_CHUNK_SIZE, ClientError, SyncError +from .merginproject import MerginProject +from .utils import generate_checksum, do_sqlite_checkpoint + + +class UploadJob: + """ Keeps all the important data about a pending upload job """ + + def __init__(self, project_path, changes, transaction_id, mp, mc): + self.project_path = project_path # full project name ("username/projectname") + self.changes = changes # dictionary of local changes to the project + self.transaction_id = transaction_id # ID of the transaction assigned by the server + self.total_size = 0 # size of data to upload (in bytes) + self.transferred_size = 0 # size of data already uploaded (in bytes) + self.upload_queue_items = [] # list of items to upload in the background + self.mp = mp # MerginProject instance + self.mc = mc # MerginClient instance + self.is_cancelled = False # whether upload has been cancelled + self.executor = None # ThreadPoolExecutor that manages background upload tasks + self.futures = [] # list of futures submitted to the executor + self.server_resp = None # server response when transaction is finished + + def dump(self): + print("--- JOB ---", self.total_size, "bytes") + for item in self.upload_queue_items: + print("- {} {} {}".format(item.file_path, item.chunk_index, item.size)) + print("--- END ---") + + +class UploadQueueItem: + """ A single chunk of data that needs to be uploaded """ + + def __init__(self, file_path, size, transaction_id, chunk_id, chunk_index): + self.file_path = file_path # full path to the file + self.size = size # size of the chunk in bytes + self.chunk_id = chunk_id # ID of the chunk within transaction + self.chunk_index = chunk_index # index (starting from zero) of the chunk within the file + self.transaction_id = transaction_id # ID of the transaction + + def upload_blocking(self, mc): + + file_handle = open(self.file_path, 'rb') + file_handle.seek(self.chunk_index * UPLOAD_CHUNK_SIZE) + data = file_handle.read(UPLOAD_CHUNK_SIZE) + + checksum = hashlib.sha1() + checksum.update(data) + + headers = {"Content-Type": "application/octet-stream"} + resp = mc.post("/v1/project/push/chunk/{}/{}".format(self.transaction_id, self.chunk_id), data, headers) + resp_dict = json.load(resp) + if not (resp_dict['size'] == len(data) and resp_dict['checksum'] == checksum.hexdigest()): + mc.post("/v1/project/push/cancel/{}".format(self.transaction_id)) + raise ClientError("Mismatch between uploaded file chunk {} and local one".format(self.chunk_id)) + + + +def push_project_async(mc, directory): + """ Starts push of a project and returns pending upload job """ + + mp = MerginProject(directory) + project_path = mp.metadata["name"] + local_version = mp.metadata["version"] + server_info = mc.project_info(project_path) + server_version = server_info["version"] if server_info["version"] else "v0" + if local_version != server_version: + raise ClientError("Update your local repository") + + changes = mp.get_push_changes() + enough_free_space, freespace = mc.enough_storage_available(changes) + if not enough_free_space: + freespace = int(freespace/(1024*1024)) + raise SyncError("Storage limit has been reached. Only " + str(freespace) + "MB left") + + if not sum(len(v) for v in changes.values()): + return + # drop internal info from being sent to server + for item in changes['updated']: + item.pop('origin_checksum', None) + data = { + "version": local_version, + "changes": changes + } + + resp = mc.post(f'/v1/project/push/{project_path}', data, {"Content-Type": "application/json"}) + server_resp = json.load(resp) + + upload_files = data['changes']["added"] + data['changes']["updated"] + + transaction_id = server_resp["transaction"] if upload_files else None + job = UploadJob(project_path, changes, transaction_id, mp, mc) + + if not upload_files: + job.server_resp = server_resp + push_project_finalize(job) + return None # all done - no pending job + + upload_queue_items = [] + total_size = 0 + # prepare file chunks for upload + for file in upload_files: + # do checkpoint to push changes from wal file to gpkg if there is no diff + if "diff" not in file and mp.is_versioned_file(file["path"]): + do_sqlite_checkpoint(mp.fpath(file["path"])) + file["checksum"] = generate_checksum(mp.fpath(file["path"])) + file['location'] = mp.fpath_meta(file['diff']['path']) if 'diff' in file else mp.fpath(file['path']) + + for chunk_index, chunk_id in enumerate(file["chunks"]): + size = min(UPLOAD_CHUNK_SIZE, file['size'] - chunk_index * UPLOAD_CHUNK_SIZE) + upload_queue_items.append(UploadQueueItem(file['location'], size, transaction_id, chunk_id, chunk_index)) + + total_size += file['size'] + + job.total_size = total_size + job.upload_queue_items = upload_queue_items + + # start uploads in background + job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) + for item in upload_queue_items: + future = job.executor.submit(_do_upload, item, job) + job.futures.append(future) + + return job + + + +def push_project_wait(job): + """ blocks until all upload tasks are finished """ + + concurrent.futures.wait(job.futures) + + +def push_project_is_running(job): + """ + Returns true/false depending on whether we have some pending uploads + + It also forwards any exceptions from workers (e.g. some network errors). If an exception + is raised, it is advised to call push_project_cancel() to abort the job. + """ + for future in job.futures: + if future.done() and future.exception() is not None: + raise future.exception() + if future.running(): + return True + return False + + +def push_project_finalize(job): + """ + To be called when push in the background is finished and we need to do the finalization + + This should not be called from a worker thread (e.g. directly from a handler when push is complete). + + If any of the workers has thrown any exception, it will be re-raised (e.g. some network errors). + That also means that the whole job has been aborted. + """ + + with_upload_of_files = job.executor is not None + + if with_upload_of_files: + job.executor.shutdown(wait=True) + + # make sure any exceptions from threads are not lost + for future in job.futures: + if future.exception() is not None: + raise future.exception() + + if job.transferred_size != job.total_size: + raise ClientError("Upload error: transferred size and expected total size do not match!") + + if with_upload_of_files: + try: + resp = job.mc.post("/v1/project/push/finish/%s" % job.transaction_id) + job.server_resp = json.load(resp) + except ClientError as err: + job.mc.post("/v1/project/push/cancel/%s" % job.transaction_id) + # server returns various error messages with filename or something generic + # it would be better if it returned list of failed files (and reasons) whenever possible + return {'error': str(err)} + + if 'error' in job.server_resp: + #TODO would be good to get some detailed info from server so user could decide what to do with it + # e.g. diff conflicts, basefiles issues, or any other failure + raise ClientError(job.server_resp['error']) + + job.mp.metadata = { + 'name': job.project_path, + 'version': job.server_resp['version'], + 'files': job.server_resp["files"] + } + job.mp.apply_push_changes(job.changes) + + +def push_project_cancel(job): + """ + To be called (from main thread) to cancel a job that has uploads in progress. + Returns once all background tasks have exited (may block for a bit of time). + """ + + # set job as cancelled + job.is_cancelled = True + + job.executor.shutdown(wait=True) + + +def _do_upload(item, job): + """ runs in worker thread """ + if job.is_cancelled: + return + + item.upload_blocking(job.mc) + job.transferred_size += item.size diff --git a/mergin/common.py b/mergin/common.py new file mode 100644 index 00000000..720a9b83 --- /dev/null +++ b/mergin/common.py @@ -0,0 +1,36 @@ + +import os + + +CHUNK_SIZE = 100 * 1024 * 1024 + +# there is an upper limit for chunk size on server, ideally should be requested from there once implemented +UPLOAD_CHUNK_SIZE = 10 * 1024 * 1024 + + +this_dir = os.path.dirname(os.path.realpath(__file__)) + + +class ClientError(Exception): + pass + + +class SyncError(Exception): + def __init__(self, msg, detail=""): + super().__init__(msg) + self.detail = detail + + +try: + import dateutil.parser + from dateutil.tz import tzlocal +except ImportError: + # this is to import all dependencies shipped with package (e.g. to use in qgis-plugin) + deps_dir = os.path.join(this_dir, 'deps') + if os.path.exists(deps_dir): + import sys + for f in os.listdir(os.path.join(deps_dir)): + sys.path.append(os.path.join(deps_dir, f)) + + import dateutil.parser + from dateutil.tz import tzlocal diff --git a/mergin/merginproject.py b/mergin/merginproject.py new file mode 100644 index 00000000..bfc8e420 --- /dev/null +++ b/mergin/merginproject.py @@ -0,0 +1,534 @@ + +import json +import math +import os +import re +import shutil +import uuid +from datetime import datetime +from dateutil.tz import tzlocal + +from .common import UPLOAD_CHUNK_SIZE +from .utils import generate_checksum, move_file, int_version, find, do_sqlite_checkpoint + + +this_dir = os.path.dirname(os.path.realpath(__file__)) + + +try: + from .deps import pygeodiff +except ImportError: + os.environ['GEODIFF_ENABLED'] = 'False' + + +class InvalidProject(Exception): + pass + + +class MerginProject: + """ Base class for Mergin local projects. + + Linked to existing local directory, with project metadata (mergin.json) and backups located in .mergin directory. + """ + def __init__(self, directory): + self.dir = os.path.abspath(directory) + if not os.path.exists(self.dir): + raise InvalidProject('Project directory does not exist') + + # make sure we can load correct pygeodiff + if os.environ.get('GEODIFF_ENABLED', 'True').lower() == 'true': + try: + self.geodiff = pygeodiff.GeoDiff() + except pygeodiff.geodifflib.GeoDiffLibVersionError: + self.geodiff = None + else: + self.geodiff = None + + self.meta_dir = os.path.join(self.dir, '.mergin') + if not os.path.exists(self.meta_dir): + os.mkdir(self.meta_dir) + + def fpath(self, file, other_dir=None): + """ + Helper function to get absolute path of project file. Defaults to project dir but + alternative dir get be provided (mostly meta or temp). Also making sure that parent dirs to file exist. + + :param file: relative file path in project (posix) + :type file: str + :param other_dir: alternative base directory for file, defaults to None + :type other_dir: str + :returns: file's absolute path + :rtype: str + """ + root = other_dir or self.dir + abs_path = os.path.abspath(os.path.join(root, file)) + f_dir = os.path.dirname(abs_path) + os.makedirs(f_dir, exist_ok=True) + return abs_path + + def fpath_meta(self, file): + """ Helper function to get absolute path of file in meta dir. """ + return self.fpath(file, self.meta_dir) + + @property + def metadata(self): + if not os.path.exists(self.fpath_meta('mergin.json')): + raise InvalidProject('Project metadata has not been created yet') + with open(self.fpath_meta('mergin.json'), 'r') as file: + return json.load(file) + + @metadata.setter + def metadata(self, data): + with open(self.fpath_meta('mergin.json'), 'w') as file: + json.dump(data, file, indent=2) + + def is_versioned_file(self, file): + """ Check if file is compatible with geodiff lib and hence suitable for versioning. + + :param file: file path + :type file: str + :returns: if file is compatible with geodiff lib + :rtype: bool + """ + if not self.geodiff: + return False + diff_extensions = ['.gpkg', '.sqlite'] + f_extension = os.path.splitext(file)[1] + return f_extension in diff_extensions + + def is_gpkg_open(self, path): + """ + Check whether geopackage file is open (and wal file exists) + + :param path: absolute path of file on disk + :type path: str + :returns: whether file is open + :rtype: bool + """ + f_extension = os.path.splitext(path)[1] + if f_extension != '.gpkg': + return False + if os.path.exists(f'{path}-wal'): + return True + return False + + def ignore_file(self, file): + """ + Helper function for blacklisting certain types of files. + + :param file: file path in project + :type file: str + :returns: whether file should be ignored + :rtype: bool + """ + ignore_ext = re.compile(r'({})$'.format('|'.join(re.escape(x) for x in ['-shm', '-wal', '~', 'pyc', 'swap']))) + ignore_files = ['.DS_Store', '.directory'] + name, ext = os.path.splitext(file) + if ext and ignore_ext.search(ext): + return True + if file in ignore_files: + return True + return False + + def inspect_files(self): + """ + Inspect files in project directory and return metadata. + + :returns: metadata for files in project directory in server required format + :rtype: list[dict] + """ + files_meta = [] + for root, dirs, files in os.walk(self.dir, topdown=True): + dirs[:] = [d for d in dirs if d not in ['.mergin']] + for file in files: + if self.ignore_file(file): + continue + + abs_path = os.path.abspath(os.path.join(root, file)) + rel_path = os.path.relpath(abs_path, start=self.dir) + proj_path = '/'.join(rel_path.split(os.path.sep)) # we need posix path + files_meta.append({ + "path": proj_path, + "checksum": generate_checksum(abs_path), + "size": os.path.getsize(abs_path), + "mtime": datetime.fromtimestamp(os.path.getmtime(abs_path), tzlocal()) + }) + return files_meta + + def compare_file_sets(self, origin, current): + """ + Helper function to calculate difference between two sets of files metadata using file names and checksums. + + :Example: + + >>> origin = [{'checksum': '08b0e8caddafe74bf5c11a45f65cedf974210fed', 'path': 'base.gpkg', 'size': 2793, 'mtime': '2019-08-26T11:08:34.051221+02:00'}] + >>> current = [{'checksum': 'c9a4fd2afd513a97aba19d450396a4c9df8b2ba4', 'path': 'test.qgs', 'size': 31980, 'mtime': '2019-08-26T11:09:30.051221+02:00'}] + >>> self.compare_file_sets(origin, current) + {"added": [{'checksum': 'c9a4fd2afd513a97aba19d450396a4c9df8b2ba4', 'path': 'test.qgs', 'size': 31980, 'mtime': '2019-08-26T11:09:30.051221+02:00'}], "removed": [[{'checksum': '08b0e8caddafe74bf5c11a45f65cedf974210fed', 'path': 'base.gpkg', 'size': 2793, 'mtime': '2019-08-26T11:08:34.051221+02:00'}]], "renamed": [], "updated": []} + + :param origin: origin set of files metadata + :type origin: list[dict] + :param current: current set of files metadata to be compared against origin + :type current: list[dict] + :returns: changes between two sets with change type + :rtype: dict[str, list[dict]]' + """ + origin_map = {f["path"]: f for f in origin} + current_map = {f["path"]: f for f in current} + removed = [f for f in origin if f["path"] not in current_map] + + added = [] + for f in current: + if f["path"] in origin_map: + continue + added.append(f) + + moved = [] + for rf in removed: + match = find( + current, + lambda f: f["checksum"] == rf["checksum"] and f["size"] == rf["size"] and all( + f["path"] != mf["path"] for mf in moved) + ) + if match: + moved.append({**rf, "new_path": match["path"]}) + + added = [f for f in added if all(f["path"] != mf["new_path"] for mf in moved)] + removed = [f for f in removed if all(f["path"] != mf["path"] for mf in moved)] + + updated = [] + for f in current: + path = f["path"] + if path not in origin_map: + continue + # with open WAL files we don't know yet, better to mark file as updated + if not self.is_gpkg_open(self.fpath(path)) and f["checksum"] == origin_map[path]["checksum"]: + continue + f["origin_checksum"] = origin_map[path]["checksum"] + updated.append(f) + + return { + "renamed": moved, + "added": added, + "removed": removed, + "updated": updated + } + + def get_pull_changes(self, server_files): + """ + Calculate changes needed to be pulled from server. + + Calculate diffs between local files metadata and server's ones. Because simple metadata like file size or + checksum are not enough to determine geodiff files changes, evaluate also their history (provided by server). + For small files ask for full versions of geodiff files, otherwise determine list of diffs needed to update file. + + .. seealso:: self.compare_file_sets + + :param server_files: list of server files' metadata (see also self.inspect_files()) + :type server_files: list[dict] + :returns: changes metadata for files to be pulled from server + :rtype: dict + """ + changes = self.compare_file_sets(self.metadata['files'], server_files) + if not self.geodiff: + return changes + + not_updated = [] + for file in changes['updated']: + # for small geodiff files it does not make sense to download diff and then apply it (slow) + if not self.is_versioned_file(file["path"]): + continue + + diffs = [] + diffs_size = 0 + is_updated = False + # need to track geodiff file history to see if there were any changes + for k, v in file['history'].items(): + if int_version(k) <= int_version(self.metadata['version']): + continue # ignore history of no interest + is_updated = True + if 'diff' in v: + diffs.append(v['diff']['path']) + diffs_size += v['diff']['size'] + else: + diffs = [] + break # we found force update in history, does not make sense to download diffs + + if is_updated: + file['diffs'] = diffs + else: + not_updated.append(file) + + changes['updated'] = [f for f in changes['updated'] if f not in not_updated] + return changes + + def get_push_changes(self): + """ + Calculate changes needed to be pushed to server. + + Calculate diffs between local files metadata and actual files in project directory. Because simple metadata like + file size or checksum are not enough to determine geodiff files changes, geodiff tool is used to determine change + of file content and update corresponding metadata. + + .. seealso:: self.compare_file_sets + + :returns: changes metadata for files to be pushed to server + :rtype: dict + """ + changes = self.compare_file_sets(self.metadata['files'], self.inspect_files()) + # do checkpoint to push changes from wal file to gpkg + for file in changes['added'] + changes['updated']: + size, checksum = do_sqlite_checkpoint(self.fpath(file["path"])) + if size and checksum: + file["size"] = size + file["checksum"] = checksum + file['chunks'] = [str(uuid.uuid4()) for i in range(math.ceil(file["size"] / UPLOAD_CHUNK_SIZE))] + + if not self.geodiff: + return changes + + # need to check for for real changes in geodiff files using geodiff tool (comparing checksum is not enough) + not_updated = [] + for file in changes['updated']: + path = file["path"] + if not self.is_versioned_file(path): + continue + + # we use geodiff to check if we can push only diff files + current_file = self.fpath(path) + origin_file = self.fpath_meta(path) + diff_id = str(uuid.uuid4()) + diff_name = path + '-diff-' + diff_id + diff_file = self.fpath_meta(diff_name) + try: + self.geodiff.create_changeset(origin_file, current_file, diff_file) + if self.geodiff.has_changes(diff_file): + diff_size = os.path.getsize(diff_file) + file['checksum'] = file['origin_checksum'] # need to match basefile on server + file['chunks'] = [str(uuid.uuid4()) for i in range(math.ceil(diff_size / UPLOAD_CHUNK_SIZE))] + file['mtime'] = datetime.fromtimestamp(os.path.getmtime(current_file), tzlocal()) + file['diff'] = { + "path": diff_name, + "checksum": generate_checksum(diff_file), + "size": diff_size, + 'mtime': datetime.fromtimestamp(os.path.getmtime(diff_file), tzlocal()) + } + else: + not_updated.append(file) + except (pygeodiff.GeoDiffLibError, pygeodiff.GeoDiffLibConflictError) as e: + # changes from wal file already committed + pass + + changes['updated'] = [f for f in changes['updated'] if f not in not_updated] + return changes + + def get_list_of_push_changes(self, push_changes): + changes = {} + for idx, file in enumerate(push_changes["updated"]): + if "diff" in file: + changeset_path = file["diff"]["path"] + changeset = self.fpath_meta(changeset_path) + result_file = self.fpath("change_list" + str(idx), self.meta_dir) + try: + self.geodiff.list_changes_summary(changeset, result_file) + with open(result_file, 'r') as f: + change = f.read() + changes[file["path"]] = json.loads(change) + os.remove(result_file) + except (pygeodiff.GeoDiffLibError, pygeodiff.GeoDiffLibConflictError): + pass + return changes + + def apply_pull_changes(self, changes, temp_dir): + """ + Apply changes pulled from server. + + Update project files according to file changes. Apply changes to geodiff basefiles as well + so they are up to date with server. In case of conflicts create backups from locally modified versions. + + .. seealso:: self.pull_changes + + :param changes: metadata for pulled files + :type changes: dict[str, list[dict]] + :param temp_dir: directory with downloaded files from server + :type temp_dir: str + :returns: files where conflicts were found + :rtype: list[str] + """ + conflicts = [] + local_changes = self.get_push_changes() + modified = {} + for f in local_changes["added"] + local_changes["updated"]: + modified.update({f['path']: f}) + for f in local_changes["renamed"]: + modified.update({f['new_path']: f}) + + local_files_map = {} + for f in self.inspect_files(): + local_files_map.update({f['path']: f}) + + for k, v in changes.items(): + for item in v: + path = item['path'] if k != 'renamed' else item['new_path'] + src = self.fpath(path, temp_dir) if k != 'renamed' else self.fpath(item["path"]) + dest = self.fpath(path) + basefile = self.fpath_meta(path) + + # special care is needed for geodiff files + # 'src' here is server version of file and 'dest' is locally modified + if self.is_versioned_file(path) and k == 'updated': + if path in modified: + server_diff = self.fpath(f'{path}-server_diff', temp_dir) # diff between server file and local basefile + local_diff = self.fpath(f'{path}-local_diff', temp_dir) + + # temporary backup of file pulled from server for recovery + f_server_backup = self.fpath(f'{path}-server_backup', temp_dir) + shutil.copy(src, f_server_backup) + + # create temp backup (ideally with geodiff) of locally modified file if needed later + f_conflict_file = self.fpath(f'{path}-local_backup', temp_dir) + try: + self.geodiff.create_changeset(basefile, dest, local_diff) + shutil.copy(basefile, f_conflict_file) + self.geodiff.apply_changeset(f_conflict_file, local_diff) + except (pygeodiff.GeoDiffLibError, pygeodiff.GeoDiffLibConflictError): + # FIXME hard copy can lead to data loss if changes from -wal file were not flushed !!! + shutil.copy(dest, f_conflict_file) + + # try to do rebase magic + try: + self.geodiff.create_changeset(basefile, src, server_diff) + self.geodiff.rebase(basefile, src, dest) + # make sure basefile is in the same state as remote server file (for calc of push changes) + self.geodiff.apply_changeset(basefile, server_diff) + except (pygeodiff.GeoDiffLibError, pygeodiff.GeoDiffLibConflictError) as err: + # it would not be possible to commit local changes, they need to end up in new conflict file + shutil.copy(f_conflict_file, dest) # revert file + conflict = self.backup_file(path) + conflicts.append(conflict) + # original file synced with server + shutil.copy(f_server_backup, basefile) + shutil.copy(f_server_backup, dest) + # changes in -wal have been already applied in conflict file or LOST (see above) + if os.path.exists(f'{dest}-wal'): + os.remove(f'{dest}-wal') + if os.path.exists(f'{dest}-shm'): + os.remove(f'{dest}-shm') + else: + # The local file is not modified -> no rebase needed. + # We just apply the diff between our copy and server to both the local copy and its basefile + try: + server_diff = self.fpath(f'{path}-server_diff', temp_dir) # diff between server file and local basefile + # TODO: it could happen that basefile does not exist. + # It was either never created (e.g. when pushing without geodiff) + # or it was deleted by mistake(?) by the user. We should detect that + # when starting pull and download it as well + self.geodiff.create_changeset(basefile, src, server_diff) + self.geodiff.apply_changeset(dest, server_diff) + self.geodiff.apply_changeset(basefile, server_diff) + except (pygeodiff.GeoDiffLibError, pygeodiff.GeoDiffLibConflictError): + # something bad happened and we have failed to patch our local files - this should not happen if there + # wasn't a schema change or something similar that geodiff can't handle. + # FIXME: this is a last resort and may corrupt data! (we should warn user) + shutil.copy(src, dest) + shutil.copy(src, basefile) + else: + # backup if needed + if path in modified and item['checksum'] != local_files_map[path]['checksum']: + conflict = self.backup_file(path) + conflicts.append(conflict) + + if k == 'removed': + os.remove(dest) + if self.is_versioned_file(path): + os.remove(basefile) + elif k == 'renamed': + move_file(src, dest) + if self.is_versioned_file(path): + move_file(self.fpath_meta(item["path"]), basefile) + else: + shutil.copy(src, dest) + if self.is_versioned_file(path): + shutil.copy(src, basefile) + + return conflicts + + def apply_push_changes(self, changes): + """ + For geodiff files update basefiles according to changes pushed to server. + + :param changes: metadata for pulled files + :type changes: dict[str, list[dict]] + """ + if not self.geodiff: + return + for k, v in changes.items(): + for item in v: + path = item['path'] if k != 'renamed' else item['new_path'] + if not self.is_versioned_file(path): + continue + + basefile = self.fpath_meta(path) + if k == 'renamed': + move_file(self.fpath_meta(item["path"]), basefile) + elif k == 'removed': + os.remove(basefile) + elif k == 'added': + shutil.copy(self.fpath(path), basefile) + elif k == 'updated': + # in case for geopackage cannot be created diff (e.g. forced update with committed changes from wal file) + if "diff" not in item: + shutil.copy(self.fpath(path), basefile) + else: + # better to apply diff to previous basefile to avoid issues with geodiff tmp files + changeset = self.fpath_meta(item['diff']['path']) + patch_error = self.apply_diffs(basefile, [changeset]) + if patch_error: + # in case of local sync issues it is safier to remove basefile, next time it will be downloaded from server + os.remove(basefile) + else: + pass + + def backup_file(self, file): + """ + Create backup file next to its origin. + + :param file: path of file in project + :type file: str + :returns: path to backupfile + :rtype: str + """ + src = self.fpath(file) + if not os.path.exists(src): + return + backup_path = self.fpath(f'{file}_conflict_copy') + index = 2 + while os.path.exists(backup_path): + backup_path = self.fpath(f'{file}_conflict_copy{index}') + index += 1 + shutil.copy(src, backup_path) + return backup_path + + def apply_diffs(self, basefile, diffs): + """ + Helper function to update content of geodiff file using list of diffs. + Input file will be overwritten (make sure to create backup if needed). + + :param basefile: abs path to file to be updated + :type basefile: str + :param diffs: list of abs paths to geodiff changeset files + :type diffs: list[str] + :returns: error message if diffs were not successfully applied or None + :rtype: str + """ + error = None + if not self.is_versioned_file(basefile): + return error + + for index, diff in enumerate(diffs): + try: + self.geodiff.apply_changeset(basefile, diff) + except (pygeodiff.GeoDiffLibError, pygeodiff.GeoDiffLibConflictError) as e: + error = str(e) + break + return error diff --git a/mergin/test/test_client.py b/mergin/test/test_client.py index 048e1325..89ce1487 100644 --- a/mergin/test/test_client.py +++ b/mergin/test/test_client.py @@ -110,8 +110,7 @@ def test_create_remote_project_from_local(mc): mc.download_project(project, download_dir) -@pytest.mark.parametrize("parallel", [True, False]) -def test_push_pull_changes(mc, parallel): +def test_push_pull_changes(mc): test_project = 'test_push' project = API_USER + '/' + test_project project_dir = os.path.join(TMP_DIR, test_project) # primary project dir for updates @@ -145,7 +144,7 @@ def test_push_pull_changes(mc, parallel): assert next((f for f in push_changes['updated'] if f['path'] == f_updated), None) assert next((f for f in push_changes['renamed'] if f['path'] == f_renamed), None) - mc.push_project(project_dir, parallel=parallel) + mc.push_project(project_dir) project_info = mc.project_info(project) assert project_info['version'] == 'v2' assert not next((f for f in project_info['files'] if f['path'] == f_removed), None) @@ -178,7 +177,7 @@ def test_push_pull_changes(mc, parallel): assert next((f for f in pull_changes['renamed'] if f['path'] == f_renamed), None) assert next((f for f in push_changes['updated'] if f['path'] == f_updated), None) - mc.pull_project(project_dir_2, parallel=parallel) + mc.pull_project(project_dir_2) assert os.path.exists(os.path.join(project_dir_2, f_added)) assert not os.path.exists(os.path.join(project_dir_2, f_removed)) assert not os.path.exists(os.path.join(project_dir_2, f_renamed))