In [10]:
from pyspark.sql.functions import col, when, lit, to_date
from pyspark.sql.types import IntegerType, DoubleType, StringType, DateType, NumericType, LongType,  FloatType, ShortType, DecimalType

StatementMeta(, 02fe1b04-62d6-4e61-8705-89b1e9ed47dd, 12, Finished, Available, Finished)

In [2]:
bronze_df = spark.read.format("delta").load("Tables/bronze_liquor_sales")

StatementMeta(, 02fe1b04-62d6-4e61-8705-89b1e9ed47dd, 4, Finished, Available, Finished)

In [3]:
bronze_df.printSchema()

StatementMeta(, 02fe1b04-62d6-4e61-8705-89b1e9ed47dd, 5, Finished, Available, Finished)

root
 |-- Invoice_Item_Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Store_Number: integer (nullable = true)
 |-- Store_Name: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zip_Code: integer (nullable = true)
 |-- Store_Location: string (nullable = true)
 |-- County_Number: integer (nullable = true)
 |-- County: string (nullable = true)
 |-- Category: integer (nullable = true)
 |-- Category_Name: string (nullable = true)
 |-- Vendor_Number: integer (nullable = true)
 |-- Vendor_Name: string (nullable = true)
 |-- Item_Number: double (nullable = true)
 |-- Item_Description: string (nullable = true)
 |-- Pack: integer (nullable = true)
 |-- Bottle_Volume__ml_: double (nullable = true)
 |-- State_Bottle_Cost: double (nullable = true)
 |-- State_Bottle_Retail: double (nullable = true)
 |-- Bottles_Sold: integer (nullable = true)
 |-- Sale__Dollars_: double (nullable = true)
 |-- Volume_Sold__Liters_: doub

In [4]:
# Count null values per column
print("\n Null Values Count Per Column:")
null_counts = {col: bronze_df.filter(bronze_df[col].isNull()).count() for col in bronze_df.columns}
for col, count in null_counts.items():
    print(f"{col}: {count}")

StatementMeta(, 02fe1b04-62d6-4e61-8705-89b1e9ed47dd, 6, Finished, Available, Finished)


 Null Values Count Per Column:
Invoice_Item_Number: 0
Date: 0
Store_Number: 0
Store_Name: 0
Address: 4348
City: 4348
Zip_Code: 4370
Store_Location: 796384
County_Number: 7901466
County: 4348
Category: 703
Category_Name: 703
Vendor_Number: 25
Vendor_Name: 25
Item_Number: 21
Item_Description: 21
Pack: 21
Bottle_Volume__ml_: 21
State_Bottle_Cost: 21
State_Bottle_Retail: 21
Bottles_Sold: 42
Sale__Dollars_: 42
Volume_Sold__Liters_: 42
Volume_Sold__Gallons_: 42


In [5]:
total_rows = bronze_df.count()
unique_rows = bronze_df.dropDuplicates().count()
duplicate_count = total_rows - unique_rows
print(f"\n Total Duplicate Rows: {duplicate_count}")

StatementMeta(, 02fe1b04-62d6-4e61-8705-89b1e9ed47dd, 7, Finished, Available, Finished)


🔁 Total Duplicate Rows: 2


**Transformations**

In [7]:
from pyspark.sql.functions import col, to_date

# Convert Date column from string to DateType
bronze_df = bronze_df.withColumn("Date", to_date(col("Date"), "MM/dd/yyyy"))

StatementMeta(, 02fe1b04-62d6-4e61-8705-89b1e9ed47dd, 9, Finished, Available, Finished)

In [11]:
# Handle null values
for field in bronze_df.schema.fields:
    if isinstance(field.dataType, StringType):
        bronze_df = bronze_df.withColumn(
            field.name,
            when(col(field.name).isNull(), lit("Unknown")).otherwise(col(field.name)))
    elif isinstance(field.dataType, (IntegerType, DoubleType, LongType, FloatType, ShortType, DecimalType)):
        bronze_df = bronze_df.withColumn(
            field.name,
            when(col(field.name).isNull(), lit(0)).otherwise(col(field.name)))
    elif isinstance(field.dataType, DateType):
        bronze_df = bronze_df.withColumn(
            field.name,
            when(col(field.name).isNull(), lit("1000-01-01")).otherwise(col(field.name)))

StatementMeta(, 02fe1b04-62d6-4e61-8705-89b1e9ed47dd, 13, Finished, Available, Finished)

In [12]:
# 3. Drop duplicate rows
silver_df = bronze_df.dropDuplicates()

# 4. Check duplicates removed
duplicate_count = bronze_df.count() - silver_df.count()
print(f"Number of duplicate rows removed: {duplicate_count}")

StatementMeta(, 02fe1b04-62d6-4e61-8705-89b1e9ed47dd, 14, Finished, Available, Finished)

Number of duplicate rows removed: 2


In [13]:
print("\n Verifying Null Values Count Per Column:")
null_counts = {col: silver_df.filter(silver_df[col].isNull()).count() for col in silver_df.columns}
for col, count in null_counts.items():
    print(f"{col}: {count}")

StatementMeta(, 02fe1b04-62d6-4e61-8705-89b1e9ed47dd, 15, Finished, Available, Finished)


 Verifying Null Values Count Per Column:
Invoice_Item_Number: 0
Date: 0
Store_Number: 0
Store_Name: 0
Address: 0
City: 0
Zip_Code: 0
Store_Location: 0
County_Number: 0
County: 0
Category: 0
Category_Name: 0
Vendor_Number: 0
Vendor_Name: 0
Item_Number: 0
Item_Description: 0
Pack: 0
Bottle_Volume__ml_: 0
State_Bottle_Cost: 0
State_Bottle_Retail: 0
Bottles_Sold: 0
Sale__Dollars_: 0
Volume_Sold__Liters_: 0
Volume_Sold__Gallons_: 0


In [14]:
total_rows = silver_df.count()
unique_rows = silver_df.dropDuplicates().count()
duplicate_count = total_rows - unique_rows
print(f"\n Total Duplicate Rows: {duplicate_count}")

StatementMeta(, 02fe1b04-62d6-4e61-8705-89b1e9ed47dd, 16, Finished, Available, Finished)


 Total Duplicate Rows: 0


In [15]:
# Saving to silver layer
silver_df.write.format("delta").mode("overwrite").save("Tables/silver_liquor_sales")

StatementMeta(, 02fe1b04-62d6-4e61-8705-89b1e9ed47dd, 17, Finished, Available, Finished)