In [13]:
import os
import sys

os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jdk-21"
os.environ["SPARK_HOME"] = "C:\\Users\\makhl\\spark"
os.environ["HADOOP_HOME"] = "C:\\hadoop"

sys.path.append(os.path.join(os.environ["SPARK_HOME"], "python"))
sys.path.append(os.path.join(os.environ["SPARK_HOME"], "python", "lib", "py4j-src.zip"))

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Lire CSV") \
    .master("local[*]") \
    .getOrCreate()

df = spark.read.option("header", "true").csv("C:\\Users\\makhl\\Coffee Shop Sales.csv")

df.show()


+--------------+----------------+----------------+---------------+--------+---------------+----------+----------+------------------+--------------------+--------------------+
|transaction_id|transaction_date|transaction_time|transaction_qty|store_id| store_location|product_id|unit_price|  product_category|        product_type|      product_detail|
+--------------+----------------+----------------+---------------+--------+---------------+----------+----------+------------------+--------------------+--------------------+
|             1|      2023-01-01|        07:06:11|              2|       5|Lower Manhattan|        32|       3.0|            Coffee|Gourmet brewed co...|         Ethiopia Rg|
|             2|      2023-01-01|        07:08:56|              2|       5|Lower Manhattan|        57|       3.1|               Tea|     Brewed Chai tea|Spicy Eye Opener ...|
|             3|      2023-01-01|        07:14:04|              2|       5|Lower Manhattan|        59|       4.5|Drinking Cho

In [17]:
from pyspark.sql.functions import col, weekofyear, sum as _sum, format_number

df = df.withColumn("transaction_qty", col("transaction_qty").cast("int")) \
       .withColumn("unit_price", col("unit_price").cast("double")) \
       .withColumn("transaction_date", col("transaction_date").cast("date"))

df = df.withColumn("week", weekofyear(col("transaction_date"))) \
       .withColumn("revenue", col("transaction_qty") * col("unit_price"))

weekly_sales = df.groupBy("week") \
    .agg(_sum("revenue").alias("weekly_sales")) \
    .orderBy("week")
weekly_sales.select(
    "week",
    format_number("weekly_sales", 3).alias("weekly_sales")
).show()

+----+------------+
|week|weekly_sales|
+----+------------+
|   1|  17,139.330|
|   2|  19,129.530|
|   3|  19,818.510|
|   4|  18,271.630|
|   5|  17,231.590|
|   6|  18,333.350|
|   7|  19,856.620|
|   8|  20,063.070|
|   9|  20,389.750|
|  10|  22,146.340|
|  11|  23,594.590|
|  12|  23,395.300|
|  13|  22,055.550|
|  14|  26,079.890|
|  15|  28,981.300|
|  16|  29,217.510|
|  17|  27,386.630|
|  18|  32,110.100|
|  19|  36,056.880|
|  20|  38,476.550|
+----+------------+
only showing top 20 rows


In [27]:
from pyspark.sql.functions import avg, format_number

df_avg_price = df.groupBy("product_category") \
                 .agg(avg("unit_price").alias("average_price"))

df_avg_price.select("product_category", format_number("average_price", 3).alias("average_price")).show(truncate=False)


+------------------+-------------+
|product_category  |average_price|
+------------------+-------------+
|Bakery            |3.552        |
|Loose Tea         |9.267        |
|Drinking Chocolate|4.149        |
|Tea               |2.817        |
|Branded           |17.720       |
|Coffee            |3.024        |
|Coffee beans      |21.018       |
|Packaged Chocolate|9.051        |
|Flavours          |0.800        |
+------------------+-------------+



In [26]:
from pyspark.sql.functions import col, avg, format_number

df_basket = df.withColumn("basket_value", col("transaction_qty") * col("unit_price")) \
              .agg(avg("basket_value").alias("avg_basket_value"))

df_basket.select(format_number("avg_basket_value", 3).alias("avg_basket_value")).show()


+----------------+
|avg_basket_value|
+----------------+
|           4.686|
+----------------+



In [25]:

from pyspark.sql.functions import format_number

