In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, isnan, when
import pandas as pd
import plotly.express as px

In [7]:
spark=SparkSession.builder \
        .appName("DataCo")\
        .config("spark.sql.repl.eagerEval.enabled", True) \
        .getOrCreate()
print(spark.version)


3.5.0


In [8]:
path_data='././data/raw/DataCoSupplyChainDataset.csv'
df=spark.read.format("csv")\
    .option('header',"True")\
    .option("inferSchema","True")\
    .load(path_data)
print(f"Données chargées : {df.count()} lignes, {len(df.columns)} colonnes.")
df.printSchema()

                                                                                

Données chargées : 180519 lignes, 53 colonnes.
root
 |-- Type: string (nullable = true)
 |-- Days for shipping (real): integer (nullable = true)
 |-- Days for shipment (scheduled): integer (nullable = true)
 |-- Benefit per order: double (nullable = true)
 |-- Sales per customer: double (nullable = true)
 |-- Delivery Status: string (nullable = true)
 |-- Late_delivery_risk: integer (nullable = true)
 |-- Category Id: integer (nullable = true)
 |-- Category Name: string (nullable = true)
 |-- Customer City: string (nullable = true)
 |-- Customer Country: string (nullable = true)
 |-- Customer Email: string (nullable = true)
 |-- Customer Fname: string (nullable = true)
 |-- Customer Id: integer (nullable = true)
 |-- Customer Lname: string (nullable = true)
 |-- Customer Password: string (nullable = true)
 |-- Customer Segment: string (nullable = true)
 |-- Customer State: string (nullable = true)
 |-- Customer Street: string (nullable = true)
 |-- Customer Zipcode: integer (nullable =

In [9]:
df_desc=spark.read.format('csv').option('header','True').load('././data/raw/DescriptionDataCoSupplyChain.csv')
df_desc.printSchema()


root
 |-- FIELDS: string (nullable = true)
 |-- DESCRIPTION: string (nullable = true)



In [10]:
df.show(10)
df.describe().show()

25/11/12 15:11:15 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------+------------------------+-----------------------------+-----------------+------------------+-----------------+------------------+-----------+--------------+-------------+----------------+--------------+--------------+-----------+--------------+-----------------+----------------+--------------+--------------------+----------------+-------------+---------------+-----------+------------+------------+----------+-------------+-----------------+-----------------------+--------+----------------------+-------------------+------------------------+-------------+------------------------+-----------------------+-------------------+------+----------------+----------------------+--------------+---------------+---------------+-------------+---------------+-------------------+-------------------+--------------------+------------+-------------+--------------+--------------------------+--------------+
|    Type|Days for shipping (real)|Days for shipment (scheduled)|Benefit per order|Sales per 

[Stage 9:>                                                          (0 + 1) / 1]

+-------+--------+------------------------+-----------------------------+------------------+------------------+----------------+-------------------+------------------+------------------+-------------+----------------+--------------+--------------+-----------------+--------------+-----------------+----------------+------------------+---------------+-----------------+------------------+---------------+-----------------+------------------+------+--------------+-------------+-----------------+-----------------------+------------------+----------------------+-------------------+------------------------+-----------------+------------------------+-----------------------+-------------------+------------------+------------------+----------------------+--------------+--------------+---------------+------------------+------------------+-------------------+-------------------+--------------------+--------------------+-----------------+--------------+--------------------------+--------------+
|summ

                                                                                

In [14]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----+------------------------+-----------------------------+-----------------+------------------+---------------+------------------+-----------+-------------+-------------+----------------+--------------+--------------+-----------+--------------+-----------------+----------------+--------------+---------------+----------------+-------------+---------------+--------+---------+------+----------+-------------+-----------------+-----------------------+--------+----------------------+-------------------+------------------------+-------------+------------------------+-----------------------+-------------------+-----+----------------+----------------------+------------+-----------+------------+-------------+---------------+-------------------+-------------------+-------------+------------+-------------+--------------+--------------------------+-------------+
|Type|Days for shipping (real)|Days for shipment (scheduled)|Benefit per order|Sales per customer|Delivery Status|Late_delivery_risk|Ca

In [16]:
cols_to_use=[
'Type',
 'Late_delivery_risk',
 'Category Id',
 'Customer Segment',
 'Latitude',
 'Longitude',
 'Order Item Quantity',
 'Sales',
 'Order Profit Per Order',
 'Order Region',
 'Product Price',
 'order date (DateOrders)',
 'Shipping Mode'
 ]
df =df[cols_to_use]
print(len(df.columns))

13


In [17]:
from pyspark.sql.functions import month, col, to_date

df = df.withColumn(
    "order_date_parsed",
    to_date(col("order date (DateOrders)"), "M/d/yyyy H:mm")
)

df = df.withColumn("order_month", month(col("order_date_parsed")))

df = df.drop("order date (DateOrders)", "order_date_parsed")

df.select("order_month").show(15)


+-----------+
|order_month|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
+-----------+
only showing top 15 rows



In [24]:
from pyspark.sql.functions import col, count, when, isnan, trim, lit

dtype_dict = dict(df.dtypes)

df_nulls = df.select([
    count(
        when(
            col(c).isNull() |
            (trim(col(c)) == "") |
            (isnan(col(c)) if dtype_dict[c] in ["double", "float"] else lit(False)),
            1
        )
    ).alias(c)
    for c in df.columns
])

df_nulls.show()

+----+------------------+-----------+----------------+--------+---------+-------------------+-----+----------------------+------------+-------------+-------------+-----------+
|Type|Late_delivery_risk|Category Id|Customer Segment|Latitude|Longitude|Order Item Quantity|Sales|Order Profit Per Order|Order Region|Product Price|Shipping Mode|order_month|
+----+------------------+-----------+----------------+--------+---------+-------------------+-----+----------------------+------------+-------------+-------------+-----------+
|   0|                 0|          0|               0|       0|        0|                  0|    0|                     0|           0|            0|            0|          0|
+----+------------------+-----------+----------------+--------+---------+-------------------+-----+----------------------+------------+-------------+-------------+-----------+

