In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import datetime

# Start spark session
startTime = datetime.datetime.now()

spark = SparkSession \
    .builder\
    .appName("Anime importer") \
    .master("spark://spark-master:7077") \
    .config("spark.driver.memory", "15g") \
    .config("spark.jars", "/extra_jars/neo4j-connector-apache-spark_2.12-4.0.1_for_spark_3.jar") \
    .config("neo4j.url", "bolt://neo4j:7687")\
    .config("neo4j.authentication.type", "basic")\
    .config("neo4j.authentication.basic.username", "neo4j")\
    .config("neo4j.authentication.basic.password", "password")\
    .getOrCreate()

print(datetime.datetime.now()-startTime)

0:00:03.617098


In [2]:
# read users
startTime = datetime.datetime.now()

data_file = '/import/anime/users_cleaned.csv' 

users = spark.read.csv(data_file, header=True, sep=",", inferSchema=True).cache()
print('Csv users = {}'.format(users.count()))
users.printSchema()
users.show(1, vertical=True)

print(datetime.datetime.now()-startTime)

# write users to neo4j
startTime = datetime.datetime.now()

users.write.format("org.neo4j.spark.DataSource") \
    .option("node.keys", "username")\
    .option("schema.optimization.type", "INDEX")\
    .mode("Overwrite")\
    .option("labels", ":User") \
    .save()

print(datetime.datetime.now()-startTime)

Csv users = 108712
root
 |-- username: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_watching: integer (nullable = true)
 |-- user_completed: integer (nullable = true)
 |-- user_onhold: integer (nullable = true)
 |-- user_dropped: integer (nullable = true)
 |-- user_plantowatch: integer (nullable = true)
 |-- user_days_spent_watching: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- location: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- access_rank: string (nullable = true)
 |-- join_date: string (nullable = true)
 |-- last_online: string (nullable = true)
 |-- stats_mean_score: string (nullable = true)
 |-- stats_rewatched: string (nullable = true)
 |-- stats_episodes: double (nullable = true)

-RECORD 0---------------------------------------
 username                 | karthiga            
 user_id                  | 2255153             
 user_watching            | 3                   
 user_completed         

In [3]:
# read animes
startTime = datetime.datetime.now()

data_file = '/import/anime/anime_cleaned.csv' 

animes = spark.read.csv(data_file, header=True, sep=",", inferSchema=True).cache() 
print('Total Records = {}'.format(animes.count()))
animes.printSchema()
animes.show(1, vertical=True)

print(datetime.datetime.now()-startTime)

# write animes to neo4j
startTime = datetime.datetime.now()

animes.write.format("org.neo4j.spark.DataSource") \
    .option("node.keys", "anime_id")\
    .option("schema.optimization.type", "INDEX")\
    .mode("Overwrite")\
    .option("labels", ":Anime") \
    .save()

print(datetime.datetime.now()-startTime)

Total Records = 6668
root
 |-- anime_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- title_english: string (nullable = true)
 |-- title_japanese: string (nullable = true)
 |-- title_synonyms: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- type: string (nullable = true)
 |-- source: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- status: string (nullable = true)
 |-- airing: string (nullable = true)
 |-- aired_string: string (nullable = true)
 |-- aired: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- score: string (nullable = true)
 |-- scored_by: string (nullable = true)
 |-- rank: double (nullable = true)
 |-- popularity: double (nullable = true)
 |-- members: double (nullable = true)
 |-- favorites: integer (nullable = true)
 |-- background: string (nullable = true)
 |-- premiered: string (nullable = true)
 |-- broadcast: string (nullable = true)
 |-- rel

In [4]:
# get schema
sample_file = '/import/anime/animelists_cleaned_sample.csv' 

sample = (spark.read.csv(sample_file, 
                         header=True, 
                         sep=",", 
                         ignoreLeadingWhiteSpace=True, 
                         ignoreTrailingWhiteSpace=True, 
                         dateFormat="yyyy-MM-dd", 
                         timestampFormat="yyyy-MM-dd HH:mm:ss",
                         inferSchema=True
                         ).cache())
sample.printSchema()

# read relationships
startTime = datetime.datetime.now()

data_file = '/import/anime/animelists_cleaned.csv' 

