In [1]:
from pyspark.sql.functions import col, trim, upper

# Read from Bronze
df_silver = spark.table("lh_bronze.bronze_crm_customer_profiles")

# Type casting + cleaning
df_silver = (
    df_silver
    .withColumn("CustomerID", col("CustomerID").cast("int"))
    .withColumn("Name", trim(col("Name")))
    .withColumn("Email", trim(col("Email")))
    .withColumn("Region", trim(col("Region")))
    .withColumn("Segment", trim(col("Segment")))
    # Standardize casing for Region
    .withColumn("Region", upper(col("Region")))
)

# Save to Silver table
df_silver.write.format("delta").mode("overwrite").saveAsTable("lh_silver.silver_crm_customer_profiles")



StatementMeta(, 459def2d-9a29-4449-9c3a-79fe17d5dbe9, 3, Finished, Available, Finished)

In [3]:
from pyspark.sql.types import IntegerType, DoubleType, TimestampType
from pyspark.sql.functions import col, to_timestamp
# Read Bronze
df_bronze = spark.table("lh_bronze.bronze_erp_transactions")

# Transform & cast types
df_silver = df_bronze \
    .withColumn("TransactionID", col("TransactionID").cast(IntegerType())) \
    .withColumn("MaterialID", col("MaterialID").cast(IntegerType())) \
    .withColumn("TransactionDate", to_timestamp("TransactionDate")) \
    .withColumn("Quantity", col("Quantity").cast(IntegerType())) \
    .withColumn("Cost", col("Cost").cast(DoubleType())) \
    .withColumn("SupplierID", col("SupplierID").cast(IntegerType())) \
    .drop("_source_file") \
    .drop("_ingestion_timestamp")

# Save Silver table
df_silver.write.mode("overwrite").saveAsTable("silver_erp_transactions")

StatementMeta(, 53ba2273-5ffd-455a-af92-3aea4eb5d473, 5, Finished, Available, Finished)

In [5]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col

df_bronze = spark.table("lh_bronze.bronze_supplier_data")

# Cast + clean
df_silver = df_bronze \
    .withColumn("SupplierID", col("SupplierID").cast(IntegerType())) \
    .withColumn("SupplierName", col("SupplierName")) \
    .withColumn("ContactEmail", col("ContactEmail")) \
    .withColumn("Region", col("Region")) \
    .drop("_ingestion_timestamp", "_source_file")

df_silver.write.mode("overwrite").saveAsTable("silver_supplier_data")

StatementMeta(, 53ba2273-5ffd-455a-af92-3aea4eb5d473, 7, Finished, Available, Finished)

In [6]:
from pyspark.sql.types import IntegerType, DoubleType, TimestampType
from pyspark.sql.functions import col

df_bronze = spark.table("lh_bronze.bronze_market_trends")

# Type casting + cleaning
df_silver = df_bronze \
    .withColumn("RecordID", col("RecordID").cast(IntegerType())) \
    .withColumn("Date", col("Date").cast(TimestampType())) \
    .withColumn("Competitor", col("Competitor")) \
    .withColumn("MarketShare", col("MarketShare").cast(DoubleType())) \
    .withColumn("Trend", col("Trend")) \
    .drop("_ingestion_timestamp", "_source_file")

# Save Silver table
df_silver.write.mode("overwrite").saveAsTable("silver_market_trends")


StatementMeta(, 53ba2273-5ffd-455a-af92-3aea4eb5d473, 8, Finished, Available, Finished)

In [7]:
from pyspark.sql.types import IntegerType, TimestampType, StringType
from pyspark.sql.functions import col

df_bronze = spark.table("lh_bronze.bronze_mes_logs")

# Type casting & cleaning
df_silver = df_bronze \
    .withColumn("LogID", col("LogID").cast(IntegerType())) \
    .withColumn("Timestamp", col("Timestamp").cast(TimestampType())) \
    .withColumn("MachineID", col("MachineID").cast(IntegerType())) \
    .withColumn("Operation", col("Operation").cast(StringType())) \
    .withColumn("Status", col("Status").cast(StringType())) \
    .withColumn("Detail", col("Detail").cast(StringType())) \
    .drop("_ingestion_timestamp", "_source_file")

# Save to Silver
df_silver.write.mode("overwrite").saveAsTable("silver_mes_logs")

StatementMeta(, 53ba2273-5ffd-455a-af92-3aea4eb5d473, 9, Finished, Available, Finished)