In [1]:
import os
from dotenv import load_dotenv


# Load environment variables from .env file
load_dotenv()

# Main database connection details
NEO4J_URI = os.getenv("NEO4J_URI")
NEO4J_USERNAME = os.getenv("NEO4J_USERNAME")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD")
NEO4J_DB_NAME = os.getenv("NEO4J_DB_NAME", "neo4j")  # Default to "neo4j" if not set

# Ontology database connection details
NEO4J_URI_ONTOLOGY = os.getenv("NEO4J_URI_ONTOLOGY", NEO4J_URI)  # Fallback to main URI
NEO4J_USERNAME_ONTOLOGY = os.getenv("NEO4J_USERNAME_ONTOLOGY", NEO4J_USERNAME)
NEO4J_PASSWORD_ONTOLOGY = os.getenv("NEO4J_PASSWORD_ONTOLOGY", NEO4J_PASSWORD)
NEO4J_ONTOLOGY_DB_NAME = os.getenv("NEO4J_ONTOLOGY_DB_NAME", "movie_ontology")


In [2]:
from neo4j import GraphDatabase
driver_main = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))
driver_ontology = GraphDatabase.driver(NEO4J_URI_ONTOLOGY, auth=(NEO4J_USERNAME_ONTOLOGY, NEO4J_PASSWORD_ONTOLOGY))

In [3]:
def genPatternDefinedLabelInferenceQueries():
    records, summary, keys = driver_ontology.execute_query(
        """MATCH (mdl:PatternDefinedLabel)
        RETURN mdl.name AS name, mdl.pattern AS pattern, mdl.classElementVariable AS classElementVariable""",
        database_=NEO4J_ONTOLOGY_DB_NAME,
    )
    for record in records:
        query = f"""MATCH {record['pattern']}
        CALL ({record['classElementVariable']}) {{
            SET {record['classElementVariable']}:{record['name']}
        }} IN TRANSACTIONS OF 100 ROWS"""
        yield query

for q in genPatternDefinedLabelInferenceQueries():
    print(q)


MATCH (p:Person) WHERE EXISTS {(p)-[:ACTED_IN]->()}
        CALL (p) {
            SET p:_PersonActedInSome
        } IN TRANSACTIONS OF 100 ROWS
MATCH (p:Person) WHERE EXISTS {(p)-[:DIRECTED]->()}
        CALL (p) {
            SET p:_PersonDirectedSome
        } IN TRANSACTIONS OF 100 ROWS
MATCH (p:Person {name: 'Kevin Bacon'})
        CALL (p) {
            SET p:_KevinBacon
        } IN TRANSACTIONS OF 100 ROWS
MATCH (p:Person {name: 'Emil Eifrem'})
        CALL (p) {
            SET p:_EasterEggActor
        } IN TRANSACTIONS OF 100 ROWS


In [4]:
def genPatternDefinedRelationshipInferenceQueries():
    records, summary, keys = driver_ontology.execute_query(
        """MATCH (mdr:PatternDefinedRelationship)
        RETURN mdr.name AS name, mdr.pattern AS pattern, mdr.sourceElementVariable AS sourceElementVariable, mdr.targetElementVariable AS targetElementVariable""",
        database_=NEO4J_ONTOLOGY_DB_NAME,
    )
    for record in records:
        query = f"""MATCH {record['pattern']}
        CALL ({record['sourceElementVariable']}, {record['targetElementVariable']}) {{
        MERGE ({record['sourceElementVariable']})-[:{record['name']}]->({record['targetElementVariable']})
        }} IN TRANSACTIONS OF 100 ROWS"""
        yield query

for q in genPatternDefinedRelationshipInferenceQueries():
    print(q)


MATCH (s:Person)-[:ACTED_IN]->()<-[:ACTED_IN]-(t:Person)
        CALL (s, t) {
        MERGE (s)-[:_COACTOR]->(t)
        } IN TRANSACTIONS OF 100 ROWS
MATCH (s:Person)-[:INVOLVED_IN]->()<-[:INVOLVED_IN]-(t:Person)
        CALL (s, t) {
        MERGE (s)-[:_COLLABORATOR]->(t)
        } IN TRANSACTIONS OF 100 ROWS


In [5]:
def genSCOLabelInferenceQueries():
    records, summary, keys = driver_ontology.execute_query(
        """MATCH (narrower:Label)-[:SCO]->(broader:Label)
        RETURN narrower.name AS narrower, broader.name AS broader""",
        database_=NEO4J_ONTOLOGY_DB_NAME,
    )
    for record in records:
        query = f"""MATCH (n:{record['narrower']})
        CALL (n) {{
        SET n:{record['broader']}
        }} IN CONCURRENT TRANSACTIONS OF 100 ROWS"""
        yield query

