In [0]:
%%capture
%pip install neo4j

In [0]:
from neo4j import GraphDatabase

Neo4jConnectionInfo = {
    "URI": "neo4j+s://62d77af8.databases.neo4j.io", 
    "Username": dbutils.secrets.get(scope="neo4j", key="username"), 
    "Password": dbutils.secrets.get(scope="neo4j", key="password")
}

LoadSettings = {
    "csvDir": "file:/Workspace/Users/eric.monk@neo4j.com/movie_csvs/",
    "writeMode": "Overwrite",
    "partitionColName": "partitionCode"
}

driver = GraphDatabase.driver(Neo4jConnectionInfo["URI"], auth=(Neo4jConnectionInfo["Username"], Neo4jConnectionInfo["Password"]))

In [0]:
# helper function
def run(driver, query, params=None):
    with driver.session() as session:
        if params is not None:
            return [r for r in session.run(query, params)]
        else:
            return [r for r in session.run(query)]

run(driver,'CREATE CONSTRAINT movie_userId_unique IF NOT EXISTS FOR (user:MovieUser) REQUIRE user.userId  IS UNIQUE')

run(driver,'CREATE CONSTRAINT movieId_unique IF NOT EXISTS FOR (movie:Movie) REQUIRE movie.movieImdbId IS UNIQUE')


[]

In [0]:
def writeTableToNeo (tableName, cypherQuery):
    df = spark.read.table(tableName)
    result = (df.write
        .format("org.neo4j.spark.DataSource")
        .option("url", Neo4jConnectionInfo["URI"])
        .option("authentication.basic.username", Neo4jConnectionInfo["Username"])
        .option("authentication.basic.password", Neo4jConnectionInfo["Password"])
        .option("query", cypherQuery)
        .mode(LoadSettings["writeMode"])
        .save())
    
def writeTableToNeoSinglePartition (tableName, cypherQuery): 
    df = spark.read.table(tableName).repartition(1)
    result = (df.write
        .format("org.neo4j.spark.DataSource")
        .option("url", Neo4jConnectionInfo["URI"])
        .option("authentication.basic.username", Neo4jConnectionInfo["Username"])
        .option("authentication.basic.password", Neo4jConnectionInfo["Password"])
        .option("query", cypherQuery)
        .mode(LoadSettings["writeMode"])
        .save())    

def writeDfToNeo (df, cypherQuery):
    result = (df.write
        .format("org.neo4j.spark.DataSource")
        .option("url", Neo4jConnectionInfo["URI"])
        .option("authentication.basic.username", Neo4jConnectionInfo["Username"])
        .option("authentication.basic.password", Neo4jConnectionInfo["Password"])
        .option("batch.size", 25000)
        .option("query", cypherQuery)
        .mode(LoadSettings["writeMode"])
        .save())
    
# tableSize = 10 means a 10x10 table
def getPartitionsAndBatches (tableSize):
    batches = []
    for i in range(tableSize):
        partitions = []
        for j in range(tableSize):
            k = (i+j)%tableSize
            partitions.append(str(k) + '-' + str(j))
        batches.append(partitions)
    return batches

batches = getPartitionsAndBatches(10)    

def filter_by_partition_code(df, partitionColName, partitionSet):
    return df.filter(col(partitionColName).isin(partitionSet))

def writeRelTableToNeo(tableName, startNodeCol, endNodeCol, cypherQuery):
    df = spark.read.table(tableName)
    partitionColName = LoadSettings['partitionColName']
    newDf = df.withColumn(partitionColName, concat_ws('-', substring(df[startNodeCol], -1, 1), substring(df[endNodeCol], -1, 1)))
    # Create separate DataFrames for each batch
    dataframes = {}
    for index, partitionSet in enumerate(batches):
        dataframes[index] = filter_by_partition_code(newDf, partitionColName, partitionSet)
        parallelRelDf = dataframes[index].repartition(10, partitionColName)
        print('Num partitions (' + str(index) + '): ' + str(parallelRelDf.rdd.getNumPartitions()))

        writeDfToNeo(parallelRelDf, cypherQuery)

def saveCsvToTable (csvName, tableName):
    csvPath = LoadSettings["csvDir"] + csvName
    
    df = (spark.read.format('csv')
        .options(header='true', inferSchema='true')
        .load(csvPath))
    
    df.write.saveAsTable(tableName)        

