# Data transformation with Spark

Until now we only analyzed a sample (which was already 1+Gb) of one day. In run we'll have to deal with several days of data. As a consequence we can't do the transformation with pandas. To deal with such data, we'll use a Big Data framework called Spark.

In [1]:
import os
import findspark
import matplotlib.pyplot as plt

from pyspark import SparkContext
from pyspark.sql import SQLContext


# Spécifie le chemin où est stocké Spark
os.environ["SPARK_HOME"] = "C:\\Users\\saman\\spark-3.1.2-bin-hadoop3.2"
#os.environ["PYTHONPATH"] = "C:\\Users\\saman\\spark-2.4.3-bin-hadoop2.7\\python;C:\\Users\\saman\\spark-2.4.3-bin-hadoop2.7\\python\\lib\\py4j-0.10.7-src.zip:%PYTHONPATH%"

findspark.init()  # Trouve les exécutables dans le dossier SPARK_HOME
sc = SparkContext(master="local[*]")  # Crée un SparkContext local
sql_c = SQLContext(sc)  # Instancie un SQLContext

In [2]:
d = os.getcwd()

data = sql_c.read.csv(
    d+"\\data\\sample.csv",
    header=True
)
data.printSchema()

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_session: string (nullable = true)



We'll do the same transformation we did in pandas but in spark.

In [3]:
from pyspark.sql import functions as func

data = data \
    .withColumn("event_time", func.col("event_time").cast("timestamp")) \
    .withColumn("product_id", func.col("product_id").cast("int")) \
    .withColumn("price", func.col("price").cast("float")) \
    .withColumn("user_id", func.col("user_id").cast("int"))

