Skip to content

Commit

Permalink
cleanup: __fetch_adb_docs
Browse files Browse the repository at this point in the history
  • Loading branch information
aMahanna committed Jan 19, 2024
1 parent ddbd331 commit 704cbba
Showing 1 changed file with 12 additions and 15 deletions.
27 changes: 12 additions & 15 deletions adbpyg_adapter/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ def __process_adb_v_col(

# 1. Fetch ArangoDB vertices
v_col_cursor, v_col_size = self.__fetch_adb_docs(
v_col, meta, **adb_export_kwargs
v_col, False, meta, **adb_export_kwargs
)

# 2. Process ArangoDB vertices
Expand Down Expand Up @@ -782,7 +782,7 @@ def __process_adb_e_col(

# 1. Fetch ArangoDB edges
e_col_cursor, e_col_size = self.__fetch_adb_docs(
e_col, meta, **adb_export_kwargs
e_col, True, meta, **adb_export_kwargs
)

# 2. Process ArangoDB edges
Expand All @@ -805,6 +805,7 @@ def __process_adb_e_col(
def __fetch_adb_docs(
self,
col: str,
is_edge: bool,
meta: Union[Set[str], Dict[str, ADBMetagraphValues]],
**adb_export_kwargs: Any,
) -> Tuple[Cursor, int]:
Expand All @@ -813,6 +814,8 @@ def __fetch_adb_docs(
:param col: The ArangoDB collection.
:type col: str
:param is_edge: True if **col** is an edge collection.
:type is_edge: bool
:param meta: The MetaGraph associated to **col**
:type meta: Set[str] | Dict[str, adbpyg_adapter.typings.ADBMetagraphValues]
:param adb_export_kwargs: Keyword arguments to specify AQL query options
Expand All @@ -822,34 +825,28 @@ def __fetch_adb_docs(
:rtype: pandas.DataFrame
"""

def get_aql_return_value(
meta: Union[Set[str], Dict[str, ADBMetagraphValues]]
) -> str:
def get_aql_return_value() -> str:
"""Helper method to formulate the AQL `RETURN` value based on
the document attributes specified in **meta**
"""
attributes = []
attributes = ["_key"]
attributes += ["_from", "_to"] if is_edge else []

if type(meta) is set:
attributes = list(meta)
attributes += list(meta)

elif type(meta) is dict:
for value in meta.values():
if type(value) is str:
attributes.append(value)
elif type(value) is dict:
attributes.extend(list(value.keys()))
attributes += list(value.keys())
elif callable(value):
# Cannot determine which attributes to extract if UDFs are used
# Therefore we just return the entire document
return "doc"

return f"""
MERGE(
{{ _key: doc._key, _from: doc._from, _to: doc._to }},
KEEP(doc, {list(attributes)})
)
"""
return f"KEEP(doc, {attributes})"

col_size: int = self.__db.collection(col).count()
TracingManager.set_attributes(col=col, col_size=col_size, meta=meta)
Expand All @@ -858,7 +855,7 @@ def get_aql_return_value(
p.add_task(col)

cursor: Cursor = self.__db.aql.execute(
f"FOR doc IN @@col RETURN {get_aql_return_value(meta)}",
f"FOR doc IN @@col RETURN {get_aql_return_value()}",
bind_vars={"@col": col},
**{**adb_export_kwargs, "stream": True},
)
Expand Down

0 comments on commit 704cbba

Please sign in to comment.