### LIBRAIRIES

In [1]:
import findspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as func
from pyspark.sql.window import Window
import argparse
from datetime import datetime
import pandas as pd

### PARAMETRES

In [2]:
DATE_START = datetime.strptime("2019-10-01 00:00:00","%Y-%m-%d %H:%M:%S")
DATE_END =  datetime.strptime("2019-10-01 23:00:00","%Y-%m-%d %H:%M:%S")
DESTINATION = ""

### INITIALISATION DU SPARKCONTEXT

In [3]:
findspark.init()  # Trouve les exécutables dans le dossier SPARK_HOME

sc = SparkContext(master="local[*]")  # Créé un SparkContext local
sql_c = SQLContext(sc)  # Instancie un SQLContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/20 17:47:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### TRAITEMENTS

In [4]:
if DESTINATION=="":

    DESTINATION = "hdfs://localhost:9000/ecom_data/export_{}_{}.csv".format(DATE_START.strftime("%Y%m%d"),DATE_END.strftime("%Y%m%d"))
    
print(DESTINATION)

hdfs://localhost:9000/ecom_data/export_20191001_20191001.csv


#### Check de DATE_START < DATE_END

In [6]:
if DATE_START >= DATE_END:
    raise ValueError("DATE_END doit être plus ancien que DATE_START.")

#### Lecture du fichier csv

In [7]:
data = sql_c.read.option("header", True).csv("data/sample.csv")

In [8]:
data.show()

+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-01 00:00:00|      view|  44600062|2103807459595387724|                null|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-10-01 00:00:00|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|   33.2|554748717|9333dfbd-b87a-470...|
|2019-10-01 00:00:01|      view|  17200506|2053013559792632471|furniture.living_...|    null|  543.1|519107250|566511c2-e2e3-422...|
|2019-10-01 00:00:01|      view|   1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|
|2019-10-01 00:00:04|      view|   1004237|2053013555631882655|electr

#### Conversion des types des variables 

In [9]:
data.printSchema()

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_session: string (nullable = true)



In [10]:
data = data \
    .withColumn("event_time", func.to_timestamp("event_time","yyyy-MM-dd HH:mm:ss")) \
    .withColumn("product_id", func.col("product_id").cast("int")) \
    .withColumn("price", func.col("price").cast("double")) \
    .withColumn("user_id", func.col("user_id").cast("int"))

In [11]:
data.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



#### Agrégation à la maille (session utilisateur x article)

In [12]:
session_product_data = data \
    .groupby("user_session","product_id","user_id") \
    .agg(
        
        # Calculer le prix moyen de cet article pendant la session
        func.mean(func.col("price")).alias("price"),

        # Obtenir la catégorie de cet article (on suppose logiquement que c'est la même durant toute la session)
        func.first("category_code").alias("category_code"),
                   
        # Obtenir la  marque de cet article (on suppose logiquement que c'est la même durant toute la session)
        func.first("brand").alias("brand"),
        
        # Compter le nombre de vues de cet article dans la session
        func.sum(func.when(func.col("event_type") == "view", 1).otherwise(0)).alias("num_views_product"),

        # Regarder si 'purchase' est apparu au moins une fois pour cet article dans les événements de la session
        func.when(func.sum(func.when(func.col("event_type") == "purchase", 1).otherwise(0))>=1,True).otherwise(False).alias("purchased"),
        
    )

24/01/20 17:47:45 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [13]:
session_product_data.show()

[Stage 2:>                                                          (0 + 3) / 3]

+--------------------+----------+---------+------+--------------------+--------+-----------------+---------+
|        user_session|product_id|  user_id| price|       category_code|   brand|num_views_product|purchased|
+--------------------+----------+---------+------+--------------------+--------+-----------------+---------+
|0000c738-9fbb-466...|   2600500|529430576|218.77|                null|  gefest|                1|    false|
|0000c738-9fbb-466...|   2601627|529430576|218.54|                null|    null|                1|    false|
|0000c738-9fbb-466...|   2602106|529430576|221.34|                null|dauscher|                1|    false|
|00016584-60b8-4d6...|   5100570|536959548|460.96|  electronics.clocks|   apple|                1|    false|
|00016bb9-b50d-4bc...|  12300393|523769618|  39.1|construction.tool...|    null|                2|    false|
|0001d713-f9c4-4d9...|   4803977|513196971| 107.8|electronics.audio...| samsung|                1|    false|
|0001d713-f9c4-4d9.

                                                                                

#### Agrégation à la maille (session utilisateur)

