Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added endpoints entities/<id>/collections #686

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 182 additions & 0 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2095,6 +2095,188 @@ def get_next_revisions(id):

return jsonify(final_result)


"""
Get all collections of the given entity

The gateway treats this endpoint as public accessible

Result filtering based on query string
For example: /entities/<id>/collections?property=uuid

Parameters
----------
id : str
The HuBMAP ID (e.g. HBM123.ABCD.456) or UUID of given entity

Returns
-------
json
A list of all the collections of the target entity
"""
@app.route('/entities/<id>/collections', methods = ['GET'])
def get_collections(id):
final_result = []

# Token is not required, but if an invalid token provided,
# we need to tell the client with a 401 error
validate_token_if_auth_header_exists(request)

# Use the internal token to query the target entity
# since public entities don't require user token
token = get_internal_token()

# Get the entity dict from cache if exists
# Otherwise query against uuid-api and neo4j to get the entity dict if the id exists
entity_dict = query_target_entity(id, token)
normalized_entity_type = entity_dict['entity_type']
uuid = entity_dict['uuid']

if not schema_manager.entity_type_instanceof(normalized_entity_type, 'Dataset'):
bad_request_error(f"Unsupported entity type of id {id}: {normalized_entity_type}")

if entity_dict['status'].lower() != DATASET_STATUS_PUBLISHED:
# Token is required and the user must belong to HuBMAP-READ group
token = get_user_token(request, non_public_access_required = True)

# By now, either the entity is public accessible or the user token has the correct access level
# Result filtering based on query string
if bool(request.args):
property_key = request.args.get('property')

if property_key is not None:
result_filtering_accepted_property_keys = ['uuid']

# Validate the target property
if property_key not in result_filtering_accepted_property_keys:
bad_request_error(f"Only the following property keys are supported in the query string: {COMMA_SEPARATOR.join(result_filtering_accepted_property_keys)}")

# Only return a list of the filtered property value of each entity
property_list = schema_neo4j_queries.get_collections(neo4j_driver_instance, uuid, property_key)

# Final result
final_result = property_list
else:
bad_request_error("The specified query string is not supported. Use '?property=<key>' to filter the result")
# Return all the details if no property filtering
else:
collection_list = schema_neo4j_queries.get_collections(neo4j_driver_instance, uuid)

# Generate trigger data
# Skip some of the properties that are time-consuming to generate via triggers
# Also skip next_revision_uuid and previous_revision_uuid for Dataset to avoid additional
# checks when the target Dataset is public but the revisions are not public
properties_to_skip = [
# Properties to skip for Sample
'direct_ancestor',
# Properties to skip for Dataset
'direct_ancestors',
'collections',
'upload',
'title',
'next_revision_uuid',
'previous_revision_uuid'
]

complete_entities_list = schema_manager.get_complete_entities_list(token, collection_list, properties_to_skip)

# Final result after normalization
final_result = schema_manager.normalize_entities_list_for_response(complete_entities_list)

return jsonify(final_result)


"""
Get all uploads of the given entity

The gateway treats this endpoint as public accessible

Result filtering based on query string
For example: /entities/<id>/uploads?property=uuid

Parameters
----------
id : str
The HuBMAP ID (e.g. HBM123.ABCD.456) or UUID of given entity

Returns
-------
json
A list of all the uploads of the target entity
"""
@app.route('/entities/<id>/uploads', methods = ['GET'])
def get_uploads(id):
final_result = []

# Token is not required, but if an invalid token provided,
# we need to tell the client with a 401 error
validate_token_if_auth_header_exists(request)

# Use the internal token to query the target entity
# since public entities don't require user token
token = get_internal_token()

# Get the entity dict from cache if exists
# Otherwise query against uuid-api and neo4j to get the entity dict if the id exists
entity_dict = query_target_entity(id, token)
normalized_entity_type = entity_dict['entity_type']
uuid = entity_dict['uuid']

if not schema_manager.entity_type_instanceof(normalized_entity_type, 'Dataset'):
bad_request_error(f"Unsupported entity type of id {id}: {normalized_entity_type}")

if entity_dict['status'].lower() != DATASET_STATUS_PUBLISHED:
# Token is required and the user must belong to HuBMAP-READ group
token = get_user_token(request, non_public_access_required = True)

# By now, either the entity is public accessible or the user token has the correct access level
# Result filtering based on query string
if bool(request.args):
property_key = request.args.get('property')

if property_key is not None:
result_filtering_accepted_property_keys = ['uuid']

