Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retry loop for exception caused by deadlock on badge node #404

Merged
merged 11 commits into from
Dec 3, 2020
33 changes: 25 additions & 8 deletions databuilder/publisher/neo4j_csv_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from neo4j import GraphDatabase, Transaction
import neo4j
from neo4j.exceptions import CypherError
from neo4j.exceptions import CypherError, TransientError
from pyhocon import ConfigFactory
from pyhocon import ConfigTree
from typing import Set, List
Expand All @@ -35,7 +35,7 @@
# A end point for Neo4j e.g: bolt://localhost:9999
NEO4J_END_POINT_KEY = 'neo4j_endpoint'
# A transaction size that determines how often it commits.
NEO4J_TRANSCATION_SIZE = 'neo4j_transaction_size'
NEO4J_TRANSACTION_SIZE = 'neo4j_transaction_size'
# A progress report frequency that determines how often it report the progress.
NEO4J_PROGRESS_REPORT_FREQUENCY = 'neo4j_progress_report_frequency'
# A boolean flag to make it fail if relationship is not created
Expand All @@ -46,6 +46,9 @@
# list of nodes that are create only, and not updated if match exists
NEO4J_CREATE_ONLY_NODES = 'neo4j_create_only_nodes'

# list of node labels that could attempt to be accessed simultaneously
NEO4J_DEADLOCK_NODE_LABELS = 'neo4j_deadlock_node_labels'

NEO4J_USER = 'neo4j_user'
NEO4J_PASSWORD = 'neo4j_password'
NEO4J_ENCRYPTED = 'neo4j_encrypted'
Expand Down Expand Up @@ -92,7 +95,7 @@
RELATION_END_LABEL, RELATION_END_KEY,
RELATION_TYPE, RELATION_REVERSE_TYPE}

DEFAULT_CONFIG = ConfigFactory.from_dict({NEO4J_TRANSCATION_SIZE: 500,
DEFAULT_CONFIG = ConfigFactory.from_dict({NEO4J_TRANSACTION_SIZE: 500,
NEO4J_PROGRESS_REPORT_FREQUENCY: 500,
NEO4J_RELATIONSHIP_CREATION_CONFIRM: False,
NEO4J_MAX_CONN_LIFE_TIME_SEC: 50,
Expand Down Expand Up @@ -136,13 +139,14 @@ def init(self, conf: ConfigTree) -> None:
auth=(conf.get_string(NEO4J_USER), conf.get_string(NEO4J_PASSWORD)),
encrypted=conf.get_bool(NEO4J_ENCRYPTED),
trust=trust)
self._transaction_size = conf.get_int(NEO4J_TRANSCATION_SIZE)
self._transaction_size = conf.get_int(NEO4J_TRANSACTION_SIZE)
self._session = self._driver.session()
self._confirm_rel_created = conf.get_bool(NEO4J_RELATIONSHIP_CREATION_CONFIRM)

# config is list of node label.
# When set, this list specifies a list of nodes that shouldn't be updated, if exists
self.create_only_nodes = set(conf.get_list(NEO4J_CREATE_ONLY_NODES, default=[]))
self.deadlock_node_labels = set(conf.get_list(NEO4J_DEADLOCK_NODE_LABELS, default=[]))
self.labels: Set[str] = set()
self.publish_tag: str = conf.get_string(JOB_PUBLISH_TAG)
if not self.publish_tag:
Expand Down Expand Up @@ -302,6 +306,7 @@ def _publish_relation(self, relation_file: str, tx: Transaction) -> Transaction:
count = 0
with open(relation_file, 'r', encoding='utf8') as relation_csv:
for rel_record in pandas.read_csv(relation_csv, na_filter=False).to_dict(orient="records"):
# TODO not sure if deadlock on badge node arises in preporcessing or not
stmt, params = self._relation_preprocessor.preprocess_cypher(
start_label=rel_record[RELATION_START_LABEL],
end_label=rel_record[RELATION_END_LABEL],
Expand All @@ -318,10 +323,22 @@ def _publish_relation(self, relation_file: str, tx: Transaction) -> Transaction:

with open(relation_file, 'r', encoding='utf8') as relation_csv:
for rel_record in pandas.read_csv(relation_csv, na_filter=False).to_dict(orient="records"):
stmt = self.create_relationship_merge_statement(rel_record=rel_record)
params = self._create_props_param(rel_record)
tx = self._execute_statement(stmt, tx, params,
expect_result=self._confirm_rel_created)
badge_exception = True
allisonsuarez marked this conversation as resolved.
Show resolved Hide resolved
retries_for_badge_exception = 5
allisonsuarez marked this conversation as resolved.
Show resolved Hide resolved
while badge_exception and retries_for_badge_exception > 0:
try:
stmt = self.create_relationship_merge_statement(rel_record=rel_record)
params = self._create_props_param(rel_record)
tx = self._execute_statement(stmt, tx, params,
expect_result=self._confirm_rel_created)
badge_exception = False
except TransientError as e:
if rel_record[RELATION_START_LABEL] in self.deadlock_node_labels\
or rel_record[RELATION_END_LABEL] in self.deadlock_node_labels:
time.sleep(2)
allisonsuarez marked this conversation as resolved.
Show resolved Hide resolved
retries_for_badge_exception -= 1
else:
raise e

return tx

Expand Down