In [112]:
from pyspark.sql import SparkSession

In [113]:
spark = SparkSession.builder.appName("analye").getOrCreate()

In [114]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, FloatType, DateType

In [115]:
sch = [StructField('Identifiant de la ligne', StringType(), True),
       StructField('ID commande', StringType(), True),
       StructField('Date de commande', StringType(), True),
       StructField("Date d'expédition", StringType(), True),
       StructField("Mode d'expedition", StringType(), True),
       StructField("ID client", StringType(), True),
       StructField("Nom du client", StringType(), True),
       StructField("Segment", StringType(), True),
       StructField("Ville", StringType(), True),
       StructField("Etat", StringType(), True),
       StructField("Pays", StringType(), True),
       StructField("Code postal", StringType(), True),
       StructField("Marché", StringType(), True),
       StructField("Région", StringType(), True),
       StructField("ID produit", StringType(), True),
       StructField("Catégorie", StringType(), True),
       StructField("Sous-catégorie", StringType(), True),
       StructField("Nom du produit", StringType(), True),
       StructField("Ventes", StringType(), True),
       StructField("Quantité", StringType(), True),
       StructField("Réduction", StringType(), True),
       StructField("Bénéfices", StringType(), True),
       StructField("Frais de port", StringType(), True),
       StructField("Priorité de la commande", StringType(), True)]

In [116]:
struct = StructType(fields=sch)

In [117]:
df = spark.read.csv("global_superstore_fr.csv", sep=";", header=True, schema=struct)

In [118]:
df.show()

+-----------------------+--------------+----------------+-----------------+-----------------+---------+-------------------+-----------------+-------------+----------------+---------+-----------+---------------+------+---------------+--------------------+--------------------+--------------------+------+--------+---------+---------+-------------+-----------------------+
|Identifiant de la ligne|   ID commande|Date de commande|Date d'expédition|Mode d'expedition|ID client|      Nom du client|          Segment|        Ville|            Etat|     Pays|Code postal|         Marché|Région|     ID produit|           Catégorie|      Sous-catégorie|      Nom du produit|Ventes|Quantité|Réduction|Bénéfices|Frais de port|Priorité de la commande|
+-----------------------+--------------+----------------+-----------------+-----------------+---------+-------------------+-----------------+-------------+----------------+---------+-----------+---------------+------+---------------+--------------------+----

In [119]:
from pyspark.sql.functions import col, regexp_replace

In [120]:
df = df.withColumn("Ventes", regexp_replace(col("Ventes"), ",", "."))
df = df.withColumn("Bénéfices", regexp_replace(col("Bénéfices"), ",", "."))
df = df.withColumn("Frais de port", regexp_replace(col("Frais de port"), ",", "."))
df = df.withColumn("Réduction", regexp_replace(col("Réduction"), ",", "."))

In [None]:
# 2émé méthode : 
df = df.withColumn("Ventes", regexp_replace(col("Ventes"), ",", "."))\
.withColumn("Bénéfices", regexp_replace(col("Bénéfices"), ",", "."))\
.withColumn("Frais de port", regexp_replace(col("Frais de port"), ",", "."))\
.withColumn("Réduction", regexp_replace(col("Réduction"), ",", "."))

In [121]:
df.show()

+-----------------------+--------------+----------------+-----------------+-----------------+---------+-------------------+-----------------+-------------+----------------+---------+-----------+---------------+------+---------------+--------------------+--------------------+--------------------+------+--------+---------+---------+-------------+-----------------------+
|Identifiant de la ligne|   ID commande|Date de commande|Date d'expédition|Mode d'expedition|ID client|      Nom du client|          Segment|        Ville|            Etat|     Pays|Code postal|         Marché|Région|     ID produit|           Catégorie|      Sous-catégorie|      Nom du produit|Ventes|Quantité|Réduction|Bénéfices|Frais de port|Priorité de la commande|
+-----------------------+--------------+----------------+-----------------+-----------------+---------+-------------------+-----------------+-------------+----------------+---------+-----------+---------------+------+---------------+--------------------+----

In [122]:
from pyspark.sql.functions import to_date