# Validate the target property
if property_key not in result_filtering_accepted_property_keys:
bad_request_error(f"Only the following property keys are supported in the query string: {COMMA_SEPARATOR.join(result_filtering_accepted_property_keys)}")

# Only return a list of the filtered property value of each entity
property_list = schema_neo4j_queries.get_uploads(neo4j_driver_instance, uuid, property_key)

# Final result
final_result = property_list
else:
bad_request_error("The specified query string is not supported. Use '?property=<key>' to filter the result")
# Return all the details if no property filtering
else:
uploads_list = schema_neo4j_queries.get_uploads(neo4j_driver_instance, uuid)

# Generate trigger data
# Skip some of the properties that are time-consuming to generate via triggers
# Also skip next_revision_uuid and previous_revision_uuid for Dataset to avoid additional
# checks when the target Dataset is public but the revisions are not public
properties_to_skip = [
# Properties to skip for Sample
'direct_ancestor',
# Properties to skip for Dataset
'direct_ancestors',
'collections',
'upload',
'title',
'next_revision_uuid',
'previous_revision_uuid'
]

complete_entities_list = schema_manager.get_complete_entities_list(token, uploads_list, properties_to_skip)

# Final result after normalization
final_result = schema_manager.normalize_entities_list_for_response(complete_entities_list)

return jsonify(final_result)



"""
Redirect a request from a doi service for a dataset or collection

Expand Down
99 changes: 99 additions & 0 deletions src/schema/schema_neo4j_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,105 @@ def get_descendants(neo4j_driver, uuid, property_key = None):
return results


"""
Get all collections by uuid

Parameters
----------
neo4j_driver : neo4j.Driver object
The neo4j database connection pool
uuid : str
The uuid of target entity
property_key : str
A target property key for result filtering

Returns
-------
list
A list of unique collection dictionaries returned from the Cypher query
"""
def get_collections(neo4j_driver, uuid, property_key = None):
results = []

if property_key:
query = (f"MATCH (c:Collection)<-[:IN_COLLECTION]-(ds:Dataset) "
f"WHERE ds.uuid='{uuid}' "
# COLLECT() returns a list
# apoc.coll.toSet() reruns a set containing unique nodes
f"RETURN apoc.coll.toSet(COLLECT(c.{property_key})) AS {record_field_name}")
else:
query = (f"MATCH (c:Collection)<-[:IN_COLLECTION]-(ds:Dataset) "
f"WHERE ds.uuid='{uuid}' "
# COLLECT() returns a list
# apoc.coll.toSet() reruns a set containing unique nodes
f"RETURN apoc.coll.toSet(COLLECT(c)) AS {record_field_name}")

logger.info("======get_collections() query======")
logger.info(query)

with neo4j_driver.session() as session:
record = session.read_transaction(execute_readonly_tx, query)

if record and record[record_field_name]:
if property_key:
# Just return the list of property values from each entity node
results = record[record_field_name]
else:
# Convert the list of nodes to a list of dicts
results = nodes_to_dicts(record[record_field_name])

return results



"""
Get all uploads by uuid

Parameters
----------
neo4j_driver : neo4j.Driver object
The neo4j database connection pool
uuid : str
The uuid of target entity
property_key : str
A target property key for result filtering

Returns
-------
list
A list of unique upload dictionaries returned from the Cypher query
"""
def get_uploads(neo4j_driver, uuid, property_key = None):
results = []
if property_key:
query = (f"MATCH (u:Upload)<-[:IN_UPLOAD]-(ds:Dataset) "
f"WHERE ds.uuid='{uuid}' "
# COLLECT() returns a list
# apoc.coll.toSet() reruns a set containing unique nodes
f"RETURN apoc.coll.toSet(COLLECT(u.{property_key})) AS {record_field_name}")
else:
query = (f"MATCH (u:Upload)<-[:IN_UPLOAD]-(ds:Dataset) "
f"WHERE ds.uuid='{uuid}' "
# COLLECT() returns a list
# apoc.coll.toSet() reruns a set containing unique nodes
f"RETURN apoc.coll.toSet(COLLECT(u)) AS {record_field_name}")

logger.info("======get_uploads() query======")
logger.info(query)

with neo4j_driver.session() as session:
record = session.read_transaction(execute_readonly_tx, query)
if record and record[record_field_name]:
if property_key:
# Just return the list of property values from each entity node
results = record[record_field_name]
else:
# Convert the list of nodes to a list of dicts
results = nodes_to_dicts(record[record_field_name])

return results


"""
Get the direct ancestors uuids of a given dataset by uuid

Expand Down
Loading