In [0]:
%load_ext autoreload
%autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0

In [0]:
import sys
# -----------------------------
# Ajouter le repo au Python Path
# -----------------------------
sys.path.append("/Workspace/Users/mandu543@gmail.com/databricks-ecommerce/Pipelines/")

from lib.utils import *

In [0]:
df_silver_orders = spark.table(f"{SILVER_ZONE}.slv_ecommerce_order_items")

df_silver_orders.limit(10).display()

In [0]:
# 1) Add gross amount
df_silver_orders = df_silver_orders.withColumn(
    "gross_amount",
    F.col("quantity") * F.col("unit_price")
    )

# 2) Add discount_amount (discount_pct is already numeric, e.g., 21 -> 21%)
df_silver_orders = df_silver_orders.withColumn(
    "discount_amount",
    F.ceil(F.col("gross_amount") * (F.col("discount_pct") / 100.0))
)

# 3) Add sale_amount = gross - discount + tax
df_silver_orders = df_silver_orders.withColumn(
    "sale_amount",
    F.col("gross_amount") - F.col("discount_amount") + F.col("tax_amount")
)

# add date id
df_silver_orders = df_silver_orders.withColumn("date_id", F.date_format(F.col("dt"), "yyyyMMdd").cast(IntegerType()))  # Create date_key

# Coupon flag
#  coupon flag = 1 if coupon_code is not null else 0
df_silver_orders = df_silver_orders.withColumn(
    "coupon_flag",
    F.when(F.col("coupon_code") == '', F.lit(1))
     .otherwise(F.lit(0))
)

df_silver_orders.limit(5).display()    

In [0]:
import requests

spark = SparkSession.builder.getOrCreate()

# Utilisation de Frankfurter (Base EUR par défaut)

# Pas besoin de clé d'accès ici
url = "https://api.frankfurter.dev/v1/latest?base=EUR"
response = requests.get(url).json()

# Frankfurter place les taux dans le champ "rates"
rates = response["rates"]

# Ajouter la base elle-même (1 EUR = 1 EUR) car elle n'est pas dans la liste
rates["EUR"] = 1.0
rates["AED"] = 4.34

# Création du DataFrame
df_rates = spark.createDataFrame(list(rates.items()), schema=["unit_price_currency", "rate_eur"])

display(df_rates)

 

In [0]:
df_gold_order_items = df_silver_orders.join(df_rates, on="unit_price_currency", how="left")

df_gold_order_items = (df_gold_order_items
                       .withColumn("sale_amount_eur", F.ceil(F.col("sale_amount") * F.col("rate_eur"))
                                   )
                      )


In [0]:
display(df_gold_order_items)


In [0]:
df_gold_order_items = df_gold_order_items.select(
    F.col("date_id"),
    F.col("customer_id"),
    F.col("product_id"),
    F.col("order_id").alias("transaction_id"),
    F.col("dt").alias("transaction_date"),
    F.col("order_ts").alias("transaction_ts"),
    F.col("item_seq").alias("seq_no"),
    F.col("channel"),
    F.col("coupon_code"),
    F.col("coupon_flag"),
    F.col("unit_price_currency"),
    F.col("quantity"),
    F.col("unit_price"),
    F.col("gross_amount"),
    F.col("discount_pct").alias("discount_percent"),
    F.col("discount_amount"),
    F.col("tax_amount"),
    F.col("sale_amount").alias("net_amount"),
    F.col("sale_amount_eur").alias("net_amount_eur"),   
    F.col("date_creation"),
    F.col("date_modification"),
    F.col("_source_file")
     
)

In [0]:
df_gold_order_items.limit(10).display()

In [0]:
df_gold_order_items.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{GOLD_ZONE}.fact_ecommerce_order_items")

In [0]:
nb_gold= spark.sql(f'SELECT count(*) as nb_ligne_fact_gold FROM {GOLD_ZONE}.fact_ecommerce_order_items')
nb_gold.show()

nb_silver = spark.sql(f'SELECT count(*)as nb_ligne_fact_silver FROM {SILVER_ZONE}.slv_ecommerce_order_items')
nb_silver.show()

