Skip to content

Commit

Permalink
feat(service): support chunked file uploads (#2892)
Browse files Browse the repository at this point in the history
  • Loading branch information
Panaetius committed Jun 29, 2022
1 parent 73866f2 commit 610e88a
Show file tree
Hide file tree
Showing 25 changed files with 719 additions and 33 deletions.
10 changes: 10 additions & 0 deletions helm-chart/renku-core/templates/configmap.yaml
Expand Up @@ -7,6 +7,8 @@ data:
{{- range $version := .Values.versions }}
upstream {{ $version.name }} {
server {{ include "renku-core.fullname" $ }}-{{ $version.name }};
keepalive 32;
keepalive_timeout 60s;
}
{{ end }}
Expand All @@ -30,12 +32,20 @@ data:
rewrite /renku/{{ $version.prefix }}/(.*) /renku/$1 break;
proxy_set_header Host $host;
proxy_pass http://{{ $version.name }};
proxy_send_timeout {{ $.Values.requestTimeout }}s;
proxy_read_timeout {{ $.Values.requestTimeout }}s;
proxy_http_version 1.1;
proxy_set_header "Connection" "";
}
{{- end }}
location /renku {
proxy_set_header Host $host;
proxy_pass http://{{ .Values.versions.latest.name }};
proxy_send_timeout {{ $.Values.requestTimeout }}s;
proxy_read_timeout {{ $.Values.requestTimeout }}s;
proxy_http_version 1.1;
proxy_set_header "Connection" "";
}
}
version.json: |
Expand Down
4 changes: 4 additions & 0 deletions helm-chart/renku-core/templates/deployment.yaml
Expand Up @@ -104,6 +104,10 @@ spec:
value: {{ $.Values.projectCloneDepth | quote }}
- name: TEMPLATE_CLONE_DEPTH_DEFAULT
value: {{ $.Values.templateCloneDepth | quote }}
- name: MAX_CONTENT_LENGTH
value: {{ $.Values.maximumUploadSizeBytes | quote }}
- name: REQUEST_TIMEOUT
value: {{ $.Values.requestTimeout | quote }}
- name: CORE_SERVICE_PREFIX
value: /renku
- name: CORE_SERVICE_API_BASE_PATH
Expand Down
10 changes: 10 additions & 0 deletions helm-chart/renku-core/values.schema.json
Expand Up @@ -26,6 +26,16 @@
"type": "integer",
"minimum": 1
},
"maximumUploadSizeBytes": {
"description": "Maximum allowed file upload size.",
"type": "string",
"minimum": 1,
"pattern": "^\\d+"
},
"requestTimeout": {
"description": "Time before requests time out.",
"type": "integer"
},
"datasetsWorkerQueues": {
"description": "Name of the worker queue for dataset jobs",
"type": "string"
Expand Down
2 changes: 2 additions & 0 deletions helm-chart/renku-core/values.yaml
Expand Up @@ -39,6 +39,8 @@ cacheDirectory: /svc/cache
cleanupInterval: 60 # NOTE: This needs to be a divisor of, and less than cleanupFilesTTL|cleanupProjectsTTL.
projectCloneDepth: 1
templateCloneDepth: 1
maximumUploadSizeBytes: "1073741824" # 1 Gigabyte, store as string to keep Helm from converting it to scientific notation
requestTimeout: 600

datasetsWorkerQueues: datasets.jobs,delayed.ctrl.DatasetsCreateCtrl,delayed.ctrl.DatasetsAddFileCtrl,delayed.ctrl.DatasetsRemoveCtrl,delayed.ctrl.DatasetsImportCtrl,delayed.ctrl.DatasetsEditCtrl,delayed.ctrl.DatasetsUnlinkCtrl
managementWorkerQueues: cache.cleanup.files,cache.cleanup.projects,delayed.ctrl.MigrateProjectCtrl,delayed.ctrl.SetConfigCtrl
Expand Down
85 changes: 81 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Expand Up @@ -70,7 +70,7 @@ deepdiff = "^5.8.0"
deepmerge = "==1.0.1"
docker = "<6,>=3.7.2"
environ-config = ">=18.2.0,<22.2.0"
fakeredis = { version = ">=1.4.1,<1.7.2", optional = true }
fakeredis = { version = ">=1.4.1,<1.7.2", extras = ["lua"], optional = true }
filelock = ">=3.3.0,<3.6.1"
flake8 = { version = "<4.0,>=3.8", optional = true } #wait for https://github.com/flakehell/flakehell/pull/23 to be merged before bumping
flakehell = { version = ">=0.9.0,<1.0.*", optional = true }
Expand Down
4 changes: 3 additions & 1 deletion renku/ui/cli/service.py
Expand Up @@ -44,6 +44,8 @@ def run_api(addr="0.0.0.0", port=8080, timeout=600):
svc_num_workers = os.getenv("RENKU_SVC_NUM_WORKERS", "1")
svc_num_threads = os.getenv("RENKU_SVC_NUM_THREADS", "2")

svc_timeout = int(os.getenv("REQUEST_TIMEOUT", timeout))

loading_opt = "--preload"

sys.argv = [
Expand All @@ -53,7 +55,7 @@ def run_api(addr="0.0.0.0", port=8080, timeout=600):
"-b",
f"{addr}:{port}",
"--timeout",
f"{timeout}",
f"{svc_timeout}",
"--workers",
svc_num_workers,
"--worker-class",
Expand Down
34 changes: 32 additions & 2 deletions renku/ui/service/cache/files.py
Expand Up @@ -17,15 +17,16 @@
# limitations under the License.
"""Renku service files cache management."""
from renku.ui.service.cache.base import BaseCache
from renku.ui.service.cache.models.file import File
from renku.ui.service.cache.models.file import File, FileChunk
from renku.ui.service.cache.models.user import User
from renku.ui.service.cache.serializers.file import FileSchema
from renku.ui.service.cache.serializers.file import FileChunkSchema, FileSchema


class FileManagementCache(BaseCache):
"""File management cache."""

file_schema = FileSchema()
chunk_schema = FileChunkSchema()

def set_file(self, user, file_data):
"""Cache file metadata."""
Expand All @@ -40,6 +41,15 @@ def set_files(self, user, files):
"""Cache files metadata."""
return [self.set_file(user, file_) for file_ in files]

def set_file_chunk(self, user, chunk_data):
"""Cache chunk metadata."""
chunk_data.update({"user_id": user.user_id})

chunk_obj = self.chunk_schema.load(chunk_data)
chunk_obj.save()

return chunk_obj

@staticmethod
def get_file(user, file_id):
"""Get user file."""
Expand All @@ -55,6 +65,21 @@ def get_files(user):
"""Get all user cached files."""
return File.query(File.user_id == user.user_id)

@staticmethod
def get_chunks(user, chunked_id=None):
"""Get all user chunks for a file."""
if chunked_id is not None:
return FileChunk.query(FileChunk.user_id == user.user_id and FileChunk.chunked_id == chunked_id)
return FileChunk.query(FileChunk.user_id == user.user_id)

@staticmethod
def invalidate_chunks(user, chunked_id):
"""Remove all user chunks for a file."""
chunks = FileChunk.query(FileChunk.user_id == user.user_id and FileChunk.chunked_id == chunked_id)

for chunk in chunks:
chunk.delete()

@staticmethod
def invalidate_file(user, file_id):
"""Remove users file records."""
Expand All @@ -69,3 +94,8 @@ def user_files(self):
"""Iterate through all cached files."""
for user in User.all():
yield user, self.get_files(user)

def user_chunks(self):
"""Iterate through all cached files."""
for user in User.all():
yield user, self.get_chunks(user)

0 comments on commit 610e88a

Please sign in to comment.