Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
f4f084b
Added first draft of potential interface for Neptune
bechbd Dec 22, 2021
5f3a307
Added first draft of potential interface for Neptune
bechbd Dec 22, 2021
6db725e
Fixed __init__.py file with the correct functions for Neptune
bechbd Dec 23, 2021
2a21a5e
[skip ci] Updated signatures per initial feedback from draft PR
bechbd Jan 12, 2022
b3c4e45
Merge branch 'main' into ISSUE-1084
jaidisido Jan 15, 2022
178130f
Merge branch 'main' into ISSUE-1084
jaidisido Jan 17, 2022
41334b6
[skip ci] WIP - Initial version of oc and gremlin endpoint read queri…
bechbd Jan 20, 2022
355b8be
[skip ci] Initial working version of the basic read functionality ass…
bechbd Jan 26, 2022
ce3b697
[skip ci] Fixed tests that were not running correctly due to typo
bechbd Jan 26, 2022
2c84615
WIP on writing data
bechbd Feb 2, 2022
494e330
[skip ci] Have a working version of the complete roundtrip for proepr…
bechbd Feb 4, 2022
b4531c2
[skip ci] Refactored code to simplify the locations into a utils clas…
bechbd Feb 5, 2022
9df814c
[skip ci] Added SPARQL write functionality as well as added neptune t…
bechbd Feb 10, 2022
d768d85
[skip ci] Readded nested asyncio in order for it to work in Jupyter a…
bechbd Feb 10, 2022
4257546
Working version of all MVP features as well as changes to make valida…
bechbd Feb 12, 2022
31df062
Added GremlinParser to the init file so that it can be unit tested.
bechbd Feb 28, 2022
d61a4c9
Added better error handling on query exceptions for all languages
bechbd Mar 4, 2022
78d04ab
Fixed validation error
bechbd Mar 4, 2022
d654903
Added method to flatten dataframes to Neptune module
bechbd Mar 9, 2022
9e486d1
Fixed issues related to flattening DF
bechbd Mar 9, 2022
d725c74
Merge branch 'main' into ISSUE-1084
jaidisido Mar 11, 2022
c526960
Fix static checks issues
jaidisido Mar 11, 2022
0ae781b
Fix pydocstyle
jaidisido Mar 11, 2022
00a68e9
Added functionality to properly set the edge id values as well as add…
bechbd Mar 14, 2022
257c1e5
Merge branch 'ISSUE-1084' of https://github.com/bechbd/aws-data-wrang…
bechbd Mar 14, 2022
201de73
Validate fixes and security group in test_infra
jaidisido Mar 15, 2022
4aabf84
Minor - typing and docs
jaidisido Mar 15, 2022
52ce870
[skip ci] - Minor - Add Neptune docs entry
jaidisido Mar 15, 2022
6f77ada
Merge branch 'main' into ISSUE-1084
jaidisido Mar 15, 2022
f06c01a
Use built-in gremlin driver functionality to enable event loop nesting
kukushking Mar 15, 2022
6ba9ccd
Use built-in gremlin driver functionality to enable event loop nesting
kukushking Mar 15, 2022
6bf0450
Updated the connection handling for Gremlin and added a tutorial note…
bechbd Mar 18, 2022
71d273f
Merge branch 'ISSUE-1084' of https://github.com/bechbd/aws-data-wrang…
bechbd Mar 18, 2022
462bc0b
Fixed validation issues
bechbd Mar 18, 2022
8680416
Minor - Remove nest_asyncio and fix tutorial
jaidisido Mar 21, 2022
de1acf6
Upgrading dependencies
jaidisido Mar 21, 2022
52dd72d
Merge branch 'main' into ISSUE-1084
jaidisido Mar 21, 2022
f48830d
Fixed incomplete error message and updated tutorial to remove cluster…
bechbd Mar 21, 2022
49f5fdc
Merge branch 'ISSUE-1084' of https://github.com/bechbd/aws-data-wrang…
bechbd Mar 21, 2022
6ac2c6a
Minor - validate
jaidisido Mar 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions awswrangler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
exceptions,
lakeformation,
mysql,
neptune,
opensearch,
postgresql,
quicksight,
Expand Down Expand Up @@ -47,6 +48,7 @@
"redshift",
"lakeformation",
"mysql",
"neptune",
"postgresql",
"secretsmanager",
"sqlserver",
Expand Down
22 changes: 22 additions & 0 deletions awswrangler/neptune/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Utilities Module for Amazon Neptune."""
from awswrangler.neptune.gremlin_parser import GremlinParser
from awswrangler.neptune.neptune import (
connect,
execute_gremlin,
execute_opencypher,
execute_sparql,
flatten_nested_df,
to_property_graph,
to_rdf_graph,
)

__all__ = [
"execute_gremlin",
"execute_opencypher",
"execute_sparql",
"to_property_graph",
"to_rdf_graph",
"connect",
"GremlinParser",
"flatten_nested_df",
]
118 changes: 118 additions & 0 deletions awswrangler/neptune/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Amazon Neptune Utils Module (PRIVATE)."""