In [0]:
saveCsvToTable("users-same-5.csv", "movie_users_same_5")
saveCsvToTable("users.csv", "movie_users")
saveCsvToTable("movies.csv", "movies")
saveCsvToTable("users-rated-same-movie-5.csv", "movie_user_to_user_rel")
saveCsvToTable("user-rated-movie-5.csv", "movie_user_to_movie_rel")

In [0]:
writeTableToNeo("movie_users", "MERGE (user:MovieUser {userId: event.userId})")
writeTableToNeo("movie_users_same_5", "MERGE (user:MovieUser {userId: event.userId})")
writeTableToNeo("movies", "MERGE (movie:Movie {movieImdbId: event.movieImdbId})")


In [0]:
# writeTableToNeoSinglePartition("movie_user_to_movie_rel", """
#         MATCH (user:MovieUser {userId: event.userId})
#         MATCH (movie:Movie {movieImdbId: event.movieImdbId})
#         MERGE (user)-[:RATED_5]->(movie)
#     """)

writeTableToNeoSinglePartition("movie_user_to_user_rel", """
        MATCH (user1:MovieUser {userId: event.userId1})
        MATCH (user2:MovieUser {userId: event.userId2})
        MERGE (user1)-[:GAVE_SAME_RATING]->(user2) 
    """)


In [0]:
import math
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import pmod, col, count, substring, concat_ws, udf

# tableSize = 10 means a 10x10 table
def getPartitionsAndBatches (tableSize):
    batches = []
    for i in range(tableSize):
        partitions = []
        for j in range(tableSize):
            k = (i+j)%tableSize
            partitions.append(str(k) + '-' + str(j))
        batches.append(partitions)
    return batches

batches = getPartitionsAndBatches(10)

get_partition_code_1 = udf(lambda x, y: str(math.floor((x or 0) / 1)) + '-' + str(math.floor((y or 0) / 1)), StringType())
get_partition_code_10 = udf(lambda x, y: str(math.floor((x or 0) / 10)) + '-' + str(math.floor((y or 0) / 10)), StringType())
get_partition_code_100 = udf(lambda x, y: str(math.floor((x or 0) / 100)) + '-' + str(math.floor((y or 0) / 100)), StringType())

# Function to filter DataFrame by last digit of userId
def filter_by_partition_code(df, colName, partitionSet):
    # print(colName, partitionSet)
    return df.filter(col(partitionColName).isin(partitionSet))

partitionColName = 'partitionCode'
df = spark.read.table('movie_user_to_movie_rel')

# https://stackoverflow.com/questions/45512884/spark-dataframe-column-with-last-character-of-other-column

newDf1 = df.withColumn(partitionColName, \
                      get_partition_code_1(substring(df.userId, -1, 1).cast("int"), \
                                     substring(df.movieImdbId, -1, 1).cast("int")) \
                      ) 
# newDf.show()

newDf = newDf1

def writeDfToNeo (df, cypherQuery):
    result = (df.write
        .format("org.neo4j.spark.DataSource")
        .option("url", Neo4jConnectionInfo["URI"])
        .option("authentication.basic.username", Neo4jConnectionInfo["Username"])
        .option("authentication.basic.password", Neo4jConnectionInfo["Password"])
        .option("batch.size", 25000)
        .option("query", cypherQuery)
        .mode(LoadSettings["writeMode"])
        .save())

# Create separate DataFrames for each batch
dataframes = {}
for index, partitionSet in enumerate(batches):
    print(partitionSet)
    dataframes[index] = filter_by_partition_code(newDf, partitionColName, partitionSet)
    parallelRelDf = dataframes[index].repartition(10, partitionColName)
    print('Num partitions (' + str(index) + '): ' + str(parallelRelDf.rdd.getNumPartitions()))

    writeDfToNeo(parallelRelDf, """
        MATCH (user:MovieUser {userId: event.userId})
        MATCH (movie:Movie {movieImdbId: event.movieImdbId})
        MERGE (user)-[:RATED_5]->(movie)
    """)