for q in genSCOLabelInferenceQueries():
    print(q)

MATCH (n:Actor)
        CALL (n) {
        SET n:Person
        } IN CONCURRENT TRANSACTIONS OF 100 ROWS
MATCH (n:Director)
        CALL (n) {
        SET n:Person
        } IN CONCURRENT TRANSACTIONS OF 100 ROWS
MATCH (n:_KevinBacon)
        CALL (n) {
        SET n:Actor
        } IN CONCURRENT TRANSACTIONS OF 100 ROWS
MATCH (n:_EasterEggActor)
        CALL (n) {
        SET n:Actor
        } IN CONCURRENT TRANSACTIONS OF 100 ROWS
MATCH (n:_EasterEggActor)
        CALL (n) {
        SET n:Anomalous
        } IN CONCURRENT TRANSACTIONS OF 100 ROWS


In [6]:
def genImpliesRelationshipInferenceQueries():
    records, summary, keys = driver_ontology.execute_query(
        """MATCH (narrower:Relationship)-[:IMPLIES]->(broader:Relationship)
        RETURN narrower.name AS narrower, broader.name AS broader""",
        database_=NEO4J_ONTOLOGY_DB_NAME,
    )
    for record in records:
        query = f"""MATCH (n)-[:{record['narrower']}]->(m)
        CALL (n, m) {{
        MERGE (n)-[:{record['broader']}]->(m)
        }} IN TRANSACTIONS OF 100 ROWS"""
        yield query

for q in genImpliesRelationshipInferenceQueries():
    print(q)

MATCH (n)-[:ACTED_IN]->(m)
        CALL (n, m) {
        MERGE (n)-[:INVOLVED_IN]->(m)
        } IN TRANSACTIONS OF 100 ROWS
MATCH (n)-[:DIRECTED]->(m)
        CALL (n, m) {
        MERGE (n)-[:INVOLVED_IN]->(m)
        } IN TRANSACTIONS OF 100 ROWS
MATCH (n)-[:COACTOR]->(m)
        CALL (n, m) {
        MERGE (n)-[:COLLABORATOR]->(m)
        } IN TRANSACTIONS OF 100 ROWS


In [7]:
def genEquivalentLabelInferenceQueries():
    records, summary, keys = driver_ontology.execute_query(
        """MATCH (l1:Label)-[:EQUIVALENT]->(l2:Label)
        RETURN l1.name AS l1, l2.name AS l2""",
        database_=NEO4J_ONTOLOGY_DB_NAME,
    )
    for record in records:
        query = f"""MATCH (n:{record['l1']}|{record['l2']})
        CALL (n) {{
            SET n:{record['l1']}:{record['l2']}
        }} IN CONCURRENT TRANSACTIONS OF 100 ROWS"""
        yield query

for q in genEquivalentLabelInferenceQueries():
    print(q)

MATCH (n:_PersonActedInSome|Actor)
        CALL (n) {
            SET n:_PersonActedInSome:Actor
        } IN CONCURRENT TRANSACTIONS OF 100 ROWS
MATCH (n:_PersonDirectedSome|Director)
        CALL (n) {
            SET n:_PersonDirectedSome:Director
        } IN CONCURRENT TRANSACTIONS OF 100 ROWS


In [8]:
def genEquivalentRelationshipInferenceQueries():
    records, summary, keys = driver_ontology.execute_query(
        """MATCH (r1:Relationship)-[:EQUIVALENT]->(r2:Relationship)
        RETURN r1.name AS r1, r2.name AS r2""",
        database_=NEO4J_ONTOLOGY_DB_NAME,
    )
    for record in records:
        query = f"""MATCH (n)-[:{record['r1']}|{record['r2']}]->(m)
        CALL (n, m) {{
            MERGE (n)-[:{record['r1']}]->(m)
            MERGE (n)-[:{record['r2']}]->(m)
        }} IN TRANSACTIONS OF 100 ROWS"""
        yield query

for q in genEquivalentRelationshipInferenceQueries():
    print(q)

MATCH (n)-[:_COACTOR|COACTOR]->(m)
        CALL (n, m) {
            MERGE (n)-[:_COACTOR]->(m)
            MERGE (n)-[:COACTOR]->(m)
        } IN TRANSACTIONS OF 100 ROWS