df.withColumn("revenue", col("transaction_qty") * col("unit_price")) \
  .groupBy("store_id", "store_location") \
  .agg(_sum("revenue").alias("total_sales")) \
  .orderBy(col("total_sales").desc()) \
  .limit(5) \
 .select("store_id", "store_location", format_number("total_sales", 3).alias("total_sales")) \
  .show(truncate=False)
 


+--------+---------------+-----------+
|store_id|store_location |total_sales|
+--------+---------------+-----------+
|8       |Hell's Kitchen |236,511.170|
|3       |Astoria        |232,243.910|
|5       |Lower Manhattan|230,057.250|
+--------+---------------+-----------+



In [29]:
df.createOrReplaceTempView("sales")

spark.sql("""
    SELECT product_category AS category, 
           COUNT(*) AS nb_sales, 
           ROUND(SUM(revenue), 3) AS total_revenue
    FROM sales
    GROUP BY product_category
    ORDER BY total_revenue DESC
""").show(truncate=False)


+------------------+--------+-------------+
|category          |nb_sales|total_revenue|
+------------------+--------+-------------+
|Coffee            |58416   |269952.45    |
|Tea               |45449   |196405.95    |
|Bakery            |22796   |82315.64     |
|Drinking Chocolate|11468   |72416.0      |
|Coffee beans      |1753    |40085.25     |
|Branded           |747     |13607.0      |
|Loose Tea         |1210    |11213.6      |
|Flavours          |6790    |8408.8       |
|Packaged Chocolate|487     |4407.64      |
+------------------+--------+-------------+



In [34]:
from pyspark.sql.functions import format_number

assembler = VectorAssembler(
    inputCols=["transaction_qty", "unit_price"],
    outputCol="features"
)

data = assembler.transform(df.withColumn("label", col("revenue")))

# Régression
lr = LinearRegression(featuresCol="features", labelCol="label")
model = lr.fit(data)

predictions = model.transform(data)

predictions.select(
    "features",
    format_number("label", 3).alias("label"),
    format_number("prediction", 3).alias("prediction")
).show(5, truncate=False)


+---------+-----+----------+
|features |label|prediction|
+---------+-----+----------+
|[2.0,3.0]|6.000|6.196     |
|[2.0,3.1]|6.200|6.314     |
|[2.0,4.5]|9.000|7.963     |
|[1.0,2.0]|2.000|1.529     |
|[2.0,3.1]|6.200|6.314     |
+---------+-----+----------+
only showing top 5 rows


In [38]:
from pyspark.sql.functions import dayofweek, col, sum as _sum, format_number
from pyspark.sql.types import DoubleType

df_casted = df.withColumn("transaction_qty", col("transaction_qty").cast(DoubleType())) \
              .withColumn("unit_price", col("unit_price").cast(DoubleType()))

df_day = df_casted.withColumn("day_of_week", dayofweek("transaction_date"))

df_day.groupBy("day_of_week") \
      .agg(format_number(_sum(col("transaction_qty") * col("unit_price")), 3).alias("total_revenue")) \
      .orderBy("day_of_week") \
      .show()


+-----------+-------------+
|day_of_week|total_revenue|
+-----------+-------------+
|          1|   98,330.310|
|          2|  101,677.280|
|          3|   99,455.940|
|          4|  100,313.540|
|          5|  100,767.780|
|          6|  101,373.000|
|          7|   96,894.480|
+-----------+-------------+



In [55]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, weekofyear, col, round, sum as _sum
df = df.withColumn("week", weekofyear(col("transaction_date")))
weekly_revenue = df.groupBy("week").agg(_sum("revenue").alias("weekly_revenue"))
w = Window.orderBy("week")
weekly_revenue = weekly_revenue.withColumn("prev_week", lag("weekly_revenue", 1, 0).over(w))

weekly_revenue = weekly_revenue.withColumn("weekly_revenue", round(col("weekly_revenue"), 3)) \
                               .withColumn("prev_week", round(col("prev_week"), 3))
weekly_revenue.show()


