In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col

# SparkSession 
spark = SparkSession.builder.appName("TechCarbonELT").getOrCreate()

# Authentication settings
spark.conf.set("fs.azure.account.key.techcarbondatalake.dfs.core.windows.net", "<your-access-key>")

# Read JSON file
df = spark.read.json("abfss://techcarbonprojectcontainer@techcarbondatalake.dfs.core.windows.net/raw/techCarbonData.json")

# Explode 'history' array
df_exploded = df.withColumn("history", explode("history"))

# Select and rename columns for analysis (excluding LEI number)
filtered_df = df_exploded.select(
    col("history.reporting_year").alias("reporting_year"),
    col("history.submission.values.total_scope_1_emissions_ghg").alias("scope_1_emissions"),
    col("history.submission.values.total_scope_2_lb_emissions_ghg").alias("scope_2_lb_emissions"),
    col("history.submission.values.total_scope_2_mb_emissions_ghg").alias("scope_2_mb_emissions"),
    col("history.submission.values.total_scope_3_emissions_ghg").alias("scope_3_emissions"),
    col("history.submission.values.scope_1_methodology").alias("scope_1_methodology"),
    col("history.submission.values.source").alias("source")
)

In [None]:
# 1. Total Emissions Analysis
total_emissions = filtered_df.withColumn(
    "total_emissions", 
    col("scope_1_emissions") + col("scope_2_mb_emissions") + col("scope_3_emissions")
)

total_emissions = total_emissions.withColumn(
    "scope_1_percentage", round(col("scope_1_emissions") / col("total_emissions") * 100, 2)
).withColumn(
    "scope_2_percentage", round(col("scope_2_mb_emissions") / col("total_emissions") * 100, 2)
).withColumn(
    "scope_3_percentage", round(col("scope_3_emissions") / col("total_emissions") * 100, 2)
)

total_emissions.select("total_emissions", "scope_1_percentage", "scope_2_percentage", "scope_3_percentage").show()


+---------------+------------------+------------------+------------------+
|total_emissions|scope_1_percentage|scope_2_percentage|scope_3_percentage|
+---------------+------------------+------------------+------------------+
|      2.31864E7|              0.24|              0.01|             99.75|
+---------------+------------------+------------------+------------------+



In [None]:
# 2. Scope Comparison
scope_comparison = filtered_df.select(
    lit("Scope 1").alias("scope"), col("scope_1_emissions").alias("emissions")
).union(
    filtered_df.select(lit("Scope 2 (MB)").alias("scope"), col("scope_2_mb_emissions").alias("emissions"))
).union(
    filtered_df.select(lit("Scope 3").alias("scope"), col("scope_3_emissions").alias("emissions"))
)

scope_comparison.orderBy(col("emissions").desc()).show()

+------------+----------+
|       scope| emissions|
+------------+----------+
|     Scope 3|2.312842E7|
|     Scope 1|   55200.0|
|Scope 2 (MB)|    2780.0|
+------------+----------+



In [None]:
# 3. Scope 2 Emissions Analysis
scope_2_analysis = filtered_df.withColumn(
    "scope_2_difference", col("scope_2_lb_emissions") - col("scope_2_mb_emissions")
).withColumn(
    "scope_2_reduction_percentage", 
    round((col("scope_2_lb_emissions") - col("scope_2_mb_emissions")) / col("scope_2_lb_emissions") * 100, 2)
)

scope_2_analysis.select("scope_2_lb_emissions", "scope_2_mb_emissions", "scope_2_difference", "scope_2_reduction_percentage").show()


+--------------------+--------------------+------------------+----------------------------+
|scope_2_lb_emissions|scope_2_mb_emissions|scope_2_difference|scope_2_reduction_percentage|
+--------------------+--------------------+------------------+----------------------------+
|           1003246.0|              2780.0|         1000466.0|                       99.72|
+--------------------+--------------------+------------------+----------------------------+



In [None]:
# 6. Emission Reduction Potential
emission_reduction = total_emissions.withColumn(
    "scope_1_reduction_target", col("scope_1_emissions") * 0.05  # Example: 5% reduction target
).withColumn(
    "scope_2_reduction_target", col("scope_2_mb_emissions") * 0.10  # Example: 10% reduction target
).withColumn(
    "scope_3_reduction_target", col("scope_3_emissions") * 0.15  # Example: 15% reduction target
)

emission_reduction.select("scope_1_reduction_target", "scope_2_reduction_target", "scope_3_reduction_target").show()


+------------------------+------------------------+------------------------+
|scope_1_reduction_target|scope_2_reduction_target|scope_3_reduction_target|
+------------------------+------------------------+------------------------+
|                  2760.0|                   278.0|               3469263.0|
+------------------------+------------------------+------------------------+



In [None]:
# 7. Decarbonization Path
net_zero_target_year = 2050  # Example target year
years_to_net_zero = net_zero_target_year - filtered_df.select("reporting_year").first()[0]

decarbonization_path = total_emissions.withColumn(
    "annual_reduction_needed", round(col("total_emissions") / lit(years_to_net_zero), 2)
)

decarbonization_path.select("total_emissions", "annual_reduction_needed").show()


+---------------+-----------------------+
|total_emissions|annual_reduction_needed|
+---------------+-----------------------+
|      2.31864E7|              799531.03|
+---------------+-----------------------+



In [None]:
from pyspark.sql.functions import round, col, lit

# Net zero target year and years to net zero
net_zero_target_year = 2050
years_to_net_zero = net_zero_target_year - filtered_df.select("reporting_year").first()[0]

# Decarbonization Path calculation
decarbonization_path = total_emissions.withColumn(
    "annual_reduction_needed", round(col("total_emissions") / lit(years_to_net_zero), 2)
)

# Emission Reduction Potential calculation
emission_reduction = total_emissions.withColumn(
    "scope_1_reduction_target", col("scope_1_emissions") * 0.05  # Example: 5% reduction target
).withColumn(
    "scope_2_reduction_target", col("scope_2_mb_emissions") * 0.10  # Example: 10% reduction target
).withColumn(
    "scope_3_reduction_target", col("scope_3_emissions") * 0.15  # Example: 15% reduction target
)

# Combining all data
final_df = decarbonization_path.join(
    emission_reduction, on=["reporting_year", "scope_1_emissions", "scope_2_lb_emissions", "scope_2_mb_emissions", "scope_3_emissions", "scope_1_methodology", "source", "total_emissions", "scope_1_percentage", "scope_2_percentage", "scope_3_percentage"]
).select(
    "reporting_year", "total_emissions", "annual_reduction_needed", "scope_1_reduction_target", "scope_2_reduction_target", "scope_3_reduction_target"
)

# Display the table
display(final_df) 

reporting_year,total_emissions,annual_reduction_needed,scope_1_reduction_target,scope_2_reduction_target,scope_3_reduction_target
2021,23186400.0,799531.03,2760.0,278.0,3469263.0
