## DATA AND TABLE MODELING

Note: The following variables are part of the dataset structure, but all corresponding cells and operations have been intentionally removed or cancelled due to the confidential nature of the data

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import (
    col,
    when,
    dayofmonth,
    month,
    year,
    dayofweek,
    sum    as spark_sum,
    max    as spark_max,
    avg    as spark_avg,
    row_number,
    coalesce
)
from pyspark.sql.window import Window

In [0]:
# 1. Chargement des tables Spark
items            = spark.table("items_state")
orders           = spark.table("order")
sales_store      = spark.table("store")
products         = spark.table("product")
product_variants = spark.table("variant")

# 2. Lecture du CSV local en pandas puis conversion en Spark DataFrame
import pandas as pd
local_path       = "Recap 30-09-2024(Feuil1).csv"
product_info_pd  = pd.read_csv(local_path, sep=";", encoding="latin1")
product_info_pd  = product_info_pd.rename(columns={"code - Copier": "Code Gold"})
product_info     = spark.createDataFrame(product_info_pd)

# 3. Nettoyage générique de la colonne 'code' : on prend tout ce qui suit le dernier '-'
from pyspark.sql.functions import substring_index, col

products = products.withColumn(
    "code_cleaned",
    substring_index(col("code"), "-", -1)
)

# 4. Enchaînement des jointures
df_join = (
    items
    .join(product_variants, items["variant_id"] == product_variants["id"])
    .join(orders,          col("order_number") == col("number"))
    .join(sales_store,     orders["store_id"] == sales_store["id"], how="left")
    .join(products,        product_variants["product_id"] == products["id"], how="left")
    .join(
        product_info,
        products["code_cleaned"] == product_info["CODE_INT"],
        how="left"
    )
)



In [0]:
# Préparation des données
final_df = df_join.select(
    "variant_id",
    "lstore_id",
    "Order_Number",
    "quantity_delivery",
    "quantity_sales",
    "STATUS",
    "checkout_completed_at",
    "state",
    "LIB_ETAT_ARTICLE",
    "GRP",
    "DEP",
    "RAY",
    "SAISONNALITE_ARTICLE"
)




com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:132)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:132)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
# Ajout de la colonne stockout
final_df = final_df.withColumn(
    "stockout",
    when(col("STATUS") == "unfound", 1).otherwise(0)
)


In [0]:
# Si ta colonne magasin s’appelle simplement 'store_id' :
df_store25 = final_df.filter(col("store_id") == 25)



com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:132)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:132)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
from pyspark.sql.functions import col, to_date, lit

# ------------------------------------------------------------------
# 1) Indique tes dates ici ⬇︎ (format AAAA-MM-JJ)
# ------------------------------------------------------------------
date_debut = "2024-01-01"      # ← remplace par ta date de début
date_fin   = "2025-07-20"      # ← remplace par ta date de fin
# ------------------------------------------------------------------

# 2) Conversion en type date pour éviter les surprises
date_debut = to_date(lit(date_debut))
date_fin   = to_date(lit(date_fin))

# 3) Filtre
df_filtre = df_store25.filter(
    (col("checkout_completed_at").cast("date") >= date_debut) &
    (col("checkout_completed_at").cast("date") <= date_fin)
)




[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-8890019767672582>, line 15[0m
[1;32m     12[0m date_fin   [38;5;241m=[39m to_date(lit(date_fin))
[1;32m     14[0m [38;5;66;03m# 3) Filtre[39;00m
[0;32m---> 15[0m df_filtre [38;5;241m=[39m df_store25[38;5;241m.[39mfilter(
[1;32m     16[0m     (col([38;5;124m"[39m[38;5;124mcheckout_completed_at[39m[38;5;124m"[39m)[38;5;241m.[39mcast([38;5;124m"[39m[38;5;124mdate[39m[38;5;124m"[39m) [38;5;241m>[39m[38;5;241m=[39m date_debut) [38;5;241m&[39m
[1;32m     17[0m     (col([38;5;124m"[39m[38;5;124mcheckout_completed_at[39m[38;5;124m"[39m)[38;5;241m.[39mcast([38;5;124m"[39m[38;5;124mdate[39m[38;5;124m"[39m) [38;5;241m<[39m[38;5;241m=[39m date_fin)
[1;32m     18[0m )

[0;31mNameError[0m: name 'df_store25' is not defined

In [0]:
# ⚠️ 1. Active Arrow (copie en mémoire plus rapide)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# 2. Si ton DataFrame s’appelle daily_data :
pdf = df_store25.toPandas()      # PySpark classique
# pdf = daily_data.to_pandas()   # Spark Connect (nouvelle API)

# 3. Vérifie
print(pdf.shape)
pdf.head()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-6620639893672549>, line 2[0m
[1;32m      1[0m [38;5;66;03m# ⚠️ 1. Active Arrow (copie en mémoire plus rapide)[39;00m
[0;32m----> 2[0m spark[38;5;241m.[39mconf[38;5;241m.[39mset([38;5;124m"[39m[38;5;124mspark.sql.execution.arrow.pyspark.enabled[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)
[1;32m      4[0m [38;5;66;03m# 2. Si ton DataFrame s’appelle daily_data :[39;00m
[1;32m      5[0m pdf [38;5;241m=[39m df_store25[38;5;241m.[39mtoPandas()      [38;5;66;03m# PySpark classique[39;00m

File [0;32m/databricks/python/lib/python3.11/site-packages/pyspark/sql/connect/conf.py:46[0m, in [0;36mRuntimeConf.set[0;34m(self, key, value)[0m
[1;32m     44[0m op_set [38;5;241m=[39m proto[38;5;241m.[39mConfigRequest[38;5;241m.[39mSet(pairs