In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, month, rank, max as spark_max

### create spark session in local


In [17]:
spark = SparkSession.builder \
    .appName("LocalSalesAnalysis") \
    .master("local[*]") \
    .getOrCreate()

# Vérifier la connexion
print("Spark Session Créée :", spark)

Spark Session Créée : <pyspark.sql.session.SparkSession object at 0x000001DA6D6CCCE0>


### Read csv File



In [18]:
df = spark.read.csv("./data/sales_data.csv", header=True, inferSchema=True)
df.show(5)

+--------------+---------+----------+----------+------+-----------+
|transaction_id|client_id|product_id|      date|amount|   category|
+--------------+---------+----------+----------+------+-----------+
|             1|      101|       201|2023-01-15|   150|Electronics|
|             2|      102|       202|2023-01-18|   200|       Home|
|             3|      103|       203|2023-01-20|    50|       Toys|
|             4|      104|       204|2023-02-05|   120|Electronics|
|             5|      105|       205|2023-02-08|   300|    Fashion|
+--------------+---------+----------+----------+------+-----------+
only showing top 5 rows



In [19]:
df.printSchema()
print("Nombre de lignes:", df.count())

root
 |-- transaction_id: integer (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: integer (nullable = true)
 |-- category: string (nullable = true)

Nombre de lignes: 27


### Filter the transactions with amount > 100

In [20]:
from pyspark.sql.functions import col

df_filtered = df.filter(col("amount") > 100)
df_filtered.show(5)

+--------------+---------+----------+----------+------+-----------+
|transaction_id|client_id|product_id|      date|amount|   category|
+--------------+---------+----------+----------+------+-----------+
|             1|      101|       201|2023-01-15|   150|Electronics|
|             2|      102|       202|2023-01-18|   200|       Home|
|             4|      104|       204|2023-02-05|   120|Electronics|
|             5|      105|       205|2023-02-08|   300|    Fashion|
|             7|      107|       207|2023-03-10|   220|Electronics|
+--------------+---------+----------+----------+------+-----------+
only showing top 5 rows



In [21]:
df = df.fillna({"amount": 0.0, "category": "Default"})
df.show(5)

+--------------+---------+----------+----------+------+-----------+
|transaction_id|client_id|product_id|      date|amount|   category|
+--------------+---------+----------+----------+------+-----------+
|             1|      101|       201|2023-01-15|   150|Electronics|
|             2|      102|       202|2023-01-18|   200|       Home|
|             3|      103|       203|2023-01-20|    50|       Toys|
|             4|      104|       204|2023-02-05|   120|Electronics|
|             5|      105|       205|2023-02-08|   300|    Fashion|
+--------------+---------+----------+----------+------+-----------+
only showing top 5 rows



### Convert the date column to date format for temp analysis

In [22]:
from pyspark.sql.types import DateType

df = df.withColumn("date", col("date").cast(DateType()))

In [23]:
# Calculer le montant total des ventes par catégorie de produit
from pyspark.sql.functions import sum

sales_by_category = df.groupBy("category").agg(sum("amount").alias("total_sales"))
sales_by_category.show()

+-----------+-----------+
|   category|total_sales|
+-----------+-----------+
|    Default|          0|
|       Home|        795|
|    Fashion|       1030|
|Electronics|       1150|
|       Toys|        295|
+-----------+-----------+



### Calculate the total amount of sales per month

In [24]:
sales_by_month = df.withColumn("month", month(col("date"))) \
                   .groupBy("month") \
                   .agg(sum("amount").alias("monthly_sales"))
sales_by_month.show()

+-----+-------------+
|month|monthly_sales|
+-----+-------------+
|    1|          400|
|    6|          230|
|    3|          400|
|    5|          340|
|    9|          260|
|    4|          390|
|    8|          110|
|    7|          350|
|   10|          155|
|   11|          130|
|    2|          505|
+-----+-------------+



### Identify the top 5 best-selling products in terms of total amount

In [25]:
top_5_products = df.groupBy("product_id") \
                   .agg(sum("amount").alias("total_sales")) \
                   .orderBy(col("total_sales").desc()) \
                   .limit(5)
top_5_products.show()

+----------+-----------+
|product_id|total_sales|
+----------+-----------+
|       205|        300|
|       212|        250|
|       207|        220|
|       217|        210|
|       211|        200|
+----------+-----------+

