#Silver Layer For Transformations

#Imports

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window


#Read Bronze Table

In [0]:
try:
    bronze_df = spark.table("co2_bronze.co_2_emissions_raw")
except Exception as e:
    print("Error reading Bronze table:", e)
    raise


#Remove Duplicate Records

In [0]:
silver_df = bronze_df.dropDuplicates([
    "country",
    "region",
    "year",
    "sector",
    "scenario"
])

#Standardize Country Names (Inconsistent Records)

In [0]:
silver_df = silver_df.withColumn(
    "country",
    F.initcap(F.lower(F.col("country")))
)

#Standardize Year Format

In [0]:
silver_df = silver_df.withColumn(
    "year",
    F.col("year").cast("int")
)
silver_df.show(5)

+-----------+------------+-------------+----+-------------+------------------+--------------------------+--------------------+
|     sector|     country|       region|year|   population|   gdp_billion_usd|co2_emissions_million_tons|            scenario|
+-----------+------------+-------------+----+-------------+------------------+--------------------------+--------------------+
|Residential|       China|         Asia|2008|  2.1200037E8| 2632937.509436383|                    280.48|    Policy_Reduction|
|Residential|      Mexico|North America|2015|  3.1981686E8| 362463.3469927282|                    265.47|Renewable_Transition|
|  Transport|South Africa|       Africa|2017| 5.23530291E8|        1017442.99|                    455.71|            Baseline|
|  Transport|    Pakistan|         Asia|2014|1.502512241E9|3924846.9457435603|                    954.99|         High_Growth|
|  Transport| South Korea|         Asia|2001|1.242222012E9| 1957102.691733397|                    711.07|    Po

#Handle Missing Values

In [0]:
# Compute country-level averages
country_avg_df = (
    silver_df
    .groupBy("country")
    .agg(
        F.avg("population").alias("avg_population"),
        F.avg("gdp_billion_usd").alias("avg_gdp")
    )
)


In [0]:
# Join back and fill missing values
silver_df = silver_df.join(country_avg_df, on="country", how="left")

silver_df = silver_df.withColumn(
    "population",
    F.when(F.col("population").isNull(), F.col("avg_population"))
     .otherwise(F.col("population"))
)

silver_df = silver_df.withColumn(
    "gdp_billion_usd",
    F.when(F.col("gdp_billion_usd").isNull(), F.col("avg_gdp"))
     .otherwise(F.col("gdp_billion_usd"))
)

# Drop helper columns
silver_df = silver_df.drop("avg_population", "avg_gdp")


In [0]:
# Sector-wise average emissions
sector_avg_df = (
    silver_df
    .groupBy("sector")
    .agg(
        F.avg("co2_emissions_million_tons").alias("avg_co2")
    )
)


In [0]:
# Join & fill
silver_df = silver_df.join(sector_avg_df, on="sector", how="left")

silver_df = silver_df.withColumn(
    "co2_emissions_million_tons",
    F.when(
        F.col("co2_emissions_million_tons").isNull(),
        F.col("avg_co2")
    ).otherwise(F.col("co2_emissions_million_tons"))
)
# Drop helper columns
silver_df = silver_df.drop("avg_co2")


In [0]:
from pyspark.sql.functions import count, when, col

null_counts = [
    count(
        when(col(c).isNull(), c)
    ).alias(c)
    for c in silver_df.columns
]

display(
    silver_df.select(null_counts)
)


sector,country,region,year,population,gdp_billion_usd,co2_emissions_million_tons,scenario
0,0,0,0,0,0,0,0


#Create Derived Metric: Emissions Per Capita

In [0]:
silver_df = silver_df.withColumn(
    "co2_per_capita",
    F.col("co2_emissions_million_tons") * 1_000_000 / F.col("population")
)

#Validate Scenario Values (Filter Inconsistent Records)

In [0]:
valid_scenarios = [
    "Baseline",
    "High_Growth",
    "Policy_Reduction",
    "Renewable_Transition"
]

silver_df = silver_df.filter(F.col("scenario").isin(valid_scenarios))


In [0]:
# Validate row count after transformations
total_records = silver_df.count()
print(f"Total records in Bronze table: {total_records}")

Total records in Bronze table: 18000


#Create co2_silver table schema

In [0]:
%python
spark.sql("CREATE SCHEMA IF NOT EXISTS co2_silver")

(
    silver_df.write
    .format("delta")
    .option("mergeSchema", "true")
    .mode("overwrite")
    .saveAsTable("co2_silver.co2_emissions_clean")
)

#Write Silver Layer as Delta

In [0]:

try:
    (
        silver_df.write
        .format("delta")
        .mode("overwrite")
        .saveAsTable("co2_silver.co2_emissions_clean")
    )
except Exception as e:
    print("Error writing Silver table:", e)
    raise


In [0]:
spark.table("co2_silver.co2_emissions_clean").count()


18000

In [0]:
display(
    spark.table("co2_silver.co2_emissions_clean").limit(10)
)


sector,country,region,year,population,gdp_billion_usd,co2_emissions_million_tons,scenario,co2_per_capita
Residential,China,Asia,2008,212000370.0,2632937.509436383,280.48,Policy_Reduction,1.3230165588861944
Residential,Mexico,North America,2015,319816860.0,362463.3469927282,265.47,Renewable_Transition,0.8300688087551108
Transport,South Africa,Africa,2017,523530291.0,1017442.99,455.71,Baseline,0.8704558414939929
Transport,Pakistan,Asia,2014,1502512241.0,3924846.94574356,954.99,High_Growth,0.6355954873049184
Transport,South Korea,Asia,2001,1242222012.0,1957102.691733397,711.07,Policy_Reduction,0.5724178070674858
Agriculture,Argentina,South America,2018,725658382.0,8145921.734477208,222.24,Policy_Reduction,0.3062598124857049
Industry,Argentina,South America,1998,583910474.0,2771757.2689869357,1521.97,High_Growth,2.606512586722327
Transport,Italy,Europe,2016,1038347097.0,1681175.4330652845,2776.96,Policy_Reduction,2.674404356715797
Transport,Germany,Europe,2016,293872598.0,6936214.669392974,2056.55,Policy_Reduction,6.998100585070542
Transport,United Kingdom,Europe,2019,107644084.0,2329724.655665925,2085.47,Policy_Reduction,19.37375397239666
