# Pyspark *INSIGHTS* Project

### Description

#### in this project we will use this dataset
#### 1- Retailrocket recommender system dataset
        -The dataset consists of three files: a file with behaviour data (events.csv), a file with item properties (item_properties.сsv) and a file, which describes category tree (category_tree.сsv). The data has been collected from a real-world ecommerce website. It is raw data, i.e. without any content transformations, however, all values are hashed due to confidential issues. The purpose of publishing is to motivate researches in the field of recommender systems with implicit feedback.

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("Pyspark insights") \
    .config("spark.driver.extraClassPath", './postgresql-42.3.9.jar') \
    .config("spark.executor.extraClassPath", "./postgresql-42.3.9.jar") \
    .getOrCreate()

25/03/28 16:02:07 WARN Utils: Your hostname, filo resolves to a loopback address: 127.0.1.1; using 192.168.1.11 instead (on interface wlp0s20f3)
25/03/28 16:02:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/28 16:02:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
events_df = spark.read.csv(path='./data/events.csv', header=True, inferSchema=True, samplingRatio=0.01)

                                                                                

In [4]:
events_df.printSchema()

root
 |-- timestamp: long (nullable = true)
 |-- visitorid: integer (nullable = true)
 |-- event: string (nullable = true)
 |-- itemid: integer (nullable = true)
 |-- transactionid: integer (nullable = true)



In [5]:
events_df.show(n=10, truncate=False)

+-------------+---------+-----+------+-------------+
|timestamp    |visitorid|event|itemid|transactionid|
+-------------+---------+-----+------+-------------+
|1433221332117|257597   |view |355908|NULL         |
|1433224214164|992329   |view |248676|NULL         |
|1433221999827|111016   |view |318965|NULL         |
|1433221955914|483717   |view |253185|NULL         |
|1433221337106|951259   |view |367447|NULL         |
|1433224086234|972639   |view |22556 |NULL         |
|1433221923240|810725   |view |443030|NULL         |
|1433223291897|794181   |view |439202|NULL         |
|1433220899221|824915   |view |428805|NULL         |
|1433221204592|339335   |view |82389 |NULL         |
+-------------+---------+-----+------+-------------+
only showing top 10 rows



In [6]:
category_tree_df = spark.read.csv(path='./data/category_tree.csv', header=True, inferSchema=True, samplingRatio=0.01)

In [7]:
category_tree_df.printSchema()

root
 |-- categoryid: integer (nullable = true)
 |-- parentid: integer (nullable = true)



In [8]:
category_tree_df.show(n=10, truncate=False)

+----------+--------+
|categoryid|parentid|
+----------+--------+
|1016      |213     |
|809       |169     |
|570       |9       |
|1691      |885     |
|536       |1691    |
|231       |NULL    |
|542       |378     |
|1146      |542     |
|1140      |542     |
|1479      |1537    |
+----------+--------+
only showing top 10 rows



In [9]:
item_properties_df1 = spark.read.csv("./data/item_properties_part1.csv", header=True, inferSchema=True, samplingRatio=0.01)
item_properties_df2 = spark.read.csv("./data/item_properties_part2.csv", header=True, inferSchema=True, samplingRatio=0.01)

item_properties_df = item_properties_df1.unionByName(item_properties_df2)

                                                                                

In [10]:
item_properties_df.printSchema()

root
 |-- timestamp: long (nullable = true)
 |-- itemid: integer (nullable = true)
 |-- property: string (nullable = true)
 |-- value: string (nullable = true)



In [11]:
item_properties_df.show(n=10, truncate=False)

+-------------+------+----------+-------------------------------+
|timestamp    |itemid|property  |value                          |
+-------------+------+----------+-------------------------------+
|1435460400000|460429|categoryid|1338                           |
|1441508400000|206783|888       |1116713 960601 n277.200        |
|1439089200000|395014|400       |n552.000 639502 n720.000 424566|
|1431226800000|59481 |790       |n15360.000                     |
|1431831600000|156781|917       |828513                         |
|1436065200000|285026|available |0                              |
|1434250800000|89534 |213       |1121373                        |
|1431831600000|264312|6         |319724                         |
|1433646000000|229370|202       |1330310                        |
|1434250800000|98113 |451       |1141052 n48.000                |
+-------------+------+----------+-------------------------------+
only showing top 10 rows



### lets do some cleaning operations and preparartion 

In [12]:
from pyspark.sql.functions import from_unixtime, col

# convert from millisconds to timestamp
events_df = events_df.withColumn("timestamp", from_unixtime(col("timestamp") / 1000).cast("timestamp"))
item_properties_df = item_properties_df.withColumn("timestamp", from_unixtime(col("timestamp") / 1000).cast("timestamp"))


In [13]:
events_df = events_df.fillna({"transactionid": -1})

events_df = events_df.dropna(subset=["itemid"])
item_properties_df = item_properties_df.dropna(subset=["itemid"])
category_tree_df = category_tree_df.dropna(subset=["categoryid"])


In [14]:
df_joined = events_df.join(item_properties_df, on="itemid", how="left")


In [15]:
df_joined = df_joined.join(category_tree_df, df_joined["value"] == category_tree_df["categoryid"], how="left") \
    .drop("value")  # Remove the old column as it has been replaced by categoryid from category_tree_df


In [16]:
df_joined = df_joined.dropDuplicates()


In [17]:
df_joined = df_joined.fillna({
    "parentid": -1,        
    "property": "unknown"
})

In [18]:
df_joined.printSchema()

root
 |-- itemid: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- visitorid: integer (nullable = true)
 |-- event: string (nullable = true)
 |-- transactionid: integer (nullable = false)
 |-- timestamp: timestamp (nullable = true)
 |-- property: string (nullable = false)
 |-- categoryid: integer (nullable = true)
 |-- parentid: integer (nullable = false)



In [19]:
df_joined = df_joined.drop("timestamp")

In [20]:
df_joined.printSchema()

root
 |-- itemid: integer (nullable = true)
 |-- visitorid: integer (nullable = true)
 |-- event: string (nullable = true)
 |-- transactionid: integer (nullable = false)
 |-- property: string (nullable = false)
 |-- categoryid: integer (nullable = true)
 |-- parentid: integer (nullable = false)



In [None]:
df_joined_partition = df_joined.repartition(10)  

In [None]:
df_joined_partition.write \
  .format("jdbc") \
  .option("url", "jdbc:postgresql://127.0.0.1:5432/pyspark_insights") \
  .option("dbtable", "ecommerce_events") \
  .option("user", "postgres") \
  .option("password", "P@ssw0rd") \
  .option("batchsize", "10000") \
  .mode("overwrite") \
  .save()