In [38]:
from pyspark.sql import SparkSession

## Check_connection

In [40]:
# Test connection
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Neo4jPySparkApp") \
    .config("spark.jars", "/home/son/Downloads/neo4j-connector-apache-spark_2.12-5.3.0_for_spark_3.jar") \
    .getOrCreate()

# Define Neo4j connection details
uri = "bolt://44.203.229.212:7687"
username = "neo4j"
password = "thickness-transfer-fiber"

# Check if SparkSession is created
if spark:
    print("SparkSession created successfully")

# Check if Neo4j connection is established
try:
    df = spark.read.format("org.neo4j.spark.DataSource") \
        .option("url", uri) \
        .option("authentication.basic.username", username) \
        .option("authentication.basic.password", password) \
        .option("query", "CREATE (n:Person {name: $name, age: $age})") \
        .load()
    print("Neo4j connection established successfully")
except Exception as e:
    print(f"Error connecting to Neo4j: {str(e)}")

# Check if data can be read from Neo4j
try:
    df = spark.read.format("org.neo4j.spark.DataSource") \
        .option("url", uri) \
        .option("authentication.basic.username", username) \
        .option("authentication.basic.password", password) \
        .option("query", "MATCH (n) RETURN n") \
        .load()
    df.show()
    print("Data read from Neo4j successfully")
except Exception as e:
    print(f"Error reading data from Neo4j: {str(e)}")

# Stop SparkSession
spark.stop()


SparkSession created successfully
Neo4j connection established successfully


[Stage 0:>                                                          (0 + 1) / 1]

+---+
|  n|
+---+
+---+

Data read from Neo4j successfully


                                                                                

## Set up Spark and Connect with neo4j by neo4j-connector

In [4]:
# Define Neo4j connection details
uri = "bolt://44.203.229.212:7687"
username = "neo4j"
password = "thickness-transfer-fiber"

In [5]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Neo4jPySparkApp") \
    .config("spark.jars", "/home/son/Downloads/neo4j-connector-apache-spark_2.12-5.3.0_for_spark_3.jar") \
    .getOrCreate()



24/05/30 17:29:28 WARN Utils: Your hostname, thaisonatk resolves to a loopback address: 127.0.1.1; using 192.168.247.128 instead (on interface ens33)
24/05/30 17:29:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/05/30 17:29:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Coding funtion read and write to neo4j database by Pyspark

In [52]:
# Function to execute a Cypher query via Spark
def execute_write_cypher_query(query):
    try :
        df = spark.createDataFrame([("dummy",)], ["col"])  # Create a dummy DataFrame
        df.write.format("org.neo4j.spark.DataSource") \
            .option("url", uri) \
            .option("authentication.basic.username", username) \
            .option("authentication.basic.password", password) \
            .option("query", query) \
            .mode("overwrite") \
            .save()
        df.show()
    except Exception as e:
        print(f"Error executing query: {e}")

In [66]:
def execute_read_cypher_query(query):
    try:
        result_df = spark.read.format("org.neo4j.spark.DataSource") \
            .option("url", uri) \
            .option("authentication.basic.username", username) \
            .option("authentication.basic.password", password) \
            .option("query", query) \
            .load()
        result_df.limit(10).show()
    except Exception as e:
        print(f"Error executing query: {e}")

## Initialize data with csv file from our github

In [45]:
# Because neo4j connector with spark does not support CONSTRAINT query so I run it on server
query = """
CREATE CONSTRAINT FOR (u:User) REQUIRE u.id IS UNIQUE
CREATE CONSTRAINT FOR (t:Team) REQUIRE t.id IS UNIQUE
CREATE CONSTRAINT FOR (c:TeamChatSession) REQUIRE c.id IS UNIQUE
CREATE CONSTRAINT FOR (i:ChatItem) REQUIRE i.id IS UNIQUE"""

In [9]:
# Load data from chat_create_team_chat.csv

chat_create_team_chat_queries = """LOAD CSV FROM "https://raw.githubusercontent.com/NinhDT22022522/Neo4j-Graph-Analytics-of-Catch-the-Pink-Flamingo-Chat-Data-Using-Neo4j-/main/data/chat_create_team_chat.csv" AS row
MERGE (u:User {id: toInteger(row[0])})
MERGE (t:Team {id: toInteger(row[1])})
MERGE (c:TeamChatSession {id: toInteger(row[2])})
MERGE (u)-[:CreatesSession {timeStamp: row[3]}]->(c)
MERGE (c)-[:OwnedBy {timeStamp: row[3]}]->(t)"""

