In [None]:
import requests

csv_base_url = "https://github.com/PowerBiDevCamp/Python-In-Fabric-Notebooks/raw/main/ProductSalesData/"

csv_files = { "Customers.csv", "Products.csv", "Invoices.csv", "InvoiceDetails.csv" }

folder_path = "Files/landing_zone_sales/"

for csv_file in csv_files:
    csv_file_path = csv_base_url + csv_file
    with requests.get(csv_file_path) as response:
        csv_content = response.content.decode('utf-8-sig')
        mssparkutils.fs.put(folder_path + csv_file, csv_content, True)
        print(csv_file + " copied to Lakehouse file in OneLake")

StatementMeta(, 2b68fd37-2eac-401a-916d-72047a7db4e0, 6, Finished, Available)

Customers.csv copied to Lakehouse file in OneLake
InvoiceDetails.csv copied to Lakehouse file in OneLake
Products.csv copied to Lakehouse file in OneLake
Invoices.csv copied to Lakehouse file in OneLake


In [None]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, FloatType

# creating a Spark DataFrame using schema defined using StructType and StructField 
schema_products = StructType([
    StructField("ProductId", LongType() ),
    StructField("Product", StringType() ),
    StructField("Category", StringType() )
])

df_products = (
    spark.read.format("csv")
         .option("header","true")
         .schema(schema_products)
         .load("Files/landing_zone_sales/Products.csv")
)

df_products.printSchema()
df_products.show()

StatementMeta(, 2b68fd37-2eac-401a-916d-72047a7db4e0, 7, Finished, Available)

root
 |-- ProductId: long (nullable = true)
 |-- Product: string (nullable = true)
 |-- Category: string (nullable = true)

+---------+---------+----------+
|ProductId|  Product|  Category|
+---------+---------+----------+
|        1|   Apples|    Fruits|
|        2|  Bananas|    Fruits|
|        3|  Oranges|    Fruits|
|        4|  Carrots|Vegetables|
|        5|Cucumbers|Vegetables|
|        6| Potatoes|Vegetables|
|        7| Tomatoes|Vegetables|
|        8|     Milk|     Dairy|
|        9|   Butter|     Dairy|
|       10|   Cheese|     Dairy|
+---------+---------+----------+



In [None]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, FloatType, DateType

# creating a Spark DataFrame using schema defined with StructType and StructField 
schema_customers = StructType([
    StructField("CustomerId", LongType() ),
    StructField("FirstName", StringType() ),
    StructField("LastName", StringType() ),
    StructField("Country", StringType() ),
    StructField("City", StringType() ),
    StructField("DOB", DateType() ),
])

df_customers = (
    spark.read.format("csv")
         .option("header","true")
         .schema(schema_customers)
         .option("dateFormat", "M/d/yyyy")
         .option("inferSchema", "true")
         .load("Files/landing_zone_sales/Customers.csv")
)

df_customers.printSchema()
df_customers.show()

StatementMeta(, 2b68fd37-2eac-401a-916d-72047a7db4e0, 8, Finished, Available)