['0-0', '1-1', '2-2', '3-3', '4-4', '5-5', '6-6', '7-7', '8-8', '9-9']
Num partitions (0): 10
['1-0', '2-1', '3-2', '4-3', '5-4', '6-5', '7-6', '8-7', '9-8', '0-9']
Num partitions (1): 10
['2-0', '3-1', '4-2', '5-3', '6-4', '7-5', '8-6', '9-7', '0-8', '1-9']
Num partitions (2): 10
['3-0', '4-1', '5-2', '6-3', '7-4', '8-5', '9-6', '0-7', '1-8', '2-9']
Num partitions (3): 10
['4-0', '5-1', '6-2', '7-3', '8-4', '9-5', '0-6', '1-7', '2-8', '3-9']
Num partitions (4): 10
['5-0', '6-1', '7-2', '8-3', '9-4', '0-5', '1-6', '2-7', '3-8', '4-9']
Num partitions (5): 10
['6-0', '7-1', '8-2', '9-3', '0-4', '1-5', '2-6', '3-7', '4-8', '5-9']
Num partitions (6): 10
['7-0', '8-1', '9-2', '0-3', '1-4', '2-5', '3-6', '4-7', '5-8', '6-9']
Num partitions (7): 10
['8-0', '9-1', '0-2', '1-3', '2-4', '3-5', '4-6', '5-7', '6-8', '7-9']
Num partitions (8): 10
['9-0', '0-1', '1-2', '2-3', '3-4', '4-5', '5-6', '6-7', '7-8', '8-9']
Num partitions (9): 10


In [0]:
import math
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import pmod, col, count, substring, concat_ws, udf
from concurrent.futures import ThreadPoolExecutor
from queue import Queue

# tableSize = 10 means a 10x10 table
def getPartitionsAndBatches (tableSize):
    batches = []
    for i in range(tableSize):
        partitions = []
        for j in range(tableSize):
            k = (i+j)%tableSize
            partitions.append(str(k) + '-' + str(j))
        batches.append(partitions)
    return batches

batches = getPartitionsAndBatches(10)

get_partition_code_1 = udf(lambda x, y: str(math.floor((x or 0) / 1)) + '-' + str(math.floor((y or 0) / 1)), StringType())
get_partition_code_10 = udf(lambda x, y: str(math.floor((x or 0) / 10)) + '-' + str(math.floor((y or 0) / 10)), StringType())
get_partition_code_100 = udf(lambda x, y: str(math.floor((x or 0) / 100)) + '-' + str(math.floor((y or 0) / 100)), StringType())

# Function to filter DataFrame by last digit of userId
def filter_by_partition_code(df, colName, partitionSet):
    # print(colName, partitionSet)
    return df.filter(col(partitionColName).isin(partitionSet))

partitionColName = 'partitionCode'
df = spark.read.table('movie_user_to_movie_rel')

# https://stackoverflow.com/questions/45512884/spark-dataframe-column-with-last-character-of-other-column
newDf1 = df.withColumn(partitionColName, \
                      get_partition_code_1(substring(df.userId, -1, 1).cast("int"), \
                                     substring(df.movieImdbId, -1, 1).cast("int")) \
                      ) 

# newDf.show()

newDf = newDf1

def filterDf(queue, newDf, partitionColName, partitionSet):
    print(partitionSet)
    partitionedDf = filter_by_partition_code(newDf, partitionColName, partitionSet)
    parallelRelDf = partitionedDf.repartition(10, partitionColName)
    print('Num partitions (' + str(index) + '): ' + str(parallelRelDf.rdd.getNumPartitions()))

    # push data into the queue
    queue.put(parallelRelDf)

if __name__ == '__main__':
    # define the shared queue
    queue = Queue()
    # create the thread pool
    n_tasks = 10
    with ThreadPoolExecutor() as tpe:
        # issue tasks to the thread pool
        for index, partitionSet in enumerate(batches):
            tpe.submit(filterDf, queue, newDf, partitionColName, partitionSet)
        
        # consume results from the queue
        for i in range(n_tasks):
            # get parallelRelDf from the queue
            parallelRelDf = queue.get()

            # row_count = parallelRelDf.count()

            # print(f'The DataFrame has {row_count} rows.')
            print(f'Processing parallel data frame {i}')

            writeDfToNeo(parallelRelDf, """
                MATCH (user:MovieUser {userId: event.userId})
                MATCH (movie:Movie {movieImdbId: event.movieImdbId})
                MERGE (user)-[:RATED_5]->(movie)
            """)




