Skip to content

Commit

Permalink
feat(service): support chunked file uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
Panaetius committed May 10, 2022
1 parent eb58098 commit 11125d6
Show file tree
Hide file tree
Showing 18 changed files with 601 additions and 233 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Expand Up @@ -67,7 +67,7 @@ services:
- redis
- reverse-proxy
ports:
- "8081:8080"
- "8082:8080"
volumes:
- ${PWD}:/code/renku
command: ["service", "api"]
Expand Down
2 changes: 2 additions & 0 deletions helm-chart/renku-core/templates/deployment.yaml
Expand Up @@ -104,6 +104,8 @@ spec:
value: {{ $.Values.projectCloneDepth | quote }}
- name: TEMPLATE_CLONE_DEPTH_DEFAULT
value: {{ $.Values.templateCloneDepth | quote }}
- name: MAX_CONTENT_LENGTH
value: {{ $.Values.maximumUploadSizeBytes | quote }}
- name: CORE_SERVICE_PREFIX
value: /renku
- name: CORE_SERVICE_API_BASE_PATH
Expand Down
5 changes: 5 additions & 0 deletions helm-chart/renku-core/values.schema.json
Expand Up @@ -26,6 +26,11 @@
"type": "integer",
"minimum": 1
},
"maximumUploadSizeBytes": {
"description": "Maximum allowed file upload size.",
"type": "integer",
"minimum": 1
},
"datasetsWorkerQueues": {
"description": "Name of the worker queue for dataset jobs",
"type": "string"
Expand Down
1 change: 1 addition & 0 deletions helm-chart/renku-core/values.yaml
Expand Up @@ -39,6 +39,7 @@ cacheDirectory: /svc/cache
cleanupInterval: 60 # NOTE: This needs to be a divisor of, and less than cleanupFilesTTL|cleanupProjectsTTL.
projectCloneDepth: 1
templateCloneDepth: 1
maximumUploadSizeBytes: 1073741824 # 1 Gigabyte

datasetsWorkerQueues: datasets.jobs,delayed.ctrl.DatasetsCreateCtrl,delayed.ctrl.DatasetsAddFileCtrl,delayed.ctrl.DatasetsRemoveCtrl,delayed.ctrl.DatasetsImportCtrl,delayed.ctrl.DatasetsEditCtrl,delayed.ctrl.DatasetsUnlinkCtrl
managementWorkerQueues: cache.cleanup.files,cache.cleanup.projects,delayed.ctrl.MigrateProjectCtrl,delayed.ctrl.SetConfigCtrl
Expand Down
493 changes: 284 additions & 209 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Expand Up @@ -69,7 +69,7 @@ cwltool = "==3.1.20211107152837"
deepmerge = "==1.0.1"
docker = "<6,>=3.7.2"
environ-config = "<21.3.0,>=18.2.0"
fakeredis = { version = ">=1.4.1,<1.7.2", optional = true }
fakeredis = { version = ">=1.4.1,<1.7.2", extras = ["lua"], optional = true }
filelock = ">=3.0.0,<3.6.1"
flake8 = { version = "<4.0,>=3.8", optional = true } #wait for https://github.com/flakehell/flakehell/pull/23 to be merged before bumping
flakehell = { version = ">=0.9.0,<1.0.*", optional = true }
Expand Down Expand Up @@ -129,7 +129,7 @@ responses = { version = "<0.19.0,>=0.7.0", optional = true }
rich = "<11.3.0,>=9.3.0"
rq = { version = "==1.10.1", optional = true }
rq-scheduler = { version = "==0.11.0", optional = true }
sentry-sdk = { version = ">=1.0.0,<1.5.6", extras = ["flask"], optional = true }
sentry-sdk = { version = ">=1.0.0,<1.5.12,!=1.5.10", extras = ["flask"], optional = true }
shellingham = "1.4.0"
sphinxcontrib-spelling = { version = "7.*", optional = true }
sphinx-rtd-theme = { version = "<1.1,>=0.5.0", optional = true }
Expand Down
34 changes: 32 additions & 2 deletions renku/ui/service/cache/files.py
Expand Up @@ -17,15 +17,16 @@
# limitations under the License.
"""Renku service files cache management."""
from renku.ui.service.cache.base import BaseCache
from renku.ui.service.cache.models.file import File
from renku.ui.service.cache.models.file import File, FileChunk
from renku.ui.service.cache.models.user import User
from renku.ui.service.cache.serializers.file import FileSchema
from renku.ui.service.cache.serializers.file import FileChunkSchema, FileSchema


class FileManagementCache(BaseCache):
"""File management cache."""

file_schema = FileSchema()
chunk_schema = FileChunkSchema()

def set_file(self, user, file_data):
"""Cache file metadata."""
Expand All @@ -40,6 +41,15 @@ def set_files(self, user, files):
"""Cache files metadata."""
return [self.set_file(user, file_) for file_ in files]

def set_file_chunk(self, user, chunk_data):
"""Cache chunk metadata."""
chunk_data.update({"user_id": user.user_id})

chunk_obj = self.chunk_schema.load(chunk_data)
chunk_obj.save()

return chunk_obj

@staticmethod
def get_file(user, file_id):
"""Get user file."""
Expand All @@ -55,6 +65,21 @@ def get_files(user):
"""Get all user cached files."""
return File.query(File.user_id == user.user_id)

@staticmethod
def get_chunks(user, chunked_id=None):
"""Get all user chunks for a file."""
if chunked_id is not None:
return FileChunk.query(FileChunk.user_id == user.user_id and FileChunk.chunked_id == chunked_id)
return FileChunk.query(FileChunk.user_id == user.user_id)

@staticmethod
def invalidate_chunks(user, chunked_id):
"""Remove all user chunks for a file."""
chunks = FileChunk.query(FileChunk.user_id == user.user_id and FileChunk.chunked_id == chunked_id)

for chunk in chunks:
chunk.delete()

@staticmethod
def invalidate_file(user, file_id):
"""Remove users file records."""
Expand All @@ -69,3 +94,8 @@ def user_files(self):
"""Iterate through all cached files."""
for user in User.all():
yield user, self.get_files(user)

def user_chunks(self):
"""Iterate through all cached files."""
for user in User.all():
yield user, self.get_chunks(user)
51 changes: 51 additions & 0 deletions renku/ui/service/cache/models/file.py
Expand Up @@ -91,3 +91,54 @@ def purge(self):
def is_locked(self, jobs):
"""Check if file locked by given jobs."""
return bool(next((job for job in jobs if self.file_id in job.locked), False))


class FileChunk(Model):
"""User file chunk object."""

__database__ = BaseCache.model_db
__namespace__ = BaseCache.namespace

created_at = DateTimeField()

file_id = TextField(primary_key=True, index=True)
user_id = TextField(index=True)
chunked_id = TextField(index=True)

file_name = TextField()
relative_path = TextField()

@property
def abs_path(self):
"""Full path of cached file."""
return CACHE_UPLOADS_PATH / self.user_id / self.chunked_id / self.relative_path

@property
def age(self):
"""Returns file's age in seconds."""
# NOTE: `created_at` field is aligned to UTC timezone.
age = int((datetime.utcnow() - self.created_at).total_seconds())
return age

def exists(self):
"""Ensure a file exists on file system."""
if self.abs_path.exists():
return True

return False

def ttl_expired(self, ttl=None):
"""Check if file time to live has expired."""
if not self.created_at:
# If record does not contain created_at,
# it means its an old record, and
# we should mark it for deletion.
return True

ttl = ttl or int(os.getenv("RENKU_SVC_CLEANUP_TTL_FILES", 1800))
return self.age >= ttl

def purge(self):
"""Removes file from file system and cache."""
self.abs_path.unlink()
self.delete()
21 changes: 19 additions & 2 deletions renku/ui/service/cache/serializers/file.py
Expand Up @@ -16,10 +16,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Renku service file cache serializers."""
import uuid

from marshmallow import fields, post_load

from renku.ui.service.cache.models.file import File
from renku.ui.service.serializers.common import FileDetailsSchema, MandatoryUserSchema
from renku.ui.service.cache.models.file import File, FileChunk
from renku.ui.service.serializers.common import CreationSchema, FileDetailsSchema, MandatoryUserSchema


class FileSchema(FileDetailsSchema, MandatoryUserSchema):
Expand All @@ -34,3 +36,18 @@ class FileSchema(FileDetailsSchema, MandatoryUserSchema):
def make_file(self, data, **options):
"""Construct file object."""
return File(**data)


class FileChunkSchema(CreationSchema, MandatoryUserSchema):
"""Schema for file model."""

file_id = fields.String(missing=lambda: uuid.uuid4().hex)
file_name = fields.String(required=True)

chunked_id = fields.String(required=True)
relative_path = fields.String(required=True)

@post_load
def make_file(self, data, **options):
"""Construct file object."""
return FileChunk(**data)
3 changes: 3 additions & 0 deletions renku/ui/service/config.py
Expand Up @@ -42,6 +42,9 @@
PROJECT_CLONE_DEPTH_DEFAULT = int(os.getenv("PROJECT_CLONE_DEPTH_DEFAULT", 1))
TEMPLATE_CLONE_DEPTH_DEFAULT = int(os.getenv("TEMPLATE_CLONE_DEPTH_DEFAULT", 0))

# NOTE: Defaults to 1GB
MAX_CONTENT_LENGTH = int(os.getenv("MAX_CONTENT_LENGTH", 1073741824))

CACHE_DIR = os.getenv("CACHE_DIR", os.path.realpath(tempfile.TemporaryDirectory().name))
CACHE_UPLOADS_PATH = Path(CACHE_DIR) / Path("uploads")
CACHE_UPLOADS_PATH.mkdir(parents=True, exist_ok=True)
Expand Down
82 changes: 74 additions & 8 deletions renku/ui/service/controllers/cache_files_upload.py
Expand Up @@ -24,10 +24,11 @@
from patoolib.util import PatoolError

from renku.core.errors import RenkuException
from renku.ui.service.config import CACHE_UPLOADS_PATH, SUPPORTED_ARCHIVES
from renku.core.util.file_size import bytes_to_unit
from renku.ui.service.config import CACHE_UPLOADS_PATH, MAX_CONTENT_LENGTH, SUPPORTED_ARCHIVES
from renku.ui.service.controllers.api.abstract import ServiceCtrl
from renku.ui.service.controllers.api.mixins import RenkuOperationMixin
from renku.ui.service.errors import IntermittentFileExistsError
from renku.ui.service.errors import IntermittentFileExistsError, UserUploadTooLargeError
from renku.ui.service.serializers.cache import FileUploadRequest, FileUploadResponseRPC, extract_file
from renku.ui.service.views import result_response

Expand All @@ -47,7 +48,8 @@ def __init__(self, cache, user_data, flask_request):
"content_type": self.file.content_type,
"is_archive": self.file.content_type in SUPPORTED_ARCHIVES,
}
self.response_builder.update(UploadFilesCtrl.REQUEST_SERIALIZER.load(flask_request.args))
args = {**flask_request.args, **flask_request.form}
self.response_builder.update(UploadFilesCtrl.REQUEST_SERIALIZER.load(args))

super(UploadFilesCtrl, self).__init__(cache, user_data, {})

Expand All @@ -56,6 +58,62 @@ def context(self):
"""Controller operation context."""
return self.response_builder

def process_upload(self):
"""Process an upload."""
if self.response_builder.get("chunked_id", None) is None:
return self.process_file()

return self.process_chunked_upload()

def process_chunked_upload(self):
"""Process upload done in chunks."""
if self.response_builder["total_size"] > MAX_CONTENT_LENGTH:
if MAX_CONTENT_LENGTH > 524288000:
max_size = bytes_to_unit(MAX_CONTENT_LENGTH, "gb")
max_size_str = f"{max_size} gb"
else:
max_size = bytes_to_unit(MAX_CONTENT_LENGTH, "mb")
max_size_str = f"{max_size} mb"
raise UserUploadTooLargeError(maximum_size=max_size_str, exception=None)

chunked_id = self.response_builder["chunked_id"]

user_cache_dir = CACHE_UPLOADS_PATH / self.user.user_id
chunks_dir = user_cache_dir / chunked_id
chunks_dir.mkdir(exist_ok=True, parents=True)

current_chunk = self.response_builder["chunk_index"]
total_chunks = self.response_builder["chunk_count"]

file_path = chunks_dir / str(current_chunk)
relative_path = file_path.relative_to(CACHE_UPLOADS_PATH / self.user.user_id)

self.file.save(str(file_path))

with self.cache.model_db.lock(f"chunked_upload_{self.user.user_id}_{chunked_id}"):
self.cache.set_file_chunk(
self.user,
{
"chunked_id": chunked_id,
"file_name": str(current_chunk),
"relative_path": str(relative_path),
},
)
completed = len(list(self.cache.get_chunks(self.user, chunked_id))) == total_chunks

if not completed:
return {}

file_path = user_cache_dir / self.file.filename

with open(file_path, "wb") as f:
for file_number in range(total_chunks):
f.write((chunks_dir / str(file_number)).read_bytes())
shutil.rmtree(chunks_dir)
self.cache.invalidate_chunks(self.user, chunked_id)

return self.postprocess_file(file_path, user_cache_dir)

def process_file(self):
"""Process uploaded file."""
user_cache_dir = CACHE_UPLOADS_PATH / self.user.user_id
Expand All @@ -70,6 +128,10 @@ def process_file(self):

self.file.save(str(file_path))

return self.postprocess_file(file_path, user_cache_dir)

def postprocess_file(self, file_path, user_cache_dir):
"""Postprocessing of uploaded file."""
files = []
if self.response_builder["unpack_archive"] and self.response_builder["is_archive"]:
unpack_dir = "{0}.unpacked".format(file_path.name)
Expand Down Expand Up @@ -98,11 +160,15 @@ def process_file(self):
else:
relative_path = file_path.relative_to(CACHE_UPLOADS_PATH / self.user.user_id)

self.response_builder["file_size"] = os.stat(file_path).st_size
self.response_builder["relative_path"] = str(relative_path)
self.response_builder["is_dir"] = file_path.is_dir()
file_obj = {
"file_name": self.response_builder["file_name"],
"file_size": os.stat(file_path).st_size,
"relative_path": str(relative_path),
"is_dir": file_path.is_dir(),
"is_archive": self.response_builder["is_archive"],
}

files.append(self.response_builder)
files.append(file_obj)

files = self.cache.set_files(self.user, files)
return {"files": files}
Expand All @@ -114,4 +180,4 @@ def renku_op(self):

def to_response(self):
"""Execute controller flow and serialize to service response."""
return result_response(UploadFilesCtrl.RESPONSE_SERIALIZER, self.process_file())
return result_response(UploadFilesCtrl.RESPONSE_SERIALIZER, self.process_upload())
8 changes: 3 additions & 5 deletions renku/ui/service/entrypoint.py
Expand Up @@ -29,7 +29,7 @@
from sentry_sdk.integrations.rq import RqIntegration

from renku.ui.service.cache import cache
from renku.ui.service.config import CACHE_DIR, SENTRY_ENABLED, SENTRY_SAMPLERATE, SERVICE_PREFIX
from renku.ui.service.config import CACHE_DIR, MAX_CONTENT_LENGTH, SENTRY_ENABLED, SENTRY_SAMPLERATE, SERVICE_PREFIX
from renku.ui.service.errors import (
ProgramHttpMethodError,
ProgramHttpMissingError,
Expand Down Expand Up @@ -69,9 +69,7 @@ def create_app(custom_exceptions=True):
app.json_encoder = SvcJSONEncoder
app.config["UPLOAD_FOLDER"] = CACHE_DIR

max_content_size = os.getenv("MAX_CONTENT_LENGTH")
if max_content_size:
app.config["MAX_CONTENT_LENGTH"] = max_content_size
app.config["MAX_CONTENT_LENGTH"] = MAX_CONTENT_LENGTH

app.config["cache"] = cache

Expand Down Expand Up @@ -102,7 +100,7 @@ def register_exceptions(app):

@app.errorhandler(Exception)
def exceptions(e):
"""This exceptions handler manages Flask/Werkzeug exceptions.
"""Exception handler that manages Flask/Werkzeug exceptions.
For the other exception handlers check ``service/decorators.py``
"""
Expand Down
19 changes: 19 additions & 0 deletions renku/ui/service/errors.py
Expand Up @@ -365,6 +365,25 @@ def __init__(self, exception):
)


class UserUploadTooLargeError(ServiceError):
"""The user tried to upload a file that is too large.
Maximum upload size can be set with the ``maximumUploadSizeBytes`` chart value or ``MAX_CONTENT_LENGTH``
environment value.
"""

code = SVC_ERROR_USER + 150
userMessage = "The file you are trying to upload is too large. Maximum allowed size is: {maximum_size}"
devMessage = "Uploaded file size was larger than ``MAX_CONTENT_LENGTH``."

def __init__(self, exception, maximum_size: str):
super().__init__(
userMessage=self.userMessage.format(maximum_size=maximum_size),
devMessage=self.devMessage.format(maximum_size=maximum_size),
exception=exception,
)


class ProgramInvalidGenericFieldsError(ServiceError):
"""One or more fields are unexpected.
Expand Down

0 comments on commit 11125d6

Please sign in to comment.