root
 |-- CustomerId: long (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- DOB: date (nullable = true)

+----------+----------+---------+--------+----------+----------+
|CustomerId| FirstName| LastName| Country|      City|       DOB|
+----------+----------+---------+--------+----------+----------+
|         1|   Yolanda|   Wagner| Belgium|  Brussels|1970-02-24|
|         2|    Arnold|   Harmon| England|    London|1943-11-01|
|         3|      Minh|    Casey| Ireland|  Limerick|1976-11-24|
|         4|   Russell| McIntyre|Portugal|     Braga|1993-08-05|
|         5|  Angelina| Santiago| England|    London|1978-08-22|
|         6|   Sabrina|   Conway| England|Birmingham|1963-06-25|
|         7|Jacqueline|Zimmerman| Belgium|  Brussels|2003-05-04|
|         8|     Dewey|  Francis| England| Liverpool|1957-01-10|
|         9|     Haley| McDowell| Belgium|  Brusse

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, FloatType, DateType

# creating a Spark DataFrame using schema defined using StructType and StructField 
schema_invoices = StructType([
    StructField("InvoiceId", LongType() ),
    StructField("Date", DateType() ),
    StructField("TotalSalesAmount", FloatType() ),
    StructField("CustomerId", LongType() )
])

df_invoices = (
    spark.read.format("csv")
         .option("header","true")
         .schema(schema_invoices)
         .option("dateFormat", "MM/dd/yyyy")
         .option("inferSchema", "true") 
         .load("Files/landing_zone_sales/Invoices.csv")
)

df_invoices.printSchema()
df_invoices.show()

StatementMeta(, 2b68fd37-2eac-401a-916d-72047a7db4e0, 11, Finished, Available)

root
 |-- InvoiceId: long (nullable = true)
 |-- Date: date (nullable = true)
 |-- TotalSalesAmount: float (nullable = true)
 |-- CustomerId: long (nullable = true)

+---------+----------+----------------+----------+
|InvoiceId|      Date|TotalSalesAmount|CustomerId|
+---------+----------+----------------+----------+
|        1|2020-01-01|            72.0|         1|
|        2|2020-01-01|            35.0|         2|
|        3|2020-01-01|             5.0|         3|
|        4|2020-01-01|           58.75|         4|
|        5|2020-01-01|            39.1|         5|
|        6|2020-01-01|            37.5|         6|
|        7|2020-01-01|           22.15|         7|
|        8|2020-01-01|            58.1|         8|
|        9|2020-01-01|           51.75|         9|
|       10|2020-01-01|          120.75|        10|
|       11|2020-01-02|           78.95|        11|
|       12|2020-01-02|            54.1|        12|
|       13|2020-01-02|             7.5|        13|
|       14|2020-01

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, FloatType, DateType

# creating a Spark DataFrame using schema defined using StructType and StructField 
schema_invoice_details = StructType([
    StructField("Id", LongType() ),
    StructField("Quantity", LongType() ),
    StructField("SalesAmount", FloatType() ),
    StructField("InvoiceId", LongType() ),
    StructField("ProductId", LongType() )
])

df_invoice_details = (
    spark.read.format("csv")
         .option("header","true")
         .schema(schema_invoice_details)
         .option("dateFormat", "MM/dd/yyyy")
         .option("inferSchema", "true") 
         .load("Files/landing_zone_sales/InvoiceDetails.csv")
)

df_invoice_details.printSchema()
df_invoice_details.show()

StatementMeta(, 2b68fd37-2eac-401a-916d-72047a7db4e0, 12, Finished, Available)

root
 |-- Id: long (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- SalesAmount: float (nullable = true)
 |-- InvoiceId: long (nullable = true)
 |-- ProductId: long (nullable = true)

+---+--------+-----------+---------+---------+
| Id|Quantity|SalesAmount|InvoiceId|ProductId|
+---+--------+-----------+---------+---------+
|  2|       2|        4.5|        1|        5|
|  2|      30|       67.5|        1|        9|
|  3|      20|       35.0|        2|        7|
|  4|       4|        5.0|        3|        3|
|  5|      20|       25.0|        4|        6|
|  5|       6|       10.5|        4|        7|
|  5|      31|      23.25|        4|        1|
|  6|       3|       2.85|        5|        2|
|  6|      29|      36.25|        5|        3|
|  7|      15|       37.5|        6|        8|
|  8|      17|      16.15|        7|        4|
|  8|       8|        6.0|        7|        1|
|  9|      28|       26.6|        8|        2|
|  9|      18|       31.5|        8|        7|
| 10|

In [29]:
df_products.write.mode("overwrite").option("overwriteSchema", "True").format("delta").save(f"Tables/bronze_products")
df_customers.write.mode("overwrite").option("overwriteSchema", "True").format("delta").save(f"Tables/bronze_customers")
df_invoices.write.mode("overwrite").option("overwriteSchema", "True").format("delta").save(f"Tables/bronze_invoices")
df_invoice_details.write.mode("overwrite").option("overwriteSchema", "True").format("delta").save(f"Tables/bronze_invoice_details")

StatementMeta(, 86c586d0-fd21-4305-99ae-bcbfd268cf34, 31, Finished, Available)

NameError: name 'df_products' is not defined

In [None]:
df_silver_products = spark.read.format("delta").load("Tables/bronze_products")

df_silver_products.write.mode("overwrite").option("overwriteSchema", "True").format("delta").save(f"Tables/products")

df_silver_products.show()

StatementMeta(, 2b68fd37-2eac-401a-916d-72047a7db4e0, 14, Finished, Available)

+---------+---------+----------+
|ProductId|  Product|  Category|
+---------+---------+----------+
|        1|   Apples|    Fruits|
|        2|  Bananas|    Fruits|
|        3|  Oranges|    Fruits|
|        4|  Carrots|Vegetables|
|        5|Cucumbers|Vegetables|
|        6| Potatoes|Vegetables|
|        7| Tomatoes|Vegetables|
|        8|     Milk|     Dairy|
|        9|   Butter|     Dairy|
|       10|   Cheese|     Dairy|
+---------+---------+----------+



In [30]:
from pyspark.sql.functions import concat_ws, floor, datediff, current_date, col

df_silver_customers = (

    spark.read.format("delta").load("Tables/bronze_customers")
            .withColumn("Customer", concat_ws(' ', col('FirstName'), col('LastName')) )
            .withColumn("Age",( floor( datediff( current_date(), col("DOB") )/365.25) ))   
            .drop('FirstName', 'LastName')
)

df_silver_customers.write.mode("overwrite").option("overwriteSchema", "True").format("delta").save(f"Tables/customers")

df_silver_customers.printSchema()
df_silver_customers.show()

StatementMeta(, 86c586d0-fd21-4305-99ae-bcbfd268cf34, 32, Finished, Available)

root
 |-- CustomerId: long (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- DOB: date (nullable = true)
 |-- Customer: string (nullable = false)
 |-- Age: long (nullable = true)

+----------+-------+-------+----------+-----------------+---+
|CustomerId|Country|   City|       DOB|         Customer|Age|
+----------+-------+-------+----------+-----------------+---+
|        16|Belgium|Antwerp|1998-04-03|       Lacy Cross| 25|
|        57|Belgium|Antwerp|1992-04-02|     Cheryl Morse| 31|
|        77|Belgium|Antwerp|1970-09-17|      Dolly Boyer| 53|
|       133|Belgium|Antwerp|1991-06-13| Alyssa Frederick| 32|
|       137|Belgium|Antwerp|1992-08-17|   Maxine Walters| 31|
|       141|Belgium|Antwerp|1951-06-06|    Burl McCarthy| 72|
|       147|Belgium|Antwerp|1983-11-20|     Eloise Bruce| 39|
|       184|Belgium|Antwerp|2004-10-23|   Gwen Whitehead| 18|
|       211|Belgium|Antwerp|1952-03-01|    Prince Franco| 71|
|       221|Belgium|Antwerp|

In [27]:
from pyspark.sql.functions import col, desc, concat, lit, floor, datediff
from pyspark.sql.functions import date_format, to_date, current_date, year, month, dayofmonth

df_bronze_invoices = spark.read.format("delta").load("Tables/bronze_invoices")
df_bronze_invoice_details = spark.read.format("delta").load("Tables/bronze_invoice_details")

df_silver_sales = (
    df_bronze_invoice_details
            .join(df_bronze_invoices, df_bronze_invoice_details['InvoiceId'] == df_bronze_invoices['InvoiceId'])
            .withColumnRenamed('SalesAmount', 'Sales')
            .withColumn("DateKey", (year(col('Date'))*10000) + 
                               (month(col('Date'))*100) + 
                               (dayofmonth(col('Date')))   )

            .drop('InvoiceId', 'TotalSalesAmount', 'InvoiceId', 'Id')
            .select('Date', "DateKey", "CustomerId", "ProductId", "Sales", "Quantity")
)

df_silver_sales.write.mode("overwrite").option("overwriteSchema", "True").format("delta").save(f"Tables/sales")

df_silver_sales.printSchema()
df_silver_sales.show()

StatementMeta(, 86c586d0-fd21-4305-99ae-bcbfd268cf34, 29, Finished, Available)

root
 |-- Date: date (nullable = true)
 |-- DateKey: integer (nullable = true)
 |-- CustomerId: long (nullable = true)
 |-- ProductId: long (nullable = true)
 |-- Sales: float (nullable = true)
 |-- Quantity: long (nullable = true)

+----------+--------+----------+---------+-----+--------+
|      Date| DateKey|CustomerId|ProductId|Sales|Quantity|
+----------+--------+----------+---------+-----+--------+
|2020-02-24|20200224|       606|       10| 90.0|      24|
|2020-03-06|20200306|       776|       10| 90.0|      24|
|2020-03-06|20200306|       789|       10| 90.0|      24|
|2020-03-07|20200307|       803|       10| 90.0|      24|
|2020-03-18|20200318|      1079|       10| 90.0|      24|
|2020-04-02|20200402|      1663|       10| 90.0|      24|
|2020-04-11|20200411|      1992|       10| 90.0|      24|
|2020-04-12|20200412|      2079|       10| 90.0|      24|
|2020-04-14|20200414|      2244|       10| 90.0|      24|
|2020-04-29|20200429|      3055|       10| 90.0|      24|
|2020-05-03|2

In [28]:
import pyspark.pandas as ps
from datetime import datetime, timedelta, date
import os
import pandas as pd

from pyspark.sql.functions import to_date, year, month, dayofmonth, quarter, dayofweek

first_sales_date = df_silver_sales.agg({"Date": "min"}).collect()[0][0]
last_sales_date = df_silver_sales.agg({"Date": "max"}).collect()[0][0]

start_date = date(first_sales_date.year, 1, 1)
end_date = date(last_sales_date.year, 12, 31)

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
df_calendar_ps = pd.date_range(start_date, end_date, freq='D').to_frame()

df_calendar_spark = (
     spark.createDataFrame(df_calendar_ps)
       .withColumnRenamed("0", "timestamp")
       .withColumn("Date", to_date(col('timestamp')))
       .withColumn("DateKey", (year(col('timestamp'))*10000) + 
                              (month(col('timestamp'))*100) + 
                              (dayofmonth(col('timestamp')))   )
       .withColumn("Year", year(col('timestamp'))  )
       .withColumn("Quarter", date_format(col('timestamp'),"yyyy-QQ")  )
       .withColumn("Month", date_format(col('timestamp'),'yyyy-MM')  )
       .withColumn("Day", dayofmonth(col('timestamp'))  )
       .withColumn("MonthInYear", date_format(col('timestamp'),'MMMM')  )
       .withColumn("MonthInYearSort", month(col('timestamp'))  )
       .withColumn("DayOfWeek", date_format(col('timestamp'),'EEEE')  )
       .withColumn("DayOfWeekSort", dayofweek(col('timestamp')))
       .drop('timestamp')
)

df_calendar_spark.write.mode("overwrite").option("overwriteSchema", "True").format("delta").save(f"Tables/calendar")
df_calendar_spark.show()

StatementMeta(, 86c586d0-fd21-4305-99ae-bcbfd268cf34, 30, Finished, Available)

+----------+--------+----+-------+-------+---+-----------+---------------+---------+-------------+
|      Date| DateKey|Year|Quarter|  Month|Day|MonthInYear|MonthInYearSort|DayOfWeek|DayOfWeekSort|
+----------+--------+----+-------+-------+---+-----------+---------------+---------+-------------+
|2020-01-01|20200101|2020|2020-01|2020-01|  1|    January|              1|Wednesday|            4|
|2020-01-02|20200102|2020|2020-01|2020-01|  2|    January|              1| Thursday|            5|
|2020-01-03|20200103|2020|2020-01|2020-01|  3|    January|              1|   Friday|            6|
|2020-01-04|20200104|2020|2020-01|2020-01|  4|    January|              1| Saturday|            7|
|2020-01-05|20200105|2020|2020-01|2020-01|  5|    January|              1|   Sunday|            1|
|2020-01-06|20200106|2020|2020-01|2020-01|  6|    January|              1|   Monday|            2|
|2020-01-07|20200107|2020|2020-01|2020-01|  7|    January|              1|  Tuesday|            3|
|2020-01-0