execute_write_cypher_query(chat_create_team_chat_queries)

                                                                                

+-----+
|  col|
+-----+
|dummy|
+-----+



In [10]:
# Load data from chat_item_team_chat.csv

chat_item_team_chat_query = """LOAD CSV FROM "https://raw.githubusercontent.com/NinhDT22022522/Neo4j-Graph-Analytics-of-Catch-the-Pink-Flamingo-Chat-Data-Using-Neo4j-/main/data/chat_item_team_chat.csv" AS row
MERGE (u:User {id: toInteger(row[0])})
MERGE (t:TeamChatSession {id: toInteger(row[1])})
MERGE (c:ChatItem {id: toInteger(row[2])})
MERGE (u)-[:CreatesChat {timeStamp: row[3]}]->(c)
MERGE (c)-[:PartOf {timeStamp: row[3]}]->(t)"""

execute_write_cypher_query(chat_item_team_chat_query)

                                                                                

+-----+
|  col|
+-----+
|dummy|
+-----+



In [11]:
# load data from chat_join_team_chat.csv

chat_join_team_chat_query = """LOAD CSV FROM "https://raw.githubusercontent.com/NinhDT22022522/Neo4j-Graph-Analytics-of-Catch-the-Pink-Flamingo-Chat-Data-Using-Neo4j-/main/data/chat_join_team_chat.csv" AS row
MERGE (u:User {id: toInteger(row[0])})
MERGE (c:TeamChatSession {id: toInteger(row[1])})
MERGE (u)-[:Joins {timeStamp: row[2]}]->(c)"""

execute_write_cypher_query(chat_join_team_chat_query)

                                                                                

+-----+
|  col|
+-----+
|dummy|
+-----+



In [12]:
# Load data from chat_leave_team_chat.csv
chat_leave_team_chat_query = """LOAD CSV FROM "https://raw.githubusercontent.com/NinhDT22022522/Neo4j-Graph-Analytics-of-Catch-the-Pink-Flamingo-Chat-Data-Using-Neo4j-/main/data/chat_leave_team_chat.csv" AS row
MERGE (u:User {id: toInteger(row[0])})
MERGE (c:TeamChatSession {id: toInteger(row[1])})
MERGE (u)-[:Leaves {timeStamp: row[2]}]->(c)"""

execute_write_cypher_query(chat_leave_team_chat_query)

                                                                                

+-----+
|  col|
+-----+
|dummy|
+-----+



In [13]:
# Load data from chat_mention_team_chat.csv
chat_mention_team_chat_query = """LOAD CSV FROM "https://raw.githubusercontent.com/NinhDT22022522/Neo4j-Graph-Analytics-of-Catch-the-Pink-Flamingo-Chat-Data-Using-Neo4j-/main/data/chat_mention_team_chat.csv" AS row
MERGE (c:ChatItem {id: toInteger(row[0])})
MERGE (u:User {id: toInteger(row[1])})
MERGE (c)-[:Mentioned {timeStamp: row[2]}]->(u)"""

execute_write_cypher_query(chat_mention_team_chat_query)

                                                                                

+-----+
|  col|
+-----+
|dummy|
+-----+



In [14]:
# Load data from chat_respond_team_chat.csv
chat_respond_team_chat_query = """LOAD CSV FROM "https://raw.githubusercontent.com/NinhDT22022522/Neo4j-Graph-Analytics-of-Catch-the-Pink-Flamingo-Chat-Data-Using-Neo4j-/main/data/chat_respond_team_chat.csv" AS row
MERGE (c1:ChatItem {id: toInteger(row[0])})
MERGE (c2:ChatItem {id: toInteger(row[1])})
MERGE (c1)-[:ResponseTo {timeStamp: row[2]}]->(c2)"""

execute_write_cypher_query(chat_respond_team_chat_query)

                                                                                

+-----+
|  col|
+-----+
|dummy|
+-----+



## Check Establish (45463 nodes and 118502 egdes)

In [58]:
# Check node
check_node_query = "MATCH (n) RETURN COUNT(n) AS NumberOfNode"

execute_read_cypher_query(check_node_query)

[Stage 33:>                                                         (0 + 1) / 1]

