# **Lecture des fichiers**


In [2]:
import pandas as pd 
staging_sales = pd.read_csv("/lakehouse/default/Files/Retail_sales.csv")
staging_sales.columns

StatementMeta(, 98146d3d-918e-4809-b64a-0de5e449033a, 4, Finished, Available, Finished)

Index(['Unnamed: 0', 'Store ID', 'Product ID', 'Date', 'Units Sold',
       'Sales Revenue (USD)', 'Discount Percentage', 'Marketing Spend (USD)',
       'Store Location', 'Product Category', 'Day of the Week',
       'Holiday Effect'],
      dtype='object')

In [3]:
# creer la vue de staging_sales
from pyspark.sql import SparkSession

spark_df = spark.createDataFrame(staging_sales)
spark_df.createOrReplaceTempView("staging_sales")


StatementMeta(, 98146d3d-918e-4809-b64a-0de5e449033a, 5, Finished, Available, Finished)

# **Création des hubs**


## Hub produit : Création de table et chargement

In [37]:
%%sql 
CREATE TABLE Hub_Product (
    hub_product_hashkey STRING,
    product_id STRING,
    load_dts TIMESTAMP,
    record_src STRING
)
USING DELTA


StatementMeta(, 33d319e2-2865-40a8-81b3-408c07357f16, 39, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [41]:
spark.sql("""
INSERT INTO Hub_Product (hub_product_hashkey, product_id, load_dts, record_src)
SELECT DISTINCT
    md5(cast(s.`Product ID` AS STRING)) AS hub_product_hashkey,
    s.`Product ID` AS product_id,
    current_timestamp() AS load_dts,
    'CRM' AS record_src
FROM staging_sales s
LEFT ANTI JOIN hub_product h  
ON md5(cast(s.`Product ID` AS STRING)) = h.hub_product_hashkey
WHERE s.`Product ID` IS NOT NULL

""")

StatementMeta(, 33d319e2-2865-40a8-81b3-408c07357f16, 43, Finished, Available, Finished)

DataFrame[]

## Hub Store : Création de table et chargement

In [1]:
%%sql 
CREATE TABLE Hub_store (
    hub_store_hashkey STRING,
    store_id STRING,
    load_dts TIMESTAMP,
    record_src STRING
)
USING DELTA

StatementMeta(, 98146d3d-918e-4809-b64a-0de5e449033a, 2, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [4]:
spark.sql("""
INSERT INTO Hub_store (hub_store_hashkey, store_id, load_dts, record_src)
SELECT DISTINCT
    md5(s.`Store Location`) AS hub_store_hashkey,
    s.`Store Location` store_id,
    current_timestamp() AS load_dts,
    "CRM" record_src
FROM staging_sales s
LEFT ANTI JOIN hub_store h  
ON md5(s.`Store Location`) = h.hub_store_hashkey
WHERE s.`Store Location` IS NOT NULL
""")

StatementMeta(, 98146d3d-918e-4809-b64a-0de5e449033a, 6, Finished, Available, Finished)

DataFrame[]

## Hub date : Création de table et chargement

In [43]:
%%sql 
CREATE TABLE Hub_Date (
    hub_date_hashkey STRING,
    date date,
    load_dts TIMESTAMP,
    record_src STRING
)
USING DELTA


StatementMeta(, 33d319e2-2865-40a8-81b3-408c07357f16, 45, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [45]:
spark.sql("""
INSERT INTO Hub_date (hub_date_hashkey, date, load_dts, record_src)
SELECT DISTINCT
    md5(s.`date`) AS hub_store_hashkey,
    s.`date` store_id,
    current_timestamp() AS load_dts,
    "CRM" record_src
FROM staging_sales s
LEFT ANTI JOIN hub_store h  
ON md5(s.`date`) = h.hub_store_hashkey
WHERE s.`date` IS NOT NULL
""")

StatementMeta(, 33d319e2-2865-40a8-81b3-408c07357f16, 47, Finished, Available, Finished)

DataFrame[]

## Link date : Création de table et chargement

In [10]:
%%sql 
CREATE TABLE link_sale (
    link_sale_hashkey STRING,         -- MD5(product_id || store || date)
    hub_product_hashkey STRING,
    hub_store_hashkey STRING,
    hub_date_hashkey STRING,
    load_dts TIMESTAMP ,
    record_src STRING
)
USING DELTA

StatementMeta(, 98146d3d-918e-4809-b64a-0de5e449033a, 12, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [12]:
spark.sql("""INSERT INTO Link_Sale (
    link_sale_hashkey,
    hub_product_hashkey,
    hub_store_hashkey,
    hub_date_hashkey,
    load_dts,
    record_src
)
SELECT DISTINCT
    md5(concat_ws('|', cast(s.`Product ID` AS STRING), cast(s.`Store Location` AS STRING), cast(s.`Date` AS STRING))) AS link_sale_hashkey,
    md5(cast(s.`Product ID` AS STRING)) AS hub_product_hashkey,
    md5(cast(s.`Store Location` AS STRING)) AS hub_store_hashkey,
    md5(cast(s.`Date` AS STRING)) AS hub_date_hashkey,
    current_timestamp() AS load_dts,
    "CRM" record_src
FROM staging_sales s
LEFT ANTI JOIN Link_Sale l
ON md5(concat_ws('|', cast(s.`Product ID` AS STRING), cast(s.`Store Location`AS STRING), cast(s.`Date` AS STRING))) = l.link_sale_hashkey
WHERE s.`Product ID` IS NOT NULL
  AND s.`Store Location` IS NOT NULL
  AND s.`Date` IS NOT NULL
""")

StatementMeta(, 98146d3d-918e-4809-b64a-0de5e449033a, 14, Finished, Available, Finished)

DataFrame[]

## Sat_Sale : Création de table et chargement

In [16]:
%%sql 
CREATE TABLE Sat_Sale (
    link_sale_hashkey STRING,         -- Clé de lien vers Link_Sale
    units_sold INT,
    sales_revenue DOUBLE,
    promo_pct DOUBLE,
    marketing_spend DOUBLE,
    holiday_flag BOOLEAN,
    hash_diff STRING,                 -- MD5 de tous les attributs ci-dessus
    load_dts TIMESTAMP,
    record_src STRING
)
USING DELTA


StatementMeta(, 98146d3d-918e-4809-b64a-0de5e449033a, 18, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [17]:
%%sql
INSERT INTO Sat_Sale (
    link_sale_hashkey,
    units_sold,
    sales_revenue,
    promo_pct,
    marketing_spend,
    holiday_flag,
    hash_diff,
    load_dts,
    record_src
)
SELECT
    md5(concat_ws('|',
        cast(s.`Product ID` AS STRING),
        cast(s.`Store Location`  AS STRING),
        cast(s.`Date` AS STRING)
    )) AS link_sale_hashkey,
    s.`Units Sold`,
    s.`Sales Revenue (USD)`,
    s.`Discount Percentage`,
    s.`Marketing Spend (USD)`,
    s.`Holiday Effect`,
    md5(concat_ws('',
        cast(coalesce(s.`Units Sold`, 0) AS STRING),
        cast(coalesce(s.`Sales Revenue (USD)`, 0.0) AS STRING),
        cast(coalesce(s.`Discount Percentage`, 0.0) AS STRING),
        cast(coalesce(s.`Marketing Spend (USD)`, 0.0) AS STRING),
        cast(coalesce(s.`Holiday Effect`, false) AS STRING)
    )) AS hash_diff,
    current_timestamp() AS load_dts,
    'CRM' AS record_src
FROM staging_sales s
WHERE s.`Product ID` IS NOT NULL
  AND s.`Store Location`  IS NOT NULL
  AND s.`Date` IS NOT NULL


StatementMeta(, 98146d3d-918e-4809-b64a-0de5e449033a, 19, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>