relationships = (spark.read.csv(data_file, 
                               header=True, 
                               sep=",", 
                               ignoreLeadingWhiteSpace=True, 
                               ignoreTrailingWhiteSpace=True, 
                               dateFormat="yyyy-MM-dd",
                               timestampFormat="yyyy-MM-dd HH:mm:ss",
                               schema=sample.schema, 
                               mode="DROPMALFORMED"
                               ).cache())
relationships.show(2)

print('Records read = {}'.format(relationships.count()))
print(datetime.datetime.now()-startTime)

previousDbSize=(spark.read.format("org.neo4j.spark.DataSource")
 .option("url", "bolt://neo4j:7687")
 .option("query", "MATCH (n) RETURN n")
 .load().count())

root
 |-- username: string (nullable = true)
 |-- anime_id: integer (nullable = true)
 |-- my_watched_episodes: integer (nullable = true)
 |-- my_start_date: string (nullable = true)
 |-- my_finish_date: string (nullable = true)
 |-- my_score: integer (nullable = true)
 |-- my_status: integer (nullable = true)
 |-- my_rewatching: double (nullable = true)
 |-- my_rewatching_ep: integer (nullable = true)
 |-- my_last_updated: timestamp (nullable = true)
 |-- my_tags: string (nullable = true)

+--------+--------+-------------------+-------------+--------------+--------+---------+-------------+----------------+-------------------+-------+
|username|anime_id|my_watched_episodes|my_start_date|my_finish_date|my_score|my_status|my_rewatching|my_rewatching_ep|    my_last_updated|my_tags|
+--------+--------+-------------------+-------------+--------------+--------+---------+-------------+----------------+-------------------+-------+
|karthiga|      21|                586|   0000-00-00|    0000-0

In [11]:
# repartition
startTime = datetime.datetime.now()

# https://neo4j.com/developer/spark/faq/#_my_writes_are_failing_due_to_deadlock_exceptions
partitioned = relationships.repartition(1, "username", "anime_id") 
print(partitioned.rdd.getNumPartitions())
partitioned.show()


print('Repartitioning took {}'.format(datetime.datetime.now()-startTime))

# write relationships to neo4j
startTime = datetime.datetime.now()

(partitioned.write.format("org.neo4j.spark.DataSource") 
    .mode("Overwrite")
#     .option("batch.size", 40000) # bigger size is less transactional overhead == faster, but watch for out of memory errors
    .option("relationship.properties", "my_watched_episodes,my_start_date,my_finish_date,my_score,my_status,my_rewatching,my_rewatching_ep,my_last_updated,my_tags")
    .option("relationship", "WATCHED") 
    .option("relationship.save.strategy", "keys") 
    .option("relationship.source.save.mode", "Match")
    .option("relationship.source.labels", ":User")
    .option("relationship.source.node.keys", "username")
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.labels", ":Anime")
    .option("relationship.target.node.keys", "anime_id")
    .save())

currentDbSize=(spark.read.format("org.neo4j.spark.DataSource")
 .option("url", "bolt://neo4j:7687")
 .option("query", "MATCH (n) RETURN n")
 .load().count())

insertCount = previousDbSize-currentDbSize
print('Inserted Records = {}'.format(insertCount))
print(datetime.datetime.now()-startTime)

1
+--------+--------+-------------------+-------------+--------------+--------+---------+-------------+----------------+-------------------+-------+
|username|anime_id|my_watched_episodes|my_start_date|my_finish_date|my_score|my_status|my_rewatching|my_rewatching_ep|    my_last_updated|my_tags|
+--------+--------+-------------------+-------------+--------------+--------+---------+-------------+----------------+-------------------+-------+
|karthiga|      21|                586|   0000-00-00|    0000-00-00|       9|        1|         null|               0|2013-03-03 10:52:53|   null|
|karthiga|      59|                 26|   0000-00-00|    0000-00-00|       7|        2|         null|               0|2013-03-10 13:54:51|   null|
|karthiga|      74|                 26|   0000-00-00|    0000-00-00|       7|        2|         null|               0|2013-04-27 16:43:35|   null|
|karthiga|     120|                 26|   0000-00-00|    0000-00-00|       7|        2|         null|               