Skip to content

Commit

Permalink
Merge pull request #343 from biolink/neo4j-client-upgrade
Browse files Browse the repository at this point in the history
Switch from neo4jrestclient to neo4j for Neo4j 4.3 compatibility
  • Loading branch information
sierra-moxon committed Jan 13, 2022
2 parents 1445b17 + 21729b1 commit 2e20cc5
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 131 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ jobs:
- name: Setup Neo4j Docker
run: |
docker run --detach --name kgx-neo4j-unit-test -p 8484:7474 -p 8888:7687 --env NEO4J_AUTH=neo4j/test neo4j:3.5.19
docker run --detach --name kgx-neo4j-integration-test -p 7474:7474 -p 7687:7687 --env NEO4J_AUTH=neo4j/test neo4j:3.5.19
docker run --detach --name kgx-neo4j-unit-test -p 8484:7474 -p 8888:7687 --env NEO4J_AUTH=neo4j/test neo4j:4.3.0
docker run --detach --name kgx-neo4j-integration-test -p 7474:7474 -p 7687:7687 --env NEO4J_AUTH=neo4j/test neo4j:4.3.0
docker ps -a
- name: Wait
Expand Down
12 changes: 5 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@ python setup.py install

### Setting up a testing environment for Neo4j

The current release of KGX supports graph source and sink transactions with the 3.5 release of Neo4j.