MATCH (n)-[:_COLLABORATOR|COLLABORATOR]->(m)
        CALL (n, m) {
            MERGE (n)-[:_COLLABORATOR]->(m)
            MERGE (n)-[:COLLABORATOR]->(m)
        } IN TRANSACTIONS OF 100 ROWS


In [9]:
def genSymmetricRelationshipInferenceQueries():
    records, summary, keys = driver_ontology.execute_query(
        """MATCH (r:Relationship&Symmetric)
        RETURN r.name AS sim_rel""",
        database_=NEO4J_ONTOLOGY_DB_NAME,
    )
    for record in records:
        query = f"""MATCH (n)-[:{record['sim_rel']}]->(m)
        CALL (n, m) {{
            MERGE (m)-[:{record['sim_rel']}]->(n)
        }} IN TRANSACTIONS OF 100 ROWS"""
        yield query

for q in genSymmetricRelationshipInferenceQueries():
    print(q)

MATCH (n)-[:COACTOR]->(m)
        CALL (n, m) {
            MERGE (m)-[:COACTOR]->(n)
        } IN TRANSACTIONS OF 100 ROWS
MATCH (n)-[:COLLABORATOR]->(m)
        CALL (n, m) {
            MERGE (m)-[:COLLABORATOR]->(n)
        } IN TRANSACTIONS OF 100 ROWS
MATCH (n)-[:_COACTOR]->(m)
        CALL (n, m) {
            MERGE (m)-[:_COACTOR]->(n)
        } IN TRANSACTIONS OF 100 ROWS
MATCH (n)-[:_COLLABORATOR]->(m)
        CALL (n, m) {
            MERGE (m)-[:_COLLABORATOR]->(n)
        } IN TRANSACTIONS OF 100 ROWS


In [10]:
def genPatternDefinedNodePropertyInferenceQueries():
    records, summary, keys = driver_ontology.execute_query(
        """MATCH (n:Label)-[:HAS_PROPERTY]->(qdp:PatternDefinedNodeProperty)
        RETURN n.name AS label, qdp.name AS property_name, qdp.pattern AS pattern, qdp.propertyOwnerVariable AS variable, qdp.valueVariable AS val_variable""",
        database_=NEO4J_ONTOLOGY_DB_NAME,
    )
    for record in records:
        query = f"""MATCH ({record['variable']}:{record['label']})
        CALL ({record['variable']}) {{
            MATCH {record['pattern']}
            WITH {record['variable']}, {record['val_variable']}
            WHERE {record['variable']}.{record['property_name']} IS NULL OR {record['variable']}.{record['property_name']} <> {record['val_variable']}
            SET {record['variable']}.{record['property_name']} = {record['val_variable']}
            }} IN CONCURRENT TRANSACTIONS OF 100 ROWS"""
        yield query

for q in genPatternDefinedNodePropertyInferenceQueries():
    print(q)

MATCH (x:Actor)
        CALL (x) {
            MATCH SHORTEST 1 (x)-[ca:COACTOR]-*(y:_KevinBacon) WITH ca LIMIT 1 WITH size(ca) AS kbn
            WITH x, kbn
            WHERE x.kb_number IS NULL OR x.kb_number <> kbn
            SET x.kb_number = kbn
            } IN CONCURRENT TRANSACTIONS OF 100 ROWS


In [11]:
inferenceRulesGenerators = [genPatternDefinedLabelInferenceQueries,
                            genPatternDefinedRelationshipInferenceQueries,
                            genSCOLabelInferenceQueries,
                            genImpliesRelationshipInferenceQueries,
                            genEquivalentLabelInferenceQueries,
                            genEquivalentRelationshipInferenceQueries,
                            genSymmetricRelationshipInferenceQueries,
                            genPatternDefinedNodePropertyInferenceQueries]

def genOntologyInferenceQueries(inferenceRulesGenerators):
    for rule in inferenceRulesGenerators:
        for q in rule():
            yield q

list(genOntologyInferenceQueries(inferenceRulesGenerators))