In [123]:
df = df.withColumn("Identifiant de la ligne", col("Identifiant de la ligne").cast(IntegerType()))
df = df.withColumn("Date de commande", to_date(col("Date de commande"), "yyyy-MM-dd"))
df = df.withColumn("Date d'expédition", to_date(col("Date d'expédition"), "yyyy-MM-dd"))
df = df.withColumn("Ventes", col("Ventes").cast(FloatType()))
df = df.withColumn("Quantité", col("Quantité").cast(IntegerType()))
df = df.withColumn("Réduction", col("Réduction").cast(FloatType()))
df = df.withColumn("Bénéfices", col("Bénéfices").cast(FloatType()))
df = df.withColumn("Frais de port", col("Frais de port").cast(FloatType()))

In [124]:
df.printSchema()

root
 |-- Identifiant de la ligne: integer (nullable = true)
 |-- ID commande: string (nullable = true)
 |-- Date de commande: date (nullable = true)
 |-- Date d'expédition: date (nullable = true)
 |-- Mode d'expedition: string (nullable = true)
 |-- ID client: string (nullable = true)
 |-- Nom du client: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Ville: string (nullable = true)
 |-- Etat: string (nullable = true)
 |-- Pays: string (nullable = true)
 |-- Code postal: string (nullable = true)
 |-- Marché: string (nullable = true)
 |-- Région: string (nullable = true)
 |-- ID produit: string (nullable = true)
 |-- Catégorie: string (nullable = true)
 |-- Sous-catégorie: string (nullable = true)
 |-- Nom du produit: string (nullable = true)
 |-- Ventes: float (nullable = true)
 |-- Quantité: integer (nullable = true)
 |-- Réduction: float (nullable = true)
 |-- Bénéfices: float (nullable = true)
 |-- Frais de port: float (nullable = true)
 |-- Priorité de la comm

In [125]:
df.describe().show()

+-------+-----------------------+------------+-----------------+---------+-----------------+-----------------+--------------------+------+-----------+----------------+---------------+-------+----------------+--------------------+--------------+--------------------+------------------+------------------+-------------------+------------------+-----------------+-----------------------+
|summary|Identifiant de la ligne| ID commande|Mode d'expedition|ID client|    Nom du client|          Segment|               Ville|  Etat|       Pays|     Code postal|         Marché| Région|      ID produit|           Catégorie|Sous-catégorie|      Nom du produit|            Ventes|          Quantité|          Réduction|         Bénéfices|    Frais de port|Priorité de la commande|
+-------+-----------------------+------------+-----------------+---------+-----------------+-----------------+--------------------+------+-----------+----------------+---------------+-------+----------------+--------------------+-

## Création des table dimensions

In [61]:
# Dimension Produit
dim_produit_df = df.select("ID produit", "Nom du produit", "Sous-catégorie").distinct()

In [62]:
dim_produit_df.show()

+---------------+--------------------+--------------------+
|     ID produit|      Nom du produit|      Sous-catégorie|
+---------------+--------------------+--------------------+
|OFF-LA-10001038|Smead Legal Exhib...|          Étiquettes|
|FUR-TA-10004840|Hon Conference Ta...|              Tables|
|OFF-LA-10002015|Hon Round Labels,...|          Étiquettes|
|TEC-CO-10004521|HP Personal Copie...|       Photocopieurs|
|OFF-ST-10002632|Smead Shelving, I...|            Stockage|
|FUR-TA-10003597|Bevis Wood Table,...|              Tables|
|TEC-AC-10004752|Memorex Keyboard,...|         Accessoires|
|OFF-ST-10000560|Fellowes Trays, I...|            Stockage|
|OFF-SU-10004662|Elite Scissors, S...|         Fournitures|
|OFF-AP-10004245| Hoover Stove, Black|           Appareils|
|TEC-AC-10001355|Memorex Router, P...|         Accessoires|
|OFF-SU-10000153|Acme Shears, High...|         Fournitures|
|TEC-AC-10000271|Enermax Router, E...|         Accessoires|
|FUR-BO-10002574|Bush Corner Shelv...|  

In [86]:
dim_produit_df.count()

10768

In [65]:
# Dimension client
dim_client_df = df.select("ID client", "Nom du client", "Pays", "Région", "Segment", "Ville").distinct()