**_Note:**_ Support for release 4.3 of Neo4j is pending in the next KGX release (see the [neo4j-client-upgrade](https://github.com/biolink/kgx/tree/neo4j-client-upgrade]) for the beta release of this code).
This release of KGX supports graph source and sink transactions with the 4.3 release of Neo4j.

KGX has a suite of tests that rely on Docker containers to run Neo4j specific tests.

Expand All @@ -120,17 +118,17 @@ on your local machine.
Once Docker is up and running, run the following commands:

```bash
docker run -d --name kgx-neo4j-integration-test \
docker run -d --rm --name kgx-neo4j-integration-test \
-p 7474:7474 -p 7687:7687 \
--env NEO4J_AUTH=neo4j/test \
neo4j:3.5.25
neo4j:4.3
```

```bash
docker run -d --name kgx-neo4j-unit-test \
docker run -d --rm --name kgx-neo4j-unit-test \
-p 8484:7474 -p 8888:7687 \
--env NEO4J_AUTH=neo4j/test \
neo4j:3.5.25
neo4j:4.3
```


Expand Down
4 changes: 2 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ This folder also contains sample YAML files that can
be used as input to KGX to drive certain operations.

For additional design patterns of KGX to consider, see also the [unit tests folder](../tests).
Note: the currently supported release of Neo4j is 3.5.

Note: the currently supported release of Neo4j is 4.3 (or better).
29 changes: 15 additions & 14 deletions kgx/sink/neo_sink.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import List, Union, Any

from neo4jrestclient.client import GraphDatabase
from neo4jrestclient.query import CypherException
from neo4j import GraphDatabase, Neo4jDriver, Session

from kgx.config import get_logger
from kgx.sink.sink import Sink
Expand Down Expand Up @@ -42,9 +41,10 @@ def __init__(self, uri: str, username: str, password: str, **kwargs: Any):
super().__init__()
if "cache_size" in kwargs:
self.CACHE_SIZE = kwargs["cache_size"]
self.http_driver: GraphDatabase = GraphDatabase(
uri, username=username, password=password
self.http_driver:Neo4jDriver = GraphDatabase.driver(
uri, auth=(username, password)
)
self.session:Session = self.http_driver.session()

def _flush_node_cache(self):
self._write_node_cache()
Expand Down Expand Up @@ -87,16 +87,17 @@ def _write_node_cache(self) -> None:
self.CATEGORY_DELIMITER, self.CYPHER_CATEGORY_DELIMITER
)
query = self.generate_unwind_node_query(cypher_category)

log.debug(query)
nodes = self.node_cache[category]
for x in range(0, len(nodes), batch_size):
y = min(x + batch_size, len(nodes))
log.debug(f"Batch {x} - {y}")
batch = nodes[x:y]
try:
self.http_driver.query(query, params={"nodes": batch})
except CypherException as ce:
log.error(ce)
self.session.run(query, parameters={"nodes": batch})
except Exception as e:
log.error(e)

def _flush_edge_cache(self):
self._flush_node_cache()
Expand Down Expand Up @@ -140,11 +141,11 @@ def _write_edge_cache(self) -> None:
batch = edges[x:y]
log.debug(f"Batch {x} - {y}")
try:
self.http_driver.query(
query, params={"relationship": predicate, "edges": batch}
self.session.run(
query, parameters={"relationship": predicate, "edges": batch}
)
except CypherException as ce:
log.error(ce)
except Exception as e:
log.error(e)

def finalize(self) -> None:
"""
Expand Down Expand Up @@ -248,10 +249,10 @@ def create_constraints(self, categories: Union[set, list]) -> None:
else:
query = NeoSink.create_constraint_query(category)
try:
self.http_driver.query(query)
self.session.run(query)
self._seen_categories.add(category)
except CypherException as ce:
log.error(ce)
except Exception as e:
log.error(e)

@staticmethod
def create_constraint_query(category: str) -> str:
Expand Down
79 changes: 61 additions & 18 deletions kgx/source/neo_source.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import itertools
from typing import Any, Dict, List, Optional, Iterator, Tuple, Generator

from neo4jrestclient.client import Node, Relationship, GraphDatabase
from neo4jrestclient.query import CypherException
from neo4j import GraphDatabase, Neo4jDriver
from neo4j.graph import Node, Relationship

from kgx.config import get_logger
from kgx.source.source import Source
Expand All @@ -26,11 +26,18 @@ class NeoSource(Source):

def __init__(self):
super().__init__()
self.http_driver = None
self.http_driver: Optional[Neo4jDriver] = None
self.session = None
self.node_count = 0
self.edge_count = 0
self.seen_nodes = set()

def _connect_db(self, uri: str, username: str, password: str):
self.http_driver = GraphDatabase.driver(
uri, auth=(username, password)
)
self.session = self.http_driver.session()

def parse(
self,
uri: str,
Expand Down Expand Up @@ -77,9 +84,7 @@ def parse(
A generator for records
"""
self.http_driver: GraphDatabase = GraphDatabase(
uri, username=username, password=password
)
self._connect_db(uri, username, password)

self.set_provenance_map(kwargs)

Expand Down Expand Up @@ -140,11 +145,11 @@ def count(self, is_directed: bool = True) -> int:
query_result: Any
counts: int = 0
try:
query_result = self.http_driver.query(query)
query_result = self.session.run(query)
for result in query_result:
counts = result[0]
except CypherException as ce:
log.error(ce)
except Exception as e:
log.error(e)
return counts

def get_nodes(self, skip: int = 0, limit: int = 0, **kwargs: Any) -> List:
Expand Down Expand Up @@ -189,11 +194,19 @@ def get_nodes(self, skip: int = 0, limit: int = 0, **kwargs: Any) -> List:
log.debug(query)
nodes = []
try:
results = self.http_driver.query(query, returns=Node, data_contents=True)
results = self.session.run(query)
if results:
nodes = [node[0] for node in results.rows]
except CypherException as ce:
log.error(ce)
nodes = [
{
"id": node[0].get('id', f"{node[0].id}"),
"name": node[0].get('name', ''),
"category": node[0].get('category', ['biolink:NamedThing'])
}
for node in results.values()
]

except Exception as e:
log.error(e)
return nodes

def get_edges(
Expand Down Expand Up @@ -251,13 +264,43 @@ def get_edges(
log.debug(query)
edges = []
try:
results = self.http_driver.query(
query, returns=(Node, Relationship, Node), data_contents=True
results = self.session.run(
query
)
if results:
edges = [x for x in results.rows]
except CypherException as ce:
log.error(ce)
edges = list()
for entry in results.values():
edge = list()
# subject
edge.append(
{
"id": entry[0].get('id', f"{entry[0].id}"),
"name": entry[0].get('name', ''),
"category": entry[0].get('category', ['biolink:NamedThing'])
}
)

# edge
edge.append(
{
"subject": entry[1].get('subject', f"{entry[0].id}"),
"predicate": entry[1].get('predicate', "biolink:related_to"),
"relation": entry[1].get('relation', "biolink:related_to"),
"object": entry[1].get('object', f"{entry[2].id}")
}
)

# object
edge.append(
{
"id": entry[2].get('id', f"{entry[2].id}"),
"name": entry[2].get('name', ''),
"category": entry[2].get('category', ['biolink:NamedThing'])
}
)
edges.append(edge)
except Exception as e:
log.error(e)

return edges

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pytest>=0.0
mypy>=0.0
rdflib~=5.0.0
Click~=7.0
neo4jrestclient>=0.0
neo4j==4.3
pyyaml>=0.0
prologterms>=0.0.5
shexjsg>=0.6.5
Expand Down
18 changes: 8 additions & 10 deletions tests/integration/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import docker
import pytest
from neo4jrestclient.client import GraphDatabase
from neo4jrestclient.query import CypherException
from neo4j import GraphDatabase

from kgx.graph.nx_graph import NxGraph

CONTAINER_NAME = "kgx-neo4j-integration-test"
DEFAULT_NEO4J_URL = "http://localhost:7474"
DEFAULT_NEO4J_URL = "neo4j://localhost:7687"
DEFAULT_NEO4J_USERNAME = "neo4j"
DEFAULT_NEO4J_PASSWORD = "test"

Expand All @@ -28,16 +27,15 @@ def check_container():

@pytest.fixture(scope="function")
def clean_slate():
http_driver = GraphDatabase(
DEFAULT_NEO4J_URL,
username=DEFAULT_NEO4J_USERNAME,
password=DEFAULT_NEO4J_PASSWORD,
http_driver = GraphDatabase.driver(
DEFAULT_NEO4J_URL, auth=(DEFAULT_NEO4J_USERNAME, DEFAULT_NEO4J_PASSWORD)
)

q = "MATCH (n) DETACH DELETE (n)"
try:
http_driver.query(q)
except CypherException as ce:
print(ce)
http_driver.session().run(q)
except Exception as e:
print(e)


def get_graph(source):
Expand Down
18 changes: 10 additions & 8 deletions tests/integration/test_neo_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

from kgx.transformer import Transformer
from tests import RESOURCE_DIR, TARGET_DIR
from tests.unit import clean_database
from kgx.config import get_logger

from tests.integration import (
check_container,
CONTAINER_NAME,
Expand All @@ -11,14 +14,17 @@
DEFAULT_NEO4J_PASSWORD,
)

logger = get_logger()


@pytest.mark.skipif(
not check_container(), reason=f"Container {CONTAINER_NAME} is not running"
)
def test_csv_to_neo_load():
def test_csv_to_neo4j_load_to_graph_transform(clean_database):
"""
Test to load a CSV to Neo4j.
Test to load a csv KGX file into Neo4j.
"""
logger.debug("test_csv_to_neo4j_load...")
input_args1 = {
"filename": [
os.path.join(RESOURCE_DIR, "cm_nodes.csv"),
Expand All @@ -37,14 +43,10 @@ def test_csv_to_neo_load():
}
t1.save(output_args)


@pytest.mark.skipif(
not check_container(), reason=f"Container {CONTAINER_NAME} is not running"
)
def test_neo_to_graph_transform():
"""
Test to read from Neo4j and write to CSV.
Continue sequentially to test read from Neo4j to write out back to CSV.
"""
logger.debug("test_neo4j_to_graph_transform")
input_args = {
"uri": DEFAULT_NEO4J_URL,
"username": DEFAULT_NEO4J_USERNAME,
Expand Down

0 comments on commit 2e20cc5

Please sign in to comment.