# GOLD LAYER

## General Info
  | Info | Details |
  |---------|------|
  |Table Name | TBD|
  |From | Azure |

## Update timeline
|Date | Developed/altered by: | Comment |
|:------:|--------|-------|
|2025/01/23|Luca Ainstein|Project Data Ingestion|

Library Import

In [0]:
### Import of libs

from pyspark.sql.functions import current_date, current_timestamp, expr


Cluster Conection

In [0]:
### There is the alternative of using the conection directly on the cluster creation. However, since the Databricks Community Cloud only allows 1-2 hours of inactivity before deleting the cluster, it is better to create the connection on the code. 

# Set up the configurations for Azure Data Lake Storage Gen2 using OAuth authentication
configs = {
    "fs.azure.account.auth.type.dlsprojetofixo.dfs.core.windows.net": "OAuth",
    "fs.azure.account.oauth.provider.type.dlsprojetofixo.dfs.core.windows.net": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id.dlsprojetofixo.dfs.core.windows.net": "238b3ff5-2a4b-49a0-838e-61aea5e55f0a",
    "fs.azure.account.oauth2.client.secret.dlsprojetofixo.dfs.core.windows.net": "~h.8Q~XwXAMBiXDY69nUxONcfoO49RweWIPELdkM",
    "fs.azure.account.oauth2.client.endpoint.dlsprojetofixo.dfs.core.windows.net": "https://login.microsoftonline.com/d16f0536-3c2f-4035-887f-8949bacfacfd/oauth2/token"
}

# Apply the configurations to Spark
for key, value in configs.items():
    spark.conf.set(key, value)

# Verify the configuration
print("Configuration applied successfully.")

Configuration applied successfully.


Star Schema: https://excalidraw.com/#json=i0qNHJZYtOEOmCDmN8p18,CaEi5WFnMMP_DpCn9NbHLA

Table `fact_orders`

In [0]:
### GOLD Table creation

## Table Creation

df_fo = spark.sql(
f"""
SELECT
  OrderID,
  CustomerID,
  ItemID,
  OrderDate,
  ItemQuantity,
  TotalAmount,
  Status
FROM
  silver.sales;
"""
)

df_fo = df_fo.withColumn("load_time_GMT", current_timestamp())
# Note that it is GMT. Please see Layer bronze for further details.

## Var Naming
schema_layer = "gold"
table_name = "fact_orders"

## Path def
path_target = 'abfss://gold@dlsprojetofixo.dfs.core.windows.net/fact_orders_lucaainstein'

## Database Creation
spark.sql("CREATE DATABASE IF NOT EXISTS gold")
df_fo.write \
    .format('delta') \
    .mode('overwrite') \
    .option('path', path_target) \
    .option('overwriteSchema', 'true') \
    .saveAsTable(f'{schema_layer}.{table_name}')
print("Schema is created!")

spark.sql(f"OPTIMIZE {schema_layer}.{table_name}")
print("Optimization " + f'{schema_layer}.{table_name}' + " completed!")

Schema is created!
Optimization gold.fact_orders completed!


Table `Customer_DIM`

In [0]:
### GOLD Table creation

## Table Creation

df = spark.sql(
f"""
SELECT
  CustomerID,
  Name,
  PhoneNumber,
  Age
FROM
  silver.sales;
"""
)

df = df.withColumn("load_time_GMT", current_timestamp())
# Note that it is GMT. Please see Layer bronze for further details.

## Var Naming
schema_layer = "gold"
table_name = "Customer_DIM"

## Path def
path_target = 'abfss://gold@dlsprojetofixo.dfs.core.windows.net/Customer_DIM_lucaainstein'

## Database Creation
spark.sql("CREATE DATABASE IF NOT EXISTS gold")
df.write \
    .format('delta') \
    .mode('overwrite') \
    .option('path', path_target) \
    .option('overwriteSchema', 'true') \
    .saveAsTable(f'{schema_layer}.{table_name}')
