Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(service): support chunked file uploads #2892

Merged
merged 14 commits into from Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions helm-chart/renku-core/templates/configmap.yaml
Expand Up @@ -7,6 +7,8 @@ data:
{{- range $version := .Values.versions }}
upstream {{ $version.name }} {
server {{ include "renku-core.fullname" $ }}-{{ $version.name }};
keepalive 32;
keepalive_timeout 60s;
}
{{ end }}

Expand All @@ -30,12 +32,20 @@ data:
rewrite /renku/{{ $version.prefix }}/(.*) /renku/$1 break;
proxy_set_header Host $host;
proxy_pass http://{{ $version.name }};
proxy_send_timeout {{ $.Values.requestTimeout }}s;
proxy_read_timeout {{ $.Values.requestTimeout }}s;
proxy_http_version 1.1;
proxy_set_header "Connection" "";
}
{{- end }}

location /renku {
proxy_set_header Host $host;
proxy_pass http://{{ .Values.versions.latest.name }};
proxy_send_timeout {{ $.Values.requestTimeout }}s;
proxy_read_timeout {{ $.Values.requestTimeout }}s;
proxy_http_version 1.1;
proxy_set_header "Connection" "";
}
}
version.json: |
Expand Down
4 changes: 4 additions & 0 deletions helm-chart/renku-core/templates/deployment.yaml
Expand Up @@ -104,6 +104,10 @@ spec:
value: {{ $.Values.projectCloneDepth | quote }}
- name: TEMPLATE_CLONE_DEPTH_DEFAULT
value: {{ $.Values.templateCloneDepth | quote }}
- name: MAX_CONTENT_LENGTH
value: {{ $.Values.maximumUploadSizeBytes | quote }}
- name: REQUEST_TIMEOUT
value: {{ $.Values.requestTimeout | quote }}
- name: CORE_SERVICE_PREFIX
value: /renku
- name: CORE_SERVICE_API_BASE_PATH
Expand Down
10 changes: 10 additions & 0 deletions helm-chart/renku-core/values.schema.json
Expand Up @@ -26,6 +26,16 @@
"type": "integer",
"minimum": 1
},
"maximumUploadSizeBytes": {
"description": "Maximum allowed file upload size.",
"type": "string",
"minimum": 1,
"pattern": "^\\d+"
},
"requestTimeout": {
"description": "Time before requests time out.",
"type": "integer"
},
"datasetsWorkerQueues": {
"description": "Name of the worker queue for dataset jobs",
"type": "string"
Expand Down
2 changes: 2 additions & 0 deletions helm-chart/renku-core/values.yaml
Expand Up @@ -39,6 +39,8 @@ cacheDirectory: /svc/cache
cleanupInterval: 60 # NOTE: This needs to be a divisor of, and less than cleanupFilesTTL|cleanupProjectsTTL.
projectCloneDepth: 1
templateCloneDepth: 1
maximumUploadSizeBytes: "1073741824" # 1 Gigabyte, store as string to keep Helm from converting it to scientific notation
requestTimeout: 600
Panaetius marked this conversation as resolved.
Show resolved Hide resolved

datasetsWorkerQueues: datasets.jobs,delayed.ctrl.DatasetsCreateCtrl,delayed.ctrl.DatasetsAddFileCtrl,delayed.ctrl.DatasetsRemoveCtrl,delayed.ctrl.DatasetsImportCtrl,delayed.ctrl.DatasetsEditCtrl,delayed.ctrl.DatasetsUnlinkCtrl
managementWorkerQueues: cache.cleanup.files,cache.cleanup.projects,delayed.ctrl.MigrateProjectCtrl,delayed.ctrl.SetConfigCtrl
Expand Down
85 changes: 81 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Expand Up @@ -70,7 +70,7 @@ deepdiff = "^5.8.0"
deepmerge = "==1.0.1"
docker = "<6,>=3.7.2"
environ-config = ">=18.2.0,<22.2.0"
fakeredis = { version = ">=1.4.1,<1.7.2", optional = true }
fakeredis = { version = ">=1.4.1,<1.7.2", extras = ["lua"], optional = true }
filelock = ">=3.3.0,<3.6.1"
flake8 = { version = "<4.0,>=3.8", optional = true } #wait for https://github.com/flakehell/flakehell/pull/23 to be merged before bumping
flakehell = { version = ">=0.9.0,<1.0.*", optional = true }
Expand Down
4 changes: 3 additions & 1 deletion renku/ui/cli/service.py
Expand Up @@ -44,6 +44,8 @@ def run_api(addr="0.0.0.0", port=8080, timeout=600):
svc_num_workers = os.getenv("RENKU_SVC_NUM_WORKERS", "1")
svc_num_threads = os.getenv("RENKU_SVC_NUM_THREADS", "2")

svc_timeout = int(os.getenv("REQUEST_TIMEOUT", timeout))

loading_opt = "--preload"

sys.argv = [
Expand All @@ -53,7 +55,7 @@ def run_api(addr="0.0.0.0", port=8080, timeout=600):
"-b",
f"{addr}:{port}",
"--timeout",
f"{timeout}",
f"{svc_timeout}",
"--workers",
svc_num_workers,
"--worker-class",
Expand Down
34 changes: 32 additions & 2 deletions renku/ui/service/cache/files.py
Expand Up @@ -17,15 +17,16 @@
# limitations under the License.
"""Renku service files cache management."""
from renku.ui.service.cache.base import BaseCache
from renku.ui.service.cache.models.file import File
from renku.ui.service.cache.models.file import File, FileChunk
from renku.ui.service.cache.models.user import User
from renku.ui.service.cache.serializers.file import FileSchema
from renku.ui.service.cache.serializers.file import FileChunkSchema, FileSchema


class FileManagementCache(BaseCache):
"""File management cache."""

file_schema = FileSchema()
chunk_schema = FileChunkSchema()

def set_file(self, user, file_data):
"""Cache file metadata."""
Expand All @@ -40,6 +41,15 @@ def set_files(self, user, files):
"""Cache files metadata."""
return [self.set_file(user, file_) for file_ in files]

def set_file_chunk(self, user, chunk_data):
"""Cache chunk metadata."""
chunk_data.update({"user_id": user.user_id})

chunk_obj = self.chunk_schema.load(chunk_data)
chunk_obj.save()

return chunk_obj

@staticmethod
def get_file(user, file_id):
"""Get user file."""
Expand All @@ -55,6 +65,21 @@ def get_files(user):
"""Get all user cached files."""
return File.query(File.user_id == user.user_id)

@staticmethod
def get_chunks(user, chunked_id=None):
"""Get all user chunks for a file."""
if chunked_id is not None:
return FileChunk.query(FileChunk.user_id == user.user_id and FileChunk.chunked_id == chunked_id)
return FileChunk.query(FileChunk.user_id == user.user_id)

@staticmethod
def invalidate_chunks(user, chunked_id):
"""Remove all user chunks for a file."""
chunks = FileChunk.query(FileChunk.user_id == user.user_id and FileChunk.chunked_id == chunked_id)

for chunk in chunks:
chunk.delete()

@staticmethod
def invalidate_file(user, file_id):
"""Remove users file records."""
Expand All @@ -69,3 +94,8 @@ def user_files(self):
"""Iterate through all cached files."""
for user in User.all():
yield user, self.get_files(user)

def user_chunks(self):
"""Iterate through all cached files."""
for user in User.all():
yield user, self.get_chunks(user)