# 1 - Download Walmart Data

In [2]:
# Téléchargement des données à partir du repo git
!wget "https://github.com/ettouilebouael/pyspark_for_datascience/raw/refs/heads/main/data/walmart_data.zip"

--2024-10-02 09:55:47--  https://github.com/ettouilebouael/pyspark_for_datascience/raw/refs/heads/main/data/walmart_data.zip
Resolving github.com (github.com)... 140.82.112.4
Connecting to github.com (github.com)|140.82.112.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://media.githubusercontent.com/media/ettouilebouael/pyspark_for_datascience/refs/heads/main/data/walmart_data.zip [following]
--2024-10-02 09:55:47--  https://media.githubusercontent.com/media/ettouilebouael/pyspark_for_datascience/refs/heads/main/data/walmart_data.zip
Resolving media.githubusercontent.com (media.githubusercontent.com)... 185.199.110.133, 185.199.108.133, 185.199.111.133, ...
Connecting to media.githubusercontent.com (media.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 132754996 (127M) [application/zip]
Saving to: ‘walmart_data.zip’


2024-10-02 09:55:51 (269 MB/s) - ‘walmart_data.zip’ saved [132754

In [3]:
# Création d'un dossier pour les données walmart
!mkdir walmart_data

# Décompression du fichier zip contenant les données
!unzip walmart_data.zip -d walmart_data

# lister les fichiers
!ls walmart_data

Archive:  walmart_data.zip
  inflating: walmart_data/calendar.parquet  
  inflating: walmart_data/sell_prices.parquet  
  inflating: walmart_data/walmart_sales.parquet  
calendar.parquet  sell_prices.parquet  walmart_sales.parquet


In [4]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=deb7f54bab9112b2d93f3e228346428b12c204b5d43ce48c9c02705204d70cb2
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


# Chargement des données et affichage du dataframe

In [5]:
#Initialisation spark session
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()

#Lecture fichier walmart_sales.parquet et affichage des 10 premières lignes pour comprendre les variables
walmart_sales_df = spark.read.parquet("walmart_data/walmart_sales.parquet")
walmart_sales_df.show(10)

+-------------+---------+-------+--------+--------+--------------------+----------+-----+
|      item_id|  dept_id| cat_id|store_id|state_id|                  id|      date|sales|
+-------------+---------+-------+--------+--------+--------------------+----------+-----+
|HOBBIES_1_001|HOBBIES_1|HOBBIES|    CA_1|      CA|HOBBIES_1_001_CA_...|2011-01-29|    0|
|HOBBIES_1_002|HOBBIES_1|HOBBIES|    CA_1|      CA|HOBBIES_1_002_CA_...|2011-01-29|    0|
|HOBBIES_1_003|HOBBIES_1|HOBBIES|    CA_1|      CA|HOBBIES_1_003_CA_...|2011-01-29|    0|
|HOBBIES_1_004|HOBBIES_1|HOBBIES|    CA_1|      CA|HOBBIES_1_004_CA_...|2011-01-29|    0|
|HOBBIES_1_005|HOBBIES_1|HOBBIES|    CA_1|      CA|HOBBIES_1_005_CA_...|2011-01-29|    0|
|HOBBIES_1_006|HOBBIES_1|HOBBIES|    CA_1|      CA|HOBBIES_1_006_CA_...|2011-01-29|    0|
|HOBBIES_1_007|HOBBIES_1|HOBBIES|    CA_1|      CA|HOBBIES_1_007_CA_...|2011-01-29|    0|
|HOBBIES_1_008|HOBBIES_1|HOBBIES|    CA_1|      CA|HOBBIES_1_008_CA_...|2011-01-29|   12|
|HOBBIES_1

# 2 - Parsing des dates

In [6]:
walmart_sales_df = walmart_sales_df.withColumn("year", F.year("Date"))
walmart_sales_df = walmart_sales_df.withColumn("month", F.month("Date"))

#walmart_sales_df.show(10) # Permet de vérifier le parsing


# 3 - 4 - Filtrage des ventes négatives et retire les valeurs manquantes

In [7]:
walmart_sales_df = walmart_sales_df.na.drop()
walmart_sales_df = walmart_sales_df.filter(walmart_sales_df.sales >= 0)

# 5 - Agrégation des données à la vente Mois X produits X Magasins

In [8]:
walmart_sales_df = walmart_sales_df.groupBy("year", "month", "item_id", "store_id").agg(F.sum("sales").alias("sales"))

# 6 - Ajouter les mois sans transactions et les imputer avec 0.

In [9]:
# Extrait chaque année/mois/item_id/store_id
years = walmart_sales_df.select("year").distinct().rdd.flatMap(lambda x: x).collect()
months = walmart_sales_df.select("month").distinct().rdd.flatMap(lambda x: x).collect()
item_ids = walmart_sales_df.select("item_id").distinct().rdd.flatMap(lambda x: x).collect()
store_ids = walmart_sales_df.select("store_id").distinct().rdd.flatMap(lambda x: x).collect()

# Création de toutes les combinaisons possibles
combinaisons = []
for year in years:
  for month in months:
    for store_id in store_ids:
      for item_id in item_ids:
        combinaisons.append((year, month, item_id, store_id))

combinaisons_df = spark.createDataFrame(combinaisons, ["year", "month", "item_id", "store_id"])


# Jointure entre les combinaisons et walmart_sales
final_df = combinaisons_df.join(walmart_sales_df, ["year", "month", "item_id", "store_id"], "left")

# Remplace les valeurs manquantes de sales par des 0
final_df = final_df.fillna(0, subset=["sales"])


In [10]:
final_df.show(5)

+----+-----+---------------+--------+-----+
|year|month|        item_id|store_id|sales|
+----+-----+---------------+--------+-----+
|2013|   12|    FOODS_3_025|    WI_2|    6|
|2013|   12|  HOBBIES_1_021|    WI_2|    3|
|2013|   12|  HOBBIES_1_239|    WI_2|    5|
|2013|   12|HOUSEHOLD_1_073|    WI_2|    1|
|2013|   12|HOUSEHOLD_2_066|    WI_2|    0|
+----+-----+---------------+--------+-----+
only showing top 5 rows



# Exportation du dataframe

In [12]:
#exportation final_df en csv
final_df.write.csv("walmart_data/final_df")

In [None]:
# Compte nombre de ligne de final_df
#final_df.count()

2195280

In [None]:
# Compte nombre de lignes de walmart_sales_df
#walmart_sales_df.count()

1951360