### Spark session configuration
This cell sets Spark session settings to enable _Verti-Parquet_ and _Optimize on Write_. More details about _Verti-Parquet_ and _Optimize on Write_ in tutorial document.

In [1]:
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")

StatementMeta(, , -1, SessionStarting, , SessionStarting)

#### Approach #1 - sale_by_date_city
In this cell, you are creating three different Spark dataframes, each referencing an existing delta table.

In [None]:
# SCRIPT DE RESGATE: GERA AS DUAS TABELAS AGREGADAS (GOLD) DE UMA VEZ
from pyspark.sql.functions import col, sum as _sum

# --- PREPARAÇÃO: LER AS TABELAS BASE (SILVER) ---
# Usando os nomes curtos que a gente já consertei
fact_sale = spark.read.table("fact_sale")
dim_date = spark.read.table("dimension_date")
dim_city = spark.read.table("dimension_city")
dim_employee = spark.read.table("dimension_employee")

print("1. Tabelas base lidas com sucesso...")

In [2]:
# --- PARTE 1: AGGREGATE SALE BY CITY ---
# Join e Agrupamento direto em Python 
df_city_agg = fact_sale.join(dim_date, fact_sale.InvoiceDateKey == dim_date.Date) \
                       .join(dim_city, fact_sale.CityKey == dim_city.CityKey) \
                       .groupBy(dim_date.Date, dim_date.CalendarMonthLabel, dim_date.CalendarYear, 
                                dim_city.City, dim_city.StateProvince, dim_city.SalesTerritory) \
                       .agg(
                           _sum(fact_sale.TotalExcludingTax).alias("SumOfTotalExcludingTax"),
                           _sum(fact_sale.TaxAmount).alias("SumOfTaxAmount"),
                           _sum(fact_sale.TotalIncludingTax).alias("SumOfTotalIncludingTax"),
                           _sum(fact_sale.Profit).alias("SumOfProfit")
                       )

# Salvar a tabela final
df_city_agg.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("aggregate_sale_by_date_city")
print("Tabela 1 Criada: aggregate_sale_by_date_city")

StatementMeta(, 3e00dee1-43b1-4421-a195-c45da55afc64, 4, Finished, Available, Finished)

SyntaxError: incomplete input (413541650.py, line 1)

In this cell, you are joining these tables using the dataframes created earlier, doing group by to generate aggregation, renaming few of the columns and finally writing it as delta table in the _Tables_ section of the lakehouse.

#### Approach #2 - sale_by_date_employee
In this cell, you are creating a temporary Spark view by joining 3 tables, doing group by to generate aggregation, renaming few of the columns. 

In [None]:
# --- PARTE 2: AGGREGATE SALE BY EMPLOYEE ---
# Fazendo o Join e Agrupamento direto
df_emp_agg = fact_sale.join(dim_date, fact_sale.InvoiceDateKey == dim_date.Date) \
                      .join(dim_employee, fact_sale.SalespersonKey == dim_employee.EmployeeKey) \
                      .groupBy(dim_date.Date, dim_date.CalendarMonthLabel, dim_date.CalendarYear, 
                               dim_employee.PreferredName, dim_employee.Employee) \
                      .agg(
                           _sum(fact_sale.TotalExcludingTax).alias("SumOfTotalExcludingTax"),
                           _sum(fact_sale.TaxAmount).alias("SumOfTaxAmount"),
                           _sum(fact_sale.TotalIncludingTax).alias("SumOfTotalIncludingTax"),
                           _sum(fact_sale.Profit).alias("SumOfProfit")
                       )

# Salvar a tabela final
df_emp_agg.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("aggregate_sale_by_date_employee")
print("Tabela 2 Criada: aggregate_sale_by_date_employee")

In [None]:
print("FIM DO ESTRESSE.")

StatementMeta(, , -1, Waiting, , Waiting)

In this cell, you are reading from the temporary Spark view created in the previous cell and and finally writing it as delta table in the _Tables_ section of the lakehouse.