In [8]:
import os
os.environ['PYARROW_IGNORE_TIMEZONE'] = '1'
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col,min,max,mean
import time
from pyspark.sql.functions import col, when, count,sum,split
import matplotlib.pyplot as plt
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType, TimestampType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, count, sum, min, max, mean,countDistinct
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import year, month, dayofmonth, hour, datediff, lit
from pyspark.sql import SparkSession
from pyspark.sql.functions import unix_timestamp, max as spark_max, sum as spark_sum, count as spark_count, col, udf
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
import datetime
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline

In [2]:

schema = StructType([
    StructField("event_time", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("category_id", LongType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("user_session", StringType(), True)
])


In [3]:


spark = SparkSession.builder \
    .appName("msprv3") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "5g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.maxResultSize", "2g") \
    .getOrCreate()

24/06/20 19:54:43 WARN Utils: Your hostname, ghani-IdeaPad-3-14ADA05 resolves to a loopback address: 127.0.1.1; using 10.188.25.103 instead (on interface wlp2s0)
24/06/20 19:54:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/20 19:54:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/06/20 19:54:45 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 55136)
Traceback (most recent call last):
  File "/usr/lib/python3.12/socketserver.py", line 318, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.12/socketserver.py", line 349, in process_request
    self.finish_requ

In [5]:
octobre_df = spark.read.csv("/home/ghani/Bureau/Data/2019-Oct.csv", header=True, schema=schema)
novembre_df = spark.read.csv("/home/ghani/Bureau/Data/2019-Nov.csv", header=True, schema=schema)
decembre_df = spark.read.csv("/home/ghani/Bureau/Data/2019-Dec.csv", header=True, schema=schema)
janvier_df = spark.read.csv("/home/ghani/Bureau/Data/2020-Jan.csv", header=True, schema=schema)
fevrier_df = spark.read.csv("/home/ghani/Bureau/Data/2020-Feb.csv", header=True, schema=schema)
mars_df = spark.read.csv("/home/ghani/Bureau/Data/2020-Mar.csv", header=True, schema=schema)
avril_df = spark.read.csv("/home/ghani/Bureau/Data/2020-Apr.csv", header=True, schema=schema)



In [6]:
combined_df = (octobre_df
               .union(novembre_df)
               .union(decembre_df)
               .union(janvier_df)
               .union(fevrier_df)
               .union(mars_df)
               .union(avril_df))


In [7]:
combined_df = combined_df.drop('brand', 'category_id', 'product_id', 'user_session')

In [10]:
from pyspark.sql import DataFrame


def extract_primary_category(df: DataFrame, column_name: str) -> DataFrame:
    
    return df.withColumn(
        "primary_category",
        when(col(column_name).isNull(), "unknown")
        .otherwise(split(col(column_name), "\\.").getItem(0))  
    )




In [12]:
combined_df = extract_primary_category(combined_df, "category_code")
combined_df.show()

+-------------------+----------+--------------------+-------+---------+----------------+
|         event_time|event_type|       category_code|  price|  user_id|primary_category|
+-------------------+----------+--------------------+-------+---------+----------------+
|2019-10-01 02:00:00|      view|                NULL|  35.79|541312140|         unknown|
|2019-10-01 02:00:00|      view|appliances.enviro...|   33.2|554748717|      appliances|
|2019-10-01 02:00:01|      view|furniture.living_...|  543.1|519107250|       furniture|
|2019-10-01 02:00:01|      view|  computers.notebook| 251.74|550050854|       computers|
|2019-10-01 02:00:04|      view|electronics.smart...|1081.98|535871217|     electronics|
|2019-10-01 02:00:05|      view|   computers.desktop| 908.62|512742880|       computers|
|2019-10-01 02:00:08|      view|                NULL| 380.96|555447699|         unknown|
|2019-10-01 02:00:08|      view|                NULL|  41.16|550978835|         unknown|
|2019-10-01 02:00:10|

In [13]:
combined_df.select(countDistinct('primary_category')).collect()[0][0]

                                                                                

14

In [22]:
combined_df.select('primary_category').distinct().show()



+----------------+
|primary_category|
+----------------+
|        medicine|
|       computers|
|            auto|
|         unknown|
|      stationery|
|           sport|
|         apparel|
|      appliances|
|    country_yard|
|       furniture|
|     accessories|
|            kids|
|     electronics|
|    construction|
+----------------+



                                                                                

In [14]:
combined_df = combined_df.drop('category_code')

In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max as spark_max, sum as spark_sum, count as spark_count, unix_timestamp
from pyspark.sql.types import IntegerType
import datetime
from pyspark.sql.functions import udf




data = combined_df.withColumn("event_time", unix_timestamp(col("event_time"), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))


last_date = data.agg(spark_max("event_time")).collect()[0][0] + datetime.timedelta(days=1)

recency_udf = udf(lambda x: (last_date - x).days, IntegerType())


intermediate_df = data.withColumn(
    "view", when(col("event_type") == "view", 1).otherwise(0)
).withColumn(
    "purchase_value", when(col("event_type") == "purchase", col("price")).otherwise(0)
)

# Définir la liste des catégories pour le pivot
categories = [
    "medicine", "computers", "auto", "unknown", "stationery",
    "sport", "apparel", "appliances", "country_yard", "furniture",
    "accessories", "kids", "electronics", "construction"
]

# Pivotement des données pour créer une colonne pour chaque catégorie et type de mesure
pivot_df = intermediate_df.groupBy("user_id").pivot(
    "primary_category", categories
).agg(
    count("view").alias("views"),
    sum("purchase_value").alias("total_purchase_value")
)

# Joindre le DataFrame pivoté avec la récence globale
# Calculer la récence globale en utilisant l'UDF et le dernier événement pour chaque utilisateur
global_recency = data.groupBy("user_id").agg(
    recency_udf(spark_max("event_time")).alias("global_recency")
)

final_df = pivot_df.join(global_recency, "user_id")

# Affichage du DataFrame résultant pour vérifier
final_df.show()


24/06/20 21:46:55 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 36:>                                                         (0 + 1) / 1]

+---------+--------------+-----------------------------+---------------+------------------------------+----------+-------------------------+-------------+----------------------------+----------------+-------------------------------+-----------+--------------------------+-------------+----------------------------+----------------+-------------------------------+------------------+---------------------------------+---------------+------------------------------+-----------------+--------------------------------+----------+-------------------------+-----------------+--------------------------------+------------------+---------------------------------+--------------+
|  user_id|medicine_views|medicine_total_purchase_value|computers_views|computers_total_purchase_value|auto_views|auto_total_purchase_value|unknown_views|unknown_total_purchase_value|stationery_views|stationery_total_purchase_value|sport_views|sport_total_purchase_value|apparel_views|apparel_total_purchase_value|appliances_view

                                                                                

In [24]:
for column in final_df.columns:
    if column != "global_recency":
        final_df = final_df.fillna({column: 0})


In [28]:
final_df.show()

ConnectionRefusedError: [Errno 111] Connection refused

In [27]:
final_df.write.parquet('/home/ghani/Bureau/Data/final_df.parquet')

ERROR:root:Exception while sending command.                        (8 + 6) / 14]
Traceback (most recent call last):
  File "/home/ghani/.local/lib/python3.12/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ghani/.local/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ghani/.local/lib/python3.12/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


Py4JError: An error occurred while calling o488.parquet