print("Schema is created!")

spark.sql(f"OPTIMIZE {schema_layer}.{table_name}")
print("Optimization " + f'{schema_layer}.{table_name}' + " completed!")

Schema is created!
Optimization gold.Customer_DIM completed!


Table `Product_DIM`

In [0]:
### GOLD Table creation

## Table Creation

df = spark.sql(
f"""
SELECT
  ItemID,
  ItemPrice,
  ItemProductName
FROM
  silver.sales;
"""
)

df = df.withColumn("load_time_GMT", current_timestamp())
# Note that it is GMT. Please see Layer bronze for further details.

## Var Naming
schema_layer = "gold"
table_name = "Product_DIM"

## Path def
path_target = 'abfss://gold@dlsprojetofixo.dfs.core.windows.net/Product_DIM_lucaainstein'

## Database Creation
spark.sql("CREATE DATABASE IF NOT EXISTS gold")
df.write \
    .format('delta') \
    .mode('overwrite') \
    .option('path', path_target) \
    .option('overwriteSchema', 'true') \
    .saveAsTable(f'{schema_layer}.{table_name}')
print("Schema is created!")

spark.sql(f"OPTIMIZE {schema_layer}.{table_name}")
print("Optimization " + f'{schema_layer}.{table_name}' + " completed!")

Schema is created!
Optimization gold.Product_DIM completed!


Table `Shipping_DIM`

In [0]:
### GOLD Table creation

## Table Creation

df = spark.sql(
f"""
SELECT
  OrderID,
  Street,
  City,
  State,
  Country,
  PostalCode
FROM
  silver.sales;
"""
)

df = df.withColumn("load_time_GMT", current_timestamp())
# Note that it is GMT. Please see Layer bronze for further details.

## Var Naming
schema_layer = "gold"
table_name = "Shipping_DIM"

## Path def
path_target = 'abfss://gold@dlsprojetofixo.dfs.core.windows.net/Shipping_DIM_lucaainstein'

## Database Creation
spark.sql("CREATE DATABASE IF NOT EXISTS gold")
df.write \
    .format('delta') \
    .mode('overwrite') \
    .option('path', path_target) \
    .option('overwriteSchema', 'true') \
    .saveAsTable(f'{schema_layer}.{table_name}')
print("Schema is created!")

spark.sql(f"OPTIMIZE {schema_layer}.{table_name}")
print("Optimization " + f'{schema_layer}.{table_name}' + " completed!")

Schema is created!
Optimization gold.Shipping_DIM completed!


Table `Date_DIM`

In [0]:
### GOLD Table creation

## Table Creation

df = spark.sql(
f"""
SELECT
  OrderDate,
  EXTRACT(YEAR from OrderDate) AS OrderYear,
  EXTRACT(MONTH from OrderDate) AS OrderMonth,
  EXTRACT(DAY from OrderDate) AS OrderDay,
  WEEKDAY(OrderDate) AS OrderWeekday
FROM
  silver.sales;
"""
)

df = df.withColumn("load_time_GMT", current_timestamp())
# Note that it is GMT. Please see Layer bronze for further details.

## Var Naming
schema_layer = "gold"
table_name = "Date_DIM"

## Path def
path_target = 'abfss://gold@dlsprojetofixo.dfs.core.windows.net/Data_DIM_lucaainstein'

## Database Creation
spark.sql("CREATE DATABASE IF NOT EXISTS gold")
df.write \
    .format('delta') \
    .mode('overwrite') \
    .option('path', path_target) \
    .option('overwriteSchema', 'true') \
    .saveAsTable(f'{schema_layer}.{table_name}')
print("Schema is created!")

spark.sql(f"OPTIMIZE {schema_layer}.{table_name}")
print("Optimization " + f'{schema_layer}.{table_name}' + " completed!")

Schema is created!
Optimization gold.Date_DIM completed!
