In [None]:
pip install neo4j python-dotenv pandas pyvis

In [None]:
from neo4j import GraphDatabase

# URI examples: "neo4j://localhost", "neo4j+s://xxx.databases.neo4j.io"
URI = "bolt://localhost:7687"
AUTH = ("neo4j", "password")

with GraphDatabase.driver(URI, auth=AUTH) as driver:
    with driver.session(database="neo4j") as session: # explicit db allows the driver to work more efficiently
        driver.verify_connectivity()
        print("Connection established.")

Connection established.


In [None]:
# Get the name of all 42 year-olds
records, summary, keys = driver.execute_query(
    "MATCH (p:PLAYER) WHERE p.name = 'LeBron James' RETURN p",
    # auth_=("neo4j", "password"),  run query as someone else instead of creating a new driver
    database_="neo4j", # speciry the database to connect to here
)

# run query as a different user
driver.execute_query(
    "MATCH (p:Person) RETURN p.name",
    impersonated_user_="somebody_else",
    database_="neo4j",
)

driver.execute_query(
    "MATCH (p:Person) RETURN p.name",
    auth_=("somebody_else", "their_password"),
    database_="neo4j",
)

In [154]:
class Player:
    def __init__(self, name, age):
        self.name = name
        self.age = age

In [176]:
def formatter(records, summary=None, keys=None):
    # Loop through results and do something with them
    for record in records:
        print(record.data())  # obtain record as dict

    if summary and keys:
        # Summary information
        print("The query `{query}` returned {records_count} records in {time} ms.".format(
            query=summary.query, records_count=len(records),
            time=summary.result_available_after
        ))

In [157]:
def create(player: Player):
    try:
        records, summary, keys = driver.execute_query(
            """CREATE (player:PLAYER {name: $name, age: $age}) RETURN player""",
            name= player.name,
            age= player.age
            )
        return records, summary, keys
    except Exception as e:
        print(e)
        raise e

records, summary, keys = create(Player(name= "Ezra", age= 23))
formatter(records, summary, keys)

