Skip to content

Commit

Permalink
disable hdfs endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
dchhabda committed Feb 29, 2024
1 parent 3e210be commit 3985234
Showing 1 changed file with 60 additions and 58 deletions.
118 changes: 60 additions & 58 deletions pybossa/view/fileproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from pybossa.contributions_guard import ContributionsGuard
from pybossa.core import task_repo, signer
from pybossa.encryption import AESWithGCM
from pybossa.pybhdfs.client import HDFSKerberos
# from pybossa.pybhdfs.client import HDFSKerberos
from pybossa.sched import has_lock
from pybossa.cloud_store_api.s3 import get_content_and_key_from_s3

Expand All @@ -50,18 +50,6 @@ def decorated(*args, **kwargs):
return decorated


def is_valid_hdfs_url(attempt_path, attempt_args):
def is_valid_url(v):
if not isinstance(v, six.string_types):
return False
parsed = urlparse(v)
parsed_args = parse_qs(parsed.query)
return (parsed.path == attempt_path
and parsed_args.get('offset') == attempt_args.get('offset')
and parsed_args.get('length') == attempt_args.get('length'))
return is_valid_url


def check_allowed(user_id, task_id, project, is_valid_url):
task = task_repo.get_task(task_id)

Expand Down Expand Up @@ -197,55 +185,69 @@ def encrypt_task_response_data(task_id, project_id, data):
return content


# def is_valid_hdfs_url(attempt_path, attempt_args):
# def is_valid_url(v):
# if not isinstance(v, six.string_types):
# return False
# parsed = urlparse(v)
# parsed_args = parse_qs(parsed.query)
# return (parsed.path == attempt_path
# and parsed_args.get('offset') == attempt_args.get('offset')
# and parsed_args.get('length') == attempt_args.get('length'))
# return is_valid_url



@blueprint.route('/hdfs/<string:cluster>/<int:project_id>/<path:path>')
@no_cache
@login_required
def hdfs_file(project_id, cluster, path):
if not current_app.config.get('HDFS_CONFIG'):
raise NotFound('Not Found')
signature = request.args.get('task-signature')
if not signature:
raise Forbidden('No signature')
size_signature = len(signature)
if size_signature > TASK_SIGNATURE_MAX_SIZE:
current_app.logger.exception(
'Project id {}, cluster {} path {} invalid task signature. Signature length {} exceeds max allowed length {}.' \
.format(project_id, cluster, path, size_signature, TASK_SIGNATURE_MAX_SIZE))
raise Forbidden('Invalid signature')

project = get_project_data(project_id)
timeout = project['info'].get('timeout', ContributionsGuard.STAMP_TTL)
payload = signer.loads(signature, max_age=timeout)
task_id = payload['task_id']

try:
check_allowed(current_user.id, task_id, project,
is_valid_hdfs_url(request.path, request.args.to_dict(flat=False)))
except Exception:
current_app.logger.exception('Project id %s not allowed to get file %s %s', project_id, path,
str(request.args))
raise

current_app.logger.info("Project id %s, task id %s. Accessing hdfs cluster %s, path %s", project_id, task_id, cluster, path)
client = HDFSKerberos(**current_app.config['HDFS_CONFIG'][cluster])
offset = request.args.get('offset')
length = request.args.get('length')

try:
offset = int(offset) if offset else None
length = int(length) if length else None
content = client.get('/{}'.format(path), offset=offset, length=length)
project_encryption = get_project_encryption(project)
if project_encryption and all(project_encryption.values()):
secret = get_secret_from_vault(project_encryption)
cipher = AESWithGCM(secret)
content = cipher.decrypt(content)
except Exception:
current_app.logger.exception("Project id %s, task id %s, cluster %s, get task file %s, %s",
project_id, task_id, cluster, path, str(request.args))
raise InternalServerError('An Error Occurred')

return Response(content)
raise BadRequest("Invalid task. HDFS is not supported")
# if not current_app.config.get('HDFS_CONFIG'):
# raise NotFound('Not Found')
# signature = request.args.get('task-signature')
# if not signature:
# raise Forbidden('No signature')
# size_signature = len(signature)
# if size_signature > TASK_SIGNATURE_MAX_SIZE:
# current_app.logger.exception(
# 'Project id {}, cluster {} path {} invalid task signature. Signature length {} exceeds max allowed length {}.' \
# .format(project_id, cluster, path, size_signature, TASK_SIGNATURE_MAX_SIZE))
# raise Forbidden('Invalid signature')

# project = get_project_data(project_id)
# timeout = project['info'].get('timeout', ContributionsGuard.STAMP_TTL)
# payload = signer.loads(signature, max_age=timeout)
# task_id = payload['task_id']

# try:
# check_allowed(current_user.id, task_id, project,
# is_valid_hdfs_url(request.path, request.args.to_dict(flat=False)))
# except Exception:
# current_app.logger.exception('Project id %s not allowed to get file %s %s', project_id, path,
# str(request.args))
# raise

# current_app.logger.info("Project id %s, task id %s. Accessing hdfs cluster %s, path %s", project_id, task_id, cluster, path)
# client = HDFSKerberos(**current_app.config['HDFS_CONFIG'][cluster])
# offset = request.args.get('offset')
# length = request.args.get('length')

# try:
# offset = int(offset) if offset else None
# length = int(length) if length else None
# content = client.get('/{}'.format(path), offset=offset, length=length)
# project_encryption = get_project_encryption(project)
# if project_encryption and all(project_encryption.values()):
# secret = get_secret_from_vault(project_encryption)
# cipher = AESWithGCM(secret)
# content = cipher.decrypt(content)
# except Exception:
# current_app.logger.exception("Project id %s, task id %s, cluster %s, get task file %s, %s",
# project_id, task_id, cluster, path, str(request.args))
# raise InternalServerError('An Error Occurred')

# return Response(content)


def validate_task(project, task_id, user_id):
Expand Down

0 comments on commit 3985234

Please sign in to comment.