import logging
from enum import Enum
from typing import Any

import pandas as pd
from gremlin_python.process.graph_traversal import GraphTraversalSource, __
from gremlin_python.process.translator import Translator
from gremlin_python.process.traversal import Cardinality, T
from gremlin_python.structure.graph import Graph

from awswrangler import exceptions
from awswrangler.neptune.client import NeptuneClient

_logger: logging.Logger = logging.getLogger(__name__)


class WriteDFType(Enum):
"""Dataframe type enum."""

VERTEX = 1
EDGE = 2
UPDATE = 3


def write_gremlin_df(client: NeptuneClient, df: pd.DataFrame, mode: WriteDFType, batch_size: int) -> bool:
"""Write the provided dataframe using Gremlin.

Parameters
----------
client : NeptuneClient
The Neptune client to write the dataframe
df : pd.DataFrame
The dataframe to write
mode : WriteDFType
The type of dataframe to write
batch_size : int
The size of the batch to write

Returns
-------
bool
True if the write operation succeeded
"""
g = Graph().traversal()
# Loop through items in the DF
for (index, row) in df.iterrows():
# build up a query
if mode == WriteDFType.EDGE:
g = _build_gremlin_edges(g, row.to_dict())
elif mode == WriteDFType.VERTEX:
g = _build_gremlin_vertices(g, row.to_dict())
else:
g = _build_gremlin_update(g, row.to_dict())
# run the query
if index > 0 and index % batch_size == 0:
res = _run_gremlin_insert(client, g)
if res:
g = Graph().traversal()
else:
_logger.debug(res)
raise exceptions.QueryFailed(
"""Failed to insert part or all of the data in the DataFrame, please check the log output."""
)

return _run_gremlin_insert(client, g)


def _run_gremlin_insert(client: NeptuneClient, g: GraphTraversalSource) -> bool:
translator = Translator("g")
s = translator.translate(g.bytecode)
s = s.replace("Cardinality.", "") # hack to fix parser error for set cardinality
_logger.debug(s)
res = client.write_gremlin(s)
return res


def _build_gremlin_update(g: GraphTraversalSource, row: Any) -> GraphTraversalSource:
g = g.V(str(row["~id"]))
g = _build_gremlin_properties(g, row)

return g


def _build_gremlin_vertices(g: GraphTraversalSource, row: Any) -> GraphTraversalSource:
g = g.V(str(row["~id"])).fold().coalesce(__.unfold(), __.addV(row["~label"]).property(T.id, str(row["~id"])))
g = _build_gremlin_properties(g, row)

return g


def _build_gremlin_edges(g: GraphTraversalSource, row: pd.Series) -> GraphTraversalSource:
g = (
g.V(str(row["~from"]))
.fold()
.coalesce(__.unfold(), _build_gremlin_vertices(__, {"~id": row["~from"], "~label": "Vertex"}))
.addE(row["~label"])
.to(
__.V(str(row["~to"]))
.fold()
.coalesce(__.unfold(), _build_gremlin_vertices(__, {"~id": row["~to"], "~label": "Vertex"}))
)
)
g = _build_gremlin_properties(g, row)

return g


def _build_gremlin_properties(g: GraphTraversalSource, row: Any) -> GraphTraversalSource:
for (column, value) in row.items():
if column not in ["~id", "~label", "~to", "~from"]:
if isinstance(value, list) and len(value) > 0:
for item in value:
g = g.property(Cardinality.set_, column, item)
elif not pd.isna(value) and not pd.isnull(value):
g = g.property(column, value)
return g
Loading