diff --git a/awswrangler/neptune/neptune.py b/awswrangler/neptune/neptune.py index b7a6e48fb..790a12c98 100644 --- a/awswrangler/neptune/neptune.py +++ b/awswrangler/neptune/neptune.py @@ -290,7 +290,7 @@ def _set_properties(g: GraphTraversalSource, use_header_cardinality: bool, row: if column not in ["~id", "~label", "~to", "~from"]: # If the column header is specifying the cardinality then use it if use_header_cardinality: - if column.lower().find("(single)") > 0: + if column.lower().find("(single)") > 0 and pd.notna(value): g = g.property(Cardinality.single, _get_column_name(column), value) else: g = _expand_properties(g, _get_column_name(column), value) @@ -305,7 +305,7 @@ def _expand_properties(g: GraphTraversalSource, column: str, value: Any) -> Grap if isinstance(value, list) and len(value) > 0: for item in value: g = g.property(Cardinality.set_, column, item) - else: + elif pd.notna(value): g = g.property(Cardinality.set_, column, value) return g diff --git a/tests/test_neptune.py b/tests/test_neptune.py index 29af98bb4..92c05e206 100644 --- a/tests/test_neptune.py +++ b/tests/test_neptune.py @@ -4,6 +4,7 @@ import uuid from typing import Any, Dict +import numpy as np import pandas as pd import pytest # type: ignore from gremlin_python.process.traversal import Direction, T @@ -169,6 +170,33 @@ def test_sparql_query(neptune_endpoint, neptune_port) -> Dict[str, Any]: assert df.shape == (2, 3) +def test_gremlin_write_different_cols(neptune_endpoint, neptune_port) -> Dict[str, Any]: + client = wr.neptune.connect(neptune_endpoint, neptune_port, iam_enabled=False) + id = uuid.uuid4() + wr.neptune.execute_gremlin(client, f"g.addV().property(T.id, '{str(id)}')") + + data = [_create_dummy_vertex(), _create_dummy_vertex()] + del data[1]["str"] + data[1]["int"] = np.nan + df = pd.DataFrame(data) + res = wr.neptune.to_property_graph(client, df) + assert res + + data = [_create_dummy_edge(), _create_dummy_edge()] + del data[1]["str"] + data[1]["int"] = np.nan + df = pd.DataFrame(data) + res = wr.neptune.to_property_graph(client, df) + assert res + + data = [{"~id": id, "age(single)": 50, "name": "foo"}, {"~id": id, "age(single)": 55}, {"~id": id, "name": "foo"}] + df = pd.DataFrame(data) + res = wr.neptune.to_property_graph(client, df) + res = wr.neptune.execute_gremlin(client, f"g.V('{id}').valueMap().with(WithOptions.tokens)") + saved_row = res.iloc[0] + assert saved_row["age"] == 55 + + def test_gremlin_write_updates(neptune_endpoint, neptune_port) -> Dict[str, Any]: client = wr.neptune.connect(neptune_endpoint, neptune_port, iam_enabled=False) id = uuid.uuid4() @@ -191,12 +219,14 @@ def test_gremlin_write_updates(neptune_endpoint, neptune_port) -> Dict[str, Any] res = wr.neptune.execute_gremlin(client, f"g.V('{id}').valueMap().with(WithOptions.tokens)") saved_row = res.iloc[0] assert saved_row["age"] == 55 - assert saved_row["name"] == ["foo", "bar"] + assert "foo" in saved_row["name"] + assert "bar" in saved_row["name"] res = wr.neptune.to_property_graph(client, df, use_header_cardinality=False) res = wr.neptune.execute_gremlin(client, f"g.V('{id}').valueMap().with(WithOptions.tokens)") saved_row = res.iloc[0] assert saved_row["age(single)"] == 55 - assert saved_row["name"] == ["foo", "bar"] + assert "foo" in saved_row["name"] + assert "bar" in saved_row["name"] def test_gremlin_write_vertices(neptune_endpoint, neptune_port) -> Dict[str, Any]: @@ -298,6 +328,22 @@ def test_gremlin_write_edges(neptune_endpoint, neptune_port) -> Dict[str, Any]: assert batch_cnt_df.iloc[0][0] == final_cnt_df.iloc[0][0] + 50 +def test_sparql_write_different_cols(neptune_endpoint, neptune_port) -> Dict[str, Any]: + client = wr.neptune.connect(neptune_endpoint, neptune_port, iam_enabled=False) + + data = [_create_dummy_triple(), _create_dummy_triple()] + del data[1]["o"] + df = pd.DataFrame(data) + res = wr.neptune.to_rdf_graph(client, df) + assert res + + data = [_create_dummy_quad(), _create_dummy_quad()] + del data[1]["o"] + df = pd.DataFrame(data) + res = wr.neptune.to_rdf_graph(client, df) + assert res + + def test_sparql_write_triples(neptune_endpoint, neptune_port) -> Dict[str, Any]: client = wr.neptune.connect(neptune_endpoint, neptune_port, iam_enabled=False) initial_df = wr.neptune.execute_sparql(client, "SELECT ?p ?o WHERE { ?p ?o .}")