In [66]:
dim_client_df.show()

+---------+--------------------+--------------------+-------+-----------------+--------------------+
|ID client|       Nom du client|                Pays| Région|          Segment|               Ville|
+---------+--------------------+--------------------+-------+-----------------+--------------------+
| AG-10300|    Albertine Garcia|             Mexique|   Nord|       Entreprise|              Juárez|
| CT-11995|      Bruno Courtois|            Colombie|    Sud|     Consommateur|              Bogotá|
| JG-15310|  Guillemine LaGarde|                Cuba|Caraïbe|       Entreprise|         Santa Clara|
| DC-13285|       David Arcouet|              Brésil|    Sud|     Consommateur|              Jurema|
| EB-13975|   Étiamble Chicoine|République Domini...|Caraïbe|       Entreprise|       Santo Domingo|
| SS-20410|     Sidney Routhier|            Équateur|    Sud|     Consommateur|Santo Domingo de ...|
| LH-16750|        Lydie Durand|         El Salvador| Centre|     Consommateur|        San 

In [84]:
dim_client_df.count()

24956

In [73]:
df.printSchema()

root
 |-- Identifiant de la ligne: integer (nullable = true)
 |-- ID commande: string (nullable = true)
 |-- Date de commande: date (nullable = true)
 |-- Date d'expédition: date (nullable = true)
 |-- Mode d'expedition: string (nullable = true)
 |-- ID client: string (nullable = true)
 |-- Nom du client: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Ville: string (nullable = true)
 |-- Etat: string (nullable = true)
 |-- Pays: string (nullable = true)
 |-- Code postal: string (nullable = true)
 |-- Marché: string (nullable = true)
 |-- Région: string (nullable = true)
 |-- ID produit: string (nullable = true)
 |-- Catégorie: string (nullable = true)
 |-- Sous-catégorie: string (nullable = true)
 |-- Nom du produit: string (nullable = true)
 |-- Ventes: float (nullable = true)
 |-- Quantité: integer (nullable = true)
 |-- Réduction: float (nullable = true)
 |-- Bénéfices: float (nullable = true)
 |-- Frais de port: float (nullable = true)
 |-- Priorité de la comm

In [80]:
from pyspark.sql import functions as F

In [81]:
# Dimensions Achats
dim_achats_df = df.groupBy("ID client", "ID commande", 'Identifiant de la ligne', "ID produit", "Priorité de la commande") \
    .agg(
        F.sum("Ventes").alias("Montant des ventes"),
        F.sum("Bénéfices").alias("Profit"),
        F.sum("Quantité").alias("Quantité"),
        F.sum("Réduction").alias("Remise")
    )

In [82]:
dim_achats_df.show()

+---------+--------------+-----------------------+---------------+-----------------------+------------------+-------------------+--------+------+
|ID client|   ID commande|Identifiant de la ligne|     ID produit|Priorité de la commande|Montant des ventes|             Profit|Quantité|Remise|
+---------+--------------+-----------------------+---------------+-----------------------+------------------+-------------------+--------+------+
| PB-19150|MX-2014-130666|                    306|OFF-EN-10002725|                Moyenne| 65.83999633789062| 15.119999885559082|       4|   0.0|
| BG-11740|US-2012-110716|                    451|OFF-BI-10000517|                Moyenne| 33.29999923706055|   16.6200008392334|       3|   0.0|
| CB-12535|MX-2012-140473|                    988|OFF-BI-10004177|                  Haute| 78.63999938964844|   33.7599983215332|       4|   0.0|
| TC-21295|MX-2014-102995|                   1021|OFF-PA-10003708|                Moyenne| 59.52000045776367| 0.540000021457

In [87]:
dim_achats_df.count()

51290

In [None]:
from pyspark.sql.functions import year, month, weekofyear

In [None]:
dim_calendrier_commande_df = df.select(year(df['Date de commande']).alias("Année"),
                                       "Date de commande",
                                       month(df['Date de commande']).alias("Mois"),
                                       weekofyear(df['Date de commande']).alias("Semaine de l'année"))

In [96]:
from pyspark.sql.functions import min, max