In [None]:
from pyspark.sql.functions import udf, when, col, cast, explode, row_number, sum as F_sum
from pyspark.sql.window import Window
from pyspark.sql.types import BooleanType, StringType, DoubleType
from pyspark.sql import SparkSession
from pathlib import Path
import os
import glob

# Création d'une SparkSession locale
spark = SparkSession.builder.appName("TutoDataFrame_PySpark").master("local[*]").getOrCreate()

# SparkContext, pour la partie comparaison RDD
sc = spark.sparkContext

In [None]:
# Chargement des fichiers data
dir_path = Path.cwd().parent.parent / "data/march-input"
orders_file_list = [x for x in os.listdir(dir_path) if x.startswith('orders')]
orders_paths = [str(dir_path/x) for x in orders_file_list]

if not os.path.exists(dir_path):
    print("Pas de dossier data/march-input à ce chemin :", dir_path)
if os.listdir(dir_path)==[]:
    print("Le dossier data/march-input est vide :", dir_path)

customers_file = dir_path/"customers.csv"
refunds_file = dir_path/"refunds.csv"
print(orders_file_list)

orders_df = spark.read.option("multiline", "true").json(orders_paths)
orders_df.show()

In [None]:
orders_paths = [str(dir_path/x) for x in orders_file_list]

In [None]:
refunds_df = spark.read.csv(str(dir_path/refunds_file), header= True, inferSchema= True)
customers_df = spark.read.csv(str(dir_path/customers_file), header= True, inferSchema= True)

In [None]:
print(dir_path/orders_file_list[0])
df = spark.read.csv(str(dir_path/orders_file_list[0]), header= True, inferSchema= True)

In [None]:
def controle_bool(v):
    if isinstance(v, bool): return v
    if isinstance(v, (int, float)): return bool(v)
    if v is None: return False
    s = str(v).strip().lower()
    return s in ("1","true","yes","y","t")

controle_bool_udf = udf(controle_bool, BooleanType())

In [None]:
customers_df = customers_df.withColumn("is_active", controle_bool_udf(col("is_active")))

In [None]:
customers_df_active = customers_df.filter(col('is_active'))

In [None]:
refunds_df = refunds_df.withColumn('amount', col('amount').cast("double")).na.fill({"amount": 0.0})
refunds_df = refunds_df.withColumn('created_at', col('created_at').cast("string"))
                                                                                   

In [None]:
orders_df_paid = orders_df.filter(col('payment_status') == 'paid')

In [None]:
orders_exploded = orders_df.withColumn("items", explode("items"))
orders_flat = orders_exploded.select("order_id","channel", "created_at", "customer_id", "payment_status", "items.*")

In [None]:
orders_neg = orders_flat.filter(col('unit_price') < 0)

In [None]:
orders_pos = orders_flat.filter(col('unit_price') >= 0)

In [None]:
before = orders_pos.count()
#orders_unique = orders_pos.sort_values(["order_id", "created_at"])

In [None]:
w = Window.partitionBy("order_id").orderBy("created_at")
orders_df = orders_pos.withColumn("rn", row_number().over(w)).filter(col("rn") == 1).drop("rn")


In [None]:
after = orders_df.count()

In [None]:
orders_df.show()

In [None]:
orders_df = orders_df.withColumn('line_gross', col('qty')*col('unit_price'))
per_order_df = orders_df.groupBy("order_id", "customer_id", "channel", "created_at").agg(F_sum("qty").alias("items_sold"), F_sum("line_gross").alias("gross_revenue_eur"))

In [None]:
df_orders_customers = per_order_df.join(customers_df_active, per_order_df.customer_id == customers_df_active.customer_id, how='left')

In [None]:
df_orders_customers.show()