# Create the Silver Table (Transform Data)

- Import the functions needed to create columns, timestamps etc from pyspark

In [0]:
from pyspark.sql.functions import col, to_timestamp, coalesce, lit, to_date, round

- The Bronze and Silver paths to the tables

In [0]:
bronze_path = "workspace.default.bronze_global_mart_sales"
silver_path = "workspace.default.silver_global_mart_sales"

- Load Bronze Table (Raw data) using above path

In [0]:
df_bronze = spark.read.format("delta").table(bronze_path)

- Show first 10 rows of bronze table

In [0]:
%sql
SELECT *
FROM workspace.default.bronze_global_mart_sales
LIMIT 10;

- Count all rows and distinct ordernumber to get total number of actual orders

In [0]:
%sql
SELECT COUNT(DISTINCT ordernumber) AS total_orders,
  COUNT(*) AS total_rows
FROM workspace.default.bronze_global_mart_sales

- Look for any nulls in the bronze table

In [0]:
table = "workspace.default.bronze_global_mart_sales"
df = spark.table(table)

total_rows = df.count()

# Create tidy results: column | missing_count | missing_pct
results = []
for c in df.columns:
    missing_count = df.filter(df[c].isNull()).count()
    results.append((c, missing_count, round(missing_count/total_rows*100, 2)))

# Convert to Spark DataFrame so it looks nice in Databricks
result_df = spark.createDataFrame(results, ["column_name", "missing_count", "missing_pct"])
display(result_df)

- change all column names to lower case for consistency
- replace nulls in state & addressline2 with unknown
- replace null postalcodes with N/A
- fix order date to be in date format and not string


In [0]:

df_silver = df_bronze.toDF(*[c.lower() for c in df_bronze.columns]) \
    .withColumn("state", coalesce(col("state"), lit("Unknown"))) \
    .withColumn("addressline2", coalesce(col("addressline2"), lit("Unknown"))) \
    .withColumn("postalcode", coalesce(col("postalcode"), lit("N/A"))) \
    .withColumn("orderdate", to_date(col("orderdate"), "M/d/yyyy H:mm"))

- Print schema to check data types for each column

In [0]:
df_silver.printSchema()


- Check for duplicates using python


In [0]:
duplicates = df_silver.groupBy(df_silver.columns)\
    .count()\
    .filter("count > 1")

display(duplicates)

- Check distinct customers for anything that looks odd or out of place, for example customer names in lower case, therefore essentailly creating a new customer

In [0]:
%sql
SELECT DISTINCT customername
FROM workspace.default.bronze_global_mart_sales
ORDER BY customername;

- The total amount in the sales column doesnt balance back to price* quantity ordered, added new column called "calculated_sales_total" to calculate correct amount

In [0]:
df_silver = df_silver.withColumn(
    "calculated_sales_total",
    round(col("priceeach") * col("quantityordered"),2)
)

In [0]:
df_silver.display()

- Select only the columns needed for Gold Aggregation table


In [0]:

df_silver = df_silver.select("ordernumber", "quantityordered", "priceeach", "calculated_sales_total", "orderdate", "qtr_id", "month_id", "year_id", "productline", "msrp", "productcode", "customername", "city", "country", "dealsize")


- Write the adjusted table to Silver Path

In [0]:
df_silver.write.format("delta").mode("overwrite").saveAsTable(silver_path)

- check new table has correct rows

In [0]:
df_silver.display()
