diff --git a/.gitignore b/.gitignore index 7614b44b..5ca81560 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ projects*/ data/ mergin_db +diagnostic_logs logs *.log diff --git a/README.md b/README.md index 6fbc30d3..5d3a912b 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ You are currently browsing repository for Mergin Maps web server and web client. - 🌱 **Sharing with collaborators** - Projects can be shared with other team members - 🏰 **Permission system** - Decide who can read, write or manage projects - 🌈 **Web interface** - Simple user interface to view and manage projects -- ⚡️ **Fast** - Efficient sync protocol transfering data between clients and server +- ⚡️ **Fast** - Efficient sync protocol transferring data between clients and server - 🧑‍💻 **Developer friendly** - Mergin Maps is open platform. CLI tools and client libraries are available for [Python](https://github.com/MerginMaps/python-api-client) and [C++](https://github.com/MerginMaps/cpp-api-client) - :camera: **Sync images** - Supporting sync of photos with common cloud storage using [mergin-media-sync](https://github.com/MerginMaps/media-sync) tool - 💽 **Sync with database** - Supporting two-way sync of data with PostGIS using [mergin-db-sync](https://github.com/MerginMaps/db-sync) tool @@ -74,7 +74,7 @@ Admin users can enter the admin interface available at `/admin` URL which provid ### Contributing -Contributions are welcomed! You can set up development environment by following a guide in [development.md](./deployment/community/development.md). Before you create your first pull request, we kindly ask you to sign the CLA with your GitHub user name and date [here](LICENSES/CLA-signed-list.md). +Contributions are welcomed! You can set up development environment by following a guide in [development.md](./development.md). Before you create your first pull request, we kindly ask you to sign the CLA with your GitHub user name and date [here](LICENSES/CLA-signed-list.md). ## Documentation @@ -93,7 +93,7 @@ If you need support, a custom deployment, extending the service capabilities and Contributions are welcome! -More information for developers can be found in the dedicated [development](./deployment/community/development.md) page. +More information for developers can be found in the dedicated [development](./development.md) page. Client side modules: - [Python](https://github.com/MerginMaps/python-api-client) client library + CLI diff --git a/deployment/common/set_permissions.sh b/deployment/common/set_permissions.sh old mode 100644 new mode 100755 diff --git a/deployment/community/.env.template b/deployment/community/.env.template index 4526246e..86e4a6b0 100644 --- a/deployment/community/.env.template +++ b/deployment/community/.env.template @@ -98,12 +98,10 @@ MAIL_SUPPRESS_SEND=0 #MAIL_BCC='' -#MERGIN_LOGO_URL= # for link to logo in emails +MERGIN_LOGO_URL=https://merginmaps.com/MM_logo_HORIZ_COLOR_TRANSPARENT_no_padding.png #MAIL_DEBUG=MAIL_SUPPRESS_SEND | False - - # data sync #LOCAL_PROJECTS=os.path.join(config_dir, os.pardir, os.pardir, 'projects') # for local storage type @@ -215,5 +213,7 @@ NO_MONKEY_PATCH=False # Diagnostic logs +DIAGNOSTIC_LOGS_URL= + DIAGNOSTIC_LOGS_DIR=/diagnostic_logs diff --git a/deployment/community/docker-compose.yml b/deployment/community/docker-compose.yml index 8b5d5156..3a5b2665 100644 --- a/deployment/community/docker-compose.yml +++ b/deployment/community/docker-compose.yml @@ -30,7 +30,6 @@ services: volumes: - ./projects:/data - ./diagnostic_logs:/diagnostic_logs - - ../common/entrypoint.sh:/app/entrypoint.sh env_file: - .prod.env depends_on: @@ -48,8 +47,6 @@ services: environment: - GEVENT_WORKER=0 - NO_MONKEY_PATCH=1 - volumes: - - ../common/entrypoint.sh:/app/entrypoint.sh depends_on: - redis - server @@ -68,7 +65,6 @@ services: - NO_MONKEY_PATCH=1 volumes: - ./projects:/data - - ../common/entrypoint.sh:/app/entrypoint.sh depends_on: - redis - server diff --git a/deployment/enterprise/.env.template b/deployment/enterprise/.env.template index f51c8bef..663ba5ac 100644 --- a/deployment/enterprise/.env.template +++ b/deployment/enterprise/.env.template @@ -175,11 +175,8 @@ ACCOUNT_EXPIRATION=1 # for links generated in emails -#MERGIN_BASE_URL=http://localhost:5000 -MERGIN_BASE_URL=fixme - #MERGIN_LOGO_URL= # for link to logo in emails -MERGIN_LOGO_URL=fixme +MERGIN_LOGO_URL=https://merginmaps.com/MM_logo_HORIZ_COLOR_TRANSPARENT_no_padding.png # global workspace related bits - ignored in non-CE versions # GLOBAL_WORKSPACE mergin @@ -228,6 +225,8 @@ VECTOR_TILES_STYLE_URL=https://tiles-ee.merginmaps.com//styles/default.json ### Diagnostic logs from Mobile and QGIS Plugin DIAGNOSTIC_LOGS_DIR=/diagnostic_logs +DIAGNOSTIC_LOGS_URL= + ### SSO ################################################################################################################ SSO_ENABLED=False diff --git a/deployment/enterprise/docker-compose.yml b/deployment/enterprise/docker-compose.yml index c4524707..902a66ef 100644 --- a/deployment/enterprise/docker-compose.yml +++ b/deployment/enterprise/docker-compose.yml @@ -13,9 +13,10 @@ services: volumes: - ./data:/data # map data dir to host - ./diagnostic_logs:/diagnostic_logs # diagnostic logs dir - - ../common/entrypoint.sh:/app/entrypoint.sh env_file: - .prod.env + environment: + - GUNICORN_CMD_ARGS="--limit-request-line 8190" depends_on: - db networks: @@ -56,8 +57,6 @@ services: restart: always user: 901:999 command: ["celery -A application.celery beat --loglevel=info"] - volumes: - - ../common/entrypoint.sh:/app/entrypoint.sh env_file: - .prod.env depends_on: @@ -75,7 +74,6 @@ services: volumes: - ./data:/data # map data dir to host - ./map_data:/overviews - - ../common/entrypoint.sh:/app/entrypoint.sh env_file: - .prod.env depends_on: diff --git a/development.md b/development.md index b2a1be08..72c79ec4 100644 --- a/development.md +++ b/development.md @@ -43,7 +43,7 @@ $ yarn install $ yarn link:dependencies # link dependencies $ yarn build:libs # bild libraries @mergin/lib @mergin/admin-lib @mergin/lib-vue2 $ yarn dev # development client web application dev server on port 8080 (package @mergin/app) -$ yarn dev:admin # development admin appplication dev server on port 8081 (package @mergin/admin-app) +$ yarn dev:admin # development admin application dev server on port 8081 (package @mergin/admin-app) ``` If you are developing a library package (named **-lib*), it is useful to watch the library for changes instead of rebuilding it each time. @@ -71,8 +71,6 @@ cd deployment/community/ # Create .prod.env file from .env.template cp .env.template .prod.env -# Run the docker composition with the current Dockerfiles -cp .env.template .prod.env docker compose -f docker-compose.yml -f docker-compose.dev.yml up -d # Give ownership of the ./projects folder to user that is running the gunicorn container @@ -98,7 +96,7 @@ docker exec -it merginmaps-server flask server send-check-email --email admin@e In docker-compose.dev.yml is started maildev/maildev image that can be used to test emails (see [https://github.com/maildev/maildev/](https://github.com/maildev/maildev/)). In localhost:1080 you can see the emails sent by the application in web interface. ### Running with remote debugger -If you want to run the application with remote debugger, you can use debug compose file with attatched source code and reload. +If you want to run the application with remote debugger, you can use debug compose file with attached source code and reload. It starts a debugpy session on port 5678 you can attach to. ```shell diff --git a/server/application.py b/server/application.py index b1ab79ac..b5fae1d1 100644 --- a/server/application.py +++ b/server/application.py @@ -27,6 +27,7 @@ remove_projects_archives, remove_temp_files, remove_projects_backups, + remove_unused_chunks, ) from mergin.celery import celery, configure_celery from mergin.stats.config import Configuration @@ -47,6 +48,7 @@ "GLOBAL_WRITE", "ENABLE_SUPERADMIN_ASSIGNMENT", "DIAGNOSTIC_LOGS_URL", + "V2_PUSH_ENABLED", ] ) register_stats(application) @@ -85,4 +87,9 @@ def setup_periodic_tasks(sender, **kwargs): crontab(hour=3, minute=0), remove_projects_archives, name="remove old project archives", + ), + sender.add_periodic_task( + crontab(hour="*/4", minute=0), + remove_unused_chunks, + name="clean up of outdated chunks", ) diff --git a/server/mergin/app.py b/server/mergin/app.py index d0fd2f3a..2f8c7dcf 100644 --- a/server/mergin/app.py +++ b/server/mergin/app.py @@ -12,7 +12,17 @@ from sqlalchemy.schema import MetaData from flask_sqlalchemy import SQLAlchemy from flask_marshmallow import Marshmallow -from flask import json, jsonify, request, abort, current_app, Flask, Request, Response +from flask import ( + json, + jsonify, + make_response, + request, + abort, + current_app, + Flask, + Request, + Response, +) from flask_login import current_user, LoginManager from flask_wtf.csrf import generate_csrf, CSRFProtect from flask_migrate import Migrate @@ -25,7 +35,7 @@ import time import traceback from werkzeug.exceptions import HTTPException -from typing import List, Dict, Optional +from typing import List, Dict, Optional, Tuple from .sync.utils import get_blacklisted_dirs, get_blacklisted_files from .config import Configuration @@ -347,6 +357,16 @@ def ping(): # pylint: disable=W0612 ) return status, 200 + # reading raw input stream not supported in connexion so far + # https://github.com/zalando/connexion/issues/592 + # and as workaround we use custom Flask endpoint in create_app function + @app.route("/v2/projects//chunks", methods=["POST"]) + @auth_required + def upload_chunk_v2(id: str): + from .sync import public_api_v2_controller + + return public_api_v2_controller.upload_chunk(id) + # reading raw input stream not supported in connexion so far # https://github.com/zalando/connexion/issues/592 # and as workaround we use custom Flask endpoint in create_app function @@ -485,6 +505,12 @@ class ResponseError: def to_dict(self) -> Dict: return dict(code=self.code, detail=self.detail + f" ({self.code})") + def response(self, status_code: int) -> Tuple[Response, int]: + """Returns a custom error response with the given code.""" + response = make_response(jsonify(self.to_dict()), status_code) + response.headers["Content-Type"] = "application/problem+json" + return response, status_code + def whitespace_filter(obj): return obj.strip() if isinstance(obj, str) else obj diff --git a/server/mergin/auth/models.py b/server/mergin/auth/models.py index 39b94e91..5dcf275e 100644 --- a/server/mergin/auth/models.py +++ b/server/mergin/auth/models.py @@ -36,6 +36,8 @@ class User(db.Model): default=datetime.datetime.utcnow, ) + last_signed_in = db.Column(db.DateTime(), nullable=True) + __table_args__ = ( db.Index("ix_user_username", func.lower(username), unique=True), db.Index("ix_user_email", func.lower(email), unique=True), @@ -289,6 +291,7 @@ def __init__(self, user_id: int, ua: str, ip: str, device_id: Optional[str] = No self.user_agent = ua self.ip_address = ip self.device_id = device_id + self.timestamp = datetime.datetime.now(tz=datetime.timezone.utc) @staticmethod def add_record(user_id: int, req: request) -> None: @@ -300,4 +303,37 @@ def add_record(user_id: int, req: request) -> None: return lh = LoginHistory(user_id, ua, ip, device_id) db.session.add(lh) + + # cache user last login + User.query.filter_by(id=user_id).update({"last_signed_in": lh.timestamp}) + db.session.commit() + + @staticmethod + def get_users_last_signed_in(user_ids: list) -> dict: + """Get users last signed in dates. + Result is also cached in User table for future use. + """ + result = ( + db.session.query( + LoginHistory.user_id, + func.max(LoginHistory.timestamp).label("last_signed_in"), + ) + .filter(LoginHistory.user_id.in_(user_ids)) + .group_by(LoginHistory.user_id) + .all() + ) + + user_mapping = [ + { + "id": row.user_id, # user_id as PK in User table + "last_signed_in": row.last_signed_in, + } + for row in result + ] + if not user_mapping: + return {} + + # cache users last signed in + db.session.bulk_update_mappings(User, user_mapping) db.session.commit() + return {item["id"]: item["last_signed_in"] for item in user_mapping} diff --git a/server/mergin/sync/commands.py b/server/mergin/sync/commands.py index 6e70d913..21f5ef15 100644 --- a/server/mergin/sync/commands.py +++ b/server/mergin/sync/commands.py @@ -11,7 +11,6 @@ from flask import Flask, current_app from sqlalchemy import func -from .files import UploadChanges from ..app import db from .models import Project, ProjectVersion from .utils import split_project_path @@ -55,8 +54,8 @@ def create(name, namespace, username): # pylint: disable=W0612 p = Project(**project_params) p.updated = datetime.utcnow() db.session.add(p) - changes = UploadChanges(added=[], updated=[], removed=[]) - pv = ProjectVersion(p, 0, user.id, changes, "127.0.0.1") + pv = ProjectVersion(p, 0, user.id, [], "127.0.0.1") + pv.project = p db.session.add(pv) db.session.commit() os.makedirs(p.storage.project_dir, exist_ok=True) diff --git a/server/mergin/sync/config.py b/server/mergin/sync/config.py index b182da6d..7200dae5 100644 --- a/server/mergin/sync/config.py +++ b/server/mergin/sync/config.py @@ -64,3 +64,14 @@ class Configuration(object): ) # in seconds, older unfinished zips are moved to temp PARTIAL_ZIP_EXPIRATION = config("PARTIAL_ZIP_EXPIRATION", default=600, cast=int) + # whether new push is allowed + V2_PUSH_ENABLED = config("V2_PUSH_ENABLED", default=True, cast=bool) + # directory for file chunks + UPLOAD_CHUNKS_DIR = config( + "UPLOAD_CHUNKS_DIR", + default=os.path.join(LOCAL_PROJECTS, "chunks"), + ) + # time in seconds after chunks are permanently deleted (1 day) + UPLOAD_CHUNKS_EXPIRATION = config( + "UPLOAD_CHUNKS_EXPIRATION", default=86400, cast=int + ) diff --git a/server/mergin/sync/db_events.py b/server/mergin/sync/db_events.py index 18d1ce60..48a1756d 100644 --- a/server/mergin/sync/db_events.py +++ b/server/mergin/sync/db_events.py @@ -6,6 +6,8 @@ from flask import current_app, abort from sqlalchemy import event +from .models import ProjectVersion +from .tasks import optimize_storage from ..app import db @@ -14,9 +16,17 @@ def check(session): abort(503, "Service unavailable due to maintenance, please try later") +def optimize_gpgk_storage(mapper, connection, project_version): + # do not optimize on every version, every 10th is just fine + if not project_version.name % 10: + optimize_storage.delay(project_version.project_id) + + def register_events(): event.listen(db.session, "before_commit", check) + event.listen(ProjectVersion, "after_insert", optimize_gpgk_storage) def remove_events(): event.remove(db.session, "before_commit", check) + event.listen(ProjectVersion, "after_insert", optimize_gpgk_storage) diff --git a/server/mergin/sync/errors.py b/server/mergin/sync/errors.py index d253ef4c..35985ab9 100644 --- a/server/mergin/sync/errors.py +++ b/server/mergin/sync/errors.py @@ -3,8 +3,12 @@ # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial from typing import List, Dict + +from .config import Configuration from ..app import ResponseError +MAX_CHUNK_SIZE = Configuration.MAX_CHUNK_SIZE / 1024 / 1024 + class UpdateProjectAccessError(ResponseError): code = "UpdateProjectAccessError" @@ -39,3 +43,55 @@ def to_dict(self) -> Dict: class ProjectLocked(ResponseError): code = "ProjectLocked" detail = "The project is currently locked and you cannot make changes to it" + + +class DataSyncError(ResponseError): + code = "DataSyncError" + detail = "There are either corrupted files or it is not possible to create version with provided geopackage data" + + def __init__(self, failed_files: Dict): + self.failed_files = failed_files + + def to_dict(self) -> Dict: + data = super().to_dict() + data["failed_files"] = self.failed_files + return data + + +class ProjectVersionExists(ResponseError): + code = "ProjectVersionExists" + detail = "Project version mismatch" + + def __init__(self, client_version: int, server_version: int): + self.client_version = client_version + self.server_version = server_version + + def to_dict(self) -> Dict: + data = super().to_dict() + data["client_version"] = f"v{self.client_version}" + data["server_version"] = f"v{self.server_version}" + return data + + +class AnotherUploadRunning(ResponseError): + code = "AnotherUploadRunning" + detail = "Another process is running" + + +class UploadError(ResponseError): + code = "UploadError" + detail = "Project version could not be created" + + def __init__(self, error: str = None): + self.error = error + + def to_dict(self) -> Dict: + data = super().to_dict() + if self.error is not None: + data["detail"] = self.error + f" ({self.code})" + return data + + +class BigChunkError(ResponseError): + code = "BigChunkError" + detail = f"Chunk size exceeds maximum allowed size {MAX_CHUNK_SIZE} MB" diff --git a/server/mergin/sync/files.py b/server/mergin/sync/files.py index 12b30afe..fd77c597 100644 --- a/server/mergin/sync/files.py +++ b/server/mergin/sync/files.py @@ -2,15 +2,36 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial import datetime +from enum import Enum import os from dataclasses import dataclass from typing import Optional, List -from marshmallow import fields, EXCLUDE, pre_load, post_load, post_dump +import uuid +from flask import current_app +from marshmallow import ValidationError, fields, EXCLUDE, post_dump, validates_schema from pathvalidate import sanitize_filename +from .utils import ( + is_file_name_blacklisted, + is_qgis, + is_supported_extension, + is_valid_path, + is_versioned_file, +) from ..app import DateTimeWithZ, ma +class PushChangeType(Enum): + CREATE = "create" + UPDATE = "update" + DELETE = "delete" + UPDATE_DIFF = "update_diff" + + @classmethod + def values(cls): + return [member.value for member in cls.__members__.values()] + + def mergin_secure_filename(filename: str) -> str: """Generate secure filename for given file""" filename = os.path.normpath(filename) @@ -24,94 +45,181 @@ def mergin_secure_filename(filename: str) -> str: @dataclass class File: - """Base class for every file object""" + """Base class for every file object, either intended to upload or already existing in project""" path: str checksum: str size: int - location: str def is_valid_gpkg(self): """Check if diff file is valid""" return self.size != 0 +@dataclass +class ProjectDiffFile(File): + """Metadata for geodiff diff file (aka. changeset) associated with geopackage""" + + # location where file is actually stored + location: str + + @dataclass class ProjectFile(File): - """Project file metadata including metadata for diff file""" + """Project file metadata including metadata for diff file and location where it is stored""" # metadata for gpkg diff file - diff: Optional[File] + diff: Optional[ProjectDiffFile] # deprecated attribute kept for public API compatibility mtime: Optional[datetime.datetime] + # location where file is actually stored + location: str @dataclass -class UploadFile(File): - """File to be uploaded coming from client push process""" - - # determined by client - chunks: Optional[List[str]] - diff: Optional[File] - +class ProjectFileChange(ProjectFile): + """Metadata of changed file in project version. + + This item is saved into database into file_history. + """ + + change: PushChangeType + + +def files_changes_from_upload( + changes: dict, location_dir: str +) -> List["ProjectFileChange"]: + """Create a list of version file changes from upload changes dictionary used by public API. + + It flattens changes dict and adds change type to each item. Also generates location for each file. + """ + secure_filenames = [] + version_changes = [] + for key in ("added", "updated", "removed"): + for item in changes.get(key, []): + location = os.path.join(location_dir, mergin_secure_filename(item["path"])) + diff = None + + # make sure we have unique location for each file + if location in secure_filenames: + filename, file_extension = os.path.splitext(location) + location = filename + f".{str(uuid.uuid4())}" + file_extension + + secure_filenames.append(location) + + if key == "removed": + change = PushChangeType.DELETE + location = None + elif key == "added": + change = PushChangeType.CREATE + else: + change = PushChangeType.UPDATE + if item.get("diff"): + change = PushChangeType.UPDATE_DIFF + diff_location = os.path.join( + location_dir, mergin_secure_filename(item["diff"]["path"]) + ) + if diff_location in secure_filenames: + filename, file_extension = os.path.splitext(diff_location) + diff_location = ( + filename + f".{str(uuid.uuid4())}" + file_extension + ) + + secure_filenames.append(diff_location) + diff = ProjectDiffFile( + path=item["diff"]["path"], + checksum=item["diff"]["checksum"], + size=item["diff"]["size"], + location=diff_location, + ) + + file_change = ProjectFileChange( + path=item["path"], + checksum=item["checksum"], + size=item["size"], + mtime=None, + change=change, + location=location, + diff=diff, + ) + version_changes.append(file_change) -@dataclass -class UploadChanges: - added: List[UploadFile] - updated: List[UploadFile] - removed: List[UploadFile] + return version_changes class FileSchema(ma.Schema): path = fields.String() size = fields.Integer() checksum = fields.String() - location = fields.String(load_default="", load_only=True) class Meta: unknown = EXCLUDE - @post_load - def create_obj(self, data, **kwargs): - return File(**data) - class UploadFileSchema(FileSchema): chunks = fields.List(fields.String(), load_default=[]) diff = fields.Nested(FileSchema(), many=False, load_default=None) - @pre_load - def pre_load(self, data, **kwargs): - # add future location based on context version - version = f"v{self.context.get('version')}" - if not data.get("location"): - data["location"] = os.path.join( - version, mergin_secure_filename(data["path"]) - ) - if data.get("diff") and not data.get("diff").get("location"): - data["diff"]["location"] = os.path.join( - version, mergin_secure_filename(data["diff"]["path"]) - ) - return data - - @post_load - def create_obj(self, data, **kwargs): - return UploadFile(**data) - class ChangesSchema(ma.Schema): """Schema for upload changes""" - added = fields.List(fields.Nested(UploadFileSchema()), load_default=[]) - updated = fields.List(fields.Nested(UploadFileSchema()), load_default=[]) - removed = fields.List(fields.Nested(UploadFileSchema()), load_default=[]) + added = fields.List( + fields.Nested(UploadFileSchema()), load_default=[], dump_default=[] + ) + updated = fields.List( + fields.Nested(UploadFileSchema()), load_default=[], dump_default=[] + ) + removed = fields.List( + fields.Nested(UploadFileSchema()), load_default=[], dump_default=[] + ) class Meta: unknown = EXCLUDE - @post_load - def create_obj(self, data, **kwargs): - return UploadChanges(**data) + @post_dump + def remove_blacklisted_files(self, data, **kwargs): + """Files which are blacklisted are not allowed to be uploaded and are simple ignored.""" + for key in ("added", "updated", "removed"): + data[key] = [ + f + for f in data[key] + if not is_file_name_blacklisted( + f["path"], current_app.config["BLACKLIST"] + ) + ] + return data + + @validates_schema + def validate(self, data, **kwargs): + """Basic consistency validations for upload metadata""" + changes_files = [ + f["path"] for f in data["added"] + data["updated"] + data["removed"] + ] + + if len(changes_files) == 0: + raise ValidationError("No changes") + + # changes' files must be unique + if len(set(changes_files)) != len(changes_files): + raise ValidationError("Not unique changes") + + # check if all files are valid + for file in data["added"] + data["updated"]: + file_path = file["path"] + if is_versioned_file(file_path) and file["size"] == 0: + raise ValidationError("File is not valid") + + if not is_valid_path(file_path): + raise ValidationError( + f"Unsupported file name detected: {file_path}. Please remove the invalid characters." + ) + + if not is_supported_extension(file_path): + raise ValidationError( + f"Unsupported file type detected: {file_path}. " + f"Please remove the file or try compressing it into a ZIP file before uploading.", + ) class ProjectFileSchema(FileSchema): diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index 3854e4d2..9574a69d 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -10,6 +10,7 @@ from enum import Enum from typing import Optional, List, Dict, Set, Tuple from dataclasses import dataclass, asdict +import logging from blinker import signal from flask_login import current_user @@ -23,30 +24,38 @@ from .files import ( File, - UploadChanges, + ProjectDiffFile, + ProjectFileChange, ChangesSchema, ProjectFile, + files_changes_from_upload, + mergin_secure_filename, + PushChangeType, ) from .interfaces import WorkspaceRole from .storages.disk import move_to_tmp from ..app import db from .storages import DiskStorage -from .utils import is_versioned_file, is_qgis +from .utils import ( + Toucher, + get_chunk_location, + get_project_path, + is_supported_type, + is_versioned_file, + is_qgis, +) Storages = {"local": DiskStorage} project_deleted = signal("project_deleted") project_access_granted = signal("project_access_granted") +push_finished = signal("push_finished") +project_version_created = signal("project_version_created") -class PushChangeType(Enum): - CREATE = "create" - UPDATE = "update" - DELETE = "delete" - UPDATE_DIFF = "update_diff" - - @classmethod - def values(cls): - return [member.value for member in cls.__members__.values()] +class FileSyncErrorType(Enum): + CORRUPTED = "corrupted" + UNSUPPORTED = "unsupported" + SYNC_ERROR = "sync error" class Project(db.Model): @@ -181,7 +190,7 @@ def files(self) -> List[ProjectFile]: checksum=row.checksum, location=row.location, mtime=row.mtime, - diff=File(**row.diff) if row.diff else None, + diff=ProjectDiffFile(**row.diff) if row.diff else None, ) for row in db.session.execute(query, params).fetchall() ] @@ -504,9 +513,9 @@ def path(self) -> str: return self.file.path @property - def diff_file(self) -> Optional[File]: + def diff_file(self) -> Optional[ProjectDiffFile]: if self.diff: - return File(**self.diff) + return ProjectDiffFile(**self.diff) @property def mtime(self) -> datetime: @@ -705,7 +714,7 @@ def __init__( project: Project, name: int, author_id: int, - changes: UploadChanges, + changes: List[ProjectFileChange], ip: str, user_agent: str = None, device_id: str = None, @@ -725,9 +734,7 @@ def __init__( ).all() } - changed_files_paths = [ - f.path for f in changes.updated + changes.removed + changes.added - ] + changed_files_paths = set(change.path for change in changes) existing_files_map = { f.path: f for f in ProjectFilePath.query.filter_by(project_id=self.project_id) @@ -735,46 +742,32 @@ def __init__( .all() } - for key in ( - ("added", PushChangeType.CREATE), - ("updated", PushChangeType.UPDATE), - ("removed", PushChangeType.DELETE), - ): - change_attr = key[0] - change_type = key[1] - - for upload_file in getattr(changes, change_attr): - is_diff_change = ( - change_type is PushChangeType.UPDATE - and upload_file.diff is not None - ) - - file = existing_files_map.get( - upload_file.path, ProjectFilePath(self.project_id, upload_file.path) - ) - fh = FileHistory( - file=file, - size=upload_file.size, - checksum=upload_file.checksum, - location=upload_file.location, - diff=( - asdict(upload_file.diff) - if (is_diff_change and upload_file.diff) - else null() - ), - change=( - PushChangeType.UPDATE_DIFF if is_diff_change else change_type - ), - ) - fh.version = self - fh.project_version_name = self.name - db.session.add(fh) - db.session.flush() + for item in changes: + # get existing DB file reference or create a new one (for added files) + db_file = existing_files_map.get( + item.path, ProjectFilePath(self.project_id, item.path) + ) + fh = FileHistory( + file=db_file, + size=item.size, + checksum=item.checksum, + location=item.location, + diff=( + asdict(item.diff) + if (item.change is PushChangeType.UPDATE_DIFF and item.diff) + else null() + ), + change=item.change, + ) + fh.version = self + fh.project_version_name = self.name + db.session.add(fh) + db.session.flush() - if change_type is PushChangeType.DELETE: - latest_files_map.pop(fh.path, None) - else: - latest_files_map[fh.path] = fh.id + if item.change is PushChangeType.DELETE: + latest_files_map.pop(fh.path, None) + else: + latest_files_map[fh.path] = fh.id # update cached values in project and push to transaction buffer so that self.files is up-to-date self.project.latest_project_files.file_history_ids = latest_files_map.values() @@ -909,7 +902,7 @@ def files(self) -> List[ProjectFile]: checksum=row.checksum, location=row.location, mtime=row.mtime, - diff=File(**row.diff) if row.diff else None, + diff=ProjectDiffFile(**row.diff) if row.diff else None, ) for row in result ] @@ -1021,9 +1014,7 @@ class Upload(db.Model): ) __table_args__ = (db.UniqueConstraint("project_id", "version"),) - def __init__( - self, project: Project, version: int, changes: UploadChanges, user_id: int - ): + def __init__(self, project: Project, version: int, changes: dict, user_id: int): self.id = str(uuid.uuid4()) self.project_id = project.id self.version = version @@ -1053,6 +1044,143 @@ def clear(self): db.session.delete(self) db.session.commit() + def process_chunks( + self, use_shared_chunk_dir: bool + ) -> Tuple[List[ProjectFileChange], Dict]: + """Concatenate chunks into single file and apply gpkg updates if needed""" + errors = {} + project_path = get_project_path(self.project) + v_next_version = ProjectVersion.to_v_name(self.project.next_version()) + chunks_map = { + f["path"]: f["chunks"] + for f in self.changes["added"] + self.changes["updated"] + } + file_changes = files_changes_from_upload(self.changes, v_next_version) + to_remove = [i.path for i in file_changes if i.change == PushChangeType.DELETE] + current_files = [f for f in self.project.files if f.path not in to_remove] + + with Toucher(self.lockfile, 5): + for f in file_changes: + if f.change == PushChangeType.DELETE: + continue + + f_location = ( + f.diff.location + if f.change == PushChangeType.UPDATE_DIFF + else f.location + ) + temporary_location = os.path.join(self.upload_dir, "files", f_location) + os.makedirs(os.path.dirname(temporary_location), exist_ok=True) + with open(temporary_location, "wb") as dest: + try: + for chunk_id in chunks_map.get(f.path, []): + # based on API version location for uploaded chunks differs + if use_shared_chunk_dir: + chunk_file = get_chunk_location(chunk_id) + else: + chunk_file = os.path.join( + self.upload_dir, "chunks", chunk_id + ) + + if not os.path.exists(chunk_file): + errors[f.path] = FileSyncErrorType.CORRUPTED.value + continue + + with open(chunk_file, "rb") as src: + data = src.read(8192) + while data: + dest.write(data) + data = src.read(8192) + + except IOError: + logging.exception( + f"Failed to process chunk: {chunk_id} in project {project_path}" + ) + errors[f.path] = FileSyncErrorType.CORRUPTED.value + continue + + if ( + not f.change == PushChangeType.UPDATE_DIFF + and not is_supported_type(temporary_location) + ): + logging.info(f"Rejecting blacklisted file: {temporary_location}") + errors[f.path] = FileSyncErrorType.UNSUPPORTED.value + continue + + # check if .gpkg file is valid + if is_versioned_file(temporary_location) and not f.is_valid_gpkg(): + errors[f.path] = FileSyncErrorType.CORRUPTED.value + continue + + expected_size = ( + f.diff.size if f.change == PushChangeType.UPDATE_DIFF else f.size + ) + if expected_size != os.path.getsize(temporary_location): + logging.error( + f"Data integrity check has failed on file {f.path} in project {project_path}", + exc_info=True, + ) + errors[f.path] = FileSyncErrorType.CORRUPTED.value + continue + + # for updates try to apply diff to create a full updated gpkg file or from full .gpkg try to create corresponding diff + if f.change in ( + PushChangeType.UPDATE, + PushChangeType.UPDATE_DIFF, + ) and is_versioned_file(f.path): + current_file = next( + (i for i in current_files if i.path == f.path), None + ) + if not current_file: + errors[f.path] = ( + f"{FileSyncErrorType.SYNC_ERROR.value}: file not found on server" + ) + continue + + if f.diff: + changeset = temporary_location + patched_file = os.path.join( + self.upload_dir, "files", f.location + ) + + result = self.project.storage.apply_diff( + current_file, changeset, patched_file + ) + if result.ok(): + checksum, size = result.value + f.checksum = checksum + f.size = size + else: + errors[f.path] = ( + f"{FileSyncErrorType.SYNC_ERROR.value}: project {self.project.workspace.name}/{self.project.name}, {result.value}" + ) + else: + diff_name = mergin_secure_filename( + f.path + "-diff-" + str(uuid.uuid4()) + ) + changeset = os.path.join( + self.upload_dir, "files", v_next_version, diff_name + ) + patched_file = temporary_location + result = self.project.storage.construct_diff( + current_file, changeset, patched_file + ) + if result.ok(): + checksum, size = result.value + f.diff = ProjectDiffFile( + checksum=checksum, + size=size, + path=diff_name, + location=os.path.join(v_next_version, diff_name), + ) + f.change = PushChangeType.UPDATE_DIFF + else: + # if diff cannot be constructed it would be a force update + logging.warning( + f"Geodiff: create changeset error {result.value}" + ) + return file_changes, errors + class RequestStatus(Enum): ACCEPTED = "accepted" diff --git a/server/mergin/sync/private_api.yaml b/server/mergin/sync/private_api.yaml index b5fa2c87..48a88933 100644 --- a/server/mergin/sync/private_api.yaml +++ b/server/mergin/sync/private_api.yaml @@ -384,20 +384,20 @@ paths: $ref: "#/components/responses/NotFoundResp" x-openapi-router-controller: mergin.sync.private_api_controller /projects/{id}/download: + parameters: + - $ref: "#/components/parameters/ProjectId" + - name: version + in: query + description: Particular version to download + required: false + schema: + $ref: "#/components/schemas/VersionName" get: tags: - project summary: Download full project description: Download whole project folder as zip file operationId: download_project - parameters: - - $ref: "#/components/parameters/ProjectId" - - name: version - in: query - description: Particular version to download - required: false - schema: - $ref: "#/components/schemas/VersionName" responses: "200": description: Zip file @@ -417,6 +417,26 @@ paths: "404": $ref: "#/components/responses/NotFoundResp" x-openapi-router-controller: mergin.sync.private_api_controller + post: + tags: + - project + summary: Prepare project archive + description: Prepare project zip archive to download + operationId: prepare_archive + responses: + "201": + description: Project archive creation started + "204": + $ref: "#/components/responses/NoContent" + "400": + $ref: "#/components/responses/BadStatusResp" + "422": + $ref: "#/components/responses/UnprocessableEntity" + "403": + $ref: "#/components/responses/Forbidden" + "404": + $ref: "#/components/responses/NotFoundResp" + x-openapi-router-controller: mergin.sync.private_api_controller components: responses: UnauthorizedError: @@ -436,7 +456,9 @@ components: UnsupportedMediaType: description: Payload format is in an unsupported format. ConflictResp: - description: Request could not be processed becuase of conflict in resources + description: Request could not be processed because of conflict in resources + NoContent: + description: Success. No content returned. parameters: Page: name: page diff --git a/server/mergin/sync/private_api_controller.py b/server/mergin/sync/private_api_controller.py index 13f059b9..dd870aae 100644 --- a/server/mergin/sync/private_api_controller.py +++ b/server/mergin/sync/private_api_controller.py @@ -322,7 +322,7 @@ def get_project_access(id: str): def download_project(id: str, version=None): # noqa: E501 # pylint: disable=W0622 """Download whole project folder as zip file in any version - Return zip file if it exists, otherwise trigger background job to create it""" + Return zip file if it exists, otherwise return 202""" project = require_project_by_uuid(id, ProjectPermissions.Read) lookup_version = ( ProjectVersion.from_v_name(version) if version else project.latest_version @@ -331,9 +331,6 @@ def download_project(id: str, version=None): # noqa: E501 # pylint: disable=W06 project_id=project.id, name=lookup_version ).first_or_404("Project version does not exist") - if project_version.project_size > current_app.config["MAX_DOWNLOAD_ARCHIVE_SIZE"]: - abort(400) - # check zip is already created if os.path.exists(project_version.zip_path): if current_app.config["USE_X_ACCEL"]: @@ -352,17 +349,36 @@ def download_project(id: str, version=None): # noqa: E501 # pylint: disable=W06 f"attachment; filename*=UTF-8''{file_name}" ) return resp - # GET request triggers background job if no partial zip or expired one - if request.method == "GET": - temp_zip_path = project_version.zip_path + ".partial" - # create zip if it does not exist yet or has expired - partial_exists = os.path.exists(temp_zip_path) - is_expired = partial_exists and datetime.fromtimestamp( - os.path.getmtime(temp_zip_path), tz=timezone.utc - ) < datetime.now(timezone.utc) - timedelta( - seconds=current_app.config["PARTIAL_ZIP_EXPIRATION"] - ) - if not partial_exists or is_expired: - create_project_version_zip.delay(project_version.id) - return "Project zip being prepared, please try again later", 202 + return "Project zip being prepared", 202 + + +def prepare_archive(id: str, version=None): + """Triggers background job to create project archive""" + project = require_project_by_uuid(id, ProjectPermissions.Read) + lookup_version = ( + ProjectVersion.from_v_name(version) if version else project.latest_version + ) + pv = ProjectVersion.query.filter_by( + project_id=project.id, name=lookup_version + ).first_or_404() + + if pv.project_size > current_app.config["MAX_DOWNLOAD_ARCHIVE_SIZE"]: + abort(400) + + if os.path.exists(pv.zip_path): + return NoContent, 204 + + # trigger job if no recent partial + temp_zip_path = pv.zip_path + ".partial" + partial_exists = os.path.exists(temp_zip_path) + is_expired = partial_exists and datetime.fromtimestamp( + os.path.getmtime(temp_zip_path), tz=timezone.utc + ) < datetime.now(timezone.utc) - timedelta( + seconds=current_app.config["PARTIAL_ZIP_EXPIRATION"] + ) + if partial_exists and not is_expired: + return NoContent, 204 + + create_project_version_zip.delay(pv.id) + return "Project zip creation started", 201 diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index 9fd229a1..b3bf42df 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -14,8 +14,8 @@ from datetime import datetime import gevent +from marshmallow import ValidationError import psycopg2 -from blinker import signal from connexion import NoContent, request from flask import ( abort, @@ -32,7 +32,7 @@ from gevent import sleep import base64 -from werkzeug.exceptions import HTTPException +from werkzeug.exceptions import HTTPException, Conflict from mergin.sync.forms import project_name_validation from .interfaces import WorkspaceRole @@ -40,6 +40,7 @@ from ..auth import auth_required from ..auth.models import User from .models import ( + FileSyncErrorType, Project, ProjectVersion, Upload, @@ -48,13 +49,17 @@ ProjectFilePath, ProjectUser, ProjectRole, + project_version_created, + push_finished, ) from .files import ( - UploadChanges, + ProjectFileChange, ChangesSchema, UploadFileSchema, ProjectFileSchema, FileSchema, + files_changes_from_upload, + mergin_secure_filename, ) from .schemas import ( ProjectSchema, @@ -65,7 +70,7 @@ FileHistorySchema, ProjectVersionListSchema, ) -from .storages.storage import DataSyncError, InitializationError +from .storages.storage import InitializationError from .storages.disk import save_to_file, move_to_tmp from .permissions import ( require_project, @@ -96,11 +101,6 @@ from ..utils import format_time_delta -push_finished = signal("push_finished") -# TODO: Move to database events to handle all commits to project versions -project_version_created = signal("project_version_created") - - def parse_project_access_update_request(access: Dict) -> Dict: """Parse raw project access update request and filter out invalid entries. New access can be specified either by list of usernames or ids -> convert only to ids fur further processing. @@ -239,15 +239,24 @@ def add_project(namespace): # noqa: E501 .first_or_404() ) version_name = 1 - files = UploadFileSchema(context={"version": 1}, many=True).load( - FileSchema(exclude=("location",), many=True).dump(template.files) - ) - changes = UploadChanges(added=files, updated=[], removed=[]) + file_changes = [] + for file in template.files: + file_changes.append( + ProjectFileChange( + file.path, + file.checksum, + file.size, + diff=None, + mtime=None, + location=os.path.join("v1", mergin_secure_filename(file.path)), + change=PushChangeType.CREATE, + ) + ) else: template = None version_name = 0 - changes = UploadChanges(added=[], updated=[], removed=[]) + file_changes = [] try: p.storage.initialize(template_project=template) @@ -258,7 +267,7 @@ def add_project(namespace): # noqa: E501 p, version_name, current_user.id, - changes, + file_changes, get_ip(request), get_user_agent(request), get_device_id(request), @@ -694,8 +703,13 @@ def catch_sync_failure(f): @functools.wraps(f) def wrapper(*args, **kwargs): try: - return f(*args, **kwargs) - except (HTTPException, IntegrityError) as e: + response, status_code = f(*args, **kwargs) + if status_code >= 400: + raise HTTPException(response=response) + return response, status_code + except IntegrityError: + raise Conflict("Database integrity error") + except HTTPException as e: if e.code in [401, 403, 404]: raise # nothing to do, just propagate downstream @@ -711,16 +725,22 @@ def wrapper(*args, **kwargs): error_type = "push_finish" elif request.endpoint == "chunk_upload": error_type = "chunk_upload" + elif ( + request.endpoint + == "/v2.mergin_sync_public_api_v2_controller_create_project_version" + ): + error_type = "project_push" + + description = ( + e.description if e.description else e.response.json.get("detail", "") + ) - if not e.description: # custom error cases (e.g. StorageLimitHit) - e.description = e.response.json["detail"] if project: project.sync_failed( - user_agent, error_type, str(e.description), current_user.id + user_agent, error_type, str(description), current_user.id ) else: logging.warning("Missing project info in sync failure") - raise return wrapper @@ -745,7 +765,7 @@ def project_push(namespace, project_name): project_permission = current_app.project_handler.get_push_permission(changes) project = require_project(namespace, project_name, project_permission) if project.locked_until: - abort(make_response(jsonify(ProjectLocked().to_dict()), 422)) + return ProjectLocked().response(422) # pass full project object to request for later use request.view_args["project"] = project ws = project.workspace @@ -771,76 +791,36 @@ def project_push(namespace, project_name): if pending_upload and pending_upload.is_active(): abort(400, "Another process is running. Please try later.") - upload_changes = ChangesSchema(context={"version": version + 1}).load(changes) + try: + ChangesSchema().validate(changes) + upload_changes = ChangesSchema().dump(changes) + except ValidationError as err: + msg = err.messages[0] if type(err.messages) == list else "Invalid input data" + abort(400, msg) - for item in upload_changes.added: + for item in upload_changes["added"]: # check if same file is not already uploaded - if not all(ele.path != item.path for ele in project.files): - abort(400, f"File {item.path} has been already uploaded") - if not is_valid_path(item.path): - abort( - 400, - f"Unsupported file name detected: {item.path}. Please remove the invalid characters.", - ) - if not is_supported_extension(item.path): - abort( - 400, - f"Unsupported file type detected: {item.path}. " - f"Please remove the file or try compressing it into a ZIP file before uploading.", - ) - - # changes' files must be unique - changes_files = [ - f.path - for f in upload_changes.added + upload_changes.updated + upload_changes.removed - ] - if len(set(changes_files)) != len(changes_files): - abort(400, "Not unique changes") - - sanitized_files = [] - blacklisted_files = [] - for f in upload_changes.added + upload_changes.updated + upload_changes.removed: - # check if .gpkg file is valid - if is_versioned_file(f.path): - if not f.is_valid_gpkg(): - abort(400, f"File {f.path} is not valid") - if is_file_name_blacklisted(f.path, current_app.config["BLACKLIST"]): - blacklisted_files.append(f.path) - # all file need to be unique after sanitized - if f.location in sanitized_files: - filename, file_extension = os.path.splitext(f.location) - f.location = filename + f".{str(uuid.uuid4())}" + file_extension - sanitized_files.append(f.location) - if f.diff: - if f.diff.location in sanitized_files: - filename, file_extension = os.path.splitext(f.diff.location) - f.diff.location = filename + f".{str(uuid.uuid4())}" + file_extension - sanitized_files.append(f.diff.location) - - # remove blacklisted files from changes - for key in upload_changes.__dict__.keys(): - new_value = [ - f for f in getattr(upload_changes, key) if f.path not in blacklisted_files - ] - setattr(upload_changes, key, new_value) + if not all(ele.path != item["path"] for ele in project.files): + abort(400, f"File {item['path']} has been already uploaded") # Check user data limit - updates = [f.path for f in upload_changes.updated] - updated_files = list(filter(lambda i: i.path in updates, project.files)) + updated_files = list( + filter( + lambda i: i.path in [f["path"] for f in upload_changes["updated"]], + project.files, + ) + ) additional_disk_usage = ( - sum(file.size for file in upload_changes.added + upload_changes.updated) + sum( + file["size"] for file in upload_changes["added"] + upload_changes["updated"] + ) - sum(file.size for file in updated_files) - - sum(file.size for file in upload_changes.removed) + - sum(file["size"] for file in upload_changes["removed"]) ) - current_usage = ws.disk_usage() requested_storage = current_usage + additional_disk_usage if requested_storage > ws.storage: - abort( - make_response( - jsonify(StorageLimitHit(current_usage, ws.storage).to_dict()), 422 - ) - ) + return StorageLimitHit(current_usage, ws.storage).response(422) upload = Upload(project, version, upload_changes, current_user.id) db.session.add(upload) @@ -885,6 +865,9 @@ def project_push(namespace, project_name): # Update immediately without uploading of new/modified files and remove transaction/lockfile after successful commit if not (changes["added"] or changes["updated"]): next_version = version + 1 + file_changes = files_changes_from_upload( + upload.changes, ProjectVersion.to_v_name(next_version) + ) user_agent = get_user_agent(request) device_id = get_device_id(request) try: @@ -892,7 +875,7 @@ def project_push(namespace, project_name): project, next_version, current_user.id, - upload_changes, + file_changes, get_ip(request), user_agent, device_id, @@ -919,7 +902,7 @@ def project_push(namespace, project_name): finally: upload.clear() - return {"transaction": upload.id} + return {"transaction": upload.id}, 200 @auth_required @@ -938,29 +921,27 @@ def chunk_upload(transaction_id, chunk_id): """ upload, upload_dir = get_upload(transaction_id) request.view_args["project"] = upload.project - upload_changes = ChangesSchema(context={"version": upload.version + 1}).load( - upload.changes - ) - for f in upload_changes.added + upload_changes.updated: - if chunk_id in f.chunks: - dest = os.path.join(upload_dir, "chunks", chunk_id) - lockfile = os.path.join(upload_dir, "lockfile") - with Toucher(lockfile, 30): - try: - # we could have used request.data here, but it could eventually cause OOM issue - save_to_file( - request.stream, dest, current_app.config["MAX_CHUNK_SIZE"] - ) - except IOError: - move_to_tmp(dest, transaction_id) - abort(400, "Too big chunk") - if os.path.exists(dest): - checksum = generate_checksum(dest) - size = os.path.getsize(dest) - return jsonify({"checksum": checksum, "size": size}), 200 - else: - abort(400, "Upload was probably canceled") - abort(404) + chunks = [] + for file in upload.changes["added"] + upload.changes["updated"]: + chunks += file.get("chunks", []) + + if chunk_id not in chunks: + abort(404) + + dest = os.path.join(upload_dir, "chunks", chunk_id) + with Toucher(upload.lockfile, 30): + try: + # we could have used request.data here, but it could eventually cause OOM issue + save_to_file(request.stream, dest, current_app.config["MAX_CHUNK_SIZE"]) + except IOError: + move_to_tmp(dest, transaction_id) + abort(400, "Too big chunk") + if os.path.exists(dest): + checksum = generate_checksum(dest) + size = os.path.getsize(dest) + return jsonify({"checksum": checksum, "size": size}), 200 + else: + abort(400, "Upload was probably canceled") @auth_required @@ -980,73 +961,45 @@ def push_finish(transaction_id): :rtype: None """ - from .tasks import optimize_storage - upload, upload_dir = get_upload(transaction_id) request.view_args["project"] = upload.project - changes = ChangesSchema(context={"version": upload.version + 1}).load( - upload.changes - ) project = upload.project + next_version = project.next_version() + v_next_version = ProjectVersion.to_v_name(next_version) + version_dir = os.path.join(project.storage.project_dir, v_next_version) if project.locked_until: - abort(make_response(jsonify(ProjectLocked().to_dict()), 422)) - project_path = get_project_path(project) - corrupted_files = [] - - for f in changes.added + changes.updated: - if f.diff is not None: - dest_file = os.path.join(upload_dir, "files", f.diff.location) - expected_size = f.diff.size - else: - dest_file = os.path.join(upload_dir, "files", f.location) - expected_size = f.size - - # Concatenate chunks into single file - # TODO we need to move this elsewhere since it can fail for large files (and slow FS) - os.makedirs(os.path.dirname(dest_file), exist_ok=True) - with open(dest_file, "wb") as dest: - try: - for chunk_id in f.chunks: - sleep(0) # to unblock greenlet - chunk_file = os.path.join(upload_dir, "chunks", chunk_id) - with open(chunk_file, "rb") as src: - data = src.read(8192) - while data: - dest.write(data) - data = src.read(8192) - except IOError: - logging.exception( - "Failed to process chunk: %s in project %s" - % (chunk_id, project_path) - ) - corrupted_files.append(f.path) - continue - if not is_supported_type(dest_file): - logging.info(f"Rejecting blacklisted file: {dest_file}") + return ProjectLocked().response(422) + + file_changes, errors = upload.process_chunks(use_shared_chunk_dir=False) + if errors: + upload.clear() + + unsupported_files = [ + k for k, v in errors.items() if v == FileSyncErrorType.UNSUPPORTED.value + ] + if len(unsupported_files): abort( 400, - f"Unsupported file type detected: {f.path}. " + f"Unsupported file type detected: {unsupported_files[0]}. " f"Please remove the file or try compressing it into a ZIP file before uploading.", ) - if expected_size != os.path.getsize(dest_file): - logging.error( - "Data integrity check has failed on file %s in project %s" - % (f.path, project_path), - exc_info=True, - ) - # check if .gpkg file is valid - if is_versioned_file(dest_file): - if not f.is_valid_gpkg(): - corrupted_files.append(f.path) - corrupted_files.append(f.path) + corrupted_files = [ + k for k, v in errors.items() if v == FileSyncErrorType.CORRUPTED.value + ] + if corrupted_files: + abort(422, {"corrupted_files": corrupted_files}) - if corrupted_files: - move_to_tmp(upload_dir) - abort(422, {"corrupted_files": corrupted_files}) + sync_errors = { + k: v for k, v in errors.items() if FileSyncErrorType.SYNC_ERROR.value in v + } + if sync_errors: + msg = "" + for key, value in sync_errors.items(): + msg += key + " error=" + value + "\n" + + abort(422, f"Failed to create new version: {msg}") - next_version = upload.version + 1 - v_next_version = ProjectVersion.to_v_name(next_version) files_dir = os.path.join(upload_dir, "files", v_next_version) target_dir = os.path.join(project.storage.project_dir, v_next_version) if os.path.exists(target_dir): @@ -1065,58 +1018,13 @@ def push_finish(transaction_id): move_to_tmp(target_dir) try: - # let's move uploaded files where they are expected to be - os.renames(files_dir, target_dir) - # apply gpkg updates - sync_errors = {} - to_remove = [i.path for i in changes.removed] - current_files = [f for f in project.files if f.path not in to_remove] - for updated_file in changes.updated: - # yield to gevent hub since geodiff action can take some time to prevent worker timeout - sleep(0) - current_file = next( - (i for i in current_files if i.path == updated_file.path), None - ) - if not current_file: - sync_errors[updated_file.path] = "file not found on server " - continue - - if updated_file.diff: - result = project.storage.apply_diff( - current_file, updated_file, next_version - ) - if result.ok(): - checksum, size = result.value - updated_file.checksum = checksum - updated_file.size = size - else: - sync_errors[updated_file.path] = ( - f"project: {project.workspace.name}/{project.name}, {result.value}" - ) - - elif is_versioned_file(updated_file.path): - result = project.storage.construct_diff( - current_file, updated_file, next_version - ) - if result.ok(): - updated_file.diff = result.value - else: - # if diff cannot be constructed it would be force update - logging.warning(f"Geodiff: create changeset error {result.value}") - - if sync_errors: - msg = "" - for key, value in sync_errors.items(): - msg += key + " error=" + value + "\n" - raise DataSyncError(msg) - user_agent = get_user_agent(request) device_id = get_device_id(request) pv = ProjectVersion( project, next_version, current_user.id, - changes, + file_changes, get_ip(request), user_agent, device_id, @@ -1124,12 +1032,24 @@ def push_finish(transaction_id): db.session.add(pv) db.session.add(project) db.session.commit() + + # let's move uploaded files where they are expected to be + os.renames(files_dir, version_dir) + + # remove used chunks + for file in upload.changes["added"] + upload.changes["updated"]: + file_chunks = file.get("chunks", []) + for chunk_id in file_chunks: + chunk_file = os.path.join(upload.upload_dir, "chunks", chunk_id) + if os.path.exists(chunk_file): + move_to_tmp(chunk_file) + logging.info( f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." ) project_version_created.send(pv) push_finished.send(pv) - except (psycopg2.Error, FileNotFoundError, DataSyncError, IntegrityError) as err: + except (psycopg2.Error, FileNotFoundError, IntegrityError) as err: db.session.rollback() logging.exception( f"Failed to finish push for project: {project.id}, project version: {v_next_version}, " @@ -1144,9 +1064,6 @@ def push_finish(transaction_id): # remove artifacts upload.clear() - # do not optimize on every version, every 10th is just fine - if not project.latest_version % 10: - optimize_storage.delay(project.id) return jsonify(ProjectSchema().dump(project)), 200 @@ -1246,15 +1163,24 @@ def clone_project(namespace, project_name): # noqa: E501 user_agent = get_user_agent(request) device_id = get_device_id(request) # transform source files to new uploaded files - files = UploadFileSchema(context={"version": 1}, many=True).load( - FileSchema(exclude=("location",), many=True).dump(cloned_project.files) - ) - changes = UploadChanges(added=files, updated=[], removed=[]) + file_changes = [] + for file in cloned_project.files: + file_changes.append( + ProjectFileChange( + file.path, + file.checksum, + file.size, + diff=None, + mtime=None, + location=os.path.join("v1", mergin_secure_filename(file.path)), + change=PushChangeType.CREATE, + ) + ) project_version = ProjectVersion( p, version, current_user.id, - changes, + file_changes, get_ip(request), user_agent, device_id, diff --git a/server/mergin/sync/public_api_v2.yaml b/server/mergin/sync/public_api_v2.yaml index 04dbce61..9ed062d5 100644 --- a/server/mergin/sync/public_api_v2.yaml +++ b/server/mergin/sync/public_api_v2.yaml @@ -96,6 +96,39 @@ paths: "404": $ref: "#/components/responses/NotFound" x-openapi-router-controller: mergin.sync.public_api_v2_controller + # /projects/{id}/chunks: + # post: + # tags: + # - project + # summary: Upload file chunk. + # operationId: upload_chunk + # parameters: + # - $ref: "#/components/parameters/ProjectId" + # requestBody: + # x-stream-upload: true + # content: + # application/octet-stream: + # schema: + # type: string + # format: binary + # responses: + # "200": + # description: Chunk upload response + # content: + # application/json: + # schema: + # $ref: "#/components/schemas/UploadChunk" + # "400": + # $ref: "#/components/responses/BadRequest" + # "401": + # $ref: "#/components/responses/Unauthorized" + # "403": + # $ref: "#/components/responses/Forbidden" + # "404": + # $ref: "#/components/responses/NotFound" + # "413": + # $ref: "#/components/responses/RequestTooBig" + # x-openapi-router-controller: mergin.sync.public_api_v2_controller /projects/{id}/collaborators: parameters: - $ref: "#/components/parameters/ProjectId" @@ -219,6 +252,95 @@ paths: "404": $ref: "#/components/responses/NotFound" x-openapi-router-controller: mergin.sync.public_api_v2_controller + /projects/{id}/versions: + post: + tags: + - project + summary: Create a new project version from pushed data + operationId: create_project_version + parameters: + - $ref: "#/components/parameters/ProjectId" + requestBody: + description: Project files changes and current version head. + required: true + content: + application/json: + schema: + type: object + required: + - version + - changes + properties: + check_only: + type: boolean + default: false + example: true + version: + type: string + pattern: '^$|^v\d+$' + example: v2 + changes: + type: object + required: + - added + - updated + - removed + properties: + added: + type: array + items: + $ref: "#/components/schemas/UploadFile" + updated: + type: array + items: + $ref: "#/components/schemas/UpdateFile" + removed: + type: array + items: + $ref: "#/components/schemas/File" + responses: + "201": + description: New project version + content: + application/json: + schema: + $ref: "#/components/schemas/Project" + "204": + $ref: "#/components/responses/NoContent" + "400": + $ref: "#/components/responses/BadRequest" + "401": + $ref: "#/components/responses/Unauthorized" + "403": + $ref: "#/components/responses/Forbidden" + "404": + $ref: "#/components/responses/NotFound" + "409": + description: Version already exists or another process is already running + content: + application/problem+json: + schema: + anyOf: + - $ref: "#/components/schemas/ProjectVersionExists" + - $ref: "#/components/schemas/AnotherUploadRunning" + "422": + description: Request could not be processed by server + content: + application/problem+json: + schema: + anyOf: + - $ref: "#/components/schemas/UploadError" + - $ref: "#/components/schemas/TrialExpired" + - $ref: "#/components/schemas/StorageLimitHit" + - $ref: "#/components/schemas/DataSyncError" + "423": + description: Project is locked for any upload + content: + application/problem+json: + schema: + $ref: "#/components/schemas/ProjectLocked" + + x-openapi-router-controller: mergin.sync.public_api_v2_controller components: responses: NoContent: @@ -233,6 +355,8 @@ components: description: Not found Conflict: description: Conflict + RequestTooBig: + description: Request Entity Too Large parameters: ProjectId: name: id @@ -244,6 +368,86 @@ components: format: uuid pattern: \b[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\b[0-9a-f]{12}\b schemas: + # Errors + CustomError: + type: object + properties: + code: + type: string + detail: + type: string + required: + - code + - detail + TrialExpired: + allOf: + - $ref: "#/components/schemas/CustomError" + example: + code: TrialExpired + detail: Failed to push changes. Ask the workspace owner to log in to their Mergin Maps dashboard + StorageLimitHit: + allOf: + - $ref: "#/components/schemas/CustomError" + type: object + properties: + current_usage: + type: integer + storage_limit: + type: integer + example: + code: StorageLimitHit + detail: You have reached a data limit (StorageLimitHit) + current_usage: 24865 + storage_limit: 24865 + ProjectLocked: + allOf: + - $ref: "#/components/schemas/CustomError" + example: + code: ProjectLocked + detail: The project is currently locked and you cannot make changes to it (ProjectLocked) + ProjectVersionExists: + allOf: + - $ref: "#/components/schemas/CustomError" + type: object + properties: + client_version: + type: string + server_version: + type: string + example: + code: ProjectVersionExists + detail: Project version mismatch (ProjectVersionExists) + AnotherUploadRunning: + allOf: + - $ref: "#/components/schemas/CustomError" + type: object + properties: + client_version: + type: string + server_version: + type: string + example: + code: AnotherUploadRunning + detail: Another process is running (AnotherUploadRunning) + DataSyncError: + allOf: + - $ref: "#/components/schemas/CustomError" + type: object + properties: + failed_files: + type: object + example: + code: DataSyncError + detail: "There are either corrupted files or it is not possible to create version with provided geopackage data (DataSyncError)" + failed_files: + "survey.gpkg": "Corrupted file" + UploadError: + allOf: + - $ref: "#/components/schemas/CustomError" + example: + code: UploadError + detail: "Project version could not be created (UploadError)" + # Data ProjectRole: type: string nullable: true @@ -268,6 +472,17 @@ components: - $ref: "#/components/schemas/ProjectRole" nullable: false description: combination of workspace role and project role + UploadChunk: + type: object + properties: + id: + type: string + format: uuid + example: "123e4567-e89b-12d3-a456-426614174000" + valid_until: + type: string + format: date-time + example: "2023-10-01T12:00:00Z" ProjectMember: type: object properties: @@ -287,3 +502,255 @@ components: $ref: "#/components/schemas/ProjectRole" role: $ref: "#/components/schemas/Role" + File: + type: object + description: Project file metadata + required: + - path + - size + properties: + path: + type: string + example: media/favicon.ico + size: + type: integer + format: int64 + example: 1024 + checksum: + description: sha1 hash of file + type: string + example: 9adb76bf81a34880209040ffe5ee262a090b62ab + UploadFile: + description: Metadata of uploaded file with chunks it is composed of + allOf: + - $ref: "#/components/schemas/File" + - type: object + properties: + chunks: + type: array + items: + type: string + example: d17a60eb-6581-431c-adfc-3451231455bb + UpdateFile: + description: Metadata of updated file with optional metadata about uploaded file diff + allOf: + - $ref: "#/components/schemas/UploadFile" + - type: object + properties: + diff: + nullable: true + allOf: + - $ref: "#/components/schemas/File" + MerginTag: + type: string + enum: + - valid_qgis + - mappin_use + - input_use + example: valid_qgis + Access: + type: object + properties: + ownersnames: + type: array + nullable: false + items: + type: string + example: [john.doe] + writersnames: + type: array + nullable: false + items: + type: string + example: [john.doe] + editorsnames: + type: array + nullable: false + items: + type: string + example: [john.doe] + readersnames: + type: array + nullable: false + items: + type: string + example: [john.doe] + public: + type: boolean + example: true + owners: + type: array + nullable: false + items: + type: integer + example: [1] + writers: + type: array + nullable: false + items: + type: integer + example: [1] + editors: + type: array + nullable: false + items: + type: integer + example: [1] + readers: + type: array + nullable: false + items: + type: integer + example: [1] + FileInfo: + type: object + required: + - path + - size + - checksum + properties: + path: + type: string + example: media/favicon.ico + checksum: + description: sha1 hash + type: string + example: 9adb76bf81a34880209040ffe5ee262a090b62ab + size: + type: integer + format: int64 + example: 1024 + mtime: + deprecated: true + type: string + format: date-time + example: 2018-11-30T08:47:58.636074Z + diff: + type: object + nullable: true + required: + - path + - checksum + properties: + path: + type: string + description: unique diff filename + example: survey.gpkg-diff-15eqn2q + checksum: + type: string + example: 45dfdfbf81a34asdf209040ffe5fasdf2a090bfa + size: + type: integer + example: 512 + history: + type: object + description: map with version names as keys and file info as values + additionalProperties: + type: object + required: + - path + - size + - checksum + - change + properties: + path: + type: string + example: media/favicon.ico + checksum: + description: sha1 hash + type: string + example: 9adb76bf81a34880209040ffe5ee262a090b62ab + size: + type: integer + format: int64 + example: 1024 + change: + type: string + example: added + enum: + - added + - updated + - removed + expiration: + nullable: true + type: string + format: date-time + example: 2019-02-26T08:47:58.636074Z + Project: + type: object + required: + - name + properties: + id: + type: string + example: f9ef87ac-1dae-48ab-85cb-062a4784fb83 + description: Project UUID + name: + type: string + example: mergin + namespace: + type: string + example: mergin + creator: + nullable: true + type: integer + example: 1 + description: Project creator ID + created: + type: string + format: date-time + example: 2018-11-30T08:47:58.636074Z + updated: + type: string + nullable: true + format: date-time + example: 2018-11-30T08:47:58.636074Z + description: Last modified + version: + type: string + nullable: true + example: v2 + description: Last project version + disk_usage: + type: integer + example: 25324373 + description: Project size in bytes + permissions: + type: object + properties: + delete: + type: boolean + example: false + update: + type: boolean + example: false + upload: + type: boolean + example: true + tags: + type: array + nullable: true + items: + $ref: "#/components/schemas/MerginTag" + uploads: + type: array + nullable: true + items: + type: string + example: 669b838e-a30b-4338-b2b6-3da144742a82 + description: UUID for ongoing upload + access: + $ref: "#/components/schemas/Access" + files: + type: array + items: + allOf: + - $ref: "#/components/schemas/FileInfo" + role: + nullable: true + type: string + enum: + - reader + - editor + - writer + - owner diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 7f40c54b..6717a083 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -2,20 +2,52 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial -from datetime import datetime +import uuid +import gevent +import logging +import os +import psycopg2 from connexion import NoContent, request -from flask import abort, jsonify +from datetime import datetime, timedelta, timezone +from flask import abort, jsonify, current_app from flask_login import current_user +from marshmallow import ValidationError +from sqlalchemy.exc import IntegrityError -from mergin.sync.forms import project_name_validation - -from .schemas import ProjectMemberSchema -from .workspace import WorkspaceRole from ..app import db from ..auth import auth_required from ..auth.models import User -from .models import Project, ProjectRole, ProjectMember +from .errors import ( + AnotherUploadRunning, + BigChunkError, + DataSyncError, + ProjectLocked, + ProjectVersionExists, + StorageLimitHit, + UploadError, +) +from .files import ChangesSchema +from .forms import project_name_validation +from .models import ( + Project, + ProjectRole, + ProjectMember, + ProjectVersion, + Upload, + project_version_created, + push_finished, +) from .permissions import ProjectPermissions, require_project_by_uuid +from .public_api_controller import catch_sync_failure +from .schemas import ( + ProjectMemberSchema, + ProjectVersionSchema, + UploadChunkSchema, + ProjectSchema, +) +from .storages.disk import move_to_tmp, save_to_file +from .utils import get_device_id, get_ip, get_user_agent, get_chunk_location +from .workspace import WorkspaceRole @auth_required @@ -128,3 +160,217 @@ def remove_project_collaborator(id, user_id): project.unset_role(user_id) db.session.commit() return NoContent, 204 + + +@auth_required +@catch_sync_failure +def create_project_version(id): + """Create a new project version from pushed data""" + version: int = ProjectVersion.from_v_name(request.json["version"]) + changes = request.json["changes"] + project_permission: ProjectPermissions = ( + current_app.project_handler.get_push_permission(changes) + ) + project = require_project_by_uuid(id, project_permission) + # pass full project object to request for later use + request.view_args["project"] = project + + if project.locked_until: + return ProjectLocked().response(423) + + next_version = project.next_version() + v_next_version = ProjectVersion.to_v_name(next_version) + version_dir = os.path.join(project.storage.project_dir, v_next_version) + + pv = project.get_latest_version() + if pv and pv.name != version: + return ProjectVersionExists(version, pv.name).response(409) + + # reject push if there is another one already running + pending_upload = Upload.query.filter_by(project_id=project.id).first() + if pending_upload and pending_upload.is_active(): + return AnotherUploadRunning().response(409) + + try: + ChangesSchema().validate(changes) + upload_changes = ChangesSchema().dump(changes) + except ValidationError as err: + msg = err.messages[0] if type(err.messages) == list else "Invalid input data" + return UploadError(error=msg).response(422) + + to_be_added_files = upload_changes["added"] + to_be_updated_files = upload_changes["updated"] + to_be_removed_files = upload_changes["removed"] + + # check consistency of changes + current_files = set(file.path for file in project.files) + added_files = set(file["path"] for file in to_be_added_files) + if added_files and added_files.issubset(current_files): + return UploadError( + error=f"Add changes contain files which already exist" + ).response(422) + + modified_files = set( + file["path"] for file in to_be_updated_files + to_be_removed_files + ) + if modified_files and not modified_files.issubset(current_files): + return UploadError( + error="Update or remove changes contain files that are not in project" + ).response(422) + + # Check user data limit + updated_files = list( + filter( + lambda i: i.path in [f["path"] for f in to_be_updated_files], + project.files, + ) + ) + additional_disk_usage = ( + sum(file["size"] for file in to_be_added_files + to_be_updated_files) + - sum(file.size for file in updated_files) + - sum(file["size"] for file in to_be_removed_files) + ) + + current_usage = project.workspace.disk_usage() + requested_storage = current_usage + additional_disk_usage + if requested_storage > project.workspace.storage: + return StorageLimitHit(current_usage, project.workspace.storage).response(422) + + # we have done all checks but this request is just a dry-run + if request.json.get("check_only", False): + return NoContent, 204 + + try: + # while processing data, block other uploads + upload = Upload(project, version, upload_changes, current_user.id) + db.session.add(upload) + # Creating blocking upload can fail, e.g. in case of racing condition + db.session.commit() + except IntegrityError: + db.session.rollback() + # check and clean dangling blocking uploads or abort + for current_upload in project.uploads.all(): + if current_upload.is_active(): + return AnotherUploadRunning().response(409) + db.session.delete(current_upload) + db.session.commit() + # previous push attempt is definitely lost + project.sync_failed( + "", + "push_lost", + "Push artefact removed by subsequent push", + current_user.id, + ) + + try: + # Try again after cleanup + upload = Upload(project, version, upload_changes, current_user.id) + db.session.add(upload) + db.session.commit() + move_to_tmp(upload.upload_dir) + except IntegrityError as err: + logging.error(f"Failed to create upload session: {str(err)}") + return AnotherUploadRunning().response(409) + + # Create transaction folder and lockfile + os.makedirs(upload.upload_dir) + open(upload.lockfile, "w").close() + + file_changes, errors = upload.process_chunks(use_shared_chunk_dir=True) + # files consistency or geodiff related issues, project push would never succeed, whole upload is aborted + if errors: + upload.clear() + return DataSyncError(failed_files=errors).response(422) + + try: + pv = ProjectVersion( + project, + next_version, + current_user.id, + file_changes, + get_ip(request), + get_user_agent(request), + get_device_id(request), + ) + db.session.add(pv) + db.session.add(project) + db.session.commit() + + # let's move uploaded files where they are expected to be + if to_be_added_files or to_be_updated_files: + temp_files_dir = os.path.join(upload.upload_dir, "files", v_next_version) + os.renames(temp_files_dir, version_dir) + + # remove used chunks + for file in to_be_added_files + to_be_updated_files: + file_chunks = file.get("chunks", []) + for chunk_id in file_chunks: + chunk_file = get_chunk_location(chunk_id) + if os.path.exists(chunk_file): + move_to_tmp(chunk_file) + + logging.info( + f"Push finished for project: {project.id}, project version: {v_next_version}, upload id: {upload.id}." + ) + project_version_created.send(pv) + push_finished.send(pv) + except (psycopg2.Error, FileNotFoundError, IntegrityError) as err: + db.session.rollback() + logging.exception( + f"Failed to finish push for project: {project.id}, project version: {v_next_version}, " + f"upload id: {upload.id}.: {str(err)}" + ) + if ( + os.path.exists(version_dir) + and not ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count() + ): + move_to_tmp(version_dir) + return UploadError().response(422) + # catch exception during pg transaction so we can rollback and prevent PendingRollbackError during upload clean up + except gevent.timeout.Timeout: + db.session.rollback() + if ( + os.path.exists(version_dir) + and not ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count() + ): + move_to_tmp(version_dir) + raise + finally: + # remove artifacts + upload.clear() + return ProjectSchema().dump(project), 201 + + +@auth_required +def upload_chunk(id: str): + """ + Push chunk to chunks location. + """ + project = require_project_by_uuid(id, ProjectPermissions.Edit) + if project.locked_until: + return ProjectLocked().response(423) + # generate uuid for chunk + chunk_id = str(uuid.uuid4()) + dest_file = get_chunk_location(chunk_id) + try: + # we could have used request.data here, but it could eventually cause OOM issue + save_to_file(request.stream, dest_file, current_app.config["MAX_CHUNK_SIZE"]) + except IOError: + move_to_tmp(dest_file, chunk_id) + return BigChunkError().response(413) + except Exception as e: + return UploadError(error="Error saving chunk").response(400) + + # Add valid_until timestamp to the response, remove tzinfo for compatibility with DateTimeWithZ + valid_until = ( + datetime.now(timezone.utc) + + timedelta(seconds=current_app.config["UPLOAD_CHUNKS_EXPIRATION"]) + ).replace(tzinfo=None) + return ( + UploadChunkSchema().dump({"id": chunk_id, "valid_until": valid_until}), + 200, + ) diff --git a/server/mergin/sync/schemas.py b/server/mergin/sync/schemas.py index 75b6f09e..8d1df050 100644 --- a/server/mergin/sync/schemas.py +++ b/server/mergin/sync/schemas.py @@ -75,7 +75,7 @@ def project_user_permissions(project): class FileHistorySchema(ma.SQLAlchemyAutoSchema): mtime = DateTimeWithZ() - diff = fields.Nested(FileSchema(), attribute="diff_file", exclude=("location",)) + diff = fields.Nested(FileSchema(), attribute="diff_file") expiration = DateTimeWithZ(attribute="expiration", dump_only=True) class Meta: @@ -405,3 +405,10 @@ class ProjectMemberSchema(Schema): project_role = fields.Enum(enum=ProjectRole, by_value=True) workspace_role = fields.Enum(enum=WorkspaceRole, by_value=True) role = fields.Enum(enum=ProjectRole, by_value=True) + + +class UploadChunkSchema(Schema): + """Schema for chunk upload response""" + + id = fields.UUID() + valid_until = DateTimeWithZ() diff --git a/server/mergin/sync/storages/disk.py b/server/mergin/sync/storages/disk.py index 4debb255..4491ad98 100644 --- a/server/mergin/sync/storages/disk.py +++ b/server/mergin/sync/storages/disk.py @@ -21,7 +21,7 @@ generate_checksum, is_versioned_file, ) -from ..files import mergin_secure_filename, ProjectFile, UploadFile, File +from ..files import mergin_secure_filename, ProjectFile, File def save_to_file(stream, path, max_size=None): @@ -245,22 +245,20 @@ def _generator(): return _generator() def apply_diff( - self, current_file: ProjectFile, upload_file: UploadFile, version: int + self, current_file: ProjectFile, diff_file: str, patched_file: str ) -> Result: """Apply geodiff diff file on current gpkg basefile. Creates GeodiffActionHistory record of the action. Returns checksum and size of generated file. If action fails it returns geodiff error message. """ from ..models import GeodiffActionHistory, ProjectVersion - v_name = ProjectVersion.to_v_name(version) + v_name = ProjectVersion.to_v_name(self.project.next_version()) basefile = os.path.join(self.project_dir, current_file.location) - changeset = os.path.join(self.project_dir, upload_file.diff.location) - patchedfile = os.path.join(self.project_dir, upload_file.location) # create local copy of basefile which will be updated in next version and changeset needed # TODO this can potentially fail for large files - logging.info(f"Apply changes: copying {basefile} to {patchedfile}") + logging.info(f"Apply changes: copying {basefile} to {patched_file}") start = time.time() - with self.geodiff_copy(changeset) as changeset_tmp, self.geodiff_copy( + with self.geodiff_copy(diff_file) as changeset_tmp, self.geodiff_copy( basefile ) as patchedfile_tmp: copy_time = time.time() - start @@ -269,7 +267,7 @@ def apply_diff( # clean geodiff logger self.flush_geodiff_logger() logging.info( - f"Geodiff: apply changeset {changeset} of size {os.path.getsize(changeset)} with changes to {patchedfile}" + f"Geodiff: apply changeset {diff_file} of size {os.path.getsize(diff_file)} with changes to {patched_file}" ) start = time.time() self.geodiff.apply_changeset(patchedfile_tmp, changeset_tmp) @@ -283,7 +281,7 @@ def apply_diff( current_file.size, v_name, "apply_changes", - changeset, + diff_file, ) gh.copy_time = copy_time gh.geodiff_time = geodiff_apply_time @@ -291,11 +289,11 @@ def apply_diff( # move constructed file where is belongs logging.info(f"Apply changes: moving patchfile {patchedfile_tmp}") start = time.time() - copy_file(patchedfile_tmp, patchedfile) + copy_file(patchedfile_tmp, patched_file) gh.copy_time = copy_time + (time.time() - start) # TODO this can potentially fail for large files - logging.info(f"Apply changes: calculating checksum of {patchedfile}") + logging.info(f"Apply changes: calculating checksum of {patched_file}") start = time.time() checksum = generate_checksum(patchedfile_tmp) checksumming_time = time.time() - start @@ -309,53 +307,46 @@ def apply_diff( ) ) except (GeoDiffLibError, GeoDiffLibConflictError): - move_to_tmp(changeset) + move_to_tmp(diff_file) return Err(self.gediff_log.getvalue()) def construct_diff( - self, current_file: ProjectFile, upload_file: UploadFile, version: int + self, + current_file: ProjectFile, + diff_file: str, + uploaded_file: str, ) -> Result: """Construct geodiff diff file from uploaded gpkg and current basefile. Returns diff metadata as a result. If action fails it returns geodiff error message. """ - from ..models import ProjectVersion - - v_name = ProjectVersion.to_v_name(version) basefile = os.path.join(self.project_dir, current_file.location) - uploaded_file = os.path.join(self.project_dir, upload_file.location) - diff_name = upload_file.path + "-diff-" + str(uuid.uuid4()) - changeset = os.path.join(self.project_dir, v_name, diff_name) + diff_name = os.path.basename(diff_file) with self.geodiff_copy(basefile) as basefile_tmp, self.geodiff_copy( uploaded_file ) as uploaded_file_tmp: try: # create changeset next to uploaded file copy changeset_tmp = os.path.join( - uploaded_file_tmp.replace(upload_file.location, "").rstrip( - os.path.sep - ), - v_name, + os.path.dirname(uploaded_file_tmp), diff_name, ) self.flush_geodiff_logger() logging.info( - f"Geodiff: create changeset {changeset} from {uploaded_file}" + f"Geodiff: create changeset {diff_file} from {uploaded_file}" ) self.geodiff.create_changeset( basefile_tmp, uploaded_file_tmp, changeset_tmp ) - # create diff metadata as it would be created by other clients - diff_file = File( - path=diff_name, - checksum=generate_checksum(changeset_tmp), - size=os.path.getsize(changeset_tmp), - location=os.path.join(v_name, mergin_secure_filename(diff_name)), + copy_file(changeset_tmp, diff_file) + return Ok( + ( + generate_checksum(changeset_tmp), + os.path.getsize(changeset_tmp), + ) ) - copy_file(changeset_tmp, changeset) - return Ok(diff_file) except (GeoDiffLibError, GeoDiffLibConflictError) as e: # diff is not possible to create - file will be overwritten - move_to_tmp(changeset) + move_to_tmp(diff_file) return Err(self.gediff_log.getvalue()) finally: move_to_tmp(changeset_tmp) diff --git a/server/mergin/sync/storages/storage.py b/server/mergin/sync/storages/storage.py index fd4c1e81..3b9699a6 100644 --- a/server/mergin/sync/storages/storage.py +++ b/server/mergin/sync/storages/storage.py @@ -11,10 +11,6 @@ class FileNotFound(Exception): pass -class DataSyncError(Exception): - pass - - class InitializationError(Exception): pass diff --git a/server/mergin/sync/tasks.py b/server/mergin/sync/tasks.py index ce92171e..2e23d375 100644 --- a/server/mergin/sync/tasks.py +++ b/server/mergin/sync/tasks.py @@ -6,13 +6,14 @@ import shutil import os import time -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from zipfile import ZIP_DEFLATED, ZipFile from flask import current_app from .models import Project, ProjectVersion, FileHistory from .storages.disk import move_to_tmp from .config import Configuration +from .utils import remove_outdated_files from ..celery import celery from ..app import db @@ -123,7 +124,7 @@ def create_project_version_zip(version_id: int): # partial zip is recent -> another job is likely running return else: - # partial zip is too old -> remove and creating new one + # partial zip is too old -> remove and create new one os.remove(zip_path) os.makedirs(os.path.dirname(zip_path), exist_ok=True) @@ -152,14 +153,19 @@ def create_project_version_zip(version_id: int): @celery.task def remove_projects_archives(): """Remove created zip files for project versions if they were not accessed for certain time""" - for file in os.listdir(current_app.config["PROJECTS_ARCHIVES_DIR"]): - path = os.path.join(current_app.config["PROJECTS_ARCHIVES_DIR"], file) - if datetime.fromtimestamp( - os.path.getatime(path), tz=timezone.utc - ) < datetime.now(timezone.utc) - timedelta( - days=current_app.config["PROJECTS_ARCHIVES_EXPIRATION"] - ): - try: - os.remove(path) - except OSError as e: - logging.error(f"Unable to remove {path}: {str(e)}") + remove_outdated_files( + Configuration.PROJECTS_ARCHIVES_DIR, + timedelta(days=Configuration.PROJECTS_ARCHIVES_EXPIRATION), + ) + + +@celery.task +def remove_unused_chunks(): + """Remove old chunks in shared directory. These are basically just residual from failed uploads.""" + small_hash_dirs = os.listdir(Configuration.UPLOAD_CHUNKS_DIR) + time_delta = timedelta(seconds=Configuration.UPLOAD_CHUNKS_EXPIRATION) + for _dir in small_hash_dirs: + dir = os.path.join(Configuration.UPLOAD_CHUNKS_DIR, _dir) + if not os.path.isdir(dir): + continue + remove_outdated_files(dir, time_delta) diff --git a/server/mergin/sync/utils.py b/server/mergin/sync/utils.py index c4d5fa16..321c23d2 100644 --- a/server/mergin/sync/utils.py +++ b/server/mergin/sync/utils.py @@ -2,18 +2,20 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial +import logging import math import os import hashlib import re import secrets +from datetime import datetime, timedelta, timezone from threading import Timer from uuid import UUID from shapely import wkb from shapely.errors import ShapelyError from gevent import sleep from flask import Request -from typing import Optional +from typing import Optional, Tuple from sqlalchemy import text from pathvalidate import ( validate_filename, @@ -83,6 +85,8 @@ def touch_lockfile(self): os.access(self.lockfile, os.W_OK) with open(self.lockfile, "a"): os.utime(self.lockfile, None) + + sleep(0) # to unblock greenlet if self.running: self.timer = Timer(self.interval, self.touch_lockfile) self.timer.start() @@ -578,3 +582,33 @@ def get_x_accel_uri(*url_parts): url = url.lstrip(os.path.sep) result = os.path.join(download_accell_uri, url) return result + + +def get_chunk_location(id: str): + """ + Get file location for chunk on FS + + Splits the given identifier into two parts where the first two characters of the identifier are the small hash, + and the remaining characters is a file identifier. + """ + chunk_dir = current_app.config.get("UPLOAD_CHUNKS_DIR") + small_hash = id[:2] + file_name = id[2:] + return os.path.join(chunk_dir, small_hash, file_name) + + +def remove_outdated_files(dir: str, time_delta: timedelta): + """Remove all files within directory where last access time passed expiration date""" + for file in os.listdir(dir): + path = os.path.join(dir, file) + if not os.path.isfile(path): + continue + + if ( + datetime.fromtimestamp(os.path.getatime(path), tz=timezone.utc) + < datetime.now(timezone.utc) - time_delta + ): + try: + os.remove(path) + except OSError as e: + logging.error(f"Unable to remove {path}: {str(e)}") diff --git a/server/mergin/tests/fixtures.py b/server/mergin/tests/fixtures.py index 368eba6b..8c1366f4 100644 --- a/server/mergin/tests/fixtures.py +++ b/server/mergin/tests/fixtures.py @@ -18,7 +18,7 @@ from ..stats.models import MerginInfo from . import test_project, test_workspace_id, test_project_dir, TMP_DIR from .utils import login_as_admin, initialize, cleanup, file_info -from ..sync.files import ChangesSchema +from ..sync.files import ChangesSchema, files_changes_from_upload thisdir = os.path.dirname(os.path.realpath(__file__)) sys.path.append(os.path.join(thisdir, os.pardir)) @@ -35,6 +35,7 @@ def flask_app(request): "DOCS_URL", "COLLECT_STATISTICS", "USER_SELF_REGISTRATION", + "V2_PUSH_ENABLED", ] ) register(application) @@ -212,12 +213,13 @@ def diff_project(app): else: # no files uploaded, hence no action needed pass - upload_changes = ChangesSchema(context={"version": i + 2}).load(change) + + file_changes = files_changes_from_upload(change, location_dir=f"v{i + 2}") pv = ProjectVersion( project, i + 2, project.creator.id, - upload_changes, + file_changes, "127.0.0.1", ) assert pv.project_size == sum(file.size for file in pv.files) diff --git a/server/mergin/tests/test_auth.py b/server/mergin/tests/test_auth.py index 90777122..d53b01bc 100644 --- a/server/mergin/tests/test_auth.py +++ b/server/mergin/tests/test_auth.py @@ -77,6 +77,15 @@ def test_login(client, data, headers, expected): ) assert login_history assert login_history.device_id == str(headers.get("X-Device-Id")) + assert user.last_signed_in == login_history.timestamp + users_last_signed_in = LoginHistory.get_users_last_signed_in([user.id]) + assert users_last_signed_in[user.id] == login_history.timestamp + + # verify missing value is cached on first LoginHistory access + user.last_signed_in = None + db.session.commit() + users_last_signed_in = LoginHistory.get_users_last_signed_in([user.id]) + assert user.last_signed_in == users_last_signed_in[user.id] def test_logout(client): @@ -376,6 +385,7 @@ def test_api_login(client, data, headers, expected): .first() ) assert login_history + assert user.last_signed_in == login_history.timestamp def test_api_login_from_urllib(client): @@ -394,6 +404,8 @@ def test_api_login_from_urllib(client): .first() ) assert not login_history + # we do not have recored last login yet + assert user.last_signed_in is None def test_api_user_profile(client): diff --git a/server/mergin/tests/test_celery.py b/server/mergin/tests/test_celery.py index 50420cf0..7aad01da 100644 --- a/server/mergin/tests/test_celery.py +++ b/server/mergin/tests/test_celery.py @@ -2,8 +2,10 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial +import math import os -from datetime import datetime, timedelta +import uuid +from datetime import datetime, timedelta, timezone from pathlib import Path from flask import current_app @@ -14,17 +16,32 @@ from ..config import Configuration from ..sync.models import Project, AccessRequest, ProjectRole, ProjectVersion from ..celery import send_email_async +from ..sync.config import Configuration as SyncConfiguration from ..sync.tasks import ( remove_temp_files, remove_projects_backups, create_project_version_zip, remove_projects_archives, + remove_unused_chunks, ) from ..sync.storages.disk import move_to_tmp -from . import test_project, test_workspace_name, test_workspace_id -from .utils import add_user, create_workspace, create_project, login, modify_file_times +from ..sync.utils import get_chunk_location +from . import ( + test_project, + test_workspace_name, + test_workspace_id, + test_project_dir, + json_headers, +) +from .utils import ( + CHUNK_SIZE, + add_user, + create_workspace, + create_project, + login, + modify_file_times, +) from ..auth.models import User -from . import json_headers def test_send_email(app): @@ -187,3 +204,33 @@ def test_create_project_version_zip(diff_project): modify_file_times(latest_version.zip_path, new_time) remove_projects_archives() # zip has expired -> remove assert not os.path.exists(latest_version.zip_path) + + +def test_remove_chunks(app): + """Test cleanup of outdated chunks""" + # pretend chunks were uploaded + chunks = [] + src_file = os.path.join(test_project_dir, "base.gpkg") + with open(src_file, "rb") as in_file: + f_size = os.path.getsize(src_file) + for i in range(math.ceil(f_size / CHUNK_SIZE)): + chunk_id = str(uuid.uuid4()) + chunk_location = get_chunk_location(chunk_id) + os.makedirs(os.path.dirname(chunk_location), exist_ok=True) + with open(chunk_location, "wb") as out_file: + out_file.write(in_file.read(CHUNK_SIZE)) + chunks.append(chunk_location) + + remove_unused_chunks() + assert all(os.path.exists(chunk) for chunk in chunks) + + def _atime_mock(path: str) -> float: + """Mock file stats to be already expired""" + return ( + datetime.now(timezone.utc) + - timedelta(seconds=SyncConfiguration.UPLOAD_CHUNKS_EXPIRATION) + ).timestamp() - 1 + + with patch("os.path.getatime", _atime_mock): + remove_unused_chunks() + assert not any(os.path.exists(chunk) for chunk in chunks) diff --git a/server/mergin/tests/test_config.py b/server/mergin/tests/test_config.py index 8b745a0a..af677cb0 100644 --- a/server/mergin/tests/test_config.py +++ b/server/mergin/tests/test_config.py @@ -21,6 +21,7 @@ def test_config(client): "minor", "user_self_registration", "build_hash", + "v2_push_enabled", } resp = client.get("/config") assert resp.status_code == 200 diff --git a/server/mergin/tests/test_db_hooks.py b/server/mergin/tests/test_db_hooks.py index e7f9e270..044294c5 100644 --- a/server/mergin/tests/test_db_hooks.py +++ b/server/mergin/tests/test_db_hooks.py @@ -18,7 +18,6 @@ ProjectRole, ProjectUser, ) -from ..sync.files import UploadChanges from ..auth.models import User from ..app import db from . import DEFAULT_USER @@ -40,8 +39,7 @@ def test_close_user_account(client, diff_project): # user has access to mergin user diff_project diff_project.set_role(user.id, ProjectRole.WRITER) # user contributed to another user project so he is listed in projects history - changes = UploadChanges(added=[], updated=[], removed=[]) - pv = ProjectVersion(diff_project, 11, user.id, changes, "127.0.0.1") + pv = ProjectVersion(diff_project, 11, user.id, [], "127.0.0.1") diff_project.latest_version = pv.name pv.project = diff_project db.session.add(pv) @@ -116,8 +114,7 @@ def test_remove_project(client, diff_project): # set up mergin_user = User.query.filter_by(username=DEFAULT_USER[0]).first() project_dir = Path(diff_project.storage.project_dir) - changes = UploadChanges(added=[], removed=[], updated=[]) - upload = Upload(diff_project, 10, changes, mergin_user.id) + upload = Upload(diff_project, 10, [], mergin_user.id) db.session.add(upload) project_id = diff_project.id user = add_user("user", "user") diff --git a/server/mergin/tests/test_private_project_api.py b/server/mergin/tests/test_private_project_api.py index 54575114..5f062ac2 100644 --- a/server/mergin/tests/test_private_project_api.py +++ b/server/mergin/tests/test_private_project_api.py @@ -424,27 +424,81 @@ def test_admin_project_list(client): assert len(resp.json["items"]) == 14 -test_download_proj_data = [ - # zips do not exist, version not specified -> call celery task to create zip with latest version - (0, 0, 0, None, 202, 1), +def test_download_project( + client, + diff_project, +): + """Test download endpoint responses""" + resp = client.head( + url_for( + "/app.mergin_sync_private_api_controller_download_project", + id=diff_project.id, + version="", + ) + ) + # zip archive does not exist yet + assert resp.status_code == 202 + project_version = diff_project.get_latest_version() + # pretend archive has been created + zip_path = Path(project_version.zip_path) + if zip_path.parent.exists(): + shutil.rmtree(zip_path.parent, ignore_errors=True) + zip_path.parent.mkdir(parents=True, exist_ok=True) + zip_path.touch() + resp = client.head( + url_for( + "/app.mergin_sync_private_api_controller_download_project", + id=diff_project.id, + version="", + ) + ) + # zip archive is ready -> download can start + assert resp.status_code == 200 + + +def test_prepare_large_project_fail(client, diff_project): + """Test asking for too large project is refused""" + resp = client.post( + url_for( + "/app.mergin_sync_private_api_controller_prepare_archive", + id=diff_project.id, + version="v1", + ) + ) + assert resp.status_code == 201 + # pretend testing project to be too large by lowering limit + client.application.config["MAX_DOWNLOAD_ARCHIVE_SIZE"] = 10 + resp = client.post( + url_for( + "/app.mergin_sync_private_api_controller_prepare_archive", + id=diff_project.id, + version="v1", + ) + ) + assert resp.status_code == 400 + + +test_prepare_proj_data = [ + # zips do not exist, version not specified -> trigger celery to create zip with latest version + (0, 0, 0, None, 201, 1), # expired partial zip exists -> call celery task - (0, 1, 1, None, 202, 1), - # valid partial zip exists -> return, do not call celery - (0, 1, 0, None, 202, 0), + (0, 1, 1, None, 201, 1), + # valid partial zip exists -> do not call celery + (0, 1, 0, None, 204, 0), # zips do not exist, version specified -> call celery task with specified version - (0, 0, 0, "v1", 202, 1), + (0, 0, 0, "v1", 201, 1), # specified version does not exist -> 404 (0, 0, 0, "v100", 404, 0), - # zip is ready to download - (1, 0, 0, None, 200, 0), + # zip is already prepared to download -> do not call celery + (1, 0, 0, None, 204, 0), ] @pytest.mark.parametrize( - "zipfile,partial,expired,version,exp_resp,exp_call", test_download_proj_data + "zipfile,partial,expired,version,exp_resp,exp_call", test_prepare_proj_data ) @patch("mergin.sync.tasks.create_project_version_zip.delay") -def test_download_project( +def test_prepare_archive( mock_create_zip, client, zipfile, @@ -455,7 +509,7 @@ def test_download_project( exp_call, diff_project, ): - """Test download endpoint responses and celery task calling""" + """Test prepare archive endpoint responses and celery task calling""" # prepare initial state according to testcase project_version = diff_project.get_latest_version() if zipfile: @@ -474,7 +528,7 @@ def test_download_project( seconds=current_app.config["PARTIAL_ZIP_EXPIRATION"] + 1 ) modify_file_times(temp_zip_path, new_time) - resp = client.get( + resp = client.post( url_for( "/app.mergin_sync_private_api_controller_download_project", id=diff_project.id, @@ -487,68 +541,3 @@ def test_download_project( call_args, _ = mock_create_zip.call_args args = call_args[0] assert args == diff_project.latest_version - - -def test_large_project_download_fail(client, diff_project): - """Test downloading too large project is refused""" - resp = client.get( - url_for( - "/app.mergin_sync_private_api_controller_download_project", - id=diff_project.id, - version="v1", - ) - ) - assert resp.status_code == 202 - # pretend testing project to be too large by lowering limit - client.application.config["MAX_DOWNLOAD_ARCHIVE_SIZE"] = 10 - resp = client.get( - url_for( - "/app.mergin_sync_private_api_controller_download_project", - id=diff_project.id, - version="v1", - ) - ) - assert resp.status_code == 400 - - -@patch("mergin.sync.tasks.create_project_version_zip.delay") -def test_remove_abandoned_zip(mock_prepare_zip, client, diff_project): - """Test project download removes partial zip which is inactive for some time""" - latest_version = diff_project.get_latest_version() - # pretend an incomplete zip remained - partial_zip_path = latest_version.zip_path + ".partial" - os.makedirs(os.path.dirname(partial_zip_path), exist_ok=True) - os.mknod(partial_zip_path) - assert os.path.exists(partial_zip_path) - # pretend abandoned partial zip by lowering the expiration limit - client.application.config["PARTIAL_ZIP_EXPIRATION"] = 0 - # download should remove it (move to temp folder) and call a celery task which will try to create a correct zip - resp = client.get( - url_for( - "/app.mergin_sync_private_api_controller_download_project", - id=diff_project.id, - ) - ) - assert mock_prepare_zip.called - assert resp.status_code == 202 - - -@patch("mergin.sync.tasks.create_project_version_zip.delay") -def test_download_project_request_method(mock_prepare_zip, client, diff_project): - """Test head request does not create a celery job""" - resp = client.head( - url_for( - "/app.mergin_sync_private_api_controller_download_project", - id=diff_project.id, - ) - ) - assert not mock_prepare_zip.called - assert resp.status_code == 202 - resp = client.get( - url_for( - "/app.mergin_sync_private_api_controller_download_project", - id=diff_project.id, - ) - ) - assert mock_prepare_zip.called - assert resp.status_code == 202 diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index b1f60a8f..1cba91cc 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -7,6 +7,7 @@ from dataclasses import asdict from unittest.mock import patch from urllib.parse import quote +from psycopg2 import IntegrityError import pysqlite3 import pytest import json @@ -35,7 +36,7 @@ PushChangeType, ProjectFilePath, ) -from ..sync.files import ChangesSchema +from ..sync.files import files_changes_from_upload from ..sync.schemas import ProjectListSchema from ..sync.utils import generate_checksum, is_versioned_file from ..auth.models import User, UserProfile @@ -1277,8 +1278,7 @@ def create_transaction(username, changes, version=1): project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id ).first() - upload_changes = ChangesSchema(context={"version": version}).load(changes) - upload = Upload(project, version, upload_changes, user.id) + upload = Upload(project, version, changes, user.id) db.session.add(upload) db.session.commit() upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", upload.id) @@ -1354,9 +1354,8 @@ def upload_chunks(upload_dir, changes, src_dir=test_project_dir): def test_push_finish(client): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) - url = "/v1/project/push/finish/{}".format(upload.id) - resp = client.post(url, headers=json_headers) + resp = client.post(f"/v1/project/push/finish/{upload.id}", headers=json_headers) assert resp.status_code == 422 assert "corrupted_files" in resp.json["detail"].keys() assert not os.path.exists(os.path.join(upload_dir, "files", "test.txt")) @@ -1364,21 +1363,28 @@ def test_push_finish(client): assert failure.error_type == "push_finish" assert "corrupted_files" in failure.error_details + upload, upload_dir = create_transaction("mergin", changes) os.mkdir(os.path.join(upload.project.storage.project_dir, "v2")) # mimic chunks were uploaded + chunks = [] os.makedirs(os.path.join(upload_dir, "chunks")) for f in upload.changes["added"] + upload.changes["updated"]: with open(os.path.join(test_project_dir, f["path"]), "rb") as in_file: for chunk in f["chunks"]: with open(os.path.join(upload_dir, "chunks", chunk), "wb") as out_file: out_file.write(in_file.read(CHUNK_SIZE)) + chunks.append(chunk) - resp2 = client.post(url, headers={**json_headers, "User-Agent": "Werkzeug"}) + resp2 = client.post( + f"/v1/project/push/finish/{upload.id}", + headers={**json_headers, "User-Agent": "Werkzeug"}, + ) assert resp2.status_code == 200 assert not os.path.exists(upload_dir) version = upload.project.get_latest_version() assert version.user_agent assert version.device_id == json_headers["X-Device-Id"] + assert all(not os.path.exists(chunk) for chunk in chunks) # tests basic failures resp3 = client.post("/v1/project/push/finish/not-existing") @@ -2274,12 +2280,12 @@ def add_project_version(project, changes, version=None): else User.query.filter_by(username=DEFAULT_USER[0]).first() ) next_version = version or project.next_version() - upload_changes = ChangesSchema(context={"version": next_version}).load(changes) + file_changes = files_changes_from_upload(changes, location_dir=f"v{next_version}") pv = ProjectVersion( project, next_version, author.id, - upload_changes, + file_changes, ip="127.0.0.1", ) db.session.add(pv) @@ -2293,19 +2299,23 @@ def test_project_version_integrity(client): changes = _get_changes_with_diff(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - # manually create an identical project version in db - pv = add_project_version(upload.project, changes) - # try to finish the transaction - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) - assert resp.status_code == 422 - assert "Failed to create new version" in resp.json["detail"] - failure = SyncFailuresHistory.query.filter_by(project_id=upload.project.id).first() - assert failure.error_type == "push_finish" - assert "Failed to create new version" in failure.error_details - upload.project.latest_version = pv.name - 1 - db.session.delete(pv) - db.session.delete(failure) - db.session.commit() + + # try to finish the transaction which would fail on version created integrity error, e.g. race conditions + with patch.object( + ProjectVersion, + "__init__", + side_effect=IntegrityError("Project version already exists", None, None), + ): + resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + assert resp.status_code == 422 + assert "Failed to create new version" in resp.json["detail"] + failure = SyncFailuresHistory.query.filter_by( + project_id=upload.project.id + ).first() + assert failure.error_type == "push_finish" + assert "Failed to create new version" in failure.error_details + db.session.delete(failure) + db.session.commit() # changes without an upload with patch("mergin.sync.public_api_controller.get_user_agent") as mock: @@ -2320,7 +2330,7 @@ def test_project_version_integrity(client): # to insert an identical project version when no upload (only one endpoint used), # we need to pretend side effect of a function called just before project version insertion def _get_user_agent(): - add_project_version(project, changes) + add_project_version(project, {}) # bypass endpoint checks upload.project.latest_version = ProjectVersion.from_v_name(data["version"]) return "Input" diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 2d88d652..85177190 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -1,13 +1,41 @@ # Copyright (C) Lutra Consulting Limited # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial -from .utils import add_user -from ..app import db -from mergin.sync.models import Project -from tests import test_project, test_workspace_id +import os +import shutil +from unittest.mock import patch +from sqlalchemy.exc import IntegrityError +import pytest +from datetime import datetime, timedelta, timezone -from ..config import Configuration -from ..sync.models import ProjectRole +from mergin.app import db +from mergin.config import Configuration +from mergin.sync.errors import ( + BigChunkError, + ProjectLocked, + ProjectVersionExists, + AnotherUploadRunning, + StorageLimitHit, + UploadError, +) +from mergin.sync.files import ChangesSchema +from mergin.sync.models import ( + Project, + ProjectRole, + ProjectVersion, + SyncFailuresHistory, + Upload, +) +from mergin.sync.utils import get_chunk_location +from . import TMP_DIR, test_project, test_workspace_id, test_project_dir +from .test_project_controller import ( + CHUNK_SIZE, + _get_changes, + _get_changes_with_diff, + _get_changes_with_diff_0_size, + _get_changes_without_added, +) +from .utils import add_user, file_info def test_schedule_delete_project(client): @@ -126,3 +154,346 @@ def test_project_members(client): # access provided by workspace role cannot be removed directly response = client.delete(url + f"/{user.id}") assert response.status_code == 404 + + +push_data = [ + # success + ( + {"version": "v1", "changes": _get_changes_without_added(test_project_dir)}, + 201, + None, + ), + # with diff, success + ({"version": "v1", "changes": _get_changes_with_diff(test_project_dir)}, 201, None), + # just a dry-run + ( + { + "version": "v1", + "changes": _get_changes_with_diff(test_project_dir), + "check_only": True, + }, + 204, + None, + ), + # only delete files + ( + { + "version": "v1", + "changes": { + "added": [], + "removed": [ + file_info(test_project_dir, "base.gpkg"), + ], + "updated": [], + }, + }, + 201, + None, + ), + # broken .gpkg file + ( + {"version": "v1", "changes": _get_changes_with_diff_0_size(test_project_dir)}, + 422, + UploadError.code, + ), + # contains already uploaded file + ( + {"version": "v1", "changes": _get_changes(test_project_dir)}, + 422, + UploadError.code, + ), + # version mismatch + ( + {"version": "v0", "changes": _get_changes_without_added(test_project_dir)}, + 409, + ProjectVersionExists.code, + ), + # no changes requested + ( + {"version": "v1", "changes": {"added": [], "removed": [], "updated": []}}, + 422, + UploadError.code, + ), + # inconsistent changes, a file cannot be added and updated at the same time + ( + { + "version": "v1", + "changes": { + "added": [ + { + "path": "test.txt", + "size": 1234, + "checksum": "9adb76bf81a34880209040ffe5ee262a090b62ab", + "chunks": [], + } + ], + "removed": [], + "updated": [ + { + "path": "test.txt", + "size": 1234, + "checksum": "9adb76bf81a34880209040ffe5ee262a090b62ab", + "chunks": [], + } + ], + }, + }, + 422, + UploadError.code, + ), + # inconsistent changes, a file which does not exist cannot be deleted + ( + { + "version": "v1", + "changes": { + "added": [], + "removed": [ + { + "path": "not-existing.txt", + "size": 1234, + "checksum": "9adb76bf81a34880209040ffe5ee262a090b62ab", + } + ], + "updated": [], + }, + }, + 422, + UploadError.code, + ), + # missing version (required parameter) + ({"changes": _get_changes_without_added(test_project_dir)}, 400, None), + # incorrect changes format + ({"version": "v1", "changes": {}}, 400, None), +] + + +@pytest.mark.parametrize("data,expected,err_code", push_data) +def test_create_version(client, data, expected, err_code): + """Test project push endpoint with different payloads.""" + + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + assert project.latest_version == 1 + + chunks = [] + if expected == 201: + # mimic chunks were uploaded + for f in data["changes"]["added"] + data["changes"]["updated"]: + src_file = ( + os.path.join(TMP_DIR, f["diff"]["path"]) + if f.get("diff") + else os.path.join(test_project_dir, f["path"]) + ) + with open(src_file, "rb") as in_file: + for chunk in f["chunks"]: + chunk_location = get_chunk_location(chunk) + os.makedirs(os.path.dirname(chunk_location), exist_ok=True) + with open(chunk_location, "wb") as out_file: + out_file.write(in_file.read(CHUNK_SIZE)) + + chunks.append(chunk_location) + + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == expected + if expected == 201: + assert response.json["version"] == "v2" + assert project.latest_version == 2 + assert all(not os.path.exists(chunk) for chunk in chunks) + else: + assert project.latest_version == 1 + if err_code: + assert response.json["code"] == err_code + failure = SyncFailuresHistory.query.filter_by(project_id=project.id).first() + # failures are not created when POST request body is invalid (caught by connexion validators) + if failure: + assert failure.last_version == "v1" + assert failure.error_type == "project_push" + + +def test_create_version_failures(client): + """Test various project push failures beyond invalid payload""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + + data = {"version": "v1", "changes": _get_changes_without_added(test_project_dir)} + + # somebody else is syncing + upload = Upload(project, 1, _get_changes(test_project_dir), 1) + db.session.add(upload) + db.session.commit() + os.makedirs(upload.upload_dir) + open(upload.lockfile, "w").close() + + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == 409 + assert response.json["code"] == AnotherUploadRunning.code + upload.clear() + + # project is locked + project.locked_until = datetime.now(timezone.utc) + timedelta(days=1) + db.session.commit() + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == 423 + assert response.json["code"] == ProjectLocked.code + project.locked_until = None + db.session.commit() + + # try to finish the transaction which would fail on storage limit + with patch.object( + Configuration, + "GLOBAL_STORAGE", + 0, + ): + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == 422 + assert response.json["code"] == StorageLimitHit.code + + # try to finish the transaction which would fail on version created integrity error, e.g. race conditions + with patch.object( + ProjectVersion, + "__init__", + side_effect=IntegrityError("Cannot insert new version", None, None), + ): + # keep just deleted data to avoid messing with chunks + data["changes"]["added"] = data["changes"]["updated"] = [] + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == 422 + assert response.json["code"] == UploadError.code + + # try to finish the transaction which would fail on existing Upload integrity error, e.g. race conditions + with patch.object( + Upload, + "__init__", + side_effect=IntegrityError("Cannot insert upload", None, None), + ): + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == 409 + assert response.json["code"] == AnotherUploadRunning.code + + # try to finish the transaction which would fail on unexpected integrity error + # patch of ChangesSchema is just a workaround to trigger and error + with patch.object( + ChangesSchema, + "validate", + side_effect=IntegrityError("Cannot insert upload", None, None), + ): + response = client.post(f"v2/projects/{project.id}/versions", json=data) + assert response.status_code == 409 + + +def test_upload_chunk(client): + """Test pushing a chunk to a project""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + url = f"/v2/projects/{project.id}/chunks" + client.application.config["MAX_CHUNK_SIZE"] = ( + 1024 # Set a small max chunk size for testing + ) + max_chunk_size = client.application.config["MAX_CHUNK_SIZE"] + + response = client.post( + url, + data=b"a" * (max_chunk_size + 1), # Exceeding max chunk size + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 413 + assert response.json["code"] == BigChunkError.code + + # Project is locked, cannot push chunks + project.locked_until = datetime.now(timezone.utc) + timedelta(weeks=26) + db.session.commit() + response = client.post( + url, + data=b"a", + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 423 + assert response.json["code"] == ProjectLocked.code + + project.locked_until = None # Unlock the project + project.removed_at = datetime.now(timezone.utc) - timedelta( + days=(client.application.config["DELETED_PROJECT_EXPIRATION"] + 1) + ) # Ensure project is removed + db.session.commit() + response = client.post( + url, + data=b"a", + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 404 + + # Push a chunk successfully + project.removed_at = None # Ensure project is not removed + db.session.commit() + response = client.post( + url, + data=b"a" * max_chunk_size, + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 200 + chunk_id = response.json["id"] + assert chunk_id + valid_until = response.json["valid_until"] + valid_until_dt = datetime.strptime(valid_until, "%Y-%m-%dT%H:%M:%S%z") + assert valid_until_dt > datetime.now(timezone.utc) + assert valid_until_dt < datetime.now(timezone.utc) + timedelta( + seconds=client.application.config["UPLOAD_CHUNKS_EXPIRATION"] + ) + # Check if the chunk is stored correctly + stored_chunk = get_chunk_location(chunk_id) + assert os.path.exists(stored_chunk) + with open(stored_chunk, "rb") as f: + assert f.read() == b"a" * max_chunk_size + + +def test_full_push(client): + """Test full project push with upload of chunks and project version creation""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + + # prepare data to push + project_dir = os.path.join(TMP_DIR, test_project) + if os.path.exists(project_dir): + shutil.rmtree(project_dir) + shutil.copytree(test_project_dir, project_dir) + os.rename( + os.path.join(project_dir, "base.gpkg"), + os.path.join(project_dir, "new_base.gpkg"), + ) + + test_file = file_info(project_dir, "new_base.gpkg", chunk_size=CHUNK_SIZE) + uploaded_chunks = [] + + with open(os.path.join(project_dir, test_file["path"]), "rb") as in_file: + for _ in test_file["chunks"]: + data = in_file.read(CHUNK_SIZE) + response = client.post( + f"/v2/projects/{project.id}/chunks", + data=data, + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 200 + uploaded_chunks.append(response.json["id"]) + chunk_location = get_chunk_location(response.json["id"]) + assert os.path.exists(chunk_location) + + test_file["chunks"] = uploaded_chunks + + response = client.post( + f"v2/projects/{project.id}/versions", + json={ + "version": "v1", + "changes": {"added": [test_file], "updated": [], "removed": []}, + }, + ) + assert response.status_code == 201 + assert response.json["version"] == "v2" + assert project.latest_version == 2 + assert os.path.exists( + os.path.join(project.storage.project_dir, "v2", test_file["path"]) + ) + assert not Upload.query.filter_by(project_id=project.id).first() diff --git a/server/mergin/tests/utils.py b/server/mergin/tests/utils.py index 94fc033f..6dcfd157 100644 --- a/server/mergin/tests/utils.py +++ b/server/mergin/tests/utils.py @@ -4,13 +4,11 @@ import json import shutil -from typing import Tuple import pysqlite3 import uuid import math from dataclasses import asdict from datetime import datetime - import pysqlite3 from flask import url_for, current_app import os @@ -20,7 +18,7 @@ from ..auth.models import User, UserProfile from ..sync.utils import generate_location, generate_checksum from ..sync.models import Project, ProjectVersion, FileHistory, ProjectRole -from ..sync.files import UploadChanges, ChangesSchema +from ..sync.files import ProjectFileChange, PushChangeType, files_changes_from_upload from ..sync.workspace import GlobalWorkspace from ..app import db from . import json_headers, DEFAULT_USER, test_project, test_project_dir, TMP_DIR @@ -82,8 +80,7 @@ def create_project(name, workspace, user, **kwargs): p.updated = datetime.utcnow() db.session.add(p) db.session.flush() - changes = UploadChanges(added=[], updated=[], removed=[]) - pv = ProjectVersion(p, 0, user.id, changes, "127.0.0.1") + pv = ProjectVersion(p, 0, user.id, [], "127.0.0.1") db.session.add(pv) db.session.commit() @@ -156,15 +153,17 @@ def initialize(): for f in files: abs_path = os.path.join(root, f) project_files.append( - { - "path": abs_path.replace(test_project_dir, "").lstrip("/"), - "location": os.path.join( + ProjectFileChange( + path=abs_path.replace(test_project_dir, "").lstrip("/"), + checksum=generate_checksum(abs_path), + size=os.path.getsize(abs_path), + mtime=str(datetime.fromtimestamp(os.path.getmtime(abs_path))), + change=PushChangeType.CREATE, + location=os.path.join( "v1", abs_path.replace(test_project_dir, "").lstrip("/") ), - "size": os.path.getsize(abs_path), - "checksum": generate_checksum(abs_path), - "mtime": str(datetime.fromtimestamp(os.path.getmtime(abs_path))), - } + diff=None, + ) ) p.latest_version = 1 p.public = True @@ -173,14 +172,7 @@ def initialize(): db.session.add(p) db.session.commit() - upload_changes = ChangesSchema(context={"version": 1}).load( - { - "added": project_files, - "updated": [], - "removed": [], - } - ) - pv = ProjectVersion(p, 1, user.id, upload_changes, "127.0.0.1") + pv = ProjectVersion(p, 1, user.id, project_files, "127.0.0.1") db.session.add(pv) db.session.commit() @@ -285,7 +277,7 @@ def create_blank_version(project): project, project.next_version(), project.creator.id, - UploadChanges(added=[], updated=[], removed=[]), + [], "127.0.0.1", ) db.session.add(pv) @@ -355,14 +347,14 @@ def push_change(project, action, path, src_dir): else: return - upload_changes = ChangesSchema(context={"version": project.next_version()}).load( - changes + file_changes = files_changes_from_upload( + changes, location_dir=f"v{project.next_version()}" ) pv = ProjectVersion( project, project.next_version(), project.creator.id, - upload_changes, + file_changes, "127.0.0.1", ) db.session.add(pv) diff --git a/server/mergin/version.py b/server/mergin/version.py index 214212f7..cdacf710 100644 --- a/server/mergin/version.py +++ b/server/mergin/version.py @@ -4,4 +4,4 @@ def get_version(): - return "2025.6.2" + return "2025.7.3" diff --git a/server/migrations/community/b9ec9ab6694f_add_user_last_signed_in.py b/server/migrations/community/b9ec9ab6694f_add_user_last_signed_in.py new file mode 100644 index 00000000..0ebc9250 --- /dev/null +++ b/server/migrations/community/b9ec9ab6694f_add_user_last_signed_in.py @@ -0,0 +1,25 @@ +"""Add user last signed in + +Revision ID: b9ec9ab6694f +Revises: 6cb54659c1de +Create Date: 2025-09-09 15:43:19.554498 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "b9ec9ab6694f" +down_revision = "6cb54659c1de" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column("user", sa.Column("last_signed_in", sa.DateTime(), nullable=True)) + + +def downgrade(): + op.drop_column("user", "last_signed_in") diff --git a/server/setup.py b/server/setup.py index 2abd701d..33f41ea7 100644 --- a/server/setup.py +++ b/server/setup.py @@ -6,7 +6,7 @@ setup( name="mergin", - version="2025.6.2", + version="2025.7.3", url="https://github.com/MerginMaps/mergin", license="AGPL-3.0-only", author="Lutra Consulting Limited", diff --git a/web-app/packages/lib/src/common/components/AppSidebarRight.vue b/web-app/packages/lib/src/common/components/AppSidebarRight.vue index 89a839ba..008c36eb 100644 --- a/web-app/packages/lib/src/common/components/AppSidebarRight.vue +++ b/web-app/packages/lib/src/common/components/AppSidebarRight.vue @@ -12,7 +12,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial block-scroll :modal="true" position="right" - class="w-11 lg:w-5 xl:w-3" + class="w-11 lg:w-30rem" :pt="{ mask: { style: { diff --git a/web-app/packages/lib/src/modules/project/projectApi.ts b/web-app/packages/lib/src/modules/project/projectApi.ts index 91b6edbb..657675c8 100644 --- a/web-app/packages/lib/src/modules/project/projectApi.ts +++ b/web-app/packages/lib/src/modules/project/projectApi.ts @@ -311,6 +311,10 @@ export const ProjectApi = { return ProjectModule.httpService.get(url, { responseType: 'blob' }) }, + async prepareArchive(url: string): Promise> { + return ProjectModule.httpService.post(url) + }, + /** Request head of file download */ async getHeadDownloadFile(url: string): Promise> { return ProjectModule.httpService.head(url) diff --git a/web-app/packages/lib/src/modules/project/store.ts b/web-app/packages/lib/src/modules/project/store.ts index 182bdea2..6b2f2cb9 100644 --- a/web-app/packages/lib/src/modules/project/store.ts +++ b/web-app/packages/lib/src/modules/project/store.ts @@ -719,8 +719,12 @@ export const useProjectStore = defineStore('projectModule', { const delays = [...Array(3).fill(1000), ...Array(3).fill(3000), 5000] let retryCount = 0 - const pollDownloadArchive = async () => { - try { + try { + // STEP 1: request archive creation + await ProjectApi.prepareArchive(payload.url) + + // STEP 2: start polling HEAD for readiness + const pollDownloadArchive = async () => { if (retryCount > 125) { notificationStore.warn({ text: exceedMessage, @@ -729,38 +733,42 @@ export const useProjectStore = defineStore('projectModule', { await this.cancelDownloadArchive() return } - - const head = await ProjectApi.getHeadDownloadFile(payload.url) - const polling = head.status == 202 - if (polling) { - const delay = delays[Math.min(retryCount, delays.length - 1)] // Select delay based on retry count - retryCount++ // Increment retry count - downloadArchiveTimeout = setTimeout(async () => { - await pollDownloadArchive() - }, delay) - return - } - - // Use browser download instead of playing around with the blob - FileSaver.saveAs(payload.url) - notificationStore.closeNotification() - this.cancelDownloadArchive() - } catch (e) { - if (axios.isAxiosError(e) && e.response?.status === 400) { - notificationStore.error({ - group: 'download-large-error', - text: '', - life: 6000 - }) - } else { - notificationStore.error({ - text: errorMessage - }) + try { + const head = await ProjectApi.getHeadDownloadFile(payload.url) + const polling = head.status === 202 + if (polling) { + const delay = delays[Math.min(retryCount, delays.length - 1)] // Select delay based on retry count + retryCount++ // Increment retry count + downloadArchiveTimeout = setTimeout(async () => { + await pollDownloadArchive() + }, delay) + return + } + + // Use browser download instead of playing around with the blob + FileSaver.saveAs(payload.url) + notificationStore.closeNotification() + this.cancelDownloadArchive() + } catch (e) { + notificationStore.error({ text: errorMessage }) + this.cancelDownloadArchive() } - this.cancelDownloadArchive() } + pollDownloadArchive() + } catch (e) { + if (axios.isAxiosError(e) && e.response?.status === 400) { + notificationStore.error({ + group: 'download-large-error', + text: '', + life: 6000 + }) + } else { + notificationStore.error({ + text: errorMessage + }) + } + this.cancelDownloadArchive() } - pollDownloadArchive() }, async cancelDownloadArchive() {