+----+--------------+---------+
|week|weekly_revenue|prev_week|
+----+--------------+---------+
|   1|      17139.33|      0.0|
|   2|      19129.53| 17139.33|
|   3|      19818.51| 19129.53|
|   4|      18271.63| 19818.51|
|   5|      17231.59| 18271.63|
|   6|      18333.35| 17231.59|
|   7|      19856.62| 18333.35|
|   8|      20063.07| 19856.62|
|   9|      20389.75| 20063.07|
|  10|      22146.34| 20389.75|
|  11|      23594.59| 22146.34|
|  12|       23395.3| 23594.59|
|  13|      22055.55|  23395.3|
|  14|      26079.89| 22055.55|
|  15|       28981.3| 26079.89|
|  16|      29217.51|  28981.3|
|  17|      27386.63| 29217.51|
|  18|       32110.1| 27386.63|
|  19|      36056.88|  32110.1|
|  20|      38476.55| 36056.88|
+----+--------------+---------+
only showing top 20 rows


In [53]:
from pyspark.sql.window import Window
from pyspark.sql.functions import weekofyear, col, lag, round, sum as _sum

df = df.withColumn("week", weekofyear("transaction_date"))

product_week = df.groupBy("product_id", "week").agg(_sum("revenue").alias("weekly_revenue"))

w = Window.partitionBy("product_id").orderBy("week")

product_week = product_week.withColumn("prev_revenue", lag("weekly_revenue", 1).over(w))

product_week = product_week.withColumn(
    "weekly_revenue", round(col("weekly_revenue"), 3)
).withColumn(
    "prev_revenue", round(col("prev_revenue"), 3)
).withColumn(
    "growth", round(((col("weekly_revenue") - col("prev_revenue")) / col("prev_revenue")) * 100, 3)
)
product_week.filter(col("growth") > 30).show()


+----------+----+--------------+------------+-------+
|product_id|week|weekly_revenue|prev_revenue| growth|
+----------+----+--------------+------------+-------+
|         1|   5|          54.0|        36.0|   50.0|
|         1|   6|         180.0|        54.0|233.333|
|         1|  10|         234.0|        18.0| 1200.0|
|         1|  14|         180.0|        36.0|  400.0|
|         1|  19|         396.0|        18.0| 2100.0|
|         1|  23|         360.0|        90.0|  300.0|
|        10|   2|          40.0|        20.0|  100.0|
|        10|   3|         110.0|        40.0|  175.0|
|        10|   6|          30.0|        20.0|   50.0|
|        10|   7|         100.0|        30.0|233.333|
|        10|  10|          60.0|        40.0|   50.0|
|        10|  11|          80.0|        60.0| 33.333|
|        10|  16|          90.0|        40.0|  125.0|
|        10|  19|          50.0|        30.0| 66.667|
|        10|  20|         150.0|        50.0|  200.0|
|        10|  23|          7

In [49]:
from pyspark.sql.functions import col, datediff, current_date, countDistinct, max, sum as _sum, round
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
df = df.withColumn("revenue", col("transaction_qty") * col("unit_price"))

rfm = df.groupBy("store_id").agg(
    datediff(current_date(), max("transaction_date")).alias("recency"),
    countDistinct("transaction_id").alias("frequency"),
    round(_sum("revenue"), 3).alias("monetary")  
)

assembler = VectorAssembler(inputCols=["recency", "frequency", "monetary"], outputCol="features")
rfm_vector = assembler.transform(rfm)

kmeans = KMeans(k=4, seed=1)
model = kmeans.fit(rfm_vector)

model.transform(rfm_vector).select(
    "store_id", "recency", "frequency", "monetary", "prediction"
).show()


+--------+-------+---------+---------+----------+
|store_id|recency|frequency| monetary|prediction|
+--------+-------+---------+---------+----------+
|       3|    736|    50599|232243.91|         1|
|       8|    736|    50735|236511.17|         2|
|       5|    736|    47782|230057.25|         0|
+--------+-------+---------+---------+----------+



In [47]:
from pyspark.ml.stat import Correlation

assembler = VectorAssembler(inputCols=["transaction_qty", "unit_price", "revenue"], outputCol="features")
vector_df = assembler.transform(df)

corr = Correlation.corr(vector_df, "features").head()
print("Matrice de corrélation :")
print(corr[0])


Matrice de corrélation :
DenseMatrix([[ 1.        , -0.12354566,  0.35623085],
             [-0.12354566,  1.        ,  0.68554956],
             [ 0.35623085,  0.68554956,  1.        ]])
