Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions server/mergin/sync/public_api_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -1028,14 +1028,6 @@ def push_finish(transaction_id):
# let's move uploaded files where they are expected to be
os.renames(files_dir, version_dir)

# remove used chunks
for file in upload.changes["added"] + upload.changes["updated"]:
file_chunks = file.get("chunks", [])
for chunk_id in file_chunks:
chunk_file = os.path.join(upload.upload_dir, "chunks", chunk_id)
if os.path.exists(chunk_file):
move_to_tmp(chunk_file)

logging.info(
f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}."
)
Expand Down
11 changes: 6 additions & 5 deletions server/mergin/sync/public_api_v2_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from marshmallow import ValidationError
from sqlalchemy.exc import IntegrityError

from mergin.sync.tasks import remove_transaction_chunks

from .schemas_v2 import ProjectSchema as ProjectSchemaV2
from ..app import db
from ..auth import auth_required
Expand Down Expand Up @@ -319,12 +321,12 @@ def create_project_version(id):
os.renames(temp_files_dir, version_dir)

# remove used chunks
# get chunks from added and updated files
chunks_ids = []
for file in to_be_added_files + to_be_updated_files:
file_chunks = file.get("chunks", [])
for chunk_id in file_chunks:
chunk_file = get_chunk_location(chunk_id)
if os.path.exists(chunk_file):
move_to_tmp(chunk_file)
chunks_ids.extend(file_chunks)
remove_transaction_chunks.delay(chunks_ids)

logging.info(
f"Push finished for project: {project.id}, project version: {v_next_version}, upload id: {upload.id}."
Expand Down Expand Up @@ -377,7 +379,6 @@ def upload_chunk(id: str):
# we could have used request.data here, but it could eventually cause OOM issue
save_to_file(request.stream, dest_file, current_app.config["MAX_CHUNK_SIZE"])
except IOError:
move_to_tmp(dest_file, chunk_id)
return BigChunkError().response(413)
except Exception as e:
return UploadError(error="Error saving chunk").response(400)
Expand Down
14 changes: 13 additions & 1 deletion server/mergin/sync/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
import os
import time
from datetime import datetime, timedelta, timezone
from typing import List, Optional
from zipfile import ZIP_DEFLATED, ZipFile
from flask import current_app

from .models import Project, ProjectVersion, FileHistory
from .storages.disk import move_to_tmp
from .config import Configuration
from .utils import remove_outdated_files
from .utils import get_chunk_location, remove_outdated_files
from ..celery import celery
from ..app import db

Expand Down Expand Up @@ -169,3 +170,14 @@ def remove_unused_chunks():
if not os.path.isdir(dir):
continue
remove_outdated_files(dir, time_delta)


@celery.task
def remove_transaction_chunks(chunks: Optional[List[str]] = None):
"""Remove chunks related to a specific sync transaction"""
if not chunks:
return
for chunk in chunks:
chunk_path = get_chunk_location(chunk)
if os.path.exists(chunk_path):
os.remove(chunk_path)
14 changes: 13 additions & 1 deletion server/mergin/tests/test_public_api_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial

from mergin.sync.tasks import remove_transaction_chunks, remove_unused_chunks
from . import DEFAULT_USER
from .utils import (
add_user,
Expand All @@ -22,6 +23,7 @@

from mergin.app import db
from mergin.config import Configuration
from mergin.sync.config import Configuration as SyncConfiguration
from mergin.sync.errors import (
BigChunkError,
ProjectLocked,
Expand Down Expand Up @@ -356,6 +358,7 @@ def test_create_version(client, data, expected, err_code):
assert project.latest_version == 1

chunks = []
chunk_ids = []
if expected == 201:
# mimic chunks were uploaded
for f in data["changes"]["added"] + data["changes"]["updated"]:
Expand All @@ -372,12 +375,21 @@ def test_create_version(client, data, expected, err_code):
out_file.write(in_file.read(CHUNK_SIZE))

chunks.append(chunk_location)
chunk_ids.append(chunk)

response = client.post(f"v2/projects/{project.id}/versions", json=data)
with patch(
"mergin.sync.public_api_v2_controller.remove_transaction_chunks.delay"
) as mock_remove:
response = client.post(f"v2/projects/{project.id}/versions", json=data)
assert response.status_code == expected
if expected == 201:
assert response.json["version"] == "v2"
assert project.latest_version == 2
# chunks exists after upload, cleanup job did not remove them
assert all(os.path.exists(chunk) for chunk in chunks)
if chunk_ids:
assert mock_remove.called_once_with(chunk_ids)
remove_transaction_chunks(chunk_ids)
assert all(not os.path.exists(chunk) for chunk in chunks)
else:
assert project.latest_version == 1
Expand Down
Loading