# Chargement incrémentiel des données dans la zone Silver depuis la zone Bronze

![image-alt-text](https://learn.microsoft.com/en-us/fabric/onelake/media/onelake-medallion-lakehouse-architecture/onelake-medallion-lakehouse-architecture-example.png)

En utilisant les bibliothèques PySpark, nous allons lire les données incrémentielles brutes au format CSV provenant de la zone Bronze. Nous y ajouterons des colonnes calculées supplémentaires et créeront une vue Spark temporaire.

In [None]:
from pyspark.sql.functions import col, year, month, quarter

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("Files/wwi/incremental/fact_sale_1y_incremental")
)

df = df.withColumn("Year", year(col("InvoiceDateKey")))
df = df.withColumn("Quarter", quarter(col("InvoiceDateKey")))
df = df.withColumn("Month", month(col("InvoiceDateKey")))
df = df.createOrReplaceTempView("view_sales_incr")

In [None]:
%%sql
SELECT Year, Quarter, Month, count(1) AS Sales
FROM view_sales_incr 
GROUP BY Year, Quarter, Month

Insertion ou actualisation des données de la table de fait Sales

In [None]:
%%sql
MERGE INTO lakehouse_silver.sales target
USING view_sales_incr source
ON source.SaleKey = target.SaleKey AND source.InvoiceDateKey = target.InvoiceDateKey
  WHEN MATCHED THEN
    UPDATE SET 
      target.CityKey = source.CityKey
      , target.CustomerKey = source.CustomerKey
      , target.BillToCustomerKey = source.BillToCustomerKey
      , target.StockItemKey = source.StockItemKey
      , target.DeliveryDateKey = source.DeliveryDateKey
      , target.SalespersonKey = source.SalespersonKey
      , target.InvoiceID = source.WWIInvoiceID
      , target.Description = source.Description
      , target.Package = source.Package
      , target.Quantity = source.Quantity
      , target.UnitPrice = source.UnitPrice
      , target.TaxRate = source.TaxRate
      , target.TotalExcludingTax = source.TotalExcludingTax
      , target.TaxAmount = source.TaxAmount
      , target.Profit = source.Profit
      , target.TotalIncludingTax = source.TotalIncludingTax
      , target.TotalDryItems = source.TotalDryItems
      , target.TotalChillerItems = source.TotalChillerItems
      , target.LineageKey = source.LineageKey
  WHEN NOT MATCHED
    THEN INSERT (
      target.SaleKey, target.CityKey, target.CustomerKey, target.BillToCustomerKey, target.StockItemKey, target.InvoiceDateKey,
      target.DeliveryDateKey, target.SalespersonKey, target.InvoiceID, target.Description, target.Package, 
      target.Quantity, target.UnitPrice, target.TaxRate, target.TotalExcludingTax, target.TaxAmount, target.Profit, 
      target.TotalIncludingTax, target.TotalDryItems, target.TotalChillerItems, target.LineageKey, 
      target.Year, target.Quarter, target.Month)
    VALUES (
      source.SaleKey, source.CityKey, source.CustomerKey, source.BillToCustomerKey, source.StockItemKey, source.InvoiceDateKey,
      source.DeliveryDateKey, source.SalespersonKey, source.WWIInvoiceID, source.Description, source.Package,
      source.Quantity, source.UnitPrice, source.TaxRate, source.TotalExcludingTax, source.TaxAmount, source.Profit,
      source.TotalIncludingTax, source.TotalDryItems, source.TotalChillerItems, source.LineageKey, 
      source.Year, source.Quarter, source.Month)

In [None]:
%%sql
SELECT Year, Quarter, Month, count(1) AS Sales
FROM lakehouse_silver.sales
GROUP BY Year, Quarter, Month

In [None]:
%%sql
DESCRIBE HISTORY lakehouse_silver.sales

Vérification de l'historique des mises à jour sur une vente 

In [None]:
df_sales_v0 = (
    spark.read.format("delta")
    .option("versionAsOf", 0)
    .load("Tables/sales")
    .filter("SaleKey = 45655215")
)
display(df_sales_v0)

df_sales_v1 = (
    spark.read.format("delta")
    .option("versionAsOf", 1)
    .load("Tables/sales")
    .filter("SaleKey = 45655215")
)
display(df_sales_v1)