data.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: float (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [4]:
data_sessions = data.filter(func.col("user_session").isNotNull())
data_sessions_2 = data_sessions

event_per_session = data_sessions_2 \
    .withColumn("purchase", func.when(func.col("event_type") == "purchase", 1).otherwise(0)) \
    .withColumn("view", func.when(func.col("event_type") == "view", 1).otherwise(0)) \
    .groupBy(["user_session", "product_id"]) \
    .agg(
        func.max("purchase").alias("purchase"),
        func.sum("view").alias("nb_view_product")
    )

In [5]:
print(event_per_session.count())
event_per_session.show()

836767
+--------------------+----------+--------+---------------+
|        user_session|product_id|purchase|nb_view_product|
+--------------------+----------+--------+---------------+
|18ea1924-2c5a-4ac...|  15700204|       0|              1|
|7a1f9af2-07e7-49f...|  26010220|       0|              1|
|b634401f-45aa-43d...|  28101606|       0|              1|
|0e863e87-5e2a-4f9...|  15100371|       0|              2|
|702fc6ea-7275-4b8...|   1306776|       0|              1|
|acf68be7-4fc8-49a...|   1005135|       0|              1|
|b538e155-8163-460...|   9200581|       0|              1|
|00d8be22-7bdc-46d...|  26400264|       0|              1|
|07ba36a1-9af4-44b...|  17300768|       0|              1|
|ce885079-4d92-4fe...|   1004750|       1|              2|
|781f851e-1bf3-4f0...|  26200214|       0|              1|
|104dc3e4-d6ff-4f0...|   1306650|       0|              1|
|92532f93-7d84-413...|  17300351|       0|              1|
|e2c32504-2d40-419...|   1801581|       0|       

In [6]:
view_per_session = data \
    .withColumn("view", func.when(func.col("event_type") == "view", 1).otherwise(0)) \
    .groupBy(["user_session"]) \
    .agg(func.sum("view").alias("nb_view_session"))

event_per_session = event_per_session.join(
    view_per_session,
    on = ["user_session"],
    how = "inner"
)

event_per_session.show()

+--------------------+----------+--------+---------------+---------------+
|        user_session|product_id|purchase|nb_view_product|nb_view_session|
+--------------------+----------+--------+---------------+---------------+
|0043d905-2c15-49e...|   1004665|       0|              1|              2|
|0043d905-2c15-49e...|   8902408|       0|              1|              2|
|0056e55b-b2fe-4b9...|   1004173|       1|              2|              3|
|0056e55b-b2fe-4b9...|   1004038|       0|              1|              3|
|00a1d70d-62aa-427...|  28717980|       0|              1|              2|
|00a1d70d-62aa-427...|  28716931|       0|              1|              2|
|00f8309d-abe7-46e...|   5100816|       0|              1|              5|
|00f8309d-abe7-46e...|  21401211|       0|              1|              5|
|00f8309d-abe7-46e...|  26021018|       0|              1|              5|
|00f8309d-abe7-46e...|   9002822|       0|              1|              5|
|00f8309d-abe7-46e...|   

In [7]:
data_events = data_sessions.join(event_per_session, on = ["user_session", "product_id"], how = "inner")

data_events.select("user_session", "user_id", "product_id", "brand", "purchase", "nb_view_product", "nb_view_session").show()

+--------------------+---------+----------+-------+--------+---------------+---------------+
|        user_session|  user_id|product_id|  brand|purchase|nb_view_product|nb_view_session|
+--------------------+---------+----------+-------+--------+---------------+---------------+
|0009154d-ba7e-442...|544269600|  22200061|   null|       0|              1|             12|
|0024ad93-f41f-470...|513383328|  10301351|bburago|       0|              1|             14|
|0028a4b2-72b7-4a7...|515260165|   4803977|samsung|       0|              1|              1|
|004b1c27-3f71-43a...|519188740|   7004619|lorelli|       0|              1|              3|
|007d6fd2-6c82-49e...|553206247|   5100503| xiaomi|       0|              1|              3|
|00887f2f-3d45-4f8...|550813827|  18000928|samsung|       0|              1|              5|
|00ba64fb-20bf-4fd...|548939215|   1005022|   oppo|       0|              4|             16|
|00ba64fb-20bf-4fd...|548939215|   1005022|   oppo|       0|          

In [8]:
data_events = data_events \
    .withColumn("hour", func.hour("event_time")) \
    .withColumn("minute", func.minute("event_time")) \
    .withColumn("weekday", func.dayofweek("event_time")-2) \
    .withColumn("category", func.split(func.col("category_code"), r"\.").getItem(0)) \
    .withColumn("sub_category", func.split(func.col("category_code"), r"\.").getItem(1)) \

# Attention pour le dayofweek : Spark considère Dimanche à 1, alors que Pandas considère Lundi à 0

data_events = data_events \
    .withColumn("weekday", func.when(func.col("weekday") == -1, 6).otherwise(func.col("weekday")))


data_events.select("event_time", "hour", "minute", "weekday", "category", "sub_category").show()

+-------------------+----+------+-------+-----------+------------+
|         event_time|hour|minute|weekday|   category|sub_category|
+-------------------+----+------+-------+-----------+------------+
|2019-10-01 12:35:26|  12|    35|      1|       null|        null|
|2019-10-01 17:00:01|  17|     0|      1|       null|        null|
|2019-10-01 20:42:51|  20|    42|      1|electronics|       audio|
|2019-10-01 18:51:27|  18|    51|      1|       kids|    carriage|
|2019-10-01 14:48:47|  14|    48|      1|       null|        null|
|2019-10-01 19:59:32|  19|    59|      1|       null|        null|
|2019-10-01 18:41:06|  18|    41|      1|electronics|  smartphone|
|2019-10-01 18:41:50|  18|    41|      1|electronics|  smartphone|
|2019-10-01 18:42:27|  18|    42|      1|electronics|  smartphone|
|2019-10-01 18:45:46|  18|    45|      1|electronics|  smartphone|
|2019-10-01 11:48:13|  11|    48|      1|       null|        null|
|2019-10-01 11:50:57|  11|    50|      1|       null|        n

In [9]:
sessions_duration = data_events \
    .groupBy(["user_session"]) \
    .agg(
        func.min('event_time').alias('amin'),
        func.max('event_time').alias('amax')
    ) \
    .withColumn(
        "duration",
        func.col("amax").cast("int") - func.col("amin").cast("int")
    )
sessions_duration.show()

+--------------------+-------------------+-------------------+--------+
|        user_session|               amin|               amax|duration|
+--------------------+-------------------+-------------------+--------+
|0361d9eb-993c-465...|2019-10-01 05:47:12|2019-10-01 05:52:48|     336|
|08bf8fbd-6234-4f9...|2019-10-01 13:13:07|2019-10-01 13:33:24|    1217|
|18d2bcd8-538e-4a7...|2019-10-01 15:34:02|2019-10-01 15:40:22|     380|
|1af2ed7b-5441-4dc...|2019-10-01 18:07:26|2019-10-01 18:10:00|     154|
|1afe870d-8e38-46b...|2019-10-01 16:38:31|2019-10-01 16:42:32|     241|
|23acc4df-4fb9-4a3...|2019-10-01 04:36:04|2019-10-01 04:36:04|       0|
|2b63f75c-96aa-407...|2019-10-01 18:29:51|2019-10-01 18:32:37|     166|
|2bfca724-500e-43b...|2019-10-01 20:33:12|2019-10-01 20:40:27|     435|
|305af5ae-f35f-4b6...|2019-10-01 17:31:25|2019-10-01 17:45:12|     827|
|3aa1073f-6ea2-4d3...|2019-10-01 10:19:45|2019-10-01 10:19:45|       0|
|443f0871-e125-45c...|2019-10-01 12:24:43|2019-10-01 12:27:23|  

In [10]:
dataset = data_events \
    .sort("event_time") \
    .dropDuplicates(["event_type", "product_id", "user_session", "user_id"]) \
    .filter(func.col("event_type").isin(["cart", "purchase"])) \
    .join(
        sessions_duration,
        on = ["user_session"],
        how = "inner"
    )
    
    
dataset.show()

+--------------------+----------+-------------------+----------+-------------------+--------------------+---------+-------+---------+--------+---------------+---------------+----+------+-------+-----------+------------+-------------------+-------------------+--------+
|        user_session|product_id|         event_time|event_type|        category_id|       category_code|    brand|  price|  user_id|purchase|nb_view_product|nb_view_session|hour|minute|weekday|   category|sub_category|               amin|               amax|duration|
+--------------------+----------+-------------------+----------+-------------------+--------------------+---------+-------+---------+--------+---------------+---------------+----+------+-------+-----------+------------+-------------------+-------------------+--------+
|0056e55b-b2fe-4b9...|   1004173|2019-10-01 13:19:04|  purchase|2053013555631882655|electronics.smart...|   xiaomi| 146.46|551801399|       1|              2|              3|  13|    19|      1

In [11]:
from pyspark.sql.window import Window

count_prev_sessions = dataset \
    .select(["user_id", "user_session", "event_time"]) \
    .withColumn( # Spark ne préserve pas l'ordre après le groupBy
        "event_time", # On affecte pour chaque événement de la session le timestamp du début de session
        func.first("event_time").over(Window.partitionBy(["user_id", "user_session"]).orderBy("event_time"))
    ) \
    .groupBy(["user_id", "user_session"]) \
    .agg(
        func.first("event_time").alias("event_time")
    ) \
    .withColumn(
        "num_prev_sessions",
        func.row_number().over(Window.partitionBy(["user_id"]).orderBy("event_time")) - 1
    )        

dataset = dataset.join(
    count_prev_sessions.select(["user_session", "num_prev_sessions"]),
    on =["user_session"],
    how = "inner"
)

In [12]:
view_prev_sessions = dataset \
    .select(["user_id", "user_session", "event_time", "product_id"]) \
    .withColumn(
        "event_time",
        func.first("event_time").over(Window.partitionBy(["user_id", "user_session", "product_id"]).orderBy("event_time"))
    ) \
    .groupBy(["user_id", "user_session", "product_id"]) \
    .agg(
        func.first("event_time").alias("event_time")
    ) \
    .withColumn(
        "num_prev_product_view",
        func.row_number().over(Window.partitionBy(["user_id", "product_id"]).orderBy("event_time")) - 1
    )        

dataset = dataset.join(
    view_prev_sessions.select(["user_session", "product_id", "num_prev_product_view"]),
    on =["user_session", "product_id"],
    how = "inner"
)

In [14]:
dataset = dataset \
    .sort("event_time") \
    .dropDuplicates(["user_session", "product_id", "purchase"]) \
    .select([c for c in dataset.columns if c not in \
             {"event_time", "event_type", "category_code", "category_id"}
    ])

dataset.select(
    "user_session", "nb_view_product", "nb_view_session",
    "duration", "num_prev_sessions", "num_prev_product_view"
).show()

print(dataset.count())

+--------------------+---------------+---------------+--------+-----------------+---------------------+
|        user_session|nb_view_product|nb_view_session|duration|num_prev_sessions|num_prev_product_view|
+--------------------+---------------+---------------+--------+-----------------+---------------------+
|ce885079-4d92-4fe...|              2|              4|     485|                0|                    0|
|176b4ba0-aac7-440...|              2|             12|     838|                0|                    0|
|89926d13-f182-41a...|              2|              2|      63|                0|                    0|
|2a6df6d5-792c-426...|              1|              1|      57|                0|                    0|
|0182b386-f17e-481...|              6|             30|    1492|                0|                    0|
|91b11bcd-13e8-4e3...|              1|              1|     153|                0|                    0|
|aa039047-fe42-48c...|              1|              1|     130| 

In [15]:
import pandas as pd

dataset_spark = dataset.toPandas()
dataset_spark = dataset_spark.drop(["amin", "amax"], axis=1)

In [75]:
dataset_pandas = pd.read_csv(d+"\\data\\primary.csv")

In [76]:
import numpy as np

dataset_pandas.rename(columns = {"event_hour" : "hour", 
                                 "event_minute" : "minute", 
                                 "event_weekday" : "weekday",
                                 "view_session" : "nb_view_session",
                                 "view_product_session" : "nb_view_product",
                                 "number_of_previous_sess" : "num_prev_sessions",
                                 "number_of_previous_sess_product" : "num_prev_product_view"
                                }, inplace = True
                     )

dataset_pandas_compare = dataset_pandas \
    .set_index(["user_session", "user_id", "product_id"]).sort_index()

dataset_spark_compare = dataset_spark[dataset_pandas.columns] \
    .set_index(["user_session", "user_id", "product_id"]).sort_index()

# Attention au NaN vs None !
dataset_pandas_compare = dataset_pandas_compare.fillna(0)
dataset_spark_compare = dataset_spark_compare.fillna(0)

dataset_pandas_compare["price"] = np.floor(dataset_pandas_compare["price"]).astype(float)
dataset_spark_compare["price"] = np.floor(dataset_spark_compare["price"]).astype(float)

# Puisque le format CSV n'est pas typé, il faut forcer la conversion !
for col in dataset_spark_compare.columns:
    if col not in ["price", "brand", "category", "sub_category"]:
        dataset_pandas_compare[col] = dataset_pandas_compare[col].astype(int)
        dataset_spark_compare[col] = dataset_spark_compare[col].astype(int)

In [77]:
dataset_pandas_compare.equals(dataset_spark_compare)

False

In [1]:
dataset_spark

NameError: name 'dataset_spark' is not defined