diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index f0c9047c..0b487874 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -1028,14 +1028,6 @@ def push_finish(transaction_id): # let's move uploaded files where they are expected to be os.renames(files_dir, version_dir) - # remove used chunks - for file in upload.changes["added"] + upload.changes["updated"]: - file_chunks = file.get("chunks", []) - for chunk_id in file_chunks: - chunk_file = os.path.join(upload.upload_dir, "chunks", chunk_id) - if os.path.exists(chunk_file): - move_to_tmp(chunk_file) - logging.info( f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." ) diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 27e0355a..217204c1 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -14,6 +14,8 @@ from marshmallow import ValidationError from sqlalchemy.exc import IntegrityError +from mergin.sync.tasks import remove_transaction_chunks + from .schemas_v2 import ProjectSchema as ProjectSchemaV2 from ..app import db from ..auth import auth_required @@ -319,12 +321,12 @@ def create_project_version(id): os.renames(temp_files_dir, version_dir) # remove used chunks + # get chunks from added and updated files + chunks_ids = [] for file in to_be_added_files + to_be_updated_files: file_chunks = file.get("chunks", []) - for chunk_id in file_chunks: - chunk_file = get_chunk_location(chunk_id) - if os.path.exists(chunk_file): - move_to_tmp(chunk_file) + chunks_ids.extend(file_chunks) + remove_transaction_chunks.delay(chunks_ids) logging.info( f"Push finished for project: {project.id}, project version: {v_next_version}, upload id: {upload.id}." @@ -377,7 +379,6 @@ def upload_chunk(id: str): # we could have used request.data here, but it could eventually cause OOM issue save_to_file(request.stream, dest_file, current_app.config["MAX_CHUNK_SIZE"]) except IOError: - move_to_tmp(dest_file, chunk_id) return BigChunkError().response(413) except Exception as e: return UploadError(error="Error saving chunk").response(400) diff --git a/server/mergin/sync/tasks.py b/server/mergin/sync/tasks.py index 7688a3ee..9392997c 100644 --- a/server/mergin/sync/tasks.py +++ b/server/mergin/sync/tasks.py @@ -7,13 +7,14 @@ import os import time from datetime import datetime, timedelta, timezone +from typing import List, Optional from zipfile import ZIP_DEFLATED, ZipFile from flask import current_app from .models import Project, ProjectVersion, FileHistory from .storages.disk import move_to_tmp from .config import Configuration -from .utils import remove_outdated_files +from .utils import get_chunk_location, remove_outdated_files from ..celery import celery from ..app import db @@ -169,3 +170,14 @@ def remove_unused_chunks(): if not os.path.isdir(dir): continue remove_outdated_files(dir, time_delta) + + +@celery.task +def remove_transaction_chunks(chunks: Optional[List[str]] = None): + """Remove chunks related to a specific sync transaction""" + if not chunks: + return + for chunk in chunks: + chunk_path = get_chunk_location(chunk) + if os.path.exists(chunk_path): + os.remove(chunk_path) diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index dda0bc53..6a4243fd 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial +from mergin.sync.tasks import remove_transaction_chunks, remove_unused_chunks from . import DEFAULT_USER from .utils import ( add_user, @@ -22,6 +23,7 @@ from mergin.app import db from mergin.config import Configuration +from mergin.sync.config import Configuration as SyncConfiguration from mergin.sync.errors import ( BigChunkError, ProjectLocked, @@ -356,6 +358,7 @@ def test_create_version(client, data, expected, err_code): assert project.latest_version == 1 chunks = [] + chunk_ids = [] if expected == 201: # mimic chunks were uploaded for f in data["changes"]["added"] + data["changes"]["updated"]: @@ -372,12 +375,21 @@ def test_create_version(client, data, expected, err_code): out_file.write(in_file.read(CHUNK_SIZE)) chunks.append(chunk_location) + chunk_ids.append(chunk) - response = client.post(f"v2/projects/{project.id}/versions", json=data) + with patch( + "mergin.sync.public_api_v2_controller.remove_transaction_chunks.delay" + ) as mock_remove: + response = client.post(f"v2/projects/{project.id}/versions", json=data) assert response.status_code == expected if expected == 201: assert response.json["version"] == "v2" assert project.latest_version == 2 + # chunks exists after upload, cleanup job did not remove them + assert all(os.path.exists(chunk) for chunk in chunks) + if chunk_ids: + assert mock_remove.called_once_with(chunk_ids) + remove_transaction_chunks(chunk_ids) assert all(not os.path.exists(chunk) for chunk in chunks) else: assert project.latest_version == 1