Skip to content

Commit

Permalink
[DPTOOLS-2299] Separate Index creation step from Node publish (#56)
Browse files Browse the repository at this point in the history
* [DPTOOLS-2299] Separate Index creation step from Node publish

* Update
  • Loading branch information
jinhyukchang authored and Hans Adriaans committed Jun 30, 2022
1 parent 24fb14f commit f7171a8
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
36 changes: 24 additions & 12 deletions databuilder/databuilder/publisher/neo4j_csv_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
RELATION_END_LABEL, RELATION_END_KEY,
RELATION_TYPE, RELATION_REVERSE_TYPE}


DEFAULT_CONFIG = ConfigFactory.from_dict({NEO4J_TRANSCATION_SIZE: 500,
NEO4J_RELATIONSHIP_CREATION_CONFIRM: False,
NEO4J_MAX_CONN_LIFE_TIME_SEC: 50})
Expand Down Expand Up @@ -99,6 +98,7 @@ class Neo4jCsvPublisher(Publisher):
#TODO User UNWIND batch operation for better performance
"""

def __init__(self):
# type: () -> None
pass
Expand Down Expand Up @@ -155,6 +155,10 @@ def publish(self):

start = time.time()

LOGGER.info('Creating indices using Node files: {}'.format(self._node_files))
for node_file in self._node_files:
self._create_indices(node_file=node_file)

LOGGER.info('Publishing Node files: {}'.format(self._node_files))
while True:
try:
Expand All @@ -178,6 +182,24 @@ def get_scope(self):
# type: () -> str
return 'publisher.neo4j'

def _create_indices(self, node_file):
"""
Go over the node file and try creating unique index
:param node_file:
:return:
"""
# type: (str) -> None
LOGGER.info('Creating indices. (Existing indices will be ignored)')

with open(node_file, 'r') as node_csv:
for node_record in csv.DictReader(node_csv):
label = node_record[NODE_LABEL_KEY]
if label not in self.labels:
self._try_create_index(label)
self.labels.add(label)

LOGGER.info('Indices have been created.')

def _publish_node(self, node_file):
# type: (str) -> None
"""
Expand All @@ -199,16 +221,6 @@ def _publish_node(self, node_file):
tx = self._session.begin_transaction()
with open(node_file, 'r') as node_csv:
for count, node_record in enumerate(csv.DictReader(node_csv)):
label = node_record[NODE_LABEL_KEY]
# If label is seen for the first time, try creating unique index
if label not in self.labels:
tx.commit() # Transaction needs to be committed as index update will make transaction to abort.
LOGGER.info('Committed {} records'.format(count + 1))

self._try_create_index(label)
self.labels.add(label)
tx = self._session.begin_transaction()

stmt = self.create_node_merge_statement(node_record=node_record)
tx = self._execute_statement(stmt, tx, count)

Expand Down Expand Up @@ -358,7 +370,7 @@ def _execute_statement(self,
LOGGER.debug('Executing statement: {}'.format(stmt))

if six.PY2:
result = tx.run(unicode(stmt, errors='ignore')) # noqa
result = tx.run(unicode(stmt, errors='ignore')) # noqa
else:
result = tx.run(str(stmt).encode('utf-8', 'ignore'))
if expect_result and not result.single():
Expand Down
4 changes: 2 additions & 2 deletions databuilder/tests/unit/publisher/test_neo4j_csv_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ def test_publisher(self):

self.assertEqual(mock_run.call_count, 6)

# 2 node files, 1 relation file, and 2 more commits before index creation
self.assertEqual(mock_commit.call_count, 5)
# 2 node files, 1 relation file
self.assertEqual(mock_commit.call_count, 3)


if __name__ == '__main__':
Expand Down

0 comments on commit f7171a8

Please sign in to comment.