In [0]:
# âœ¨ USE YOUR REAL VALUES BELOW

client_id      = ""     # Application (client) ID
tenant_id      = ""      # Directory (tenant) ID
client_secret  = ""          # Client Secret (Value)

storage_account = "azuresupply2605"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")

spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", client_secret)

spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net",
               f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")


In [0]:
external_bronze = spark.read.parquet(
    "abfss://bronze@azuresupply2605.dfs.core.windows.net/external/"
)

feature_bronze = spark.read.parquet(
    "abfss://bronze@azuresupply2605.dfs.core.windows.net/feature/"
)

demand_bronze = spark.read.parquet(
    "abfss://bronze@azuresupply2605.dfs.core.windows.net/demand/"
)


In [0]:
def clean_columns(df):
    for col_name in df.columns:
        df = df.withColumnRenamed(col_name, col_name.lower().replace(" ", "_"))
    return df

external_silver = clean_columns(external_bronze)
feature_silver = clean_columns(feature_bronze)
demand_silver = clean_columns(demand_bronze)


In [0]:
external_silver = external_silver.dropDuplicates()
feature_silver = feature_silver.dropDuplicates()
demand_silver = demand_silver.dropDuplicates()


In [0]:
from pyspark.sql.functions import to_date, col

for df in [external_silver, feature_silver, demand_silver]:
    for c in df.columns:
        if "date" in c or "month" in c:
            df = df.withColumn(c, to_date(col(c)))


In [0]:
from pyspark.sql.types import StringType, NumericType

def fill_missing(df):
    for field in df.schema.fields:
        if isinstance(field.dataType, NumericType):
            df = df.fillna({field.name: 0})
        elif isinstance(field.dataType, StringType):
            df = df.fillna({field.name: "Unknown"})
    return df

external_silver = fill_missing(external_silver)
feature_silver = fill_missing(feature_silver)
demand_silver = fill_missing(demand_silver)


In [0]:
external_silver.write.mode("overwrite").parquet(
    "abfss://silver@azuresupply2605.dfs.core.windows.net/external/"
)

feature_silver.write.mode("overwrite").parquet(
    "abfss://silver@azuresupply2605.dfs.core.windows.net/feature/"
)

demand_silver.write.mode("overwrite").parquet(
    "abfss://silver@azuresupply2605.dfs.core.windows.net/demand/"
)


In [0]:
external_silver.printSchema()
feature_silver.printSchema()
demand_silver.printSchema()


root
 |-- date: date (nullable = true)
 |-- cloud_demand_index: integer (nullable = false)
 |-- gdp_growth: double (nullable = false)
 |-- inflation: double (nullable = false)
 |-- competitor_price_index: integer (nullable = false)

root
 |-- date: date (nullable = true)
 |-- region: string (nullable = false)
 |-- service: string (nullable = false)
 |-- daily_usage_units: integer (nullable = false)
 |-- peak_usage_units: integer (nullable = false)
 |-- vm_count: integer (nullable = false)
 |-- storage_tb: integer (nullable = false)
 |-- season: string (nullable = false)
 |-- econ_index: integer (nullable = false)
 |-- downtime_min: integer (nullable = false)
 |-- usage_lag_1: double (nullable = false)
 |-- usage_lag_7: double (nullable = false)
 |-- week_over_week_growth: double (nullable = false)
 |-- seasonality_factor: double (nullable = false)

root
 |-- daily_usage_units: integer (nullable = false)
 |-- date: date (nullable = true)
 |-- downtime_min: integer (nullable = false)
 |-