In [14]:
session_data = data \
    .groupby("user_session","user_id") \
    .agg(
        
        # Obtenir la date du premier événement de la session
        func.min("event_time").alias("start_time"),

        # Obtenir la date du dernier événement de la session
        func.max("event_time").alias("end_time"),

        # Compter le nombre d'articles différents vus dans la session
        func.countDistinct("product_id").alias("num_views_session")
        
    ) \
    .withColumn("duration", 

                # Calculer la durée de la session
                (func.col("end_time")-func.col("start_time")).cast("int")
                
               )\
    .withColumn("start_hour",
                
                # Heure de début de session
                func.hour("start_time")
                
               )\
    .withColumn("start_minute",
                
                # Minute de début de session
                func.minute("start_time")
                
               )\
    .withColumn("start_dayofweek",
                
                # Jour de début de session
                func.dayofweek("start_time")
                
               )



In [15]:
# Créez une fenêtre temporelle basée sur la colonne event_time
windowSpec = Window.partitionBy("user_id").orderBy("start_time")
session_data = session_data \
    .withColumn("sessions_precedentes",
                # Nombre de sessions precedentes
                func.row_number().over(windowSpec) - 1
               )

In [16]:
session_data.show()

[Stage 10:>                                                         (0 + 3) / 3]

+--------------------+---------+-------------------+-------------------+-----------------+--------+----------+------------+---------------+--------------------+
|        user_session|  user_id|         start_time|           end_time|num_views_session|duration|start_hour|start_minute|start_dayofweek|sessions_precedentes|
+--------------------+---------+-------------------+-------------------+-----------------+--------+----------+------------+---------------+--------------------+
|91769fdf-461b-4e4...|244951053|2019-10-01 08:47:35|2019-10-01 08:48:28|                1|      53|         8|          47|              3|                   0|
|0051531b-c007-442...|292071852|2019-10-01 17:06:51|2019-10-01 17:06:51|                1|       0|        17|           6|              3|                   0|
|d146126f-ce44-48d...|293291933|2019-10-01 18:58:32|2019-10-01 19:03:11|                4|     279|        18|          58|              3|                   0|
|85e3fda6-c15d-488...|293291933|20

                                                                                

#### Jointure tables (session x utilisateur x article) et (session x utilisateur)

In [17]:
session_product_data = session_product_data \
    .join(session_data,on = ["user_id","user_session"])

In [18]:
session_product_data.show()

[Stage 27:>                                                         (0 + 3) / 3]

+---------+--------------------+----------+------+--------------------+-------+-----------------+---------+-------------------+-------------------+-----------------+--------+----------+------------+---------------+--------------------+
|  user_id|        user_session|product_id| price|       category_code|  brand|num_views_product|purchased|         start_time|           end_time|num_views_session|duration|start_hour|start_minute|start_dayofweek|sessions_precedentes|
+---------+--------------------+----------+------+--------------------+-------+-----------------+---------+-------------------+-------------------+-----------------+--------+----------+------------+---------------+--------------------+
|293291933|d146126f-ce44-48d...|   6902377| 36.09|furniture.living_...|bambola|                1|    false|2019-10-01 18:58:32|2019-10-01 19:03:11|                4|     279|        18|          58|              3|                   0|
|293291933|d146126f-ce44-48d...|   7005811|489.05|      

                                                                                

#### Calcul du nombre de fois où l'article a été déjà vu dans des sessions précédentes

In [23]:
windowSpec = Window.partitionBy("user_id","product_id").orderBy("start_time")

In [24]:
session_product_data = session_product_data \
    .withColumn("num_prev_and_current_product_views",
                
                # Nombre de fois où l'article a été vu dans les sessions précédentes (session courante inclue)
                func.sum("num_views_product").over(windowSpec)
               ) \
    .withColumn("num_prev_product_views",

                # Nombre où l'article a été déjà vu dans des sessions précédentes (session courante exclue)
                func.col("num_prev_and_current_product_views") - func.col("num_views_product")
               )

In [25]:
session_product_data.show()

[Stage 110:>                                                        (0 + 3) / 3]

+---------+--------------------+----------+------+--------------------+--------+-----------------+---------+-------------------+-------------------+-----------------+--------+----------+------------+---------------+--------------------+----------------------------------+----------------------+
|  user_id|        user_session|product_id| price|       category_code|   brand|num_views_product|purchased|         start_time|           end_time|num_views_session|duration|start_hour|start_minute|start_dayofweek|sessions_precedentes|num_prev_and_current_product_views|num_prev_product_views|
+---------+--------------------+----------+------+--------------------+--------+-----------------+---------+-------------------+-------------------+-----------------+--------+----------+------------+---------------+--------------------+----------------------------------+----------------------+
|244951053|91769fdf-461b-4e4...|   1003535| 460.5|electronics.smart...| samsung|                2|    false|2019-10

                                                                                