diff --git a/server/.test.env b/server/.test.env index bdaa7bfa..63294a3f 100644 --- a/server/.test.env +++ b/server/.test.env @@ -24,3 +24,4 @@ SECURITY_BEARER_SALT='bearer' SECURITY_EMAIL_SALT='email' SECURITY_PASSWORD_SALT='password' DIAGNOSTIC_LOGS_DIR=/tmp/diagnostic_logs +GEVENT_WORKER=0 \ No newline at end of file diff --git a/server/mergin/sync/commands.py b/server/mergin/sync/commands.py index 21f5ef15..882b73b6 100644 --- a/server/mergin/sync/commands.py +++ b/server/mergin/sync/commands.py @@ -127,3 +127,50 @@ def remove(project_name): project.removed_by = None db.session.commit() click.secho("Project removed", fg="green") + + @project.command() + @click.argument("project-name", callback=normalize_input(lowercase=False)) + @click.option("--since", type=int, required=False) + @click.option("--to", type=int, required=False) + def create_checkpoint(project_name, since=None, to=None): + """Create project delta checkpoint, corresponding lower checkpoints and merged diffs for project""" + ws, name = split_project_path(project_name) + workspace = current_app.ws_handler.get_by_name(ws) + if not workspace: + click.secho("ERROR: Workspace does not exist", fg="red", err=True) + sys.exit(1) + project = ( + Project.query.filter_by(workspace_id=workspace.id, name=name) + .filter(Project.storage_params.isnot(None)) + .first() + ) + if not project: + click.secho("ERROR: Project does not exist", fg="red", err=True) + sys.exit(1) + + since = since if since is not None else 0 + to = to if to is not None else project.latest_version + if since < 0 or to < 1: + click.secho( + "ERROR: Invalid version number, minimum version for 'since' is 0 and minimum version for 'to' is 1", + fg="red", + err=True, + ) + sys.exit(1) + + if to > project.latest_version: + click.secho( + "ERROR: 'to' version exceeds latest project version", fg="red", err=True + ) + sys.exit(1) + + if since >= to: + click.secho( + "ERROR: 'since' version must be less than 'to' version", + fg="red", + err=True, + ) + sys.exit(1) + + project.get_delta_changes(since, to) + click.secho("Project checkpoint(s) created", fg="green") diff --git a/server/mergin/sync/config.py b/server/mergin/sync/config.py index 7200dae5..c2556f25 100644 --- a/server/mergin/sync/config.py +++ b/server/mergin/sync/config.py @@ -75,3 +75,5 @@ class Configuration(object): UPLOAD_CHUNKS_EXPIRATION = config( "UPLOAD_CHUNKS_EXPIRATION", default=86400, cast=int ) + # whether client can pull using v2 apis + V2_PULL_ENABLED = config("V2_PULL_ENABLED", default=True, cast=bool) diff --git a/server/mergin/sync/errors.py b/server/mergin/sync/errors.py index 35985ab9..33b80d74 100644 --- a/server/mergin/sync/errors.py +++ b/server/mergin/sync/errors.py @@ -95,3 +95,10 @@ def to_dict(self) -> Dict: class BigChunkError(ResponseError): code = "BigChunkError" detail = f"Chunk size exceeds maximum allowed size {MAX_CHUNK_SIZE} MB" + + +class DiffDownloadError(ResponseError): + code = "DiffDownloadError" + detail = ( + "Required diff file could not be downloaded as it could not be reconstructed" + ) diff --git a/server/mergin/sync/files.py b/server/mergin/sync/files.py index a85bb5e6..5e28ed70 100644 --- a/server/mergin/sync/files.py +++ b/server/mergin/sync/files.py @@ -4,11 +4,18 @@ import datetime from enum import Enum import os -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Optional, List import uuid from flask import current_app -from marshmallow import ValidationError, fields, EXCLUDE, post_dump, validates_schema +from marshmallow import ( + ValidationError, + fields, + EXCLUDE, + post_dump, + validates_schema, + post_load, +) from pathvalidate import sanitize_filename from .utils import ( @@ -231,7 +238,9 @@ def validate(self, data, **kwargs): class ProjectFileSchema(FileSchema): mtime = DateTimeWithZ() - diff = fields.Nested(FileSchema()) + diff = fields.Nested( + FileSchema(), + ) @post_dump def patch_field(self, data, **kwargs): @@ -239,3 +248,114 @@ def patch_field(self, data, **kwargs): if not data.get("diff"): data.pop("diff", None) return data + + +@dataclass +class DeltaDiffFile: + """Diff file path in diffs list""" + + path: str + + +class DeltaChangeDiffFileSchema(ma.Schema): + """Schema for diff file path in diffs list""" + + path = fields.String(required=True) + + +@dataclass +class DeltaChangeBase(File): + """Base class for changes stored in json list or returned from delta endpoint""" + + change: PushChangeType + version: int + + +@dataclass +class DeltaChangeMerged(DeltaChangeBase): + """Delta item with merged diffs to list of multiple diff files""" + + diffs: List[DeltaDiffFile] = field(default_factory=list) + + def to_data_delta(self): + """Convert DeltaMerged to DeltaData with single diff""" + result = DeltaChange( + path=self.path, + size=self.size, + checksum=self.checksum, + change=self.change, + version=self.version, + ) + if self.diffs: + result.diff = self.diffs[0].path + return result + + +@dataclass +class DeltaChange(DeltaChangeBase): + """Change items stored in database as list of this item with single diff file""" + + diff: Optional[str] = None + + def to_merged(self) -> DeltaChangeMerged: + """Convert to DeltaMerged with multiple diffs""" + result = DeltaChangeMerged( + path=self.path, + size=self.size, + checksum=self.checksum, + change=self.change, + version=self.version, + ) + if self.diff: + result.diffs = [DeltaDiffFile(path=self.diff)] + return result + + +class DeltaChangeBaseSchema(ma.Schema): + """Base schema for delta json and response from delta endpoint""" + + path = fields.String(required=True) + size = fields.Integer(required=True) + checksum = fields.String(required=True) + version = fields.Integer(required=True) + change = fields.Enum(PushChangeType, by_value=True, required=True) + + +class DeltaChangeSchema(DeltaChangeBaseSchema): + """Schema for change data in changes column""" + + diff = fields.String(required=False) + + @post_load + def make_object(self, data, **kwargs): + return DeltaChange(**data) + + @post_dump + def patch_field(self, data, **kwargs): + # drop 'diff' key entirely if empty or None as database would expect + if not data.get("diff"): + data.pop("diff", None) + return data + + +class DeltaChangeItemSchema(DeltaChangeBaseSchema): + """Schema for delta changes response""" + + version = fields.Function(lambda obj: f"v{obj.version}") + diffs = fields.List(fields.Nested(DeltaChangeDiffFileSchema())) + + @post_dump + def patch_field(self, data, **kwargs): + # drop 'diffs' key entirely if empty or None as clients would expect + if not data.get("diffs"): + data.pop("diffs", None) + return data + + +class DeltaChangeRespSchema(ma.Schema): + """Schema for list of delta changes wrapped in items field""" + + items = fields.List(fields.Nested(DeltaChangeItemSchema())) + + class Meta: + unknown = EXCLUDE diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index 9574a69d..30e0901b 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial from __future__ import annotations import json +import logging import os import time import uuid @@ -15,15 +16,18 @@ from blinker import signal from flask_login import current_user from pygeodiff import GeoDiff -from sqlalchemy import text, null, desc, nullslast +from sqlalchemy import text, null, desc, nullslast, tuple_ from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM from sqlalchemy.types import String from sqlalchemy.ext.hybrid import hybrid_property -from pygeodiff.geodifflib import GeoDiffLibError +from pygeodiff.geodifflib import GeoDiffLibError, GeoDiffLibConflictError from flask import current_app from .files import ( - File, + DeltaChangeMerged, + DeltaDiffFile, + DeltaChange, + DeltaChangeSchema, ProjectDiffFile, ProjectFileChange, ChangesSchema, @@ -37,6 +41,9 @@ from ..app import db from .storages import DiskStorage from .utils import ( + LOG_BASE, + Checkpoint, + generate_checksum, Toucher, get_chunk_location, get_project_path, @@ -58,6 +65,16 @@ class FileSyncErrorType(Enum): SYNC_ERROR = "sync error" +class ChangeComparisonAction(Enum): + """Actions to take when comparing two changes""" + + REPLACE = "replace" + DELETE = "delete" + UPDATE = "update" + UPDATE_DIFF = "update_diff" + EXCLUDE = "exclude" # Return None to exclude the file + + class Project(db.Model): id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) name = db.Column(db.String, index=True) @@ -173,14 +190,18 @@ def files(self) -> List[ProjectFile]: SELECT fp.path, fh.size, - fh.diff, fh.location, fh.checksum, - pv.created AS mtime + pv.created AS mtime, + fd.path as diff_path, + fd.size as diff_size, + fd.checksum as diff_checksum, + fd.location as diff_location FROM files_ids LEFT OUTER JOIN file_history fh ON fh.id = files_ids.fh_id LEFT OUTER JOIN project_file_path fp ON fp.id = fh.file_path_id - LEFT OUTER JOIN project_version pv ON pv.id = fh.version_id; + LEFT OUTER JOIN project_version pv ON pv.id = fh.version_id + LEFT OUTER JOIN file_diff fd ON fd.file_path_id = fh.file_path_id AND fd.version = fh.project_version_name and fd.rank = 0; """ params = {"project_id": self.id} files = [ @@ -190,7 +211,16 @@ def files(self) -> List[ProjectFile]: checksum=row.checksum, location=row.location, mtime=row.mtime, - diff=ProjectDiffFile(**row.diff) if row.diff else None, + diff=( + ProjectDiffFile( + path=row.diff_path, + size=row.diff_size, + checksum=row.diff_checksum, + location=row.diff_location, + ) + if row.diff_path + else None + ), ) for row in db.session.execute(query, params).fetchall() ] @@ -255,6 +285,11 @@ def delete(self, removed_by: int = None): db.session.execute( upload_table.delete().where(upload_table.c.project_id == self.id) ) + # remove project version delta related to project + delta_table = ProjectVersionDelta.__table__ + db.session.execute( + delta_table.delete().where(delta_table.c.project_id == self.id) + ) self.project_users.clear() access_requests = ( AccessRequest.query.filter_by(project_id=self.id) @@ -333,6 +368,63 @@ def bulk_roles_update(self, access: Dict) -> Set[int]: return set(id_diffs) + def get_delta_changes( + self, since: int, to: int + ) -> Optional[List[DeltaChangeMerged]]: + """ + Get changes between two versions, merging them if needed. + - create FileDiff checkpoints if needed + - create ProjectVersionDelta checkpoints if needed with changes json + """ + if since > to: + logging.error( + f"Start version {since} is higher than end version {to} - broken history" + ) + return + if since == to: + return None + project_id = self.id + expected_checkpoints = Checkpoint.get_checkpoints(since + 1, to) + expected_deltas: List[ProjectVersionDelta] = ( + ProjectVersionDelta.query.filter( + ProjectVersionDelta.project_id == project_id, + ProjectVersionDelta.version > since, + ProjectVersionDelta.version <= to, + tuple_(ProjectVersionDelta.rank, ProjectVersionDelta.version).in_( + [(item.rank, item.end) for item in expected_checkpoints] + ), + ) + .order_by(ProjectVersionDelta.version) + .all() + ) + existing_delta_map = {(c.rank, c.version): c for c in expected_deltas} + + result: List[DeltaChange] = [] + for checkpoint in expected_checkpoints: + existing_delta = existing_delta_map.get((checkpoint.rank, checkpoint.end)) + + # we have delta in database, just return delta data from it + if existing_delta: + result.extend(DeltaChangeSchema(many=True).load(existing_delta.changes)) + continue + + # If higher rank delta checkopoint does not exists we need to create it + if checkpoint.rank > 0: + new_checkpoint = ProjectVersionDelta.create_checkpoint( + project_id, checkpoint + ) + if new_checkpoint: + result.extend( + DeltaChangeSchema(many=True).load(new_checkpoint.changes) + ) + else: + logging.error( + f"Not possible to create checkpoint for project {project_id} in range {checkpoint.start}-{checkpoint.end}" + ) + return + + return ProjectVersionDelta.merge_changes(result) + class ProjectRole(Enum): """Project roles ordered by rank (do not change)""" @@ -398,10 +490,19 @@ class ProjectFilePath(db.Model): ), ) + project = db.relationship( + "Project", + uselist=False, + ) + def __init__(self, project_id, path): self.project_id = project_id self.path = path + def generate_diff_name(self): + """Generate uniqute diff file name for server generated diff""" + return mergin_secure_filename(f"{self.path}-diff-{uuid.uuid4()}") + class LatestProjectFiles(db.Model): """Store project latest version files history ids""" @@ -445,7 +546,6 @@ class FileHistory(db.Model): location = db.Column(db.String) size = db.Column(db.BigInteger, nullable=False) checksum = db.Column(db.String, nullable=False) - diff = db.Column(JSONB) change = db.Column( ENUM( *PushChangeType.values(), @@ -479,17 +579,6 @@ class FileHistory(db.Model): file_path_id, project_version_name.desc(), ), - db.CheckConstraint( - text( - """ - CASE - WHEN (change = 'update_diff') THEN diff IS NOT NULL - ELSE diff IS NULL - END - """ - ), - name="changes_with_diff", - ), ) def __init__( @@ -500,22 +589,53 @@ def __init__( location: str, change: PushChangeType, diff: dict = None, + version_name: int = None, ): self.file = file self.size = size self.checksum = checksum self.location = location - self.diff = diff if diff is not None else null() self.change = change.value + self.project_version_name = version_name + + if diff is not None: + basefile = FileHistory.get_basefile(file.id, version_name) + diff_file = FileDiff( + basefile=basefile, + path=diff.get("path"), + rank=0, + version=version_name, + size=diff.get("size"), + checksum=diff.get("checksum"), + ) + db.session.add(diff_file) @property def path(self) -> str: return self.file.path + @property + def diff(self) -> Optional[FileDiff]: + """Diff file pushed with UPDATE_DIFF change type. + + In FileDiff table it is defined as diff related to file, saved for the same project version with rank 0 (elementar diff) + """ + if self.change != PushChangeType.UPDATE_DIFF.value: + return + + return FileDiff.query.filter_by( + file_path_id=self.file_path_id, version=self.project_version_name, rank=0 + ).first() + @property def diff_file(self) -> Optional[ProjectDiffFile]: if self.diff: - return ProjectDiffFile(**self.diff) + return ProjectDiffFile( + path=self.diff.path, + size=self.diff.size, + checksum=self.diff.checksum, + location=self.diff.location, + ) @property def mtime(self) -> datetime: @@ -527,7 +647,7 @@ def abs_path(self) -> str: @property def expiration(self) -> Optional[datetime]: - if not self.diff: + if not self.diff_file: return if os.path.exists(self.abs_path): @@ -573,107 +693,640 @@ def changes( break # if we are interested only in 'diffable' history (not broken with forced update) - if ( - diffable - and item.change == PushChangeType.UPDATE.value - and not item.diff - ): + if diffable and item.change == PushChangeType.UPDATE.value: break return history @classmethod def diffs_chain( - cls, project: Project, file: str, version: int - ) -> Tuple[Optional[FileHistory], List[Optional[File]]]: - """Find chain of diffs from the closest basefile that leads to a given file at certain project version. + cls, file_id: int, version: int + ) -> Tuple[Optional[FileHistory], List[Optional[FileDiff]]]: + """Find chain of diffs from the basefile that leads to a given file at certain project version. Returns basefile and list of diffs for gpkg that needs to be applied to reconstruct file. List of diffs can be empty if basefile was eventually asked. Basefile can be empty if file cannot be reconstructed (removed/renamed). """ + latest_change = ( + cls.query.filter_by(file_path_id=file_id) + .filter(cls.project_version_name <= version) + .order_by(desc(cls.project_version_name)) + .first() + ) + # file never existed prior that version + if not latest_change: + return None, [] + + # the last update to file was a delete + if latest_change.change == PushChangeType.DELETE.value: + return None, [] + + # the last update to file was a create / force update + if latest_change.change in ( + PushChangeType.CREATE.value, + PushChangeType.UPDATE.value, + ): + return latest_change, [] + + basefile = cls.get_basefile(file_id, version) + if not basefile: + return None, [] + diffs = [] - basefile = None - v_x = version # the version of interest - v_last = project.latest_version - - # we ask for the latest version which is always a basefile if the file has not been removed - if v_x == v_last: - latest_change = ( - FileHistory.query.join(ProjectFilePath) - .join(FileHistory.version) - .filter( - ProjectFilePath.path == file, - ProjectVersion.project_id == project.id, + cached_items = Checkpoint.get_checkpoints( + basefile.project_version_name, version + ) + expected_diffs = ( + FileDiff.query.filter_by( + basefile_id=basefile.id, + ) + .filter( + tuple_(FileDiff.rank, FileDiff.version).in_( + [(item.rank, item.end) for item in cached_items] ) - .order_by(desc(ProjectVersion.created)) - .first() ) - if latest_change.change != PushChangeType.DELETE.value: - return latest_change, [] + .all() + ) + + for item in cached_items: + diff = next( + ( + d + for d in expected_diffs + if d.rank == item.rank and d.version == item.end + ), + None, + ) + if diff and os.path.exists(diff.abs_path): + diffs.append(diff) + elif item.rank > 0: + # fallback if checkpoint does not exist: replace merged diff with individual diffs + individual_diffs = ( + FileDiff.query.filter_by( + basefile_id=basefile.id, + rank=0, + ) + .filter( + FileDiff.version >= item.start, FileDiff.version <= item.end + ) + .order_by(FileDiff.version) + .all() + ) + diffs.extend(individual_diffs) else: - # file is actually not in the latest project version - return None, [] - - # check if it would not be faster to look up from the latest version - backward = (v_last - v_x) < v_x - - if backward: - # get list of file history changes starting with the latest version (v_last, ..., v_x+n, (..., v_x)) - history = FileHistory.changes(project.id, file, v_x, v_last, diffable=True) - if history: - first_change = history[-1] - # we have either full history of changes or v_x = v_x+n => no basefile in way, it is 'diffable' from the end - if first_change.diff: - # omit diff for target version as it would lead to previous version if reconstructed backward - diffs = [ - value.diff_file - for value in reversed(history) - if value.version.name != v_x - ] - basefile = history[0] - return basefile, diffs - # there was either breaking change or v_x is a basefile itself - else: - # we asked for basefile - if v_x == first_change.version.name and first_change.change in [ - PushChangeType.CREATE.value, - PushChangeType.UPDATE.value, - ]: - return first_change, [] - # file was removed (or renamed for backward compatibility) - elif v_x == first_change.version.name: - return basefile, diffs - # there was a breaking change in v_x+n, and we need to search from start - else: - pass - - # we haven't found anything so far, search from v1 - if not (basefile and diffs): - # get ordered dict of file history starting with version of interest (v_x, ..., v_x-n, (..., v_1)) - history = FileHistory.changes(project.id, file, 1, v_x, diffable=True) - if history: - first_change = history[-1] - # we found basefile - if first_change.change in [ - PushChangeType.CREATE.value, - PushChangeType.UPDATE.value, - ]: - basefile = first_change - if v_x == first_change.version.name: - # we asked for basefile - diffs = [] - else: - # basefile has no diff - diffs = [ - value.diff_file for value in list(reversed(history))[1:] - ] - # file was removed (or renamed for backward compatibility) - else: - pass + # we asked for individual diff but there is no such diff as there was not change at that version + continue return basefile, diffs + @classmethod + def get_basefile(cls, file_path_id: int, version: int) -> Optional[FileHistory]: + """Get basefile (start of file diffable history) for diff file change at some version""" + return ( + cls.query.filter_by(file_path_id=file_path_id) + .filter( + cls.project_version_name < version, + cls.change.in_( + [PushChangeType.CREATE.value, PushChangeType.UPDATE.value] + ), + ) + .order_by(desc(cls.project_version_name)) + .first() + ) + + +class FileDiff(db.Model): + """File diffs related to versioned files, also contain higher order (rank) merged diffs""" + + id = db.Column(db.BigInteger, primary_key=True, autoincrement=True) + file_path_id = db.Column( + db.BigInteger, + db.ForeignKey("project_file_path.id", ondelete="CASCADE"), + nullable=False, + ) + # reference to actual full gpkg file + basefile_id = db.Column( + db.BigInteger, + db.ForeignKey("file_history.id", ondelete="CASCADE"), + index=True, + nullable=False, + ) + path = db.Column(db.String, nullable=False, index=True) + # exponential order of merged diff, 0 is a source diff file uploaded by user, > 0 is merged diff + rank = db.Column(db.Integer, nullable=False, index=True) + # to which project version is this linked + version = db.Column(db.Integer, nullable=False, index=True) + # path on FS relative to project directory + location = db.Column(db.String) + # size and checksum are nullable as for merged diffs (higher orders) they might not exist on disk yet + size = db.Column(db.BigInteger, nullable=True) + checksum = db.Column(db.String, nullable=True) + + __table_args__ = ( + db.UniqueConstraint("file_path_id", "rank", "version", name="unique_diff"), + db.Index("ix_file_diff_file_path_id_version_rank", file_path_id, version, rank), + ) + + file = db.relationship("ProjectFilePath", uselist=False) + + def __init__( + self, + basefile: FileHistory, + path: str, + rank: int, + version: int, + size: int = None, + checksum: str = None, + ): + self.basefile_id = basefile.id + self.file_path_id = basefile.file_path_id + self.path = path + self.size = size + self.checksum = checksum + self.rank = rank + self.version = version + self.location = ( + os.path.join("diffs", path) + if rank > 0 + else os.path.join(f"v{version}", path) + ) + + @property + def abs_path(self) -> str: + """ + Return absolute path of the diff file on the file system. + """ + return os.path.join(self.file.project.storage.project_dir, self.location) + + @staticmethod + def can_create_checkpoint(file_path_id: int, checkpoint: Checkpoint) -> bool: + """Check if it makes sense to create a diff file for a checkpoint, e.g. there were relevant changes within the range without breaking changes""" + + basefile = FileHistory.get_basefile(file_path_id, checkpoint.end) + if not basefile: + return False + + file_was_deleted = ( + FileHistory.query.filter_by(file_path_id=file_path_id) + .filter( + FileHistory.project_version_name + >= max(basefile.project_version_name, checkpoint.start), + FileHistory.project_version_name <= checkpoint.end, + FileHistory.change == PushChangeType.DELETE.value, + ) + .count() + > 0 + ) + if file_was_deleted: + return False + + query = FileDiff.query.filter_by(basefile_id=basefile.id).filter( + FileDiff.rank == 0 + ) + + # rank 0 is a special case we only verify it exists + if checkpoint.rank == 0: + query = query.filter(FileDiff.version == checkpoint.end) + # for higher ranks we need to check if there were diff updates in that range + else: + query = query.filter( + FileDiff.version >= checkpoint.start, + FileDiff.version <= checkpoint.end, + ) + + return query.count() > 0 + + def construct_checkpoint(self) -> bool: + """Create a diff file checkpoint (aka. merged diff). + Find all smaller diffs which are needed to create the final diff file and merge them. + In case of missing some lower rank checkpoints, create them recursively. + + Once checkpoint is created, size and checksum are updated in the database. + + Returns: + bool: True if checkpoint was successfully created or already present + """ + logging.debug( + f"Construct checkpoint for file {self.path} v{self.version} of rank {self.rank}" + ) + + if os.path.exists(self.abs_path): + return True + + # merged diffs can only be created for certain versions + if self.version % LOG_BASE: + return False + + cache_level_index = self.version // LOG_BASE**self.rank + try: + cache_level = Checkpoint(rank=self.rank, index=cache_level_index) + except ValueError: + logging.error( + f"Invalid record for cached level of rank {self.rank} and index {cache_level_index} for file {self.file_path_id}" + ) + return False + + basefile = FileHistory.get_basefile(self.file_path_id, cache_level.end) + if not basefile: + logging.error(f"Unable to find basefile for file {self.file_path_id}") + return False + + if basefile.project_version_name > cache_level.start: + logging.error( + f"Basefile version {basefile.project_version_name} is higher than start version {cache_level.start} - broken history" + ) + return False + + diffs_paths = [] + # let's confirm we have all intermediate diffs needed, if not, we need to create them (recursively) first + cached_items = Checkpoint.get_checkpoints( + cache_level.start, cache_level.end - 1 + ) + expected_diffs = ( + FileDiff.query.filter_by( + basefile_id=basefile.id, + ) + .filter( + tuple_(FileDiff.rank, FileDiff.version).in_( + [(item.rank, item.end) for item in cached_items] + ) + ) + .all() + ) + + for item in cached_items: + # basefile is a start of the diff chain + if item.start <= basefile.project_version_name: + continue + + # find diff in table and on disk + # diffs might not exist because theye were not created yet or there were no changes (e.g. for zeroth rank diffs) + diff = next( + ( + d + for d in expected_diffs + if d.rank == item.rank and d.version == item.end + ), + None, + ) + + if not diff: + # lower rank diff not even in DB yet - create it and try to construct merged file + if item.rank > 0 and FileDiff.can_create_checkpoint( + self.file_path_id, item + ): + diff = FileDiff( + basefile=basefile, + version=item.end, + rank=item.rank, + path=basefile.file.generate_diff_name(), + size=None, + checksum=None, + ) + db.session.add(diff) + db.session.commit() + else: + # such diff is not expected to exist + continue + + diff_exists = diff.construct_checkpoint() + if diff_exists: + diffs_paths.append(diff.abs_path) + else: + logging.error( + f"Unable to create checkpoint diff for {item} for file {self.file_path_id}" + ) + return False + + # we apply latest change (if any) on previous version + end_diff = FileDiff.query.filter_by( + basefile_id=basefile.id, + rank=0, + version=cache_level.end, + ).first() + + if end_diff: + diffs_paths.append(end_diff.abs_path) + + if not diffs_paths: + logging.warning( + f"No diffs for next checkpoint for file {self.file_path_id}" + ) + return False + + project: Project = basefile.file.project + os.makedirs(project.storage.diffs_dir, exist_ok=True) + try: + project.storage.geodiff.concat_changes(diffs_paths, self.abs_path) + except (GeoDiffLibError, GeoDiffLibConflictError): + logging.error( + f"Geodiff: Failed to merge diffs for file {self.file_path_id}" + ) + return False + + self.size = os.path.getsize(self.abs_path) + self.checksum = generate_checksum(self.abs_path) + db.session.commit() + return True + + +class ProjectVersionDelta(db.Model): + id = db.Column(db.BigInteger, primary_key=True, autoincrement=True) + version = db.Column(db.Integer, nullable=False, index=True) + # exponential order of changes json + rank = db.Column(db.Integer, nullable=False, index=True) + # to which project is this linked + project_id = db.Column( + UUID(as_uuid=True), + db.ForeignKey("project.id", ondelete="CASCADE"), + index=True, + nullable=False, + ) + # cached changes for versions from start to end (inclusive) + changes = db.Column(JSONB, nullable=False) + + __table_args__ = ( + db.UniqueConstraint("project_id", "version", "rank", name="unique_deltas"), + db.Index( + "ix_project_version_delta_project_id_version_rank", + project_id, + version, + rank, + ), + ) + project = db.relationship( + "Project", + uselist=False, + ) + + @staticmethod + def merge_changes( + items: List[DeltaChange], + ) -> List[DeltaChangeMerged]: + """ + Merge changes json array objects into one list of changes. + Changes are merged based on file path and change type. + """ + updating_files: Set[str] = set() + # sorting changes by version to apply them in correct order + items.sort(key=lambda x: x.version) + + # Merge changes for each file in a single pass + result: Dict[str, DeltaChangeMerged] = {} + for item in items: + path = item.path + current = item.to_merged() + + # First change for this file + if path not in result: + result[path] = current + # track existing paths to avoid deleting files that are already in history before + if current.change != PushChangeType.CREATE: + updating_files.add(path) + continue + + # Compare and merge with previous change for this file + can_delete = path in updating_files + new_change = ProjectVersionDelta._compare_changes( + result[path], current, can_delete + ) + + # Update result (or remove if no change is detected) + if new_change is not None: + result[path] = new_change + else: + del result[path] + + return list(result.values()) + + @staticmethod + def _compare_changes( + previous: DeltaChangeMerged, + new: DeltaChangeMerged, + prevent_delete_change: bool, + ) -> Optional[DeltaChangeMerged]: + """ + Compare and merge two changes for the same file. + + Args: + previous: Previously accumulated change + new: New change to compare + prevent_delete_change: Whether the change can be deleted when resolving create+delete sequences + + Returns: + Merged change or None if file should be excluded + """ + + # Map change type pairs to actions + action_map = { + # create + delete = file is transparent for current changes -> delete it + ( + PushChangeType.CREATE, + PushChangeType.DELETE, + ): ChangeComparisonAction.DELETE, + # create + update = create with updated info + ( + PushChangeType.CREATE, + PushChangeType.UPDATE, + ): ChangeComparisonAction.UPDATE, + ( + PushChangeType.CREATE, + PushChangeType.UPDATE_DIFF, + ): ChangeComparisonAction.UPDATE, + ( + PushChangeType.CREATE, + PushChangeType.CREATE, + ): ChangeComparisonAction.EXCLUDE, + # update + update_diff = update with latest info + ( + PushChangeType.UPDATE, + PushChangeType.UPDATE_DIFF, + ): ChangeComparisonAction.UPDATE, + ( + PushChangeType.UPDATE, + PushChangeType.UPDATE, + ): ChangeComparisonAction.REPLACE, + ( + PushChangeType.UPDATE, + PushChangeType.DELETE, + ): ChangeComparisonAction.REPLACE, + ( + PushChangeType.UPDATE, + PushChangeType.CREATE, + ): ChangeComparisonAction.REPLACE, + # update_diff + update_diff = update_diff with latest info with proper order of diffs + ( + PushChangeType.UPDATE_DIFF, + PushChangeType.UPDATE_DIFF, + ): ChangeComparisonAction.UPDATE_DIFF, + ( + PushChangeType.UPDATE_DIFF, + PushChangeType.UPDATE, + ): ChangeComparisonAction.REPLACE, + ( + PushChangeType.UPDATE_DIFF, + PushChangeType.DELETE, + ): ChangeComparisonAction.REPLACE, + ( + PushChangeType.UPDATE_DIFF, + PushChangeType.CREATE, + ): ChangeComparisonAction.EXCLUDE, + ( + PushChangeType.DELETE, + PushChangeType.CREATE, + ): ChangeComparisonAction.REPLACE, + # delete + update = invalid sequence + ( + PushChangeType.DELETE, + PushChangeType.UPDATE, + ): ChangeComparisonAction.EXCLUDE, + ( + PushChangeType.DELETE, + PushChangeType.UPDATE_DIFF, + ): ChangeComparisonAction.EXCLUDE, + ( + PushChangeType.DELETE, + PushChangeType.DELETE, + ): ChangeComparisonAction.EXCLUDE, + } + + action = action_map.get((previous.change, new.change)) + result = None + if action == ChangeComparisonAction.REPLACE: + result = new + + elif action == ChangeComparisonAction.DELETE: + # if change is create + delete, we can just remove the change from accumulated changes + # only if this action is allowed (file existed before) + if prevent_delete_change: + result = new + + elif action == ChangeComparisonAction.UPDATE: + # handle update case, when previous change was create - just revert to create with new metadata + new.change = previous.change + new.diffs = [] + result = new + + elif action == ChangeComparisonAction.UPDATE_DIFF: + new.diffs = (previous.diffs or []) + (new.diffs or []) + result = new + + return result + + @classmethod + def create_checkpoint( + cls, + project_id: str, + checkpoint: Checkpoint, + ) -> Optional[ProjectVersionDelta]: + """ + Creates and caches new checkpoint and any required FileDiff checkpoints recursively if needed. + """ + delta_range = [] + # our new checkpoint will be created by adding last individual delta to previous checkpoints + expected_checkpoints = Checkpoint.get_checkpoints( + checkpoint.start, checkpoint.end - 1 + ) + expected_checkpoints.append(Checkpoint(rank=0, index=checkpoint.end)) + + expected_deltas = ( + ProjectVersionDelta.query.filter( + ProjectVersionDelta.project_id == project_id, + tuple_(ProjectVersionDelta.rank, ProjectVersionDelta.version).in_( + [(item.rank, item.end) for item in expected_checkpoints] + ), + ) + .order_by(ProjectVersionDelta.version) + .all() + ) + + existing_delta_map = {(c.rank, c.version): c for c in expected_deltas} + # make sure we have all components, if not, created them (recursively) + for item in expected_checkpoints: + existing_delta = existing_delta_map.get((item.rank, item.end)) + if not existing_delta: + existing_delta = cls.create_checkpoint(project_id, item) + + if existing_delta: + delta_range.append(existing_delta) + else: + logging.error( + f"Missing project delta endpoint for {project_id} v{item.end} rank {item.rank} which could not be recreated" + ) + return + + if not delta_range: + logging.warning( + f"No changes found for project {project_id} in range v{checkpoint.start}-v{checkpoint.end} to create checkpoint." + ) + return None + + # dump changes lists from database and flatten list for merging + delta_range = sorted(delta_range, key=lambda x: x.version) + changes = [] + for delta in delta_range: + changes.extend(DeltaChangeSchema(many=True).load(delta.changes)) + merged_delta_items: List[DeltaChange] = [ + d.to_data_delta() for d in cls.merge_changes(changes) + ] + + # Pre-fetch data for all versioned files to create FileDiff checkpoints where it makes sense + versioned_delta_items = [ + item + for item in merged_delta_items + if is_versioned_file(item.path) + and item.change == PushChangeType.UPDATE_DIFF + ] + versioned_file_paths = [delta.path for delta in versioned_delta_items] + if versioned_file_paths: + # get versioned files from DB and lookup their paths to next processing + file_paths = ProjectFilePath.query.filter( + ProjectFilePath.project_id == project_id, + ProjectFilePath.path.in_(versioned_file_paths), + ).all() + file_path_map = {fp.path: fp.id for fp in file_paths} + + for item in versioned_delta_items: + file_path_id = file_path_map.get(item.path) + if not file_path_id: + continue + + # Check if a FileDiff checkpoint already exists + existing_diff_checkpoint = FileDiff.query.filter_by( + file_path_id=file_path_id, + rank=checkpoint.rank, + version=checkpoint.end, + ).first() + # If does not exists, let's create diff with higher rank and some generated path (name of diff file) + if not existing_diff_checkpoint: + base_file = FileHistory.get_basefile(file_path_id, checkpoint.end) + if not base_file: + continue + + if not FileDiff.can_create_checkpoint(file_path_id, checkpoint): + continue + + checkpoint_diff = FileDiff( + basefile=base_file, + path=base_file.file.generate_diff_name(), + rank=checkpoint.rank, + version=checkpoint.end, + ) + # Patch the delta with the path to the new diff checkpoint + item.diff = checkpoint_diff.path + db.session.add(checkpoint_diff) + + checkpoint_delta = ProjectVersionDelta( + project_id=project_id, + version=checkpoint.end, + rank=checkpoint.rank, + changes=DeltaChangeSchema(many=True).dump(merged_delta_items), + ) + db.session.add(checkpoint_delta) + db.session.commit() + return checkpoint_delta + class ProjectVersion(db.Model): id = db.Column(db.Integer, primary_key=True, autoincrement=True) @@ -741,7 +1394,6 @@ def __init__( .filter(ProjectFilePath.path.in_(changed_files_paths)) .all() } - for item in changes: # get existing DB file reference or create a new one (for added files) db_file = existing_files_map.get( @@ -755,9 +1407,10 @@ def __init__( diff=( asdict(item.diff) if (item.change is PushChangeType.UPDATE_DIFF and item.diff) - else null() + else None ), change=item.change, + version_name=self.name, ) fh.version = self fh.project_version_name = self.name @@ -769,6 +1422,29 @@ def __init__( else: latest_files_map[fh.path] = fh.id + # cache changes data json for version checkpoints + # rank 0 is for all changes from start to current version + delta_data = [ + DeltaChange( + path=c.path, + change=c.change, + size=c.size, + checksum=c.checksum, + version=name, + diff=c.diff.path if c.diff else None, + ) + for c in changes + ] + pvd = ProjectVersionDelta( + project_id=project.id, + version=name, + rank=0, + changes=DeltaChangeSchema(many=True).dump(delta_data), + ) + + db.session.add(pvd) + db.session.flush() + # update cached values in project and push to transaction buffer so that self.files is up-to-date self.project.latest_project_files.file_history_ids = latest_files_map.values() db.session.flush() @@ -815,14 +1491,18 @@ def _files_from_start(self): SELECT fp.path, fh.size, - fh.diff, fh.location, fh.checksum, - pv.created AS mtime + pv.created AS mtime, + fd.path as diff_path, + fd.size as diff_size, + fd.checksum as diff_checksum, + fd.location as diff_location FROM latest_changes ch LEFT OUTER JOIN file_history fh ON (fh.file_path_id = ch.id AND fh.project_version_name = ch.version) LEFT OUTER JOIN project_file_path fp ON fp.id = fh.file_path_id LEFT OUTER JOIN project_version pv ON pv.id = fh.version_id + LEFT OUTER JOIN file_diff fd ON fd.file_path_id = fh.file_path_id AND fd.version = fh.project_version_name and fd.rank = 0 WHERE fh.change != 'delete'; """ params = {"project_id": self.project_id, "version": self.name} @@ -871,14 +1551,18 @@ def _files_from_end(self): SELECT fp.path, fh.size, - fh.diff, fh.location, fh.checksum, - pv.created AS mtime + pv.created AS mtime, + fd.path as diff_path, + fd.size as diff_size, + fd.checksum as diff_checksum, + fd.location as diff_location FROM files_changes_before_version ch INNER JOIN file_history fh ON (fh.file_path_id = ch.file_id AND fh.project_version_name = ch.version) INNER JOIN project_file_path fp ON fp.id = fh.file_path_id INNER JOIN project_version pv ON pv.id = fh.version_id + LEFT OUTER JOIN file_diff fd ON fd.file_path_id = fh.file_path_id AND fd.version = fh.project_version_name and fd.rank = 0 WHERE fh.change != 'delete' ORDER BY fp.path; """ @@ -902,7 +1586,16 @@ def files(self) -> List[ProjectFile]: checksum=row.checksum, location=row.location, mtime=row.mtime, - diff=ProjectDiffFile(**row.diff) if row.diff else None, + diff=( + ProjectDiffFile( + path=row.diff_path, + checksum=row.diff_checksum, + size=row.diff_size, + location=row.diff_location, + ) + if row.diff_path + else None + ), ) for row in result ] diff --git a/server/mergin/sync/private_api_controller.py b/server/mergin/sync/private_api_controller.py index dd870aae..2d364ebc 100644 --- a/server/mergin/sync/private_api_controller.py +++ b/server/mergin/sync/private_api_controller.py @@ -5,15 +5,7 @@ from datetime import datetime, timedelta, timezone from urllib.parse import quote from connexion import NoContent -from flask import ( - render_template, - request, - current_app, - jsonify, - abort, - make_response, - send_file, -) +from flask import render_template, request, current_app, jsonify, abort from flask_login import current_user from sqlalchemy.orm import defer from sqlalchemy import text @@ -41,8 +33,7 @@ ) from ..utils import parse_order_params, split_order_param, get_order_param from .tasks import create_project_version_zip -from .storages.disk import move_to_tmp -from .utils import get_x_accel_uri +from .utils import prepare_download_response @auth_required @@ -333,22 +324,20 @@ def download_project(id: str, version=None): # noqa: E501 # pylint: disable=W06 # check zip is already created if os.path.exists(project_version.zip_path): + response = prepare_download_response( + os.path.dirname(project_version.zip_path), + os.path.basename(project_version.zip_path), + ) if current_app.config["USE_X_ACCEL"]: - resp = make_response() - resp.headers["X-Accel-Redirect"] = get_x_accel_uri(project_version.zip_path) - resp.headers["X-Accel-Buffering"] = current_app.config.get( + response.headers["X-Accel-Buffering"] = current_app.config.get( "PROJECTS_ARCHIVES_X_ACCEL_BUFFERING" ) - resp.headers["X-Accel-Expires"] = "off" - resp.headers["Content-Type"] = "application/zip" - else: - resp = send_file(project_version.zip_path, mimetype="application/zip") - + # set custom file in header file_name = quote(f"{project.name}-v{lookup_version}.zip".encode("utf-8")) - resp.headers["Content-Disposition"] = ( + response.headers["Content-Disposition"] = ( f"attachment; filename*=UTF-8''{file_name}" ) - return resp + return response return "Project zip being prepared", 202 diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index 0b487874..0aadceff 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -9,8 +9,6 @@ import logging from dataclasses import asdict from typing import Dict -from urllib.parse import quote -import uuid from datetime import datetime import gevent @@ -20,7 +18,6 @@ from flask import ( abort, current_app, - send_from_directory, jsonify, make_response, ) @@ -28,10 +25,8 @@ from flask_login import current_user from sqlalchemy import and_, desc, asc from sqlalchemy.exc import IntegrityError -from binaryornot.check import is_binary from gevent import sleep import base64 - from werkzeug.exceptions import HTTPException, Conflict from mergin.sync.forms import project_name_validation @@ -80,13 +75,13 @@ from .utils import ( generate_checksum, Toucher, - get_x_accel_uri, get_ip, get_user_agent, generate_location, is_valid_uuid, get_device_id, - get_mimetype, + prepare_download_response, + get_device_id, wkb2wkt, ) from .errors import StorageLimitHit, ProjectLocked @@ -353,30 +348,8 @@ def download_project_file( logging.error(f"Missing file {namespace}/{project_name}/{file_path}") abort(404) - if current_app.config["USE_X_ACCEL"]: - # encoding for nginx to be able to download file with non-ascii chars - encoded_file_path = quote(file_path.encode("utf-8")) - resp = make_response() - resp.headers["X-Accel-Redirect"] = get_x_accel_uri( - project.storage_params["location"], encoded_file_path - ) - resp.headers["X-Accel-Buffering"] = True - resp.headers["X-Accel-Expires"] = "off" - else: - resp = send_from_directory( - os.path.dirname(abs_path), os.path.basename(abs_path) - ) - - if not is_binary(abs_path): - mime_type = "text/plain" - else: - mime_type = get_mimetype(abs_path) - resp.headers["Content-Type"] = mime_type - resp.headers["Content-Disposition"] = "attachment; filename={}".format( - quote(os.path.basename(file).encode("utf-8")) - ) - resp.direct_passthrough = False - return resp + response = prepare_download_response(project.storage.project_dir, file_path) + return response def get_project(project_name, namespace, since="", version=None): # noqa: E501 diff --git a/server/mergin/sync/public_api_v2.yaml b/server/mergin/sync/public_api_v2.yaml index bf3db007..b4d76944 100644 --- a/server/mergin/sync/public_api_v2.yaml +++ b/server/mergin/sync/public_api_v2.yaml @@ -281,6 +281,46 @@ paths: "404": $ref: "#/components/responses/NotFound" x-openapi-router-controller: mergin.sync.public_api_v2_controller + /projects/{id}/raw/diff/{file}: + get: + tags: + - project + summary: Download project geopackage diff file + operationId: download_diff_file + parameters: + - $ref: "#/components/parameters/ProjectId" + - name: file + required: true + in: path + description: File path + schema: + type: string + example: survey.gpkg-diff-1b9fe848-d2e4-4c53-958d-3dd97e5486f6 + responses: + "200": + description: File content (or its part) + content: + application/octet-stream: + schema: + type: string + format: binary + "400": + $ref: "#/components/responses/BadRequest" + "401": + $ref: "#/components/responses/Unauthorized" + "403": + $ref: "#/components/responses/Forbidden" + "404": + $ref: "#/components/responses/NotFound" + "422": + description: Requested diff file could not be downloaded as it was not created + # mixing content types would trigger 500 on response validation + # might be related to issue https://github.com/spec-first/connexion/issues/2054 + # content: + # application/json+problem: + # schema: + # $ref: "#/components/schemas/DiffDownloadError" + x-openapi-router-controller: mergin.sync.public_api_v2_controller /projects/{id}/versions: post: tags: @@ -370,6 +410,37 @@ paths: $ref: "#/components/schemas/ProjectLocked" x-openapi-router-controller: mergin.sync.public_api_v2_controller + /projects/{id}/delta: + get: + tags: + - project + summary: Get project changes (delta) between two versions + operationId: get_project_delta + parameters: + - $ref: "#/components/parameters/ProjectId" + - name: since + in: query + required: true + schema: + $ref: "#/components/schemas/VersionName" + description: Start version (exclusive) + - name: to + in: query + schema: + $ref: "#/components/schemas/VersionName" + description: End version (inclusive) + responses: + "200": + description: Project changes between two versions + content: + application/json: + schema: + $ref: "#/components/schemas/ProjectDeltaResponse" + "400": + $ref: "#/components/responses/BadRequest" + "404": + $ref: "#/components/responses/NotFound" + x-openapi-router-controller: mergin.sync.public_api_v2_controller components: responses: NoContent: @@ -476,6 +547,12 @@ components: example: code: UploadError detail: "Project version could not be created (UploadError)" + DiffDownloadError: + allOf: + - $ref: "#/components/schemas/CustomError" + example: + code: DiffDownloadError + detail: Required diff file could not be created (DiffDownloadError) # Data ProjectRole: type: string @@ -849,3 +926,51 @@ components: - editor - writer - owner + ProjectChangeType: + type: string + enum: [create, update, delete, update_diff] + example: update + ProjectDeltaChange: + type: object + required: + - path + - size + - checksum + - version + - change + properties: + path: + type: string + example: survey.gpkg + size: + type: integer + example: 1024 + checksum: + type: string + example: 9adb76bf81a34880209040ffe5ee262a090b62ab + version: + $ref: "#/components/schemas/VersionName" + change: + $ref: "#/components/schemas/ProjectChangeType" + diffs: + type: array + nullable: true + items: + type: object + properties: + path: + type: string + example: survey.gpkg-diff-1 + ProjectDeltaResponse: + type: object + required: + - items + properties: + items: + type: array + items: + $ref: "#/components/schemas/ProjectDeltaChange" + VersionName: + type: string + pattern: '^v\d+$' + example: v2 diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 217204c1..1f9848cc 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -2,6 +2,9 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial +import os +from datetime import datetime +from typing import Optional import uuid import gevent import logging @@ -14,9 +17,6 @@ from marshmallow import ValidationError from sqlalchemy.exc import IntegrityError -from mergin.sync.tasks import remove_transaction_chunks - -from .schemas_v2 import ProjectSchema as ProjectSchemaV2 from ..app import db from ..auth import auth_required from ..auth.models import User @@ -24,14 +24,16 @@ AnotherUploadRunning, BigChunkError, DataSyncError, + DiffDownloadError, ProjectLocked, ProjectVersionExists, StorageLimitHit, UploadError, ) -from .files import ChangesSchema, ProjectFileSchema +from .files import ChangesSchema, DeltaChangeRespSchema, ProjectFileSchema from .forms import project_name_validation from .models import ( + FileDiff, Project, ProjectRole, ProjectMember, @@ -44,12 +46,19 @@ from .public_api_controller import catch_sync_failure from .schemas import ( ProjectMemberSchema, - ProjectVersionSchema, UploadChunkSchema, ProjectSchema, ) +from .schemas_v2 import ProjectSchema as ProjectSchemaV2 from .storages.disk import move_to_tmp, save_to_file -from .utils import get_device_id, get_ip, get_user_agent, get_chunk_location +from .utils import ( + get_device_id, + get_ip, + get_user_agent, + get_chunk_location, + prepare_download_response, +) +from .tasks import remove_transaction_chunks from .workspace import WorkspaceRole @@ -165,6 +174,23 @@ def remove_project_collaborator(id, user_id): return NoContent, 204 +def download_diff_file(id: str, file: str): + """Download project geopackage diff file""" + project = require_project_by_uuid(id, ProjectPermissions.Read) + diff_file = FileDiff.query.filter_by(path=file).first_or_404() + + # create merged diff if it does not exist + if not os.path.exists(diff_file.abs_path): + diff_created = diff_file.construct_checkpoint() + if not diff_created: + return DiffDownloadError().response(422) + + response = prepare_download_response( + project.storage.project_dir, diff_file.location + ) + return response + + def get_project(id, files_at_version=None): """Get project info. Include list of files at specific version if requested.""" project = require_project_by_uuid(id, ProjectPermissions.Read) @@ -392,3 +418,26 @@ def upload_chunk(id: str): UploadChunkSchema().dump({"id": chunk_id, "valid_until": valid_until}), 200, ) + + +def get_project_delta(id: str, since: str, to: Optional[str] = None): + """Get project changes (delta) between two versions""" + + project: Project = require_project_by_uuid(id, ProjectPermissions.Read) + since = ProjectVersion.from_v_name(since) + to = project.latest_version if to is None else ProjectVersion.from_v_name(to) + if since < 0 or to < 1: + abort( + 400, + "Invalid version number, minimum version for 'since' is 0 and minimum version for 'to' is 1", + ) + + if to > project.latest_version: + abort(400, "'to' version exceeds latest project version") + + if since >= to: + abort(400, "'since' version must be less than 'to' version") + + delta_changes = project.get_delta_changes(since, to) or [] + + return DeltaChangeRespSchema().dump({"items": delta_changes}), 200 diff --git a/server/mergin/sync/storages/disk.py b/server/mergin/sync/storages/disk.py index 4491ad98..3946f8d7 100644 --- a/server/mergin/sync/storages/disk.py +++ b/server/mergin/sync/storages/disk.py @@ -134,6 +134,7 @@ def __init__(self, project): str(uuid.uuid4()), ) ) + self.diffs_dir = os.path.join(self.project_dir, "diffs") def _logger_callback(level, text_bytes): text = text_bytes.decode() @@ -362,7 +363,12 @@ def restore_versioned_file(self, file: str, version: int): :param file: path of file in project to recover :param version: project version (e.g. 2) """ - from ..models import GeodiffActionHistory, ProjectVersion, FileHistory + from ..models import ( + GeodiffActionHistory, + ProjectVersion, + FileHistory, + ProjectFilePath, + ) if not is_versioned_file(file): return @@ -383,7 +389,13 @@ def restore_versioned_file(self, file: str, version: int): ): return - base_meta, diffs = FileHistory.diffs_chain(self.project, file, version) + file_id = ( + ProjectFilePath.query.filter_by(path=file, project_id=self.project.id) + .first() + .id + ) + + base_meta, diffs = FileHistory.diffs_chain(file_id, version) if not (base_meta and diffs): return @@ -402,31 +414,17 @@ def restore_versioned_file(self, file: str, version: int): ) if len(diffs) > 1: # concatenate multiple diffs into single one - partials = [ - os.path.join(self.project_dir, d.location) for d in diffs - ] + partials = [d.abs_path for d in diffs] self.geodiff.concat_changes(partials, changeset) else: - copy_file( - os.path.join(self.project_dir, diffs[0].location), changeset - ) + copy_file(diffs[0].abs_path, changeset) logging.info( f"Geodiff: apply changeset {changeset} of size {os.path.getsize(changeset)}" ) - # if we are going backwards we need to reverse changeset! - if base_meta.version.name > version: - logging.info(f"Geodiff: inverting changeset") - changes = os.path.join( - self.geodiff_working_dir, - os.path.basename(base_meta.abs_path) + "-diff-inv", - ) - self.geodiff.invert_changeset(changeset, changes) - else: - changes = changeset start = time.time() - self.geodiff.apply_changeset(restored_file, changes) + self.geodiff.apply_changeset(restored_file, changeset) # track geodiff event for performance analysis gh = GeodiffActionHistory( self.project.id, @@ -435,7 +433,7 @@ def restore_versioned_file(self, file: str, version: int): base_meta.size, ProjectVersion.to_v_name(project_version.name), "restore_file", - changes, + changeset, ) apply_time = time.time() - start gh.geodiff_time = apply_time @@ -447,7 +445,6 @@ def restore_versioned_file(self, file: str, version: int): ) return finally: - move_to_tmp(changes) move_to_tmp(changeset) # move final restored file to place where it is expected (only after it is successfully created) logging.info( diff --git a/server/mergin/sync/tasks.py b/server/mergin/sync/tasks.py index 9392997c..ac78956f 100644 --- a/server/mergin/sync/tasks.py +++ b/server/mergin/sync/tasks.py @@ -91,7 +91,7 @@ def optimize_storage(project_id): for item in f_history: # no diffs, it is a basefile for geodiff - if not item.diff: + if not item.diff_file: continue # skip the latest file version (high chance of being used) diff --git a/server/mergin/sync/utils.py b/server/mergin/sync/utils.py index de0fbe94..d3553870 100644 --- a/server/mergin/sync/utils.py +++ b/server/mergin/sync/utils.py @@ -2,20 +2,26 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial +from __future__ import annotations import logging import math import os import hashlib import re import secrets +from binaryornot.check import is_binary +from dataclasses import dataclass from datetime import datetime, timedelta, timezone from threading import Timer +from urllib.parse import quote from uuid import UUID from shapely import wkb from shapely.errors import ShapelyError from gevent import sleep +from flask import Request, Response, make_response, send_from_directory +from typing import List, Optional from flask import Request -from typing import Optional, Tuple +from typing import Optional from sqlalchemy import text from pathvalidate import ( validate_filename, @@ -27,6 +33,9 @@ from flask import current_app from pathlib import Path +# log base for caching strategy, diff checkpoints, etc. +LOG_BASE = 4 + def generate_checksum(file, chunk_size=4096): """ @@ -315,9 +324,8 @@ def files_size(): WHERE change = 'create'::push_change_type OR change = 'update'::push_change_type UNION SELECT - SUM(COALESCE((diff ->> 'size')::bigint, 0)) - FROM file_history - WHERE change = 'update_diff'::push_change_type + SUM(size) + FROM file_diff UNION SELECT SUM(size) @@ -555,6 +563,87 @@ def get_x_accel_uri(*url_parts): return result +def prepare_download_response(project_dir: str, path: str) -> Response: + """Prepare flask response for file download with custom headers""" + abs_path = os.path.join(project_dir, path) + if current_app.config["USE_X_ACCEL"]: + # encoding for nginx to be able to download file with non-ascii chars + resp = make_response() + resp.headers["X-Accel-Redirect"] = get_x_accel_uri( + project_dir, quote(path.encode("utf-8")) + ) + resp.headers["X-Accel-Buffering"] = True + resp.headers["X-Accel-Expires"] = "off" + else: + resp = send_from_directory( + os.path.dirname(abs_path), os.path.basename(abs_path) + ) + + mime_type = "text/plain" if not is_binary(abs_path) else get_mimetype(abs_path) + resp.headers["Content-Type"] = mime_type + file_name = quote(os.path.basename(path).encode("utf-8")) + resp.headers["Content-Disposition"] = f"attachment; filename*=UTF-8''{file_name}" + resp.direct_passthrough = False + return resp + + +@dataclass +class Checkpoint: + """ + Cached level of version tree. + Used as a checkpoint to merge individual versions / diff files into bigger chunks. + """ + + rank: int # power of base + index: int # index of level - multiplyer of rank + + def __post_init__(self): + if type(self.rank) is not int or type(self.index) is not int: + raise ValueError("rank and index must be integers") + + if self.rank < 0 or self.index < 1: + raise ValueError("rank must be positive and index starts from 1") + + @property + def start(self) -> int: + """Start of the range covered by this level""" + return 1 + (LOG_BASE**self.rank * (self.index - 1)) + + @property + def end(self) -> int: + """End of the range covered by this level""" + return LOG_BASE**self.rank * self.index + + def __str__(self) -> str: + return f"Checkpoint(rank={self.rank}, index={self.index}, versions=v{self.start}-v{self.end})" + + def __repr__(self) -> str: + return str(self) + + @classmethod + def get_checkpoints(cls, start: int, end: int) -> List[Checkpoint]: + """ + Get all checkpoints in a range. + This basically provide a list of smaller versions (checkpoints) to be merged in order to get the final version. + """ + levels = [] + while start <= end: + if start == end: + rank_max = 0 + else: + rank_max = math.floor(math.log(end - start + 1, LOG_BASE)) + for rank in reversed(range(0, rank_max + 1)): + if (start - 1) % LOG_BASE**rank: + continue + + index = (start - 1) // LOG_BASE**rank + 1 + levels.append(cls(rank=rank, index=index)) + start = start + LOG_BASE**rank + break + + return levels + + def get_chunk_location(id: str): """ Get file location for chunk on FS diff --git a/server/mergin/tests/fixtures.py b/server/mergin/tests/fixtures.py index 8c1366f4..5d719878 100644 --- a/server/mergin/tests/fixtures.py +++ b/server/mergin/tests/fixtures.py @@ -18,7 +18,7 @@ from ..stats.models import MerginInfo from . import test_project, test_workspace_id, test_project_dir, TMP_DIR from .utils import login_as_admin, initialize, cleanup, file_info -from ..sync.files import ChangesSchema, files_changes_from_upload +from ..sync.files import files_changes_from_upload thisdir = os.path.dirname(os.path.realpath(__file__)) sys.path.append(os.path.join(thisdir, os.pardir)) @@ -36,6 +36,7 @@ def flask_app(request): "COLLECT_STATISTICS", "USER_SELF_REGISTRATION", "V2_PUSH_ENABLED", + "V2_PULL_ENABLED", ] ) register(application) @@ -105,9 +106,9 @@ def diff_project(app): Following changes are applied to base.gpkg in tests project (v1): v2: removed file -> previous version is lost (unless requested explicitly) - v3: uploaded again + v3: uploaded again -> new basefile v4: patched with changes from inserted_1_A.gpkg (1 inserted feature) - v5: replaced with original file base.gpkg (mimic of force update) + v5: replaced with original file base.gpkg (mimic of force update) -> new basefile again v6: patched with changes from modified_1_geom.gpkg (translated feature) v7: patched with changes from inserted_1_B.gpkg (1 inserted feature), final state is modified_1_geom.gpkg + inserted_1_B.gpkg v8: nothing happened, just to ensure last diff is not last version of project file diff --git a/server/mergin/tests/test_celery.py b/server/mergin/tests/test_celery.py index 7aad01da..cb814622 100644 --- a/server/mergin/tests/test_celery.py +++ b/server/mergin/tests/test_celery.py @@ -7,14 +7,18 @@ import uuid from datetime import datetime, timedelta, timezone from pathlib import Path - from flask import current_app from flask_mail import Mail from unittest.mock import patch from ..app import db from ..config import Configuration -from ..sync.models import Project, AccessRequest, ProjectRole, ProjectVersion +from ..sync.models import ( + Project, + AccessRequest, + ProjectRole, + ProjectVersion, +) from ..celery import send_email_async from ..sync.config import Configuration as SyncConfiguration from ..sync.tasks import ( @@ -25,6 +29,7 @@ remove_unused_chunks, ) from ..sync.storages.disk import move_to_tmp +from . import test_project, test_workspace_name, test_workspace_id from ..sync.utils import get_chunk_location from . import ( test_project, @@ -175,7 +180,7 @@ def test_create_project_version_zip(diff_project): # mock expired partial zip -> celery removes it and creates new zip partial_zip_path.parent.mkdir(parents=True, exist_ok=True) partial_zip_path.touch() - new_time = datetime.now() - timedelta( + new_time = datetime.now(timezone.utc) - timedelta( seconds=current_app.config["PARTIAL_ZIP_EXPIRATION"] + 1 ) modify_file_times(partial_zip_path, new_time) @@ -198,7 +203,7 @@ def test_create_project_version_zip(diff_project): os.remove(partial_zip_path) remove_projects_archives() # zip is valid -> keep assert zip_path.exists() - new_time = datetime.now() - timedelta( + new_time = datetime.now(timezone.utc) - timedelta( days=current_app.config["PROJECTS_ARCHIVES_EXPIRATION"] + 1 ) modify_file_times(latest_version.zip_path, new_time) diff --git a/server/mergin/tests/test_cli.py b/server/mergin/tests/test_cli.py index d0b91717..d4197f86 100644 --- a/server/mergin/tests/test_cli.py +++ b/server/mergin/tests/test_cli.py @@ -13,7 +13,7 @@ from mergin.auth.models import User from mergin.commands import _check_permissions, _check_celery from mergin.stats.models import MerginInfo -from mergin.sync.models import Project, ProjectVersion +from mergin.sync.models import FileDiff, Project, ProjectVersion, ProjectVersionDelta from mergin.tests import ( test_project, test_workspace_id, @@ -545,3 +545,70 @@ def test_check_celery(mock_ping, ping, result, output, capsys): out, err = capsys.readouterr() # capture what was echoed to stdout assert ("Error: " not in out) == result assert output in out + + +create_project_checkpoint_data = [ + ( + f"{test_workspace_name}/non-existing", + 0, + 1, + "ERROR: Project does not exist", + ), + ( + f"{test_workspace_name}/{test_project}", + 4, + 1, + "ERROR: 'since' version must be less than 'to' version", + ), + ( + f"{test_workspace_name}/{test_project}", + 0, + 100, + "ERROR: 'to' version exceeds latest project version", + ), + ( + f"{test_workspace_name}/{test_project}", + 0, + 0, + "ERROR: Invalid version number, minimum version for 'since' is 0 and minimum version for 'to' is 1", + ), + ( + f"{test_workspace_name}/{test_project}", + 0, + 4, + "Project checkpoint(s) created", + ), + ( + f"{test_workspace_name}/{test_project}", + None, + None, + "Project checkpoint(s) created", + ), +] + + +@pytest.mark.parametrize("project_name,since,to,output", create_project_checkpoint_data) +def test_create_checkpoint(runner, project_name, since, to, output, diff_project): + """Test 'project remove' command""" + ProjectVersionDelta.query.filter_by(project_id=diff_project.id).filter( + ProjectVersionDelta.rank > 0 + ).delete() + db.session.commit() + + remove = runner.invoke( + args=[ + "project", + "create-checkpoint", + project_name, + "--since", + since, + "--to", + to, + ] + ) + assert output in remove.output + checkpoints = ProjectVersionDelta.query.filter(ProjectVersionDelta.rank > 0).count() + if remove.exit_code == 0: + assert checkpoints > 0 + else: + assert checkpoints == 0 diff --git a/server/mergin/tests/test_config.py b/server/mergin/tests/test_config.py index af677cb0..494bb438 100644 --- a/server/mergin/tests/test_config.py +++ b/server/mergin/tests/test_config.py @@ -22,6 +22,7 @@ def test_config(client): "user_self_registration", "build_hash", "v2_push_enabled", + "v2_pull_enabled", } resp = client.get("/config") assert resp.status_code == 200 diff --git a/server/mergin/tests/test_file_restore.py b/server/mergin/tests/test_file_restore.py index 278837d3..19e02ce2 100644 --- a/server/mergin/tests/test_file_restore.py +++ b/server/mergin/tests/test_file_restore.py @@ -112,17 +112,12 @@ def test_crud_in_version_file_restore(app, forward_check): assert gpkgs_are_equal(test_file, test_file + "_backup") -@pytest.mark.parametrize("forward_check", [True, False]) -def test_version_file_restore_with_no_changes(app, forward_check): +def test_version_file_restore_with_no_changes(app): """Test to restore gpkg file from diffs where history contains some blank versions (no changes).""" working_dir = os.path.join(TMP_DIR, "restore_from_diffs_with_gaps") basefile = os.path.join(working_dir, "base.gpkg") p = _prepare_restore_project(working_dir) - if not forward_check: - for _ in range(6): - create_blank_version(p) - base_version = p.get_latest_version().name for i in range(3): sql = "INSERT INTO simple (geometry, name) VALUES (GeomFromText('POINT(24.5, 38.2)', 4326), 'insert_test')" @@ -183,25 +178,8 @@ def test_version_file_restore(diff_project): diff_project.storage.restore_versioned_file("base.gpkg", 6) assert os.path.exists(test_file) assert gpkgs_are_equal(test_file, test_file + "_backup") - - # remove v9 and v10 to mimic that project history end with existing file - pv_8 = ProjectVersion.query.filter_by(project_id=diff_project.id, name=8).first() - pv_9 = ProjectVersion.query.filter_by(project_id=diff_project.id, name=9).first() - pv_10 = ProjectVersion.query.filter_by(project_id=diff_project.id, name=10).first() - diff_project.latest_version = 8 - db.session.delete(pv_9) - db.session.delete(pv_10) - db.session.commit() - diff_project.cache_latest_files() - # restore v6 backward, from the latest file (v7=v8) - test_file = os.path.join(diff_project.storage.project_dir, "v6", "base.gpkg") - if os.path.exists(test_file): - os.remove(test_file) - diff_project.storage.restore_versioned_file("base.gpkg", 6) - assert os.path.exists(test_file) - assert gpkgs_are_equal(test_file, test_file + "_backup") gh = GeodiffActionHistory.query.filter_by( - project_id=diff_project.id, base_version="v7", target_version="v6" + project_id=diff_project.id, base_version="v5", target_version="v6" ).first() assert gh.geodiff_time assert gh.copy_time diff --git a/server/mergin/tests/test_middleware.py b/server/mergin/tests/test_middleware.py index 82b9cf26..2f5cbe4f 100644 --- a/server/mergin/tests/test_middleware.py +++ b/server/mergin/tests/test_middleware.py @@ -6,6 +6,7 @@ import psycogreen.gevent import pytest import sqlalchemy +from unittest.mock import patch from ..app import create_simple_app, GeventTimeoutMiddleware, db from ..config import Configuration @@ -14,58 +15,74 @@ @pytest.mark.parametrize("use_middleware", [True, False]) def test_use_middleware(use_middleware): """Test using middleware""" - Configuration.GEVENT_WORKER = use_middleware - Configuration.GEVENT_REQUEST_TIMEOUT = 1 - application = create_simple_app() + with patch.object( + Configuration, + "GEVENT_WORKER", + use_middleware, + ), patch.object( + Configuration, + "GEVENT_REQUEST_TIMEOUT", + 1, + ): + application = create_simple_app() - def ping(): - gevent.sleep(Configuration.GEVENT_REQUEST_TIMEOUT + 1) - return "pong" + def ping(): + gevent.sleep(Configuration.GEVENT_REQUEST_TIMEOUT + 1) + return "pong" - application.add_url_rule("/test", "ping", ping) - app_context = application.app_context() - app_context.push() + application.add_url_rule("/test", "ping", ping) + app_context = application.app_context() + app_context.push() - assert isinstance(application.wsgi_app, GeventTimeoutMiddleware) == use_middleware - # in case of gevent, dummy endpoint it set to time out - assert application.test_client().get("/test").status_code == ( - 504 if use_middleware else 200 - ) + assert ( + isinstance(application.wsgi_app, GeventTimeoutMiddleware) == use_middleware + ) + # in case of gevent, dummy endpoint it set to time out + assert application.test_client().get("/test").status_code == ( + 504 if use_middleware else 200 + ) def test_catch_timeout(): """Test proper handling of gevent timeout with db.session.rollback""" psycogreen.gevent.patch_psycopg() - Configuration.GEVENT_WORKER = True - Configuration.GEVENT_REQUEST_TIMEOUT = 1 - application = create_simple_app() + with patch.object( + Configuration, + "GEVENT_WORKER", + True, + ), patch.object( + Configuration, + "GEVENT_REQUEST_TIMEOUT", + 1, + ): + application = create_simple_app() - def unhandled(): - try: - db.session.execute("SELECT pg_sleep(1.1);") - finally: - db.session.execute("SELECT 1;") - return "" + def unhandled(): + try: + db.session.execute("SELECT pg_sleep(1.1);") + finally: + db.session.execute("SELECT 1;") + return "" - def timeout(): - try: - db.session.execute("SELECT pg_sleep(1.1);") - except gevent.timeout.Timeout: - db.session.rollback() - raise - finally: - db.session.execute("SELECT 1;") - return "" + def timeout(): + try: + db.session.execute("SELECT pg_sleep(1.1);") + except gevent.timeout.Timeout: + db.session.rollback() + raise + finally: + db.session.execute("SELECT 1;") + return "" - application.add_url_rule("/unhandled", "unhandled", unhandled) - application.add_url_rule("/timeout", "timeout", timeout) - app_context = application.app_context() - app_context.push() + application.add_url_rule("/unhandled", "unhandled", unhandled) + application.add_url_rule("/timeout", "timeout", timeout) + app_context = application.app_context() + app_context.push() - assert application.test_client().get("/timeout").status_code == 504 + assert application.test_client().get("/timeout").status_code == 504 - # in case of missing rollback sqlalchemy would raise error - with pytest.raises(sqlalchemy.exc.PendingRollbackError): - application.test_client().get("/unhandled") + # in case of missing rollback sqlalchemy would raise error + with pytest.raises(sqlalchemy.exc.PendingRollbackError): + application.test_client().get("/unhandled") - db.session.rollback() + db.session.rollback() diff --git a/server/mergin/tests/test_private_project_api.py b/server/mergin/tests/test_private_project_api.py index 5f062ac2..56838506 100644 --- a/server/mergin/tests/test_private_project_api.py +++ b/server/mergin/tests/test_private_project_api.py @@ -8,7 +8,7 @@ import pytest from unittest.mock import patch from pathlib import Path -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from flask import url_for from ..app import db, current_app @@ -524,7 +524,7 @@ def test_prepare_archive( temp_zip_path.parent.mkdir(parents=True, exist_ok=True) temp_zip_path.touch() if expired: - new_time = datetime.now() - timedelta( + new_time = datetime.now(timezone.utc) - timedelta( seconds=current_app.config["PARTIAL_ZIP_EXPIRATION"] + 1 ) modify_file_times(temp_zip_path, new_time) diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index c7a0550e..a83f3f84 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -26,7 +26,9 @@ from sqlalchemy import desc from ..app import db from ..sync.models import ( + FileDiff, Project, + ProjectVersionDelta, Upload, ProjectVersion, SyncFailuresHistory, @@ -444,7 +446,7 @@ def test_add_project(client, app, data, expected): assert not any(file.diff for file in proj_files) assert not any(file.diff for file in pv.files) assert all( - item.change == PushChangeType.CREATE.value and not item.diff + item.change == PushChangeType.CREATE.value and not item.diff_file for item in pv.changes.all() ) # cleanup @@ -535,6 +537,7 @@ def test_delete_project(client): assert not Project.query.filter_by( workspace_id=test_workspace_id, name=test_project ).count() + assert not ProjectVersionDelta.query.filter_by(project_id=project.id).count() assert not os.path.exists(project_dir) rm_project = Project.query.get(project.id) assert rm_project.removed_at and not rm_project.storage_params @@ -1603,7 +1606,7 @@ def test_push_no_diff_finish(client): file_meta = latest_version.changes.filter( FileHistory.change == PushChangeType.UPDATE_DIFF.value ).first() - assert file_meta.diff is not None + assert file_meta.diff_file is not None assert os.path.exists( os.path.join(upload.project.storage.project_dir, file_meta.diff_file.location) ) @@ -1780,6 +1783,8 @@ def test_optimize_storage(app, client, diff_project): diff_project.latest_version = 8 ProjectVersion.query.filter_by(project_id=diff_project.id, name=9).delete() ProjectVersion.query.filter_by(project_id=diff_project.id, name=10).delete() + ProjectVersionDelta.query.filter_by(project_id=diff_project.id, version=9).delete() + ProjectVersionDelta.query.filter_by(project_id=diff_project.id, version=10).delete() db.session.commit() diff_project.cache_latest_files() assert diff_project.latest_version == 8 @@ -1834,90 +1839,73 @@ def test_optimize_storage(app, client, diff_project): def test_file_diffs_chain(diff_project): # file test.gpkg was added only in v9, and then left intact + file_id = ( + ProjectFilePath.query.filter_by(project_id=diff_project.id, path="test.gpkg") + .first() + .id + ) + # direct search - basefile, diffs = FileHistory.diffs_chain(diff_project, "test.gpkg", 2) - assert not basefile - assert not diffs - # reverse search - basefile, diffs = FileHistory.diffs_chain(diff_project, "test.gpkg", 8) + basefile, diffs = FileHistory.diffs_chain(file_id, 2) assert not basefile assert not diffs # ask for basefile - basefile, diffs = FileHistory.diffs_chain(diff_project, "test.gpkg", 9) + basefile, diffs = FileHistory.diffs_chain(file_id, 9) assert basefile.version.name == 9 assert basefile.change == "create" assert not diffs + file_id = ( + ProjectFilePath.query.filter_by(project_id=diff_project.id, path="base.gpkg") + .first() + .id + ) + # version history has been broken by removal of file in v2 - basefile, diffs = FileHistory.diffs_chain(diff_project, "base.gpkg", 2) + basefile, diffs = FileHistory.diffs_chain(file_id, 2) assert not basefile assert not diffs # file was re-added in v3 - basefile, diffs = FileHistory.diffs_chain(diff_project, "base.gpkg", 3) + basefile, diffs = FileHistory.diffs_chain(file_id, 3) assert basefile.version.name == 3 assert basefile.change == "create" assert not diffs # diff was used in v4, direct search - basefile, diffs = FileHistory.diffs_chain(diff_project, "base.gpkg", 4) + basefile, diffs = FileHistory.diffs_chain(file_id, 4) assert basefile.version.name == 3 assert len(diffs) == 1 - assert "v4" in diffs[0].location + assert diffs[0].version == 4 # file was overwritten in v5 - basefile, diffs = FileHistory.diffs_chain(diff_project, "base.gpkg", 5) + basefile, diffs = FileHistory.diffs_chain(file_id, 5) assert basefile.version.name == 5 assert basefile.change == "update" assert not diffs - # diff was used in v6, reverse search followed by direct search - basefile, diffs = FileHistory.diffs_chain(diff_project, "base.gpkg", 6) + # diff was used in v6 + basefile, diffs = FileHistory.diffs_chain(file_id, 6) assert basefile.version.name == 5 assert len(diffs) == 1 - assert "v6" in diffs[0].location + assert diffs[0].version == 6 - # diff was used in v7, nothing happened in v8 (=v7), reverse search followed by direct search - basefile, diffs = FileHistory.diffs_chain(diff_project, "base.gpkg", 8) + # diff was used in v7, nothing happened in v8 (=v7) + basefile, diffs = FileHistory.diffs_chain(file_id, 8) assert basefile.version.name == 5 assert len(diffs) == 2 # file was removed in v9 - basefile, diffs = FileHistory.diffs_chain(diff_project, "base.gpkg", 9) + basefile, diffs = FileHistory.diffs_chain(file_id, 9) assert not basefile assert not diffs # ask for latest version, but file is already gone - basefile, diffs = FileHistory.diffs_chain(diff_project, "base.gpkg", 10) + basefile, diffs = FileHistory.diffs_chain(file_id, 10) assert not basefile assert not diffs - # remove v9 and v10 to mimic that project history end with existing file - pv_8 = ProjectVersion.query.filter_by(project_id=diff_project.id, name=8).first() - pv_9 = ProjectVersion.query.filter_by(project_id=diff_project.id, name=9).first() - pv_10 = ProjectVersion.query.filter_by(project_id=diff_project.id, name=10).first() - diff_project.latest_version = 8 - db.session.delete(pv_9) - db.session.delete(pv_10) - db.session.commit() - - # diff was used in v6, v7, nothing happened in v8 => v7 = v8, reverse search - basefile, diffs = FileHistory.diffs_chain(diff_project, "base.gpkg", 6) - assert basefile.version.name == 7 - assert len(diffs) == 1 - assert "v7" in diffs[0].location - - # we asked for last existing file version - basefile - basefile, diffs = FileHistory.diffs_chain(diff_project, "base.gpkg", 7) - assert basefile.version.name == 7 - assert not diffs - - # we asked for last project version - basefile, diffs = FileHistory.diffs_chain(diff_project, "base.gpkg", 8) - assert basefile.version.name == 7 - assert not diffs - changeset_data = [ ("v1", "test.gpkg", 404), @@ -2353,6 +2341,10 @@ def _get_user_agent(): .order_by(desc(ProjectVersion.created)) .first() ) + # remove project version delta entries + ProjectVersionDelta.query.filter_by( + project_id=upload.project_id, version=pv.name + ).delete() db.session.delete(pv) db.session.commit() upload.project.cache_latest_files() @@ -2380,7 +2372,8 @@ def test_version_files(client, diff_project): x.checksum == y.checksum and x.path == y.path and x.location == y.location - and x.diff == y.diff + and x.diff_path == y.diff_path + and x.diff_checksum == y.diff_checksum for x, y in zip( sorted(forward_search, key=lambda f: f.path), sorted(backward_search, key=lambda f: f.path), @@ -2407,7 +2400,7 @@ def test_delete_diff_file(client): project_version_name=upload.project.latest_version, change=PushChangeType.UPDATE_DIFF.value, ).first() - assert fh.diff is not None + assert fh.diff_file is not None # delete file diff_change = next( @@ -2434,7 +2427,7 @@ def test_delete_diff_file(client): project_version_name=upload.project.latest_version, change=PushChangeType.DELETE.value, ).first() - assert fh.path == "base.gpkg" and fh.diff is None + assert fh.path == "base.gpkg" and fh.diff_file is None def test_cache_files_ids(client): diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 6a4243fd..1db387c0 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -16,7 +16,33 @@ from ..auth.models import User import os import shutil +from typing import List from unittest.mock import patch +import uuid +from pygeodiff import GeoDiffLibError + +from .utils import ( + add_user, + create_project, + create_workspace, + diffs_are_equal, + execute_query, + login_as_admin, + push_change, +) +from ..app import db +from tests import test_project, test_workspace_id +from ..config import Configuration +from ..sync.models import ( + FileDiff, + FileHistory, + Project, + ProjectFilePath, + ProjectRole, + ProjectVersionDelta, +) +from ..sync.files import DeltaChange, PushChangeType +from ..sync.utils import Checkpoint, is_versioned_file from sqlalchemy.exc import IntegrityError import pytest from datetime import datetime, timedelta, timezone @@ -26,6 +52,7 @@ from mergin.sync.config import Configuration as SyncConfiguration from mergin.sync.errors import ( BigChunkError, + DiffDownloadError, ProjectLocked, ProjectVersionExists, AnotherUploadRunning, @@ -170,6 +197,447 @@ def test_project_members(client): assert response.status_code == 404 +def test_file_diff_download(client, diff_project): + """Test download of gpkg diff files""" + gpkg_file = ProjectFilePath.query.filter_by( + project_id=diff_project.id, path="base.gpkg" + ).first() + + diff_file = FileDiff.query.filter_by( + file_path_id=gpkg_file.id, version=4, rank=0 + ).first() + + response = client.get(f"v2/projects/{diff_project.id}/raw/diff/{diff_file.path}") + assert response.status_code == 200 + assert response.content_type == "application/octet-stream" + + # add some indented merged diff to db, v5-v8 + basefile = FileHistory.get_basefile(gpkg_file.id, 8) + diff = FileDiff( + basefile=basefile, + version=8, + rank=1, + path=f"base.gpkg-{uuid.uuid4()}", + size=None, + checksum=None, + ) + db.session.add(diff) + db.session.commit() + assert not os.path.exists(diff.abs_path) + + # download merged diff with its reconstuction on the fly + response = client.get(f"v2/projects/{diff_project.id}/raw/diff/{diff.path}") + assert response.status_code == 200 + assert response.content_type == "application/octet-stream" + assert os.path.exists(diff.abs_path) + + # try with reconstruction failure + with patch.object(FileDiff, "construct_checkpoint") as construct_checkpoint_mock: + os.remove(diff.abs_path) + construct_checkpoint_mock.return_value = False + response = client.get(f"v2/projects/{diff_project.id}/raw/diff/{diff.path}") + assert response.status_code == 422 + assert response.json["code"] == DiffDownloadError.code + + response = client.get(f"v2/projects/{diff_project.id}/raw/diff/{diff.path}+1") + assert response.status_code == 404 + + +def test_create_diff_checkpoint(diff_project): + """Test creation of diff checkpoints""" + # add changes v11-v32 where v9 is a basefile + file_path_id = ( + ProjectFilePath.query.filter_by(project_id=diff_project.id, path="test.gpkg") + .first() + .id + ) + + base_gpkg = os.path.join(diff_project.storage.project_dir, "test.gpkg") + shutil.copy( + os.path.join(diff_project.storage.project_dir, "v9", "test.gpkg"), base_gpkg + ) + for i in range(22): + sql = f"UPDATE simple SET rating={i}" + execute_query(base_gpkg, sql) + pv = push_change( + diff_project, "updated", "test.gpkg", diff_project.storage.project_dir + ) + assert diff_project.latest_version == pv.name == (11 + i) + file_diff = FileDiff.query.filter_by( + file_path_id=file_path_id, version=pv.name, rank=0 + ).first() + assert file_diff and os.path.exists(file_diff.abs_path) + + basefile, diffs = FileHistory.diffs_chain(file_path_id, 32) + assert basefile.project_version_name == 9 + # so far we only have individual diffs + assert len(diffs) == 22 + + # diff for v17-v20 from individual diffs + assert FileDiff.can_create_checkpoint(file_path_id, Checkpoint(1, 5)) is True + diff = FileDiff( + basefile=basefile, path=f"test.gpkg-diff-{uuid.uuid4()}", version=20, rank=1 + ) + db.session.add(diff) + db.session.commit() + assert not os.path.exists(diff.abs_path) + diff.construct_checkpoint() + assert os.path.exists(diff.abs_path) + + basefile, diffs = FileHistory.diffs_chain(file_path_id, 20) + assert basefile.project_version_name == 9 + # 6 individual diffs (v11-v16) + merged diff (v17-v20) as the last one + assert len(diffs) == 7 + assert diffs[-1] == diff + + # repeat - nothing to do + mtime = os.path.getmtime(diff.abs_path) + diff.construct_checkpoint() + assert mtime == os.path.getmtime(diff.abs_path) + + # some lower rank diffs still missing + assert not FileDiff.query.filter_by(version=24, rank=1).count() + + # diff for v17-v32 with merged diffs, this will also create lower missing ranks + diff = FileDiff( + basefile=basefile, path=f"test.gpkg-diff-{uuid.uuid4()}", version=32, rank=2 + ) + db.session.add(diff) + db.session.commit() + diff.construct_checkpoint() + assert os.path.exists(diff.abs_path) + lower_diff = FileDiff.query.filter_by(version=24, rank=1).first() + assert os.path.exists(lower_diff.abs_path) + + # assert gpkg diff is the same as it would be from merging all individual diffs + individual_diffs = ( + FileDiff.query.filter_by(file_path_id=file_path_id, rank=0) + .filter(FileDiff.version.between(17, 32)) + .all() + ) + merged_diff = os.path.join(diff_project.storage.diffs_dir, "merged-diff") + diff_project.storage.geodiff.concat_changes( + [d.abs_path for d in individual_diffs], merged_diff + ) + assert diffs_are_equal(diff.abs_path, merged_diff) + + # test various failures + with patch.object(diff_project.storage.geodiff, "concat_changes") as mock: + # diff for missing basefile (e.g. deleted file or not-existing file) + diff = FileDiff( + basefile=basefile, path=f"test.gpkg-diff-{uuid.uuid4()}", version=8, rank=1 + ) + db.session.add(diff) + db.session.commit() + diff.construct_checkpoint() + assert not mock.called + + # geodiff failure + mock.side_effect = GeoDiffLibError + diff = FileDiff( + basefile=basefile, path=f"test.gpkg-diff-{uuid.uuid4()}", version=16, rank=1 + ) + db.session.add(diff) + db.session.commit() + diff.construct_checkpoint() + assert mock.called + assert not os.path.exists(diff.abs_path) + + +def test_can_create_checkpoint(diff_project): + """Test if diff file checkpoint can be created""" + file_path_id = ( + ProjectFilePath.query.filter_by(project_id=diff_project.id, path="base.gpkg") + .first() + .id + ) + + # we target v1 where file was uploaded => no diff + assert FileDiff.can_create_checkpoint(file_path_id, Checkpoint(0, 1)) is False + + # for zero rank diffs we can always create a checkpoint (but that should already exist) + assert FileDiff.can_create_checkpoint(file_path_id, Checkpoint(0, 4)) is True + + # there are diffs in both ranges, v1-v4 and v5-v8 + assert FileDiff.can_create_checkpoint(file_path_id, Checkpoint(1, 1)) is True + assert FileDiff.can_create_checkpoint(file_path_id, Checkpoint(1, 2)) is True + + # higher ranks cannot be created as file was removed at v9 + assert FileDiff.can_create_checkpoint(file_path_id, Checkpoint(2, 1)) is False + + # there is no diff for such file in this range + file_path_id = ( + ProjectFilePath.query.filter_by( + project_id=diff_project.id, path="inserted_1_A.gpkg" + ) + .first() + .id + ) + assert FileDiff.can_create_checkpoint(file_path_id, Checkpoint(1, 1)) is False + + +def test_delta_merge_changes(): + """Test merging of delta changes works as expected""" + + create = DeltaChange( + path="file1.gpkg", + change=PushChangeType.CREATE, + version=1, + size=100, + checksum="abc", + ) + update = DeltaChange( + path="file1.gpkg", + change=PushChangeType.UPDATE, + version=2, + size=120, + checksum="def", + ) + delete = DeltaChange( + path="file1.gpkg", + change=PushChangeType.DELETE, + version=3, + size=0, + checksum="ghi", + ) + update_diff1 = DeltaChange( + path="file1.gpkg", + change=PushChangeType.UPDATE_DIFF, + version=4, + size=130, + checksum="xyz", + diff="diff1", + ) + update_diff2 = DeltaChange( + path="file1.gpkg", + change=PushChangeType.UPDATE_DIFF, + version=5, + size=140, + checksum="uvw", + diff="diff2", + ) + + # CREATE + UPDATE -> CREATE + merged = ProjectVersionDelta.merge_changes([create, update]) + assert len(merged) == 1 + assert merged[0].change == PushChangeType.CREATE + assert merged[0].version == update.version + # check reverse order as well + merged = ProjectVersionDelta.merge_changes([update, create]) + assert len(merged) == 1 + assert merged[0].change == PushChangeType.CREATE + assert merged[0].version == update.version + + # CREATE + DELETE -> removed + merged = ProjectVersionDelta.merge_changes([create, delete]) + assert len(merged) == 0 + + # UPDATE + DELETE -> DELETE + merged = ProjectVersionDelta.merge_changes([update, delete]) + assert len(merged) == 1 + assert merged[0].change == PushChangeType.DELETE + + # CREATE + UPDATE_DIFF -> CREATE + merged = ProjectVersionDelta.merge_changes([create, update_diff1]) + assert len(merged) == 1 + assert merged[0].change == PushChangeType.CREATE + assert merged[0].diffs == [] + + # UPDATE + UPDATE_DIFF -> UPDATE + merged = ProjectVersionDelta.merge_changes([update, update_diff1]) + assert len(merged) == 1 + assert merged[0].change == PushChangeType.UPDATE + assert merged[0].diffs == [] + + # UPDATE_DIFF + UPDATE_DIFF -> merged diffs + merged = ProjectVersionDelta.merge_changes([update_diff1, update_diff2]) + assert len(merged) == 1 + assert merged[0].change == PushChangeType.UPDATE_DIFF + assert merged[0].version == update_diff2.version + assert merged[0].size == update_diff2.size + assert merged[0].checksum == update_diff2.checksum + assert [d.path for d in merged[0].diffs] == ["diff1", "diff2"] + + # case when trying to delete already existing file in history + # copy create with new version number + delete = DeltaChange( + path="file1.gpkg", + change=PushChangeType.DELETE, + version=6, + size=0, + checksum="ghi", + ) + create = DeltaChange( + path="file1.gpkg", + change=PushChangeType.CREATE, + version=7, + size=100, + checksum="abc", + ) + delete8 = DeltaChange( + path="file1.gpkg", + change=PushChangeType.DELETE, + version=8, + size=0, + checksum="abc2", + ) + merged = ProjectVersionDelta.merge_changes([delete, create, delete8]) + assert len(merged) == 1 + assert merged[0].change == PushChangeType.DELETE + assert merged[0].version == delete8.version + assert merged[0].size == delete8.size + assert merged[0].checksum == delete8.checksum + + +def test_project_version_delta_changes(client, diff_project: Project): + """Test that get_delta_changes and its schema work as expected""" + latest_version = diff_project.get_latest_version() + project_id = diff_project.id + assert latest_version.name == 10 + assert diff_project.get_delta_changes(2, 1) is None + assert diff_project.get_delta_changes(2, 2) is None + deltas: List[ProjectVersionDelta] = ( + ProjectVersionDelta.query.filter_by(project_id=project_id) + .order_by(ProjectVersionDelta.version) + .all() + ) + # check if deltas are created after pushes within ProjectVersion creation + assert len(deltas) == 10 + initial_delta = deltas[0] + initial_version = ProjectVersion.query.filter_by( + project_id=project_id, name=initial_delta.version + ).first() + assert initial_version + assert initial_delta.version + assert initial_delta.rank == 0 + assert initial_delta.version == 1 + + # delete file + delta = diff_project.get_delta_changes(1, 2) + assert len(delta) == 1 + assert delta[0].change == PushChangeType.DELETE + + # delete + create version + delta = diff_project.get_delta_changes(1, 3) + assert len(delta) == 1 + assert delta[0].change == PushChangeType.CREATE + # file was created in v3 + assert delta[0].version == 3 + assert delta[0].checksum == deltas[3].changes[0]["checksum"] + + # get_delta with update diff + delta = diff_project.get_delta_changes(1, 4) + assert len(delta) == 1 + assert delta[0].change == PushChangeType.CREATE + assert ProjectVersionDelta.query.filter_by(rank=1).count() == 0 + + # create rank 1 checkpoint for v4 + delta = diff_project.get_delta_changes(0, 4) + checkpoint = ProjectVersionDelta.query.filter_by(rank=1) + filediff_checkpoints = FileDiff.query.filter_by(rank=1) + checkpoint_change = checkpoint.first() + assert checkpoint.count() == 1 + assert checkpoint_change.version == deltas[3].version + assert filediff_checkpoints.count() == 0 + # check if filediff basefile is correctly set + file_history = FileHistory.query.filter_by(project_version_name=4).first() + assert len(delta) == len(initial_version.files) + delta_base_gpkg = next((d for d in delta if d.path == "base.gpkg"), None) + assert delta_base_gpkg + # from history is clear, that we are just creating geopackage in this range + assert delta_base_gpkg.change == PushChangeType.CREATE + assert delta_base_gpkg.version == 4 + assert delta_base_gpkg.path == file_history.path + assert delta_base_gpkg.size == file_history.size + assert delta_base_gpkg.checksum == file_history.checksum + assert len(delta_base_gpkg.diffs) == 0 + + # get data with multiple ranks = 1 level checkpoints 1-4, 5-8 + checkpoint 9 and 10 + assert not ProjectVersionDelta.query.filter_by(rank=1, version=8).first() + delta = diff_project.get_delta_changes(0, 10) + assert len(delta) == len(latest_version.files) + delta_test_gpkg = next((d for d in delta if d.path == "test.gpkg"), None) + assert delta_test_gpkg + assert delta_test_gpkg.change == PushChangeType.CREATE + assert ProjectVersionDelta.query.filter_by(rank=1).count() == 2 + assert ProjectVersionDelta.query.filter_by(rank=2).count() == 0 + # check if version is having rank 1 checkpoint with proper end version + assert ProjectVersionDelta.query.filter_by(rank=1, version=4).first() + # missing lower checkpoint is recreated + assert ProjectVersionDelta.query.filter_by(rank=1, version=8).first() + # base gpgk is transparent, bacause we are requesting from 0 + assert not next((c for c in delta if c.path == "base.gpkg"), None) + + delta = diff_project.get_delta_changes(latest_version.name - 3, latest_version.name) + delta_base_gpkg = next((c for c in delta if c.path == "base.gpkg"), None) + assert delta_base_gpkg.change == PushChangeType.DELETE + + # create just update_diff versions with checkpoint + base_gpkg = os.path.join(diff_project.storage.project_dir, "test.gpkg") + shutil.copy( + os.path.join(diff_project.storage.project_dir, "v9", "test.gpkg"), base_gpkg + ) + for i in range(6): + sql = f"UPDATE simple SET rating={i}" + execute_query(base_gpkg, sql) + push_change( + diff_project, "updated", "test.gpkg", diff_project.storage.project_dir + ) + delta = diff_project.get_delta_changes(8, latest_version.name + 6) + assert len(delta) == 2 + # file history in 9.th version is basefile + fh = FileHistory.query.filter_by( + project_version_name=latest_version.name - 1 + ).first() + # testing constistency of db entries FileDiff and ProjectVersionDelta + test_gpkg_checkpoint = FileDiff.query.filter_by(basefile_id=fh.id, rank=1).first() + assert test_gpkg_checkpoint + assert test_gpkg_checkpoint.version == latest_version.name + 6 + delta_checkpoint = ProjectVersionDelta.query.filter_by( + project_id=diff_project.id, version=latest_version.name + 6, rank=1 + ).first() + assert delta_checkpoint + assert len(delta_checkpoint.changes) == 1 + assert delta_checkpoint.changes[0]["version"] == latest_version.name + 6 + assert delta_checkpoint.changes[0]["change"] == PushChangeType.UPDATE_DIFF.value + assert delta_checkpoint.changes[0]["diff"] == test_gpkg_checkpoint.path + + fh = FileHistory.query.filter_by( + project_version_name=latest_version.name + 6 + ).first() + delta = diff_project.get_delta_changes(12, latest_version.name + 6) + assert len(delta) == 1 + assert len(delta[0].diffs) == 1 + assert delta[0].diffs[0].path == test_gpkg_checkpoint.path + assert delta[0].change == PushChangeType.UPDATE_DIFF + assert delta[0].checksum == fh.checksum + assert delta[0].size == fh.size + + # check if checkpoint will be there + response = client.get( + f"v2/projects/{diff_project.id}/raw/diff/{delta[0].diffs[0].path}" + ) + assert response.status_code == 200 + + # remove intermediate deltas and assert they would be recreated if needed for higher ranks + ProjectVersionDelta.query.filter_by(project_id=diff_project.id).filter( + ProjectVersionDelta.rank > 0 + ).delete() + db.session.commit() + # v1-v16 would be created from v1-v4, v5-v8 and v9-v12 and 4 individual deltas + delta = diff_project.get_delta_changes(0, diff_project.latest_version) + assert ( + ProjectVersionDelta.query.filter_by(project_id=diff_project.id, rank=1).count() + == 3 + ) + assert ( + ProjectVersionDelta.query.filter_by( + project_id=diff_project.id, rank=2, version=16 + ).count() + == 1 + ) + + def test_get_project(client): """Test get project info endpoint""" admin = User.query.filter_by(username=DEFAULT_USER[0]).first() @@ -588,3 +1056,102 @@ def test_full_push(client): os.path.join(project.storage.project_dir, "v2", test_file["path"]) ) assert not Upload.query.filter_by(project_id=project.id).first() + + +def test_project_delta(client, diff_project): + """Test project delta endpoint""" + login_as_admin(client) + user = add_user() + workspace = create_workspace() + initial_project = create_project("empty_project", workspace=workspace, user=user) + working_dir = os.path.join(TMP_DIR, "empty_work_dir") + os.makedirs(os.path.join(TMP_DIR, "empty_work_dir"), exist_ok=True) + # add basefile + shutil.copy( + os.path.join(test_project_dir, "base.gpkg"), + os.path.join(working_dir, "base.gpkg"), + ) + push_change(initial_project, "added", "base.gpkg", working_dir) + response = client.get(f"v2/projects/{initial_project.id}/delta?since=v0") + assert response.status_code == 200 + changes = response.json["items"] + assert len(changes) == 1 + assert changes[0]["change"] == PushChangeType.CREATE.value + assert changes[0]["version"] == "v1" + + # remove the file and get changes from 0 -> 2 where base gpgkg is removed -> transparent + push_change(initial_project, "removed", "base.gpkg", working_dir) + response = client.get(f"v2/projects/{initial_project.id}/delta?since=v0") + assert response.status_code == 200 + changes = response.json["items"] + assert len(changes) == 0 + + # non valid cases + response = client.get(f"v2/projects/{diff_project.id}/delta") + assert response.status_code == 400 + response = client.get(f"v2/projects/{diff_project.id}/delta?since=v2&to=v1") + assert response.status_code == 400 + response = client.get(f"v2/projects/{diff_project.id}/delta?since=v-2") + assert response.status_code == 400 + response = client.get(f"v2/projects/{diff_project.id}/delta?since=v-2&to=v-1") + assert response.status_code == 400 + # exceeding latest version + response = client.get(f"v2/projects/{diff_project.id}/delta?since=v0&to=v2000") + assert response.status_code == 400 + # no changes between versions with same number + response = client.get(f"v2/projects/{diff_project.id}/delta?since=v1&to=v1") + assert response.status_code == 400 + + # since 1 to latest version + response = client.get(f"v2/projects/{diff_project.id}/delta?since=v1") + assert response.status_code == 200 + changes = response.json["items"] + # create of test.gpkg and delete base.gpkg + assert len(changes) == 2 + assert changes[0]["change"] == PushChangeType.DELETE.value + assert changes[0]["version"] == "v9" + assert changes[0]["path"] == "base.gpkg" + assert changes[0]["size"] == 98304 + + assert changes[1]["change"] == PushChangeType.CREATE.value + assert changes[1]["version"] == "v9" + assert changes[1]["path"] == "test.gpkg" + assert changes[1]["size"] == 98304 + + # simple update + response = client.get(f"v2/projects/{diff_project.id}/delta?since=v4&to=v8") + assert response.status_code == 200 + changes = response.json["items"] + assert len(changes) == 1 + assert changes[0]["change"] == PushChangeType.UPDATE.value + # version is new latest version of the change + assert changes[0]["version"] == "v7" + assert not changes[0].get("diffs") + + +def test_project_pull_diffs(client, diff_project): + """Test project pull mechanisom in v2 with diff files. Integration test for pull mechanism""" + since = 5 + to = 7 + # check diff files in database where we can get them with right order and metadata + current_diffs = ( + FileDiff.query.filter(FileDiff.version > since, FileDiff.version <= to) + .order_by(FileDiff.version) + .all() + ) + response = client.get( + f"v2/projects/{diff_project.id}/delta?since=v{since}&to=v{to}" + ) + assert response.status_code == 200 + delta = response.json["items"] + assert len(delta) == 1 + assert delta[0]["change"] == PushChangeType.UPDATE_DIFF.value + assert delta[0]["version"] == "v7" + first_diff = delta[0]["diffs"][0] + second_diff = delta[0]["diffs"][1] + assert first_diff["path"] == current_diffs[0].path + assert second_diff["path"] == current_diffs[1].path + response = client.get( + f"v2/projects/{diff_project.id}/raw/diff/{first_diff['path']}" + ) + assert response.status_code == 200 diff --git a/server/mergin/tests/test_utils.py b/server/mergin/tests/test_utils.py index bf5f4666..bf7260b7 100644 --- a/server/mergin/tests/test_utils.py +++ b/server/mergin/tests/test_utils.py @@ -23,6 +23,7 @@ check_filename, is_valid_path, get_x_accel_uri, + Checkpoint, wkb2wkt, has_trailing_space, ) @@ -297,3 +298,36 @@ def test_save_diagnostic_log_file(client, app): with open(saved_file_path, "r") as f: content = f.read() assert content == body.decode("utf-8") + + +def test_checkpoint_utils(): + """Test util functions to construct merged versions of higher ranks (checkpoints)""" + # exact match to single rank + versions = Checkpoint.get_checkpoints(1, 64) + assert len(versions) == 1 + assert versions[0].rank == 3 + assert versions[0].index == 1 + + # v21 would be created from v1-16, v17-20 and v21 + versions = Checkpoint.get_checkpoints(1, 21) + assert len(versions) == 3 + assert versions[0].rank == 2 + assert versions[0].index == 1 + assert versions[1].rank == 1 + assert versions[1].index == 5 + assert versions[2].rank == 0 + assert versions[2].index == 21 + + # no cached versions at all, only basic levels v1-v3 + versions = Checkpoint.get_checkpoints(1, 3) + assert len(versions) == 3 + assert versions[0].rank == 0 + assert versions[0].index == 1 + assert versions[1].rank == 0 + assert versions[1].index == 2 + assert versions[2].rank == 0 + assert versions[2].index == 3 + + # dummy request + versions = Checkpoint.get_checkpoints(2, 1) + assert len(versions) == 0 diff --git a/server/mergin/tests/test_workspace.py b/server/mergin/tests/test_workspace.py index 2aafc268..e25f4365 100644 --- a/server/mergin/tests/test_workspace.py +++ b/server/mergin/tests/test_workspace.py @@ -52,6 +52,7 @@ def test_workspace_implementation(client): Configuration.GLOBAL_ADMIN = True # create project with dummy file to count for workspace usage project = create_project("test_permissions", ws, user) + latest_version = project.get_latest_version() file = ProjectFilePath(project.id, path="some_file.txt") file_history = FileHistory( file, @@ -60,12 +61,11 @@ def test_workspace_implementation(client): ), checksum="89469a6482267de394c7c7270cb7ffafe694ea76", size=1024, - diff=null(), + diff=None, change=PushChangeType.CREATE, + version_name=latest_version.name, ) - latest_version = project.get_latest_version() file_history.version = latest_version - file_history.project_version_name = file_history.version.name default_project_usage = ws.disk_usage() db.session.add(file_history) project.disk_usage = 1024 diff --git a/server/mergin/tests/utils.py b/server/mergin/tests/utils.py index dab9c02c..c669e3c6 100644 --- a/server/mergin/tests/utils.py +++ b/server/mergin/tests/utils.py @@ -17,7 +17,13 @@ from ..auth.models import User, UserProfile from ..sync.utils import generate_location, generate_checksum -from ..sync.models import Project, ProjectVersion, FileHistory, ProjectRole +from ..sync.models import ( + Project, + ProjectVersion, + FileHistory, + ProjectRole, + PushChangeType, +) from ..sync.files import ProjectFileChange, PushChangeType, files_changes_from_upload from ..sync.workspace import GlobalWorkspace from ..app import db @@ -178,7 +184,7 @@ def initialize(): # make sure for history without diff there is a proper Null in database jsonb column assert FileHistory.query.filter_by(version_id=pv.id).filter( - FileHistory.diff.is_(None) + FileHistory.changes != PushChangeType.UPDATE_DIFF.value ).count() == len(project_files) # mimic files were uploaded @@ -297,9 +303,12 @@ def push_change(project, action, path, src_dir): current_files = project.files new_version = ProjectVersion.to_v_name(project.next_version()) changes = {"added": [], "updated": [], "removed": []} - metadata = {**file_info(src_dir, path), "location": os.path.join(new_version, path)} if action == "added": + metadata = { + **file_info(src_dir, path), + "location": os.path.join(new_version, path), + } new_file = os.path.join(project.storage.project_dir, metadata["location"]) os.makedirs(os.path.dirname(new_file), exist_ok=True) shutil.copy(os.path.join(src_dir, metadata["path"]), new_file) @@ -343,6 +352,7 @@ def push_change(project, action, path, src_dir): changes["updated"].append(metadata) elif action == "removed": f_removed = next(f for f in current_files if f.path == path) + os.remove(os.path.join(project.storage.project_dir, f_removed.location)) changes["removed"].append(asdict(f_removed)) else: return @@ -381,6 +391,17 @@ def modify_file_times(path, time: datetime, accessed=True, modified=True): os.utime(path, (atime, mtime)) +def diffs_are_equal(diff1, diff2): + """Compare changes of two geodiff files""" + changes1 = os.path.join(TMP_DIR, "changeset" + str(uuid.uuid4())) + changes2 = os.path.join(TMP_DIR, "changeset" + str(uuid.uuid4())) + geodiff = GeoDiff() + geodiff.list_changes_summary(diff1, changes1) + geodiff.list_changes_summary(diff2, changes2) + with open(changes1) as f, open(changes2) as f2: + return json.load(f) == json.load(f2) + + def logout(client): """Test helper to log out the client""" resp = client.get(url_for("/.mergin_auth_controller_logout")) diff --git a/server/migrations/community/4b4648483770_add_project_version_delta.py b/server/migrations/community/4b4648483770_add_project_version_delta.py new file mode 100644 index 00000000..9f13eced --- /dev/null +++ b/server/migrations/community/4b4648483770_add_project_version_delta.py @@ -0,0 +1,129 @@ +"""Add project version delta + +Revision ID: 4b4648483770 +Revises: bd1ec73db389 +Create Date: 2025-10-24 09:55:18.286286 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "4b4648483770" +down_revision = "bd1ec73db389" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "project_version_delta", + sa.Column("id", sa.BigInteger(), autoincrement=True, nullable=False), + sa.Column("version", sa.Integer(), nullable=False), + sa.Column("rank", sa.Integer(), nullable=False), + sa.Column("project_id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("changes", postgresql.JSONB(astext_type=sa.Text()), nullable=False), + sa.ForeignKeyConstraint( + ["project_id"], + ["project.id"], + name=op.f("fk_project_version_delta_project_id_project"), + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("id", name=op.f("pk_project_version_delta")), + sa.UniqueConstraint("project_id", "version", "rank", name="unique_deltas"), + ) + op.create_index( + op.f("ix_project_version_delta_project_id"), + "project_version_delta", + ["project_id"], + unique=False, + ) + op.create_index( + "ix_project_version_delta_project_id_version_rank", + "project_version_delta", + ["project_id", "version", "rank"], + unique=False, + ) + op.create_index( + op.f("ix_project_version_delta_rank"), + "project_version_delta", + ["rank"], + unique=False, + ) + op.create_index( + op.f("ix_project_version_delta_version"), + "project_version_delta", + ["version"], + unique=False, + ) + # ### end Alembic commands ### + + op.execute( + """ + INSERT INTO project_version_delta (project_id, version, rank, changes) + WITH delta AS ( + SELECT + h.version_id, + jsonb_agg( + jsonb_strip_nulls( + jsonb_build_object( + 'path', fp.path, + 'size', h.size, + 'change', h.change, + 'version', h.project_version_name, + 'checksum', h.checksum, + 'diff', fdj.diff_path + ) + ) + ) AS changes + FROM + file_history h + JOIN + project_file_path fp ON h.file_path_id = fp.id + LEFT JOIN LATERAL ( + SELECT + fd.path AS diff_path + FROM + file_diff fd + WHERE + fd.file_path_id = fp.id + AND fd.version = h.project_version_name + AND fd.rank = 0 + ) fdj ON TRUE + GROUP BY + -- Group by the single unique version identifier + h.version_id + ) + SELECT + pv.project_id, + pv.name, + 0 AS rank, + d.changes + FROM + delta AS d + JOIN project_version AS pv ON d.version_id = pv.id + ; + """ + ) + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index( + op.f("ix_project_version_delta_version"), table_name="project_version_delta" + ) + op.drop_index( + op.f("ix_project_version_delta_rank"), table_name="project_version_delta" + ) + op.drop_index( + "ix_project_version_delta_project_id_version_rank", + table_name="project_version_delta", + ) + op.drop_index( + op.f("ix_project_version_delta_project_id"), table_name="project_version_delta" + ) + op.drop_table("project_version_delta") + # ### end Alembic commands ### diff --git a/server/migrations/community/bd1ec73db389_create_file_diff_table.py b/server/migrations/community/bd1ec73db389_create_file_diff_table.py new file mode 100644 index 00000000..1ee671dd --- /dev/null +++ b/server/migrations/community/bd1ec73db389_create_file_diff_table.py @@ -0,0 +1,173 @@ +"""create file diff table + +Revision ID: bd1ec73db389 +Revises: b9ec9ab6694f +Create Date: 2025-07-17 14:17:02.373645 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "bd1ec73db389" +down_revision = "b9ec9ab6694f" +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + "file_diff", + sa.Column("id", sa.BigInteger(), autoincrement=True, nullable=False), + sa.Column("file_path_id", sa.BigInteger(), nullable=False), + sa.Column("basefile_id", sa.BigInteger(), nullable=False), + sa.Column("path", sa.String(), nullable=False), + sa.Column("rank", sa.Integer(), nullable=False), + sa.Column("version", sa.Integer(), nullable=False), + sa.Column("location", sa.String(), nullable=True), + sa.Column("size", sa.BigInteger(), nullable=True), + sa.Column("checksum", sa.String(), nullable=True), + sa.ForeignKeyConstraint( + ["basefile_id"], + ["file_history.id"], + name=op.f("fk_file_diff_basefile_id_file_history"), + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint( + ["file_path_id"], + ["project_file_path.id"], + name=op.f("fk_file_diff_file_path_id_project_file_path"), + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("id", name=op.f("pk_file_diff")), + sa.UniqueConstraint("file_path_id", "rank", "version", name="unique_diff"), + ) + op.create_index( + op.f("ix_file_diff_basefile_id"), "file_diff", ["basefile_id"], unique=False + ) + op.create_index( + "ix_file_diff_file_path_id_version_rank", + "file_diff", + ["file_path_id", "version", "rank"], + unique=False, + ) + op.create_index(op.f("ix_file_diff_path"), "file_diff", ["path"], unique=False) + op.create_index(op.f("ix_file_diff_rank"), "file_diff", ["rank"], unique=False) + op.create_index( + op.f("ix_file_diff_version"), "file_diff", ["version"], unique=False + ) + + # migrate data + conn = op.get_bind() + conn.execute( + """ + WITH diffs AS ( + SELECT * + FROM file_history + WHERE diff IS NOT NULL + ), + basefiles AS ( + SELECT DISTINCT + fh.id AS basefile_id, + fh.file_path_id, + fh.project_version_name AS basefile_version + FROM diffs d + LEFT OUTER JOIN file_history fh ON fh.file_path_id = d.file_path_id + WHERE + fh.change = ANY(ARRAY['create'::push_change_type, 'update'::push_change_type]) + ), + relevant_basefiles AS ( + SELECT + d.id, + d.project_version_name, + b.basefile_id, + b.basefile_version + FROM diffs d + LEFT OUTER JOIN basefiles b ON b.file_path_id = d.file_path_id AND b.basefile_version < d.project_version_name + ), + file_diffs AS ( + SELECT DISTINCT + d.file_path_id, + FIRST_VALUE(rb.basefile_id) OVER (PARTITION BY rb.id ORDER BY rb.basefile_version DESC) as basefile_id, + 0 AS rank, + d.project_version_name AS version, + (d.diff ->> 'path') AS path, + (d.diff ->> 'size')::bigint AS size, + d.diff ->> 'checksum' AS checksum, + d.diff ->> 'location' AS location + FROM diffs d + LEFT OUTER JOIN relevant_basefiles rb ON rb.id = d.id + ) + INSERT INTO file_diff (file_path_id, basefile_id, rank, version, path, size, checksum, location) + -- it seems that some projects / files might be broken so we need to play it safe here + SELECT * FROM file_diffs WHERE basefile_id IS NOT NULL; + """ + ) + + op.drop_column("file_history", "diff") + + +def downgrade(): + op.add_column( + "file_history", + sa.Column( + "diff", + postgresql.JSONB(astext_type=sa.Text()), + autoincrement=False, + nullable=True, + ), + ) + + # migrate data + conn = op.get_bind() + conn.execute( + """ + UPDATE file_history fh + SET diff = jsonb_build_object( + 'path', fd.path, + 'size', fd.size, + 'checksum', fd.checksum, + 'location', fd.location + ) + FROM file_diff fd + WHERE fh.file_path_id = fd.file_path_id AND fh.project_version_name = fd.version AND fd.rank = 0; + """ + ) + + # if there were any broken gpkg files (ommited in upgrade), let's add there a dummy diff + conn.execute( + """ + UPDATE file_history fh + SET diff = jsonb_build_object( + 'path', 'missing-diff', + 'size', 0, + 'checksum', '', + 'location', '' + ) + WHERE fh.change = 'update_diff' AND fh.diff IS NULL; + """ + ) + + # add back consistency constraint + conn.execute( + sa.text( + """ + ALTER TABLE file_history + ADD CONSTRAINT ck_file_history_changes_with_diff CHECK ( + CASE + WHEN (change = 'update_diff') THEN diff IS NOT NULL + ELSE diff IS NULL + END + ); + """ + ) + ) + + op.drop_index(op.f("ix_file_diff_version"), table_name="file_diff") + op.drop_index(op.f("ix_file_diff_rank"), table_name="file_diff") + op.drop_index(op.f("ix_file_diff_path"), table_name="file_diff") + op.drop_index("ix_file_diff_file_path_id_version_rank", table_name="file_diff") + op.drop_index(op.f("ix_file_diff_basefile_id"), table_name="file_diff") + op.drop_table("file_diff")