In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, concat
from pyspark.sql.window import Window
from neo4j import GraphDatabase

In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [3]:
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://192.168.15.205:5432/db_daun_gatal") \
    .option("dbtable", "bukalapak_crawl") \
    .option("user", "postgres") \
    .option("password", "toor") \
    .option("driver", "org.postgresql.Driver") \
    .load()



In [4]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- price: long (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rating: integer (nullable = true)
 |-- url: string (nullable = true)
 |-- specs_brand: string (nullable = true)
 |-- stats_interest: integer (nullable = true)
 |-- stats_sold: integer (nullable = true)
 |-- stats_view: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- store_city: string (nullable = true)
 |-- store_province: string (nullable = true)
 |-- name: string (nullable = true)
 |-- active: boolean (nullable = true)
 |-- url_src: string (nullable = true)



In [5]:
df.select("id","name","condition","category","subcategory","store_province","store_city").show(5)

+-------+--------------------+---------+---------------+-----------------+--------------------+-------------+
|     id|                name|condition|       category|      subcategory|      store_province|   store_city|
+-------+--------------------+---------+---------------+-----------------+--------------------+-------------+
|1x2or6d|voucer depok fant...|     Baru|Tiket & Voucher| Travel & Hiburan|          Jawa Barat|        Depok|
| g2504l|BIOAQUA CHAMOMILE...|     Baru|Tiket & Voucher| Travel & Hiburan|          Jawa Barat| Kab. Bandung|
| t6a6du|Topi Army - Topi ...|     Baru|Tiket & Voucher| Travel & Hiburan|          Jawa Barat|      Bandung|
|1p24v8e|keris putut pamor...|    Bekas|Tiket & Voucher| Travel & Hiburan|Daerah Istimewa Y...|       Bantul|
|29ogehw|    vocher indomaret|     Baru|Tiket & Voucher|Makanan & Minuman|         DKI Jakarta|Jakarta Barat|
+-------+--------------------+---------+---------------+-----------------+--------------------+-------------+
only showi

In [6]:
data = {}
for column in ["condition","category","subcategory","store_province","store_city"]:
    w = Window().orderBy(column)
    data[column]=df.select(column).distinct().withColumn("id_"+column,row_number().over(w))
    #data[column].write.csv(column+".csv")


In [7]:
data_main = df.select("id","name","condition","category","subcategory","store_province","store_city")

In [34]:
data_main.join(data["condition"], data_main.condition ==data["condition"].condition) \
    .select("id","name","id_condition","category","subcategory","store_province","store_city") \
    .join(data["category"], data_main.category ==data["category"].category) \
    .select("id","name","id_condition","id_category","subcategory","store_province","store_city") \
    .join(data["subcategory"], data_main.subcategory ==data["subcategory"].subcategory) \
    .select("id","name","id_condition","id_category","id_subcategory","store_province","store_city") \
    .join(data["store_province"], data_main.store_province ==data["store_province"].store_province) \
    .select("id","name","id_condition","id_category","id_subcategory","id_store_province","store_city") \
    .join(data["store_city"], data_main.store_city ==data["store_city"].store_city) \
    .select("id","name","id_condition","id_category","id_subcategory","id_store_province","id_store_city") \
    .show(3)

+-------+--------------------+------------+-----------+--------------+-----------------+-------------+
|     id|                name|id_condition|id_category|id_subcategory|id_store_province|id_store_city|
+-------+--------------------+------------+-----------+--------------+-----------------+-------------+
|23ed11o|mesin hitung uang...|           1|         20|           482|                4|           88|
| 7jrgfd|Spirulina Pasific...|           2|         16|           327|                4|           88|
|1o261ht|madu kunyit teripang|           1|         16|           327|                4|           88|
+-------+--------------------+------------+-----------+--------------+-----------------+-------------+
only showing top 3 rows



In [73]:
uri = "neo4j://192.168.10.73:7687"
driver = GraphDatabase.driver(uri, auth=("neo4j", "IntelInside333"))

def create_all_nodes(uri=uri,driver=driver):
    def create_uniq_node(tx):
        query = (
                "CREATE CONSTRAINT ON (condition:Condition) ASSERT condition.id IS UNIQUE"
                "CREATE CONSTRAINT ON (category:Category) ASSERT category.id IS UNIQUE"
                "CREATE CONSTRAINT ON (subcategory:Subcategory) ASSERT subcategory.id IS UNIQUE"
                "CREATE CONSTRAINT ON (store_province:StoreProvince) ASSERT store_province.id IS UNIQUE"
                "CREATE CONSTRAINT ON (store_city:StoreCity) ASSERT store_city.id IS UNIQUE"

            )
        tx.run(query)

    def create_condition(tx, status, id):
        tx.run("CREATE (:Condition {status: $status, id:$id})", status=status, id=id)

    def create_category(tx, nama, id):
        tx.run("CREATE (:Category {nama: $nama, id:$id})", nama=nama, id=id)

    def create_subcategory(tx, nama, id):
        tx.run("CREATE (:Subcategory {nama: $nama, id:$id})", nama=nama, id=id)

    def create_store_province(tx, nama, id):
        tx.run("CREATE (:StoreProvince {nama: $nama, id:$id})", nama=nama, id=id)

    def create_store_city(tx, nama, id):
        tx.run("CREATE (:StoreCity {nama: $nama, id:$id})", nama=nama, id=id)

    def create_barang(tx, nama, id):
        tx.run("CREATE (:Barang {nama: $nama, id:$id})", nama=nama, id=id)

    with driver.session() as session:
        for x in data["condition"].collect():
            session.write_transaction(create_condition,x.condition,x.id_condition)
        for x in data["category"].collect():
            session.write_transaction(create_category,x.category,x.id_category)
        for x in data["subcategory"].collect():
            session.write_transaction(create_subcategory,x.subcategory,x.id_subcategory)
        for x in data["store_province"].collect():
            session.write_transaction(create_store_province,x.store_province,x.id_store_province)
        for x in data["store_city"].collect():
            session.write_transaction(create_store_city,x.store_city,x.id_store_city)

    driver.close()

In [36]:
relation_data = data_main.join(data["condition"], data_main.condition ==data["condition"].condition) \
    .select("id","name","id_condition","category","subcategory","store_province","store_city") \
    .join(data["category"], data_main.category ==data["category"].category) \
    .select("id","name","id_condition","id_category","subcategory","store_province","store_city") \
    .join(data["subcategory"], data_main.subcategory ==data["subcategory"].subcategory) \
    .select("id","name","id_condition","id_category","id_subcategory","store_province","store_city") \
    .join(data["store_province"], data_main.store_province ==data["store_province"].store_province) \
    .select("id","name","id_condition","id_category","id_subcategory","id_store_province","store_city") \
    .join(data["store_city"], data_main.store_city ==data["store_city"].store_city) \
    .select("id","name","id_condition","id_category","id_subcategory","id_store_province","id_store_city")

In [78]:
data_test = relation_data.collect()

In [79]:
def create_all_relations(uri=uri,driver=driver,data=data_test):
    def run_query(tx,id_condition,id_category,id_subcategory,id_store_province,id_store_city,name,id_Barang):
        query = (
                    "MATCH (condition:Condition {id: $id_condition}) "
                    "MATCH (category:Category {id: $id_category}) "
                    "MATCH (subcategory:Subcategory {id: $id_subcategory}) "
                    "MATCH (store_province:StoreProvince {id: $id_store_province}) "
                    "MATCH (store_city:StoreCity {id: $id_store_city}) "
                    "CREATE (barang:Barang {name: $name ,id: $id_Barang}) "
                    "CREATE (barang)-[:has_condition]->(condition) "
                    "CREATE (barang)-[:has_category]->(category) "
                    "CREATE (barang)-[:has_subcategory]->(subcategory) "
                    "CREATE (barang)-[:has_store_province]->(store_province) "
                    "CREATE (barang)-[:has_store_city]->(store_city) "


                )
        tx.run(query,id_condition=id_condition,id_category=id_category,id_subcategory=id_subcategory,
               id_store_province=id_store_province,id_store_city=id_store_city,name=name,id_Barang=id_Barang)
    with driver.session() as session:
        for x in data:
            session.write_transaction(run_query,x.id_condition,x.id_category,x.id_subcategory,x.id_store_province,x.id_store_city,x.name,x.id)
        
    driver.close()

In [80]:
create_all_nodes()
create_all_relations()