{'player': {'name': 'Ezra', 'age': 23}}
The query `CREATE (player:PLAYER {name: $name, age: $age}) RETURN player` returned 1 records in 1 ms.


  records, summary, keys = driver.execute_query(


In [188]:
# create if player with given properties does not exist
def create_if_no_exist(player: Player):
    try:
        records, summary, keys = driver.execute_query(
            """MERGE (player:PLAYER {name: $name, age: $age}) RETURN player""",
            name= player.name,
            age= player.age
            )
        return records, summary, keys
    except Exception as e:
        print(e)
        raise e

records, summary, keys = create_if_no_exist(Player(name= "Ezra", age= 23)) 
formatter(records, summary, keys)

{'player': {'name': 'Ezra', 'age': 23}}
The query `MERGE (player:PLAYER {name: $name, age: $age}) RETURN player` returned 1 records in 1 ms.


  records, summary, keys = driver.execute_query(


In [189]:
def read():
    try:
        records, summary, keys = driver.execute_query("""
            MATCH (p:PLAYER) RETURN p""")
        return records, summary, keys
    except Exception as e:
        print(e)
        raise e

records, summary, keys = read()
formatter(records, summary, keys)

{'p': {'name': 'Carolynn', 'age': 23}}
{'p': {'name': 'Ezra', 'age': '25'}}
{'p': {'name': 'Ezra', 'age': 23}}
The query `
            MATCH (p:PLAYER) RETURN p` returned 3 records in 2 ms.


  records, summary, keys = driver.execute_query("""


In [159]:
def update(name, player:Player):
    try:
        records, summary, keys = driver.execute_query(f"""
            MATCH (player: PLAYER)
            WHERE player.name = "{name}"
            SET player.name = $name, player.age=$age
            RETURN player
            """,
            parameters_ = player.__dict__
            )
        return records, summary, keys
    except Exception as e:
        print(e)
        raise e

records, summary, keys = update("Ezra",Player(name="Ezra", age="25"))
formatter(records, summary, keys)

{'player': {'name': 'Ezra', 'age': '25'}}
The query `
            MATCH (player: PLAYER)
            WHERE player.name = "Ezra"
            SET player.name = $name, player.age=$age
            RETURN player
            ` returned 1 records in 18 ms.


  records, summary, keys = driver.execute_query(f"""


In [160]:
def add_relationship():
    driver.execute_query("""
        MATCH (ezra: PLAYER {name: "Ezra"})
        MATCH (carol: PLAYER {name: "Carolynn"})
        CREATE (ezra)-[:TEAMMATES]->(carol)
    """)
add_relationship()

  driver.execute_query("""


In [150]:
driver.execute_query("MATCH (n) DETACH DELETE n")

  driver.execute_query("MATCH (n) DETACH DELETE n")


EagerResult(records=[], summary=<neo4j._work.summary.ResultSummary object at 0x10f069e10>, keys=[])

In [161]:
import neo4j

In [162]:
# transform to pandas dataframe
pandas_df = driver.execute_query("""
                MATCH (p:PLAYER) RETURN p""", 
                result_transformer_=neo4j.Result.to_df
            )
pandas_df.head()

  pandas_df = driver.execute_query("""


Unnamed: 0,p
0,"(name, age)"
1,"(name, age)"


In [163]:
# transform to graph
graph_result = driver.execute_query("""
                MATCH (p:PLAYER) RETURN p""", 
                result_transformer_=neo4j.Result.graph
            )
graph_result

  graph_result = driver.execute_query("""


<neo4j.graph.Graph at 0x10f08a320>

In [164]:
import pyvis

def visualize_result(query_graph, nodes_text_properties):
    visual_graph = pyvis.network.Network()

    for node in query_graph.nodes:
        node_label = list(node.labels)[0]
        node_text = node[nodes_text_properties[node_label]]
        visual_graph.add_node(node.element_id, node_text, group=node_label)

    for relationship in query_graph.relationships:
        visual_graph.add_edge(
            relationship.start_node.element_id,
            relationship.end_node.element_id,
            title=relationship.type
        )

    visual_graph.show('network.html', notebook=False)

In [165]:
nodes_text_properties = {  # what property to use as text for each node
            "PLAYER": "name",
        }
visualize_result(graph_result, nodes_text_properties)

network.html


# Running a managed transaction

In [183]:
# transaction callback function
def match_player(tx, filter):
    result = tx.run(
        """
            MATCH (p:PLAYER)
            WHERE p.name  STARTS WITH $filter
            RETURN p.name AS name, p.age AS age
            ORDER BY p.name
        """, 
        filter=filter
    )
    return list (result) # always cast the Result obj into a list

# sessions are lightweight so can be created multiple times compared to drivers
with driver.session(database="neo4j") as session:
    players = session.execute_read(match_player, "Car")
    session.close()
formatter(players)


{'name': 'Carolynn', 'age': 23}


  with driver.session(database="neo4j") as session:


In [193]:
# configure a transaction behaviour using decorators
from neo4j import unit_of_work
@unit_of_work(timeout=5, metadata={"app_name": "Player tracker"})
def count_player(tx):
    result = tx.run("""
            MATCH (player:PLAYER)
            RETURN COUNT(player) as no_players
        """)
    record = result.single()
    return record["no_players"]

with driver.session(database="neo4j") as session:
    player_n  = session.execute_read(count_player)
player_n

  with driver.session(database="neo4j") as session:


3

In [213]:
with driver.session(
    database="neo4j", 
    # auth=() # switching user at sesion level is cheap than creating new Diver object
    # impersonated_user="somebody" # will be useful if need to alter something so you are given permission
    ) as session:
    with session.begin_transaction() as tx: # create a transaction and run multimple queries within it 
        tx.run("MERGE (player: PLAYER {name: 'Tarus', age:25, status: 'single asf'})")
        result = tx.run("MATCH (n) RETURN n")  
        print(result.data())
        # tx.commit()     

[{'n': {'name': 'Carolynn', 'age': 23}}, {'n': {'name': 'Ezra', 'age': '25'}}, {'n': {'name': 'Ezra', 'age': 23}}, {'n': {'name': 'Tarus', 'age': 25, 'status': 'single asf'}}]


  with driver.session(database="neo4j") as session:


### An end to end transaction very good

In [None]:
import neo4j


URI = "<URI for Neo4j database>"
AUTH = ("<Username>", "<Password>")


def main():
    with neo4j.GraphDatabase.driver(URI, auth=AUTH) as driver:
        customer_id = create_customer(driver)
        other_bank_id = 42
        transfer_to_other_bank(driver, customer_id, other_bank_id, 999)


def create_customer(driver):
    result, _, _ = driver.execute_query("""
        MERGE (c:Customer {id: rand()})
        RETURN c.id AS id
    """, database_ = "neo4j")
    return result[0]["id"]


def transfer_to_other_bank(driver, customer_id, other_bank_id, amount):
    with driver.session(database="neo4j") as session:
        with session.begin_transaction() as tx:
            if not customer_balance_check(tx, customer_id, amount):
                # give up
                return

            other_bank_transfer_api(customer_id, other_bank_id, amount)
            # Now the money has been transferred => can't rollback anymore
            # (cannot rollback external services interactions)

            try:
                decrease_customer_balance(tx, customer_id, amount)
                tx.commit()
            except Exception as e:
                request_inspection(customer_id, other_bank_id, amount, e)
                raise  # roll back


def customer_balance_check(tx, customer_id, amount):
    query = ("""
        MATCH (c:Customer {id: $id})
        RETURN c.balance >= $amount AS sufficient
    """)
    result = tx.run(query, id=customer_id, amount=amount)
    record = result.single(strict=True)
    return record["sufficient"]


def other_bank_transfer_api(customer_id, other_bank_id, amount):
    # make some API call to other bank
    pass


def decrease_customer_balance(tx, customer_id, amount):
    query = ("""
        MATCH (c:Customer {id: $id})
        SET c.balance = c.balance - $amount
    """)
    result = tx.run(query, id=customer_id, amount=amount)
    result.consume()


def request_inspection(customer_id, other_bank_id, amount, e):
    # manual cleanup required; log this or similar
    print("WARNING: transaction rolled back due to exception:", repr(e))
    print("customer_id:", customer_id, "other_bank_id:", other_bank_id,
          "amount:", amount)


if __name__ == "__main__":
    main()

In [None]:
from neo4j import GraphDatabase, Bookmarks


URI = "<URI for Neo4j database>"
AUTH = ("<Username>", "<Password>")

def main():
    with GraphDatabase.driver(URI, auth=AUTH) as driver:
        driver.verify_connectivity()
        create_some_friends(driver)


def create_some_friends(driver):
    saved_bookmarks = Bookmarks()  # To collect the sessions' bookmarks

    # Create the first person and employment relationship
    with driver.session(database="neo4j") as session_a:
        session_a.execute_write(create_person, "Alice")
        session_a.execute_write(employ, "Alice", "Wayne Enterprises")
        saved_bookmarks += session_a.last_bookmarks()

    # Create the second person and employment relationship
    with driver.session(database="neo4j") as session_b:
        session_b.execute_write(create_person, "Bob")
        session_b.execute_write(employ, "Bob", "LexCorp")
        saved_bookmarks += session_b.last_bookmarks()

    # Create a friendship between the two people created above
    with driver.session(
        database="neo4j", bookmarks=saved_bookmarks
    ) as session_c:
        session_c.execute_write(create_friendship, "Alice", "Bob")
        session_c.execute_read(print_friendships)


# Create a person node
def create_person(tx, name):
    tx.run("MERGE (:Person {name: $name})", name=name)


# Create an employment relationship to a pre-existing company node
# This relies on the person first having been created.
def employ(tx, person_name, company_name):
    tx.run("""
        MATCH (person:Person {name: $person_name})
        MATCH (company:Company {name: $company_name})
        CREATE (person)-[:WORKS_FOR]->(company)
        """, person_name=person_name, company_name=company_name
    )


# Create a friendship between two people
def create_friendship(tx, name_a, name_b):
    tx.run("""
        MATCH (a:Person {name: $name_a})
        MATCH (b:Person {name: $name_b})
        MERGE (a)-[:KNOWS]->(b)
        """, name_a=name_a, name_b=name_b
    )


# Retrieve and display all friendships
def print_friendships(tx):
    result = tx.run("MATCH (a)-[:KNOWS]->(b) RETURN a.name, b.name")
    for record in result:
        print("{} knows {}".format(record["a.name"], record["b.name"]))


if __name__ == "__main__":
    main()

### asynchronouse execution

In [None]:
import asyncio
from neo4j import AsyncGraphDatabase


URI = "<URI for Neo4j database>"
AUTH = ("<Username>", "<Password>")


async def main():
    async with AsyncGraphDatabase.driver(URI, auth=AUTH) as driver:
        records, summary, keys = await driver.execute_query(
            "MATCH (a:Person) RETURN a.name AS name",
            database_="neo4j"
        )
        names = [record["name"] for record in records]
        print(names)


if __name__ == "__main__":
    asyncio.run(main())

Implicit runs session.run() useful in executions that might timeout

In [214]:
with driver.session(database="neo4j") as session:
    result = session.run("""
        LOAD CSV FROM 'https://data.neo4j.com/bands/artists.csv' AS line
        CALL {
            WITH line
            MERGE (:Artist {name: line[1], age: toInteger(line[2])})
        } IN TRANSACTIONS OF 2 ROWS
    """)
    print(result.consume().counters)

  with driver.session(database="neo4j") as session:


{'_contains_updates': True, 'labels_added': 4, 'nodes_created': 4, 'properties_set': 8}


`property keys, so MATCH (n) WHERE n.$param = 'something' is invalid;`

`relationship types, so MATCH (n)-[:$param]→(m) is invalid;`

`labels, so MATCH (n:$param) is invalid.`

In [None]:
# escape labels when you have to cocnatenate in the query to prevent cypher injection
label = "Person\\u0060n"
# convert \u0060 to literal backtick and then escape backticks
escaped_label = label.replace("\\u0060", "`").replace("`", "``")

driver.execute_query(
    f"MATCH (p:`{escaped_label}` {{name: $name}}) RETURN p.name",
    name="Alice",
    database_="neo4j"
)

In [None]:
property_key = "name"
label = "Person"

driver.execute_query(
    "CALL apoc.merge.node($labels, $properties)",
    labels=[label], properties={property_key: "Alice"},
    database_="neo4j"
)

Lazy loading and eager loading

In [216]:
import neo4j
from time import sleep, time
import tracemalloc



URI = "bolt://localhost:7687"
AUTH = ("neo4j", "password")

# Returns 250 records, each with properties
# - `output` (an expensive computation, to slow down retrieval)
# - `dummyData` (a list of 10000 ints, about 8 KB).
slow_query = '''
UNWIND range(1, 250) AS s
RETURN reduce(s=s, x in range(1,1000000) | s + sin(toFloat(x))+cos(toFloat(x))) AS output,
       range(1, 10000) AS dummyData
'''
# Delay for each processed record
sleep_time = 0.5


def main():
    with neo4j.GraphDatabase.driver(URI, auth=AUTH) as driver:
        driver.verify_connectivity()

        start_time = time()
        log('LAZY LOADING (execute_read)')
        tracemalloc.start()
        lazy_loading(driver)
        log(f'Peak memory usage: {tracemalloc.get_traced_memory()[1]} bytes')
        tracemalloc.stop()
        log('--- %s seconds ---' % (time() - start_time))

        start_time = time()
        log('EAGER LOADING (execute_query)')
        tracemalloc.start()
        eager_loading(driver)
        log(f'Peak memory usage: {tracemalloc.get_traced_memory()[1]} bytes')
        tracemalloc.stop()
        log('--- %s seconds ---' % (time() - start_time))


def lazy_loading(driver):

    def process_records(tx):
        log('Submit query')
        result = tx.run(slow_query)

        for record in result:
            log(f'Processing record {int(record.get("output"))}')
            sleep(sleep_time)  # proxy for some expensive operation

    with driver.session(database='neo4j') as session:
        processed_result = session.execute_read(process_records)


def eager_loading(driver):
    log('Submit query')
    records, _, _ = driver.execute_query(slow_query, database_='neo4j')

    for record in records:
        log(f'Processing record {int(record.get("output"))}')
        sleep(sleep_time)  # proxy for some expensive operation


def log(msg):
    print(f'[{round(time(), 2)}] {msg}')


if __name__ == '__main__':
    main()

[1734946846.32] LAZY LOADING (execute_read)
[1734946846.33] Submit query
[1734946846.76] Processing record 0
[1734946847.32] Processing record 1
[1734946847.9] Processing record 2
[1734946848.46] Processing record 3
[1734946849.01] Processing record 4
[1734946849.57] Processing record 5
[1734946850.13] Processing record 6
[1734946850.7] Processing record 7
[1734946851.26] Processing record 8
[1734946851.82] Processing record 9
[1734946852.38] Processing record 10
[1734946852.93] Processing record 11
[1734946853.49] Processing record 12
[1734946854.05] Processing record 13
[1734946854.6] Processing record 14
[1734946855.16] Processing record 15
[1734946855.72] Processing record 16
[1734946856.27] Processing record 17
[1734946856.82] Processing record 18
[1734946857.37] Processing record 19
[1734946857.92] Processing record 20
[1734946858.47] Processing record 21
[1734946859.02] Processing record 22
[1734946859.56] Processing record 23
[1734946860.11] Processing record 24
[1734946860.66]

In [None]:
# good practing routing to read node
driver.execute_query("MATCH (p:Person) RETURN p", routing_="r")
session.execute_read(lambda tx: tx.run("MATCH (p:Person) RETURN p"))

# bad practice 
driver.execute_query("MATCH (p:Person) RETURN p")
# defaults to routing = writers
session.execute_write(lambda tx: tx.run("MATCH (p:Person) RETURN p"))
# don't ask to write on a read-only operation

In [None]:
# Create an index on Person.name
driver.execute_query("CREATE INDEX person_name FOR (n:Person) ON (n.name)")

profile queries to find areas to improve

In [218]:
_, summary, _ = driver.execute_query("PROFILE MATCH (p:PLAYER {name: $name}) RETURN p", name="Ezra")
print(summary.profile['args']['string-representation'])

Cypher 5

Planner COST

Runtime PIPELINED

Runtime version 5.24

Batch size 128

+------------------+----+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+
| Operator         | Id | Details        | Estimated Rows | Rows | DB Hits | Memory (Bytes) | Page Cache Hits/Misses | Time (ms) | Pipeline            |
+------------------+----+----------------+----------------+------+---------+----------------+------------------------+-----------+---------------------+
| +ProduceResults  |  0 | p              |              1 |    2 |       6 |              0 |                        |           |                     |
| |                +----+----------------+----------------+------+---------+----------------+                        |           |                     |
| +Filter          |  1 | p.name = $name |              1 |    2 |       8 |                |                        |           |                     |
|

  _, summary, _ = driver.execute_query("PROFILE MATCH (p:PLAYER {name: $name}) RETURN p", name="Ezra")


BATCH DATA creatioong using WITH AND UNWIND clauses

In [None]:
numbers = [{"value": random()} for _ in range(10000)]
driver.execute_query("""
    WITH $numbers AS batch
    UNWIND batch AS node
    MERGE (n:Number)
    SET n.value = node.value
    """, numbers=numbers,
)

# bad pracitice
for _ in range(10000):
    driver.execute_query("MERGE (:Number {value: $value})", value=random())