['0-0', '1-1', '2-2', '3-3', '4-4', '5-5', '6-6', '7-7', '8-8', '9-9']
['1-0', '2-1', '3-2', '4-3', '5-4', '6-5', '7-6', '8-7', '9-8', '0-9']
['2-0', '3-1', '4-2', '5-3', '6-4', '7-5', '8-6', '9-7', '0-8', '1-9']
['3-0', '4-1', '5-2', '6-3', '7-4', '8-5', '9-6', '0-7', '1-8', '2-9']
['4-0', '5-1', '6-2', '7-3', '8-4', '9-5', '0-6', '1-7', '2-8', '3-9']
['5-0', '6-1', '7-2', '8-3', '9-4', '0-5', '1-6', '2-7', '3-8', '4-9']
['6-0', '7-1', '8-2', '9-3', '0-4', '1-5', '2-6', '3-7', '4-8', '5-9']
['7-0', '8-1', '9-2', '0-3', '1-4', '2-5', '3-6', '4-7', '5-8', '6-9']
['8-0', '9-1', '0-2', '1-3', '2-4', '3-5', '4-6', '5-7', '6-8', '7-9']
['9-0', '0-1', '1-2', '2-3', '3-4', '4-5', '5-6', '6-7', '7-8', '8-9']
Num partitions (9): 10
Processing parallel data frame 0
Num partitions (9): 10
Num partitions (9): 10
Num partitions (9): 10
Num partitions (9): 10
Num partitions (9): 10
Num partitions (9): 10
Num partitions (9): 10
Num partitions (9): 10
Num partitions (9): 10
Processing parallel data fr

In [0]:
import math
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import pmod, col, count, substring, concat_ws, udf

sameSameBatches = [["0-1","1-0","2-3","3-2","4-5","5-4","6-7","7-6","8-9","9-8"],["0-2","2-0","1-3","3-1","4-7","7-4","5-8","8-5","6-9","9-6"],["0-3","3-0","1-2","2-1","4-9","9-4","5-7","7-5","6-8","8-6"],["0-5","5-0","1-6","6-1","2-4","4-2","3-9","9-3","7-8","8-7"],["0-4","4-0","1-5","5-1","2-6","6-2","3-8","8-3","7-9","9-7"],["0-6","6-0","1-8","8-1","2-7","7-2","3-4","4-3","5-9","9-5"],["0-9","9-0","1-4","4-1","2-8","8-2","3-7","7-3","5-6","6-5"],["0-7","7-0","1-9","9-1","2-5","5-2","3-6","6-3","4-8","8-4"],["0-8","8-0","1-7","7-1","2-9","9-2","3-5","5-3","4-6","6-4"],["0-0","1-1","2-2","3-3","4-4","5-5","6-6","7-7","8-8","9-9"]]

batches = sameSameBatches

get_partition_code_1 = udf(lambda x, y: str(math.floor((x or 0) / 1)) + '-' + str(math.floor((y or 0) / 1)), StringType())
get_partition_code_10 = udf(lambda x, y: str(math.floor((x or 0) / 10)) + '-' + str(math.floor((y or 0) / 10)), StringType())
get_partition_code_100 = udf(lambda x, y: str(math.floor((x or 0) / 100)) + '-' + str(math.floor((y or 0) / 100)), StringType())

# Function to filter DataFrame by last digit of userId
def filter_by_partition_code(df, colName, partitionSet):
    # print(colName, partitionSet)
    return df.filter(col(partitionColName).isin(partitionSet))

partitionColName = 'partitionCode'
df = spark.read.table('movie_user_to_user_rel')

newDf1 = df.withColumn(partitionColName, \
                      get_partition_code_1(substring(df.userId1, -1, 1).cast("int"), \
                                     substring(df.userId2, -1, 1).cast("int")) \
                      ) 
# newDf.show()
newDf = newDf1

# Create separate DataFrames for each batch
dataframes = {}
for index, partitionSet in enumerate(batches):
    print(partitionSet)
    dataframes[index] = filter_by_partition_code(newDf, partitionColName, partitionSet)
    parallelRelDf = dataframes[index].repartition(10, partitionColName)
    print('Num partitions (' + str(index) + '): ' + str(parallelRelDf.rdd.getNumPartitions()))

    writeDfToNeo(parallelRelDf, """
        MATCH (user1:MovieUser {userId: event.userId1})
        MATCH (user2:MovieUser {userId: event.userId2})
        MERGE (user1)-[:GAVE_SAME_RATING]->(user2)
    """)
