In [0]:
df = (
    spark.read.format("csv")
         .option("header", "true")
         .load("dbfs:/Volumes/methane/raw/emissions/")
)

df.display()


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType

# 1️⃣ Load CSV
df = (
    spark.read.format("csv")
         .option("header", "true")
         .load("dbfs:/Volumes/methane/raw/emissions/")
)

# 2️⃣ List of year columns
year_cols = [str(y) for y in range(1990, 2019)]

# 3️⃣ Melt wide table to long format safely using try_cast
exprs = [
    F.struct(
        F.lit(c).alias("Year"),
        F.expr(f"try_cast(`{c}` as float)").alias("Emission")
    )
    for c in year_cols
]

df_long = df.withColumn("Year_Emission", F.explode(F.array(*exprs)))
df_long = df_long.withColumn("Year", F.col("Year_Emission.Year").cast(IntegerType())) \
                 .withColumn("Emission", F.col("Year_Emission.Emission")) \
                 .drop("Year_Emission")

# 4️⃣ Fill missing/null emissions
df_long = df_long.fillna({"Emission": 0})

# 5️⃣ Select only relevant columns
df_long = df_long.select("Country", "Sector", "Year", "Emission")

# 6️⃣ Show final dataset
df_long.show(5)



In [0]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, array, posexplode, lag, avg
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# Step 2: Filter for China and remove total sectors
# ===============================
df_china = df_long.filter(
    (col("Country") == "China") & 
    (~col("Sector").isin(["Total including LUCF", "Total excluding LUCF"]))
)
display(df_china)

In [0]:
# ===============================
# Step 4: Create decomposition-style features per sector
# ===============================
# Window partitioned by sector to handle each sector independently
w = Window.partitionBy("Sector").orderBy("Year")

df_features = df_china.withColumn("lag_1", lag("Emission", 1).over(w)) \
                     .withColumn("lag_2", lag("Emission", 2).over(w)) \
                     .withColumn("lag_3", lag("Emission", 3).over(w)) \
                     .withColumn("roll_mean_3", avg("Emission").over(w.rowsBetween(-2, 0)))

# Drop rows with nulls from lag features
df_features = df_features.na.drop()
display(df_features)

In [0]:
# ===============================
# Step 5: Prepare features for MLlib
# ===============================
feature_cols = ["lag_1", "lag_2", "lag_3", "roll_mean_3"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

df_ml = assembler.transform(df_features)
display(df_ml)


In [0]:
# ===============================
# Step 6: Split into train/test sets
# ===============================
train_df = df_ml.filter(col("year") <= 2016)
test_df  = df_ml.filter(col("year") > 2016)


In [0]:
# ===============================
# Step 7: Train Random Forest Regressor per sector
# ===============================
sectors = [row["Sector"] for row in train_df.select("Sector").distinct().collect()]
models = {}
predictions_list = []

for sector in sectors:
    df_train_sec = train_df.filter(col("Sector") == sector)
    df_test_sec  = test_df.filter(col("Sector") == sector)

    if df_train_sec.count() == 0 or df_test_sec.count() == 0:
        continue

    rf = RandomForestRegressor(featuresCol="features", labelCol="Emission", numTrees=100)
    model = rf.fit(df_train_sec)
    models[sector] = model

    preds = model.transform(df_test_sec)
    predictions_list.append(preds)

# Combine all predictions
from functools import reduce
from pyspark.sql import DataFrame

if predictions_list:
    predictions_all = reduce(DataFrame.unionByName, predictions_list)
display(predictions_all.select("Sector", "Year", "Emission", "prediction"))


In [0]:
# ===============================
# Step 8: Evaluate RMSE per sector
# ===============================
evaluator = RegressionEvaluator(labelCol="Emission", predictionCol="prediction", metricName="rmse")

for sector in sectors:
    preds_sec = predictions_all.filter(col("Sector") == sector)
    rmse = evaluator.evaluate(preds_sec)
    print(f"Sector: {sector}, RMSE: {rmse:.2f}")


In [0]:
# ===============================
# Step 9: Forecast 5 years ahead per sector
# ===============================
n_years = 5

# Get last year per sector
last_known = df_features.groupBy("Sector").agg({"Year": "max"}).withColumnRenamed("max(Year)", "Year")
last_data = df_features.join(last_known, on=["Sector", "Year"], how="inner")

sector_dict = {}
for row in last_data.collect():
    sector = row["Sector"]
    sector_dict[sector] = {
        "lag_1": row["lag_1"],
        "lag_2": row["lag_2"],
        "lag_3": row["lag_3"],
        "roll_mean_3": row["roll_mean_3"],
        "last_year": row["Year"]
    }

forecast_rows = []

for i in range(1, n_years + 1):
    for sector, data in sector_dict.items():
        # Create a Spark DataFrame for this row
        future_df = spark.createDataFrame([Row(
            lag_1=data["lag_1"], lag_2=data["lag_2"], lag_3=data["lag_3"], roll_mean_3=data["roll_mean_3"]
        )])
        future_vec = assembler.transform(future_df)
        pred = models[sector].transform(future_vec).collect()[0]["prediction"]

        # Save prediction
        forecast_rows.append(Row(Sector=sector, year=data["last_year"] + 1, Emission=None, prediction=pred))

        # Update lags for next iteration
        data["lag_3"] = data["lag_2"]
        data["lag_2"] = data["lag_1"]
        data["lag_1"] = pred
        data["roll_mean_3"] = (data["lag_1"] + data["lag_2"] + data["lag_3"]) / 3
        data["last_year"] += 1

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Define schema explicitly
schema = StructType([
    StructField("Sector", StringType(), True),
    StructField("Year", IntegerType(), True),
    StructField("Emission", DoubleType(), True),  # historical value, can be null
    StructField("prediction", DoubleType(), True)
])

# Create DataFrame using explicit schema
forecast_df = spark.createDataFrame(forecast_rows, schema=schema).orderBy("Sector", "Year")
display(forecast_df)