Skip to content

Commit

Permalink
[ozone] Improve fs_defaultfs check and uploadhandler
Browse files Browse the repository at this point in the history
- Check for fs_defaultFS null scenarios and raise error.
- Use urlparse for better code readability.
-Improve comments.
  • Loading branch information
Harshg999 committed Mar 9, 2023
1 parent 4588703 commit 9393ae7
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 22 deletions.
8 changes: 4 additions & 4 deletions desktop/conf.dist/hue.ini
Expand Up @@ -932,10 +932,10 @@ tls=no
# NameNode logical name.
## logical_name=

# Use WebHdfs/HttpFS as the communication mechanism.
# Domain should be the NameNode or HttpFS host.
# Default port is 14000 for HttpFS.
## webhdfs_url=http://localhost:50070/webhdfs/v1
# Use HttpFS as the communication mechanism.
# Domain should be the HttpFS host and port.
# Default port is 9778 for HttpFS.
## webhdfs_url=http://localhost:9778/webhdfs/v1

# Whether Ozone requires client to perform Kerberos authentication.
## security_enabled=false
Expand Down
8 changes: 4 additions & 4 deletions desktop/conf/pseudo-distributed.ini.tmpl
Expand Up @@ -915,10 +915,10 @@
# NameNode logical name.
## logical_name=

# Use WebHdfs/HttpFS as the communication mechanism.
# Domain should be the NameNode or HttpFS host.
# Default port is 14000 for HttpFS.
## webhdfs_url=http://localhost:50070/webhdfs/v1
# Use HttpFS as the communication mechanism.
# Domain should be the HttpFS host and port.
# Default port is 9778 for HttpFS.
## webhdfs_url=http://localhost:9778/webhdfs/v1

# Whether Ozone requires client to perform Kerberos authentication.
## security_enabled=false
Expand Down
22 changes: 14 additions & 8 deletions desktop/core/src/desktop/lib/fs/ozone/upload.py
Expand Up @@ -25,8 +25,10 @@
from hadoop.fs.exceptions import WebHdfsException

if sys.version_info[0] > 2:
from urllib.parse import urlparse as lib_urlparse
from django.utils.translation import gettext as _
else:
from urlparse import urlparse as lib_urlparse
from django.utils.translation import ugettext as _


Expand Down Expand Up @@ -54,8 +56,12 @@ def __init__(self, request):

if self._is_ofs_upload():
self._fs = self._get_ofs(request)
# Verify that the path exists
self._fs.stats(self.destination)

# Verify that the path exists
try:
self._fs.stats(self.destination)
except Exception as e:
raise OFSFileUploadError(_('Destination path does not exist: %s' % self.destination))

LOG.debug("Chunk size = %d" % UPLOAD_CHUNK_SIZE.get())

Expand Down Expand Up @@ -106,18 +112,18 @@ def file_complete(self, file_size):
def _get_ofs(self, request):
fs = get_client(fs='ofs', user=request.user.username)
if not fs:
raise OFSFileUploadError(_("No OFS filesystem found"))
raise OFSFileUploadError(_("No OFS filesystem found."))
return fs

def _is_ofs_upload(self):
return self._get_scheme() and self._get_scheme().startswith('OFS')
return self._get_scheme() and self._get_scheme().startswith('ofs')

def _get_scheme(self):
if self.destination:
dst_parts = self.destination.split('://')
if dst_parts:
return dst_parts[0].upper()
dst_parse = lib_urlparse(self.destination)
if dst_parse.scheme:
return dst_parse.scheme.lower()
else:
raise WebHdfsException('Destination does not start with a valid scheme.')
raise OFSFileUploadError('Destination does not start with a valid scheme.')
else:
return None
18 changes: 12 additions & 6 deletions desktop/libs/indexer/src/indexer/indexers/sql.py
Expand Up @@ -84,7 +84,6 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco
comment = destination['description']

source_path = source['path']
ozone_service_id = OZONE.get()['default'].FS_DEFAULTFS.get()[6:]
load_data = destination['importData']
isIceberg = destination['isIceberg']

Expand Down Expand Up @@ -184,11 +183,18 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco

if external_path.lower().startswith("abfs"): #this is to check if its using an ABFS path
external_path = abfspath(external_path)
elif external_path.lower().startswith("ofs") or source_path.lower().startswith("ofs"): #this is to check if its using an OFS path
# as of now ozone has 3 managers and impala/hive don't know on which manager
# they need to ping hence we are sending the ozone_service_id with path
external_path = external_path[:6] + ozone_service_id + external_path[5:]
source_path = source_path[:6] + ozone_service_id + source_path[5:]
elif external_path.lower().startswith("ofs") or source_path.lower().startswith("ofs"): # This is to check if its using an OFS path
if OZONE['default'].FS_DEFAULTFS.get():
fs_defaultfs_schemeless = OZONE['default'].FS_DEFAULTFS.get()[6:]

# Add fs_defaultfs netloc in the OFS path for Hive/Impala.
# fs_defaultfs can be Ozone service ID if Ozone is in HA or Ozone Manager URI in non-HA mode.
# E.g: ofs:// + fs_defaultfs_schemeless + /vol1/buk1/key

external_path = external_path[:6] + fs_defaultfs_schemeless + external_path[5:]
source_path = source_path[:6] + fs_defaultfs_schemeless + source_path[5:]
else:
raise PopupException('Ozone fs_defaultFS is not configured.')

tbl_properties = OrderedDict()
if skip_header:
Expand Down

0 comments on commit 9393ae7

Please sign in to comment.