['MATCH (p:Person) WHERE EXISTS {(p)-[:ACTED_IN]->()}\n        CALL (p) {\n            SET p:_PersonActedInSome\n        } IN TRANSACTIONS OF 100 ROWS',
 'MATCH (p:Person) WHERE EXISTS {(p)-[:DIRECTED]->()}\n        CALL (p) {\n            SET p:_PersonDirectedSome\n        } IN TRANSACTIONS OF 100 ROWS',
 "MATCH (p:Person {name: 'Kevin Bacon'})\n        CALL (p) {\n            SET p:_KevinBacon\n        } IN TRANSACTIONS OF 100 ROWS",
 "MATCH (p:Person {name: 'Emil Eifrem'})\n        CALL (p) {\n            SET p:_EasterEggActor\n        } IN TRANSACTIONS OF 100 ROWS",
 'MATCH (s:Person)-[:ACTED_IN]->()<-[:ACTED_IN]-(t:Person)\n        CALL (s, t) {\n        MERGE (s)-[:_COACTOR]->(t)\n        } IN TRANSACTIONS OF 100 ROWS',
 'MATCH (s:Person)-[:INVOLVED_IN]->()<-[:INVOLVED_IN]-(t:Person)\n        CALL (s, t) {\n        MERGE (s)-[:_COLLABORATOR]->(t)\n        } IN TRANSACTIONS OF 100 ROWS',
 'MATCH (n:Actor)\n        CALL (n) {\n        SET n:Person\n        } IN CONCURRENT TRANSACTI

In [12]:
def infer (rules, params={}):
    """
    This is a function you can use if you want to run a set of inference rules
    until a convergence is reached. why not use it in a RDF-like reasoning context?
    """
    counter = 0
    while True:
        counter += 1
        any_update = False
        for rule in rules:
            with driver_main.session(database=NEO4J_DB_NAME) as session:
                result = session.run(rule, params)
            any_new_update = result.consume().counters._contains_updates
            any_update = any_update or any_new_update
        if not any_update:
            break

In [13]:
def infer_once (rules, params={}):
    with driver_main.session(database=NEO4J_DB_NAME) as session:
        for rule in rules:
            session.run(rule, params)

In [14]:
infer(list(genOntologyInferenceQueries(inferenceRulesGenerators)))

In [None]:
import getopt
import json
import sys
import time
from threading import Thread

from neo4j import GraphDatabase


class CDCService:
    def __init__(self, driver, database, start_cursor=None, selectors=None):
        self.driver = driver
        self.database = database
        self.cursor = start_cursor
        if self.cursor is None:
            self.cursor = self.current_change_id()
        self.selectors = selectors

    def apply_change(self):
        infer_once(list(genOntologyInferenceQueries(inferenceRulesGenerators)))
        print("infer_once called")

    def query_changes_query(self, tx):
        current = self.current_change_id()
        result = tx.run('CALL db.cdc.query($cursor, $selectors)',
                        cursor=self.cursor, selectors=self.selectors)
        if result.peek() == None:
            self.cursor = current
        else:
            #for record in result:
            try:
                self.apply_change()
            except Exception as e:
                print('Error whilst applying change', e)
            for record in result:
                self.cursor = record['id']

    def query_changes(self):
        with self.driver.session(database=self.database) as session:
            session.execute_read(self.query_changes_query)

    def earliest_change_id(self):
        records, _, _ = self.driver.execute_query(
            'CALL db.cdc.earliest', database_=self.database)
        return records[0]['id']

    def current_change_id(self):
        records, _, _ = self.driver.execute_query(
            'CALL db.cdc.current', database_=self.database)
        return records[0]['id']

    def run(self):
        while True:
            self.query_changes()
            time.sleep(0.5)


def main(argv):
    # Default values
    address = NEO4J_URI
    database = NEO4J_DB_NAME
    username = NEO4J_USERNAME
    password = NEO4J_PASSWORD
    cursor = None

    opts, _ = getopt.getopt(
        argv, 'a:d:u:p:f:',
        ['address=', 'database=', 'username=', 'password=', 'from='])
    for opt, arg in opts:
        if opt in ('-a', '--address'):
            address = arg
        elif opt in ('-d', '--database'):
            database = arg
        elif opt in ('-u', '--username'):
            username = arg
        elif opt in ('-p', '--password'):
            password = arg
        elif opt in ('-f', '--from'):
            cursor = arg

    selectors = [
        # {'select': 'n'}
    ]

    with GraphDatabase.driver(address, auth=(username, password)) as driver:
        cdc = CDCService(driver, database, cursor, selectors)
        cdc_thread = Thread(target=cdc.run, daemon=True)
        cdc_thread.start()
        cdc_thread.join()


#if __name__ == '__main__':
#    main(sys.argv[1:])

main(NEO4J_URI+':'+NEO4J_DB_NAME+':'+NEO4J_USERNAME+':'+NEO4J_PASSWORD+':')