Skip to content

Commit

Permalink
[ozone] Add ofs upload filehandler
Browse files Browse the repository at this point in the history
(cherry picked from commit 05e4855)
  • Loading branch information
agl29 authored and Harshg999 committed Mar 10, 2023
1 parent 045903b commit ac4d32d
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 0 deletions.
123 changes: 123 additions & 0 deletions desktop/core/src/desktop/lib/fs/ozone/upload.py
@@ -0,0 +1,123 @@
# Licensed to Cloudera, Inc. under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. Cloudera, Inc. licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import sys

from django.core.files.uploadedfile import SimpleUploadedFile
from django.core.files.uploadhandler import FileUploadHandler, StopFutureHandlers, StopUpload, UploadFileException

from desktop.lib.fsmanager import get_client
from hadoop.conf import UPLOAD_CHUNK_SIZE
from hadoop.fs.exceptions import WebHdfsException

if sys.version_info[0] > 2:
from django.utils.translation import gettext as _
else:
from django.utils.translation import ugettext as _


LOG = logging.getLogger(__name__)


class OFSFileUploadError(UploadFileException):
pass


class OFSFileUploadHandler(FileUploadHandler):
"""
This handler is triggered by any upload field whose destination path starts with "OFS" (case insensitive).
Streams data chunk directly to OFS.
"""
def __init__(self, request):
super(OFSFileUploadHandler, self).__init__(request)
self.chunk_size = UPLOAD_CHUNK_SIZE.get()
self.destination = request.GET.get('dest', None) # GET param avoids infinite looping
self.target_path = None
self.file = None
self._request = request
self._part_size = UPLOAD_CHUNK_SIZE.get()

if self._is_ofs_upload():
self._fs = self._get_ofs(request)
# Verify that the path exists
self._fs.stats(self.destination)

LOG.debug("Chunk size = %d" % UPLOAD_CHUNK_SIZE.get())


def new_file(self, field_name, file_name, *args, **kwargs):
if self._is_ofs_upload():
super(OFSFileUploadHandler, self).new_file(field_name, file_name, *args, **kwargs)

LOG.info('Using OFSFileUploadHandler to handle file upload.')
self.target_path = self._fs.join(self.destination, file_name)

try:
# Check access permissions before attempting upload
# self._check_access() # Not implemented
LOG.debug("Initiating OFS upload to target path: %s" % self.target_path)
self.file = SimpleUploadedFile(name=file_name, content='')
raise StopFutureHandlers()
except (OFSFileUploadError, WebHdfsException) as e:
LOG.error("Encountered error in OFSUploadHandler check_access: %s" % e)
self.request.META['upload_failed'] = e
raise StopUpload()


def receive_data_chunk(self, raw_data, start):
if self._is_ofs_upload():
LOG.debug("OFSfileUploadHandler receive_data_chunk")
try:
LOG.debug("OFSFileUploadHandler uploading file part with size: %s" % self._part_size)
self._fs.create(self.target_path, data=raw_data)
return None
except Exception as e:
LOG.exception('Failed to upload file to ozone at %s: %s' % (self.target_path, e))
raise StopUpload()
else:
return raw_data


def file_complete(self, file_size):
if self._is_ofs_upload():
# Finish the upload
LOG.info("OFSFileUploadHandler has completed file upload to OFS, total file size is: %d." % file_size)
self.file.size = file_size
LOG.debug("%s" % self._fs.stats(self.target_path))
return self.file
else:
return None

def _get_ofs(self, request):
fs = get_client(fs='ofs', user=request.user.username)
if not fs:
raise OFSFileUploadError(_("No OFS filesystem found"))
return fs

def _is_ofs_upload(self):
return self._get_scheme() and self._get_scheme().startswith('OFS')

def _get_scheme(self):
if self.destination:
dst_parts = self.destination.split('://')
if dst_parts:
return dst_parts[0].upper()
else:
raise WebHdfsException('Destination does not start with a valid scheme.')
else:
return None
4 changes: 4 additions & 0 deletions desktop/core/src/desktop/settings.py
Expand Up @@ -37,6 +37,7 @@

from aws.conf import is_enabled as is_s3_enabled
from azure.conf import is_abfs_enabled
from desktop.conf import is_ofs_enabled

if sys.version_info[0] > 2:
from django.utils.translation import gettext_lazy as _
Expand Down Expand Up @@ -665,6 +666,9 @@ def is_oidc_configured():
if is_abfs_enabled():
file_upload_handlers.insert(0, 'azure.abfs.upload.ABFSFileUploadHandler')

if is_ofs_enabled():
file_upload_handlers.insert(0, 'desktop.lib.fs.ozone.upload.OFSFileUploadHandler')


FILE_UPLOAD_HANDLERS = tuple(file_upload_handlers)

Expand Down
2 changes: 2 additions & 0 deletions desktop/core/src/desktop/templates/global_js_constants.mako
Expand Up @@ -26,6 +26,7 @@
from beeswax.conf import DOWNLOAD_BYTES_LIMIT, DOWNLOAD_ROW_LIMIT, LIST_PARTITIONS_LIMIT, CLOSE_SESSIONS
from dashboard.conf import HAS_SQL_ENABLED
from hadoop.conf import UPLOAD_CHUNK_SIZE
from jobbrowser.conf import ENABLE_HISTORY_V2
from filebrowser.conf import SHOW_UPLOAD_BUTTON, REMOTE_STORAGE_HOME
from indexer.conf import ENABLE_NEW_INDEXER
Expand Down Expand Up @@ -135,6 +136,7 @@

window.SHOW_NOTEBOOKS = '${ SHOW_NOTEBOOKS.get() }' === 'True'
window.SHOW_UPLOAD_BUTTON = '${ hasattr(SHOW_UPLOAD_BUTTON, 'get') and SHOW_UPLOAD_BUTTON.get() }' === 'True'
window.UPLOAD_CHUNK_SIZE = '${ UPLOAD_CHUNK_SIZE.get() }';

window.IS_MULTICLUSTER_ONLY = '${ IS_MULTICLUSTER_ONLY.get() }' === 'True';
window.IS_K8S_ONLY = '${ IS_K8S_ONLY.get() }' === 'True';
Expand Down

0 comments on commit ac4d32d

Please sign in to comment.