+------------+
|NumberOfNode|
+------------+
|       45463|
+------------+



                                                                                

In [19]:
# Check edge
check_edge_query = "MATCH()-[r]->() RETURN COUNT(r) AS numberOfEdges"

execute_read_cypher_query(check_edge_query)

[Stage 20:>                                                         (0 + 1) / 1]

+-------------+
|numberOfEdges|
+-------------+
|       118502|
+-------------+



                                                                                

## Run Query

In [37]:
#Q1-a

query_Q1_a = """MATCH p=(start:ChatItem)-[:ResponseTo*]->(end:ChatItem)
WITH p, length(p) AS length
ORDER BY length DESC
WITH nodes(p) AS longestChain, length AS longestChainLength
LIMIT 1
RETURN longestChain, longestChainLength
"""

execute_read_cypher_query(query=query_Q1_a)

[Stage 22:>                                                         (0 + 1) / 1]

+--------------------+------------------+
|        longestChain|longestChainLength|
+--------------------+------------------+
|[{35255, [ChatIte...|                 9|
+--------------------+------------------+



                                                                                

In [39]:
#Q1-b

query_Q1_b = """MATCH p=(start:ChatItem)-[:ResponseTo*]->(end:ChatItem)
WITH length(p) AS longestChainLength
ORDER BY longestChainLength DESC
LIMIT 1

// Find all paths with this length
MATCH p=(start:ChatItem)-[:ResponseTo*]->(end:ChatItem)
WHERE length(p) = longestChainLength
WITH p

// Extract users involved in these paths
MATCH (u:User)-[:CreatesChat]->(ci:ChatItem)
WHERE ci IN nodes(p)
RETURN count(distinct u) AS numberOfUsers"""

execute_read_cypher_query(query_Q1_b)

[Stage 23:>                                                         (0 + 1) / 1]

+-------------+
|numberOfUsers|
+-------------+
|            5|
+-------------+



                                                                                

In [67]:
#Q2-a

query_Q2_a = """MATCH (u:User)-[:CreatesChat]->(c:ChatItem)
RETURN u.id AS userId, count(c) AS chatCount
ORDER BY chatCount DESC
LIMIT 10
"""

execute_read_cypher_query(query_Q2_a)

[Stage 36:>                                                         (0 + 1) / 1]

+------+---------+
|userId|chatCount|
+------+---------+
|   394|      115|
|  2067|      111|
|  1087|      109|
|   209|      109|
|   554|      107|
|  1627|      105|
|   516|      105|
|   999|      105|
|   668|      104|
|   461|      104|
+------+---------+



                                                                                

In [68]:
#Q2-b

query_Q2_b = """MATCH (ci:ChatItem)-[:PartOf]->(tcs:TeamChatSession)-[:OwnedBy]->(t:Team)
RETURN t.id AS teamId, count(ci) AS chatCount
ORDER BY chatCount DESC
LIMIT10
"""

execute_read_cypher_query(query_Q2_b)

[Stage 37:>                                                         (0 + 1) / 1]

+------+---------+
|teamId|chatCount|
+------+---------+
|    82|     1324|
|   185|     1036|
|   112|      957|
|    18|      844|
|   194|      836|
|   129|      814|
|    52|      788|
|   136|      783|
|   146|      746|
|    81|      736|
+------+---------+



                                                                                

In [46]:
#Q2-c

query_Q2_c = """// Step 1: Identify top 10 chattiest users
WITH [394, 2067, 1087, 209, 554, 1627, 516, 999, 668, 461] AS topChattiestUsers

// Step 2: Identify top 10 chattiest teams
WITH topChattiestUsers, [82, 185, 112, 18, 194, 129, 52, 136, 146, 81] AS topChattiestTeams

// Step 3: Find users who belong to the top chattiest teams
MATCH (u:User)-[:CreatesChat]->(:ChatItem)-[:PartOf]->(:TeamChatSession)-[:OwnedBy]->(t:Team)
WHERE u.id IN topChattiestUsers
AND t.id IN topChattiestTeams
RETURN DISTINCT u.id AS User, t.id AS Team"""

execute_read_cypher_query(query_Q2_c)

[Stage 27:>                                                         (0 + 1) / 1]

+----+----+
|User|Team|
+----+----+
| 999|  52|
+----+----+



                                                                                

In [54]:
#Q3-a

query_Q3_a = """MATCH (u1:User)-[:CreatesChat]->(:ChatItem)-[:Mentioned]->(u2:User)
CREATE (u1)-[:InteractsWith]->(u2)
UNION
MATCH (u1:User)-[:CreatesChat]->(:ChatItem)-[:ResponseTo]->(:ChatItem)<-[:CreatesChat]-(u2:User)
CREATE (u1)-[:InteractsWith]->(u2)
UNION
MATCH (u:User)-[r:InteractsWith]->(u)
DELETE r;
"""
execute_write_cypher_query(query_Q3_a)

                                                                                

+-----+
|  col|
+-----+
|dummy|
+-----+



In [55]:
#Q3-b

query_Q3_b = """MATCH (u:User)-[c:CreatesChat]->()
WITH u, COUNT(c) as Chats
ORDER BY Chats DESC LIMIT 10 WITH [u] as ChattiestUsers
//Getting the neighbours of all Users and the count
MATCH (u1:User)-[:InteractsWith]->(u2:User)
WHERE u1 in ChattiestUsers
WITH u1.id AS UserID, COLLECT(DISTINCT u2.id) AS Neighbours RETURN UserID, Neighbours, SIZE(Neighbours) AS k"""

execute_read_cypher_query(query_Q3_b)

[Stage 31:>                                                         (0 + 1) / 1]

+------+--------------------+---+
|UserID|          Neighbours|  k|
+------+--------------------+---+
|   394|[2011, 1012, 1782...|  4|
|  2067|[209, 63, 516, 12...|  6|
|  1087|[1311, 929, 426, ...|  6|
|   209|[2067, 516, 1672,...|  5|
|   554|[1687, 2018, 1959...|  5|
|  1627|[2067, 209, 516, ...|  6|
|   516|[1627, 63, 2067, ...|  6|
|   999|[1554, 1606, 1839...|  8|
|   668|    [698, 2034, 648]|  3|
|   461|   [1675, 1482, 482]|  3|
+------+--------------------+---+



                                                                                

In [56]:
#Q3-c

query_Q3_c = """// Getting TOP 10 Chattiest Users
MATCH (u:User)-[c:CreatesChat]->()
WITH u, COUNT(c) AS Chats
ORDER BY Chats DESC
LIMIT 10
WITH COLLECT(u) AS ChattiestUsers

// Getting the neighbours of TOP 10 Users and the count
MATCH (u1:User)-[:InteractsWith]->(u2:User)
WHERE u1 IN ChattiestUsers
WITH u1.id AS UserID, COLLECT(DISTINCT u2.id) AS Neighbours, SIZE(COLLECT(DISTINCT u2.id)) AS k

// Find Intersecting Users
MATCH (u1:User)-[:InteractsWith]->(u2:User)
// Such that both belong in Neighbours list
WHERE u1.id IN Neighbours AND u2.id IN Neighbours
// For every valid combination of a User and its two neighbours,
// Value is 1 if neighbours have interacted at least once, k is no. of Neighbours
WITH DISTINCT UserID, u1.id AS N1, u2.id AS N2, CASE WHEN (u1)-[:InteractsWith]->(u2) THEN 1 ELSE 0 END AS VALUE, k
WITH UserID, SUM(VALUE) AS NUM, k, k*(k-1) AS DENUM
RETURN UserID, NUM, DENUM, NUM/(DENUM*1.0) AS ClusteringCoefficient
ORDER BY ClusteringCoefficient DESC"""

execute_read_cypher_query(query_Q3_c)

[Stage 32:>                                                         (0 + 1) / 1]

+------+---+-----+---------------------+
|UserID|NUM|DENUM|ClusteringCoefficient|
+------+---+-----+---------------------+
|   668|  6|    6|                  1.0|
|   209| 20|   20|                  1.0|
|   999| 53|   56|   0.9464285714285714|
|   516| 28|   30|   0.9333333333333333|
|  1627| 28|   30|   0.9333333333333333|
|  2067| 28|   30|   0.9333333333333333|
|   461|  5|    6|   0.8333333333333334|
|   554| 16|   20|                  0.8|
|   394|  9|   12|                 0.75|
|  1087| 22|   30|   0.7333333333333333|
+------+---+-----+---------------------+



                                                                                

## Conclusion  
Running neo4j and Pyspark combined will be faster than running neo4j normally  
It takes me 150 seconds to set up data with neo4j and only 60 seconds if I run the combination