Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retry loop for exception caused by deadlock on badge node #404

Merged
merged 11 commits into from Dec 3, 2020
34 changes: 26 additions & 8 deletions databuilder/publisher/neo4j_csv_publisher.py
Expand Up @@ -13,13 +13,14 @@

from neo4j import GraphDatabase, Transaction
import neo4j
from neo4j.exceptions import CypherError
from neo4j.exceptions import CypherError, TransientError
from pyhocon import ConfigFactory
from pyhocon import ConfigTree
from typing import Set, List

from databuilder.publisher.base_publisher import Publisher
from databuilder.publisher.neo4j_preprocessor import NoopRelationPreprocessor
from databuilder.models.badge import BadgeMetadata


# Setting field_size_limit to solve the error below
Expand All @@ -35,7 +36,7 @@
# A end point for Neo4j e.g: bolt://localhost:9999
NEO4J_END_POINT_KEY = 'neo4j_endpoint'
# A transaction size that determines how often it commits.
NEO4J_TRANSCATION_SIZE = 'neo4j_transaction_size'
NEO4J_TRANSACTION_SIZE = 'neo4j_transaction_size'
# A progress report frequency that determines how often it report the progress.
NEO4J_PROGRESS_REPORT_FREQUENCY = 'neo4j_progress_report_frequency'
# A boolean flag to make it fail if relationship is not created
Expand Down Expand Up @@ -92,7 +93,7 @@
RELATION_END_LABEL, RELATION_END_KEY,
RELATION_TYPE, RELATION_REVERSE_TYPE}

DEFAULT_CONFIG = ConfigFactory.from_dict({NEO4J_TRANSCATION_SIZE: 500,
DEFAULT_CONFIG = ConfigFactory.from_dict({NEO4J_TRANSACTION_SIZE: 500,
NEO4J_PROGRESS_REPORT_FREQUENCY: 500,
NEO4J_RELATIONSHIP_CREATION_CONFIRM: False,
NEO4J_MAX_CONN_LIFE_TIME_SEC: 50,
Expand Down Expand Up @@ -136,7 +137,7 @@ def init(self, conf: ConfigTree) -> None:
auth=(conf.get_string(NEO4J_USER), conf.get_string(NEO4J_PASSWORD)),
encrypted=conf.get_bool(NEO4J_ENCRYPTED),
trust=trust)
self._transaction_size = conf.get_int(NEO4J_TRANSCATION_SIZE)
self._transaction_size = conf.get_int(NEO4J_TRANSACTION_SIZE)
self._session = self._driver.session()
self._confirm_rel_created = conf.get_bool(NEO4J_RELATIONSHIP_CREATION_CONFIRM)

Expand Down Expand Up @@ -302,6 +303,7 @@ def _publish_relation(self, relation_file: str, tx: Transaction) -> Transaction:
count = 0
with open(relation_file, 'r', encoding='utf8') as relation_csv:
for rel_record in pandas.read_csv(relation_csv, na_filter=False).to_dict(orient="records"):
# TODO not sure if deadlock on badge node arises in preporcessing or not
stmt, params = self._relation_preprocessor.preprocess_cypher(
start_label=rel_record[RELATION_START_LABEL],
end_label=rel_record[RELATION_END_LABEL],
Expand All @@ -318,10 +320,26 @@ def _publish_relation(self, relation_file: str, tx: Transaction) -> Transaction:

with open(relation_file, 'r', encoding='utf8') as relation_csv:
for rel_record in pandas.read_csv(relation_csv, na_filter=False).to_dict(orient="records"):
stmt = self.create_relationship_merge_statement(rel_record=rel_record)
params = self._create_props_param(rel_record)
tx = self._execute_statement(stmt, tx, params,
expect_result=self._confirm_rel_created)
badge_exception = True
allisonsuarez marked this conversation as resolved.
Show resolved Hide resolved
retries_for_badge_exception = 10 # TODO not sure how many times to retry
allisonsuarez marked this conversation as resolved.
Show resolved Hide resolved
while badge_exception and retries_for_badge_exception > 0:
try:
stmt = self.create_relationship_merge_statement(rel_record=rel_record)
params = self._create_props_param(rel_record)
tx = self._execute_statement(stmt, tx, params,
expect_result=self._confirm_rel_created)
badge_exception = False # if no exception happens we stop inner loop
# TODO use break instead ^ ?
except TransientError as e:
# if exception is due to badge relation issue TODO could use a specific
# exception for this condition (using TransientError)
if rel_record[RELATION_START_LABEL] == BadgeMetadata.BADGE_NODE_LABEL\
allisonsuarez marked this conversation as resolved.
Show resolved Hide resolved
or rel_record[RELATION_END_LABEL] == BadgeMetadata.BADGE_NODE_LABEL:
# TODO not sure how long this op usually takes and how long should wait be
time.sleep(2)
allisonsuarez marked this conversation as resolved.
Show resolved Hide resolved
retries_for_badge_exception -= 1
else: # if other exception happens oh well
raise e

return tx

Expand Down