In [75]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import col, expr, count, when, log1p, lit, year, month, dayofmonth, datediff, mean, first, to_timestamp
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.functions import vector_to_array
import matplotlib.pyplot as plt
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
import pyspark.sql.functions as F
import os

In [36]:
%matplotlib inline

In [7]:
# spark = SparkSession.builder \
#     .appName("FYPPreprocessing") \
#     .config("spark.driver.memory", "8g") \
#     .config("spark.executor.memory", "16g") \
#     .getOrCreate()

spark = SparkSession.builder.appName("FYPPreprocessing").getOrCreate()

In [8]:
df = spark.read.csv("./combined.csv", header=True, inferSchema=True)

In [9]:
# Display the first 5 rows
df.show(5)

+---------+-----------+-----------+------------+-----------+--------------------+-------------------+-------------------+-----------------+-----------------+--------------+---+------------------+--------------------+----------+--------------+----------------+----------------------+----------+--------------+---------------+-------+----------+---------+----------+------+----------+------+----------+------+----------+------+----------+------+----------+------+--------------------+------+-------------------+-------+-----------+--------------------+--------------------+----------------+-------+----------------+---------+
|source_id|source_name|source_type|iso3_country|     sector|           subsector|         start_time|           end_time|              lat|              lon|  geometry_ref|gas|emissions_quantity|temporal_granularity|  activity|activity_units|emissions_factor|emissions_factor_units|  capacity|capacity_units|capacity_factor| other1|other1_def|   other2|other2_def|other3|other

In [16]:
# Show column data types
df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_name: string (nullable = true)
 |-- source_type: string (nullable = true)
 |-- iso3_country: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- subsector: string (nullable = true)
 |-- start_time: string (nullable = true)
 |-- end_time: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- geometry_ref: string (nullable = true)
 |-- gas: string (nullable = true)
 |-- emissions_quantity: string (nullable = true)
 |-- temporal_granularity: string (nullable = true)
 |-- activity: string (nullable = true)
 |-- activity_units: string (nullable = true)
 |-- emissions_factor: string (nullable = true)
 |-- emissions_factor_units: string (nullable = true)
 |-- capacity: string (nullable = true)
 |-- capacity_units: string (nullable = true)
 |-- capacity_factor: double (nullable = true)
 |-- other1: string (nullable = true)
 |-- other1_def: string (nullable = true)
 |-- other2

# Removing the 'other' columns

In [10]:
# Define the list of columns to remove
columns_to_remove = [f"other{i}" for i in range(1, 11)] + [f"other{i}_def" for i in range(1, 11)]

# Drop the columns from the DataFrame (PySpark equivalent of 'errors="ignore"')
existing_columns_to_remove = [col for col in columns_to_remove if col in df.columns]

df_cleaned = df.drop(*existing_columns_to_remove)

# Removing other useless columns

In [11]:
# List of columns to drop
columns_to_drop = ["source_type", "sector_id", "lat_lon", "temporal_granularity", "reporting_entity", "native_source_id"]

# Drop the columns (handling missing ones safely)
existing_columns_to_drop = [col for col in columns_to_drop if col in df_cleaned.columns]

df_cleaned = df_cleaned.drop(*existing_columns_to_drop)

# Handling empty useless rows

In [15]:
# Count missing values for each column
for column in df_cleaned.columns:
    missing_count = df_cleaned.select(count(when(col(column).isNull(), column)).alias("missing_count")).collect()[0]["missing_count"]
    print(f"{column}: {missing_count} missing values")

source_id: 0 missing values
source_name: 59616 missing values
iso3_country: 0 missing values
sector: 0 missing values
subsector: 0 missing values
start_time: 0 missing values
end_time: 0 missing values
lat: 7663152 missing values
lon: 7663152 missing values
geometry_ref: 0 missing values
gas: 0 missing values
emissions_quantity: 0 missing values
activity: 6048 missing values
activity_units: 0 missing values
emissions_factor: 6048 missing values
emissions_factor_units: 0 missing values
capacity: 6048 missing values
capacity_units: 0 missing values
capacity_factor: 9888 missing values
created_date: 864 missing values
modified_date: 3024 missing values


In [14]:
# Remove rows where iso3_country is NULL
df_cleaned = df_cleaned.filter(col("iso3_country").isNotNull())

# Typecasting the integers and floats to the correct data types

In [18]:
# List of numerical columns to convert
numeric_columns = ["source_id", "lat", "lon", "emissions_quantity", "activity", 
                   "emissions_factor", "capacity", "capacity_factor"]

# Convert each column to the correct type
for column in numeric_columns:
    dtype = "int" if column == "source_id" else "double"  # Convert source_id to int, others to double
    df_cleaned = df_cleaned.withColumn(column, col(column).cast(dtype))

In [19]:
# Verify the schema
df_cleaned.printSchema()

root
 |-- source_id: integer (nullable = true)
 |-- source_name: string (nullable = true)
 |-- iso3_country: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- subsector: string (nullable = true)
 |-- start_time: string (nullable = true)
 |-- end_time: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- geometry_ref: string (nullable = true)
 |-- gas: string (nullable = true)
 |-- emissions_quantity: double (nullable = true)
 |-- activity: double (nullable = true)
 |-- activity_units: string (nullable = true)
 |-- emissions_factor: double (nullable = true)
 |-- emissions_factor_units: string (nullable = true)
 |-- capacity: double (nullable = true)
 |-- capacity_units: string (nullable = true)
 |-- capacity_factor: double (nullable = true)
 |-- created_date: string (nullable = true)
 |-- modified_date: string (nullable = true)



# Handling NULL values

In [21]:
# Define columns to check for outliers
columns = ['lat', 'lon', 'activity', 'emissions_factor', 'capacity', 'capacity_factor']

# Compute IQR and detect outliers
for column in columns:
    # Compute Q1 and Q3
    Q1 = df_cleaned.approxQuantile(column, [0.25], 0.01)[0]  # Approximate quantile for performance
    Q3 = df_cleaned.approxQuantile(column, [0.75], 0.01)[0]
    IQR = Q3 - Q1
    
    # Define outlier conditions
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    
    # Filter outliers
    outliers = df_cleaned.filter((col(column) < lower_bound) | (col(column) > upper_bound))
    
    # Print outlier count
    print(f"{column}: Found {outliers.count()} outliers")

lat: Found 16800 outliers
lon: Found 864 outliers
activity: Found 2619538 outliers
emissions_factor: Found 4744879 outliers
capacity: Found 4095511 outliers
capacity_factor: Found 3234216 outliers


In [23]:
# Fill missing values for 'lat' and 'lon' using MEAN
lat_mean = df_cleaned.select(mean(col("lat"))).collect()[0][0]
df_cleaned = df_cleaned.fillna({"lat": lat_mean})

lon_mean = df_cleaned.select(mean(col("lon"))).collect()[0][0]
df_cleaned = df_cleaned.fillna({"lon": lon_mean})

In [24]:
# Fill missing values for other columns using MEDIAN
def get_median(df, column):
    return df.approxQuantile(column, [0.5], 0.01)[0]  # Approximate median

columns_to_fill = ["activity", "emissions_factor", "capacity", "capacity_factor"]

for col_name in columns_to_fill:
    median_value = get_median(df_cleaned, col_name)
    df_cleaned = df_cleaned.fillna({col_name: median_value})

In [34]:
# Count missing values for each column
for column in df_cleaned.columns:
    missing_count = df_cleaned.select(count(when(col(column).isNull(), column)).alias("missing_count")).collect()[0]["missing_count"]
    print(f"{column}: {missing_count} missing values")

iso3_country: 0 missing values
sector: 0 missing values
subsector: 0 missing values
source_id: 0 missing values
start_time: 0 missing values
end_time: 0 missing values
lat: 0 missing values
lon: 0 missing values
geometry_ref: 0 missing values
gas: 0 missing values
emissions_quantity: 0 missing values
activity: 0 missing values
activity_units: 0 missing values
emissions_factor: 0 missing values
emissions_factor_units: 0 missing values
capacity: 0 missing values
capacity_units: 0 missing values
capacity_factor: 0 missing values
created_date: 0 missing values
modified_date: 0 missing values


In [26]:
# Remove rows where emissions_quantity is NULL
df_cleaned = df_cleaned.filter(col("emissions_quantity").isNotNull())

### Handling the missing values of categorical data

In [33]:
# Step 1: Compute the most frequent source_name per group
mode_df = (
    df_cleaned.groupBy("iso3_country", "sector", "subsector", "source_name")
    .agg(count("*").alias("count"))  # Count occurrences
    .withColumn("rank", count("*").over(Window.partitionBy("iso3_country", "sector", "subsector").orderBy(col("count").desc())))
    .filter(col("rank") == 1)  # Keep only the most frequent value
    .drop("count", "rank")  # Drop extra columns
)

# Step 2: Join back to the original DataFrame to fill missing values
df_cleaned = (
    df_cleaned.alias("df1")
    .join(mode_df.alias("df2"), on=["iso3_country", "sector", "subsector"], how="left")
    .withColumn(
        "source_name",
        when(col("df1.source_name").isNull(), col("df2.source_name")).otherwise(col("df1.source_name"))
    )
    .select("df1.*")  # Keep original structure
)

# Capping Outliers

### Displaying the distribution of the columns that had outliers, to determine the method for capping

In [39]:
columns = ['activity', 'emissions_factor', 'capacity', 'capacity_factor']

for col in columns:
    # Get histogram bins using approximate quantiles
    bins = df_cleaned.approxQuantile(col, [i / 30 for i in range(31)], 0.05)

    # Remove duplicate values (PySpark histogram needs unique bins)
    bins = sorted(set(bins))  

    # Compute histogram counts
    hist = df_cleaned.select(col).rdd.flatMap(lambda x: x).histogram(bins)

    # Extract bin edges and counts
    bin_edges, counts = hist[0], hist[1]

    # Plot histogram
    plt.bar(bin_edges[:-1], counts, width=(bin_edges[1] - bin_edges[0]), alpha=0.7, edgecolor='black')
    plt.title(f'Histogram of {col}')
    plt.xlabel(col)
    plt.ylabel('Frequency')
    plt.show()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 551.0 failed 1 times, most recent failure: Lost task 0.0 in stage 551.0 (TID 12518) (192.168.0.106 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:713)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:757)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:675)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:641)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:617)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:574)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:532)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:713)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:757)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:675)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:641)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:617)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:574)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:532)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more


## Will be using log transform

In [40]:
df_cleaned = df_cleaned.withColumn("activity", log1p("activity"))

In [56]:
df_cleaned = df_cleaned.withColumn(
    "emissions_factor",
    log1p(col("emissions_factor").cast("double") + lit(1e-6))
)

In [57]:
df_cleaned = df_cleaned.withColumn(
    "capacity", log1p(col("capacity").cast("double") + lit(1e-6))
)

In [58]:
df_cleaned = df_cleaned.withColumn(
    "capacity_factor", log1p(col("capacity_factor").cast("double") + lit(1e-6))
)

# Fixing the data types of the columns

In [77]:
df_cleaned.printSchema()

root
 |-- sector: string (nullable = true)
 |-- subsector: string (nullable = true)
 |-- source_id: integer (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- end_time: timestamp (nullable = true)
 |-- lat: double (nullable = false)
 |-- lon: double (nullable = false)
 |-- geometry_ref: string (nullable = true)
 |-- gas: string (nullable = true)
 |-- emissions_quantity: double (nullable = true)
 |-- activity: double (nullable = true)
 |-- activity_units: string (nullable = true)
 |-- emissions_factor: double (nullable = true)
 |-- emissions_factor_units: string (nullable = true)
 |-- capacity: double (nullable = true)
 |-- capacity_units: string (nullable = true)
 |-- capacity_factor: double (nullable = true)
 |-- created_date: timestamp (nullable = true)
 |-- modified_date: timestamp (nullable = true)
 |-- start_year: integer (nullable = true)
 |-- start_month: integer (nullable = true)
 |-- start_day: integer (nullable = true)
 |-- end_year: integer (nullable = true)

In [62]:
df_cleaned = df_cleaned.withColumn("start_time", to_timestamp("start_time"))
df_cleaned = df_cleaned.withColumn("end_time", to_timestamp("end_time"))
df_cleaned = df_cleaned.withColumn("created_date", to_timestamp("created_date"))
df_cleaned = df_cleaned.withColumn("modified_date", to_timestamp("modified_date"))

## Feature creation based on recording dates

### Extracting start year, month and day

In [64]:
df_cleaned = df_cleaned.withColumn("start_year", year("start_time"))
df_cleaned = df_cleaned.withColumn("start_month", month("start_time"))
df_cleaned = df_cleaned.withColumn("start_day", dayofmonth("start_time"))

## Extracting end year, month and day

In [65]:
df_cleaned = df_cleaned.withColumn("end_year", year("end_time"))
df_cleaned = df_cleaned.withColumn("end_month", month("end_time"))
df_cleaned = df_cleaned.withColumn("end_day", dayofmonth("end_time"))

## Creating a new feature, duration

In [66]:
df_cleaned = df_cleaned.withColumn("duration", datediff("end_time", "start_time"))

## ONE HOT ENCODING

### OHE-ing the country column

In [None]:
# Step 1: Convert categorical column into numerical indices
indexer = StringIndexer(inputCol="iso3_country", outputCol="iso3_country_index", handleInvalid="keep")
indexer_model = indexer.fit(df_cleaned)
df_cleaned = indexer_model.transform(df_cleaned)

# Get category labels (ordered by index)
category_names = indexer_model.labels

# Step 2: Apply OneHotEncoding to the indexed column
encoder = OneHotEncoder(inputCol="iso3_country_index", outputCol="iso3_country_ohe")
df_cleaned = encoder.fit(df_cleaned).transform(df_cleaned)

# Step 3: Convert the vector column into an array
df_cleaned = df_cleaned.withColumn("iso3_country_ohe_array", vector_to_array("iso3_country_ohe"))

# Step 4: Split array into separate columns with category names
for i, category in enumerate(category_names):
    df_cleaned = df_cleaned.withColumn(f"iso3_country_{category}", col("iso3_country_ohe_array")[i])

# Step 5: Drop the unnecessary columns
df_cleaned = df_cleaned.drop("iso3_country", "iso3_country_index", "iso3_country_ohe", "iso3_country_ohe_array")


### OHE-ing the sector column

In [80]:
# Step 1: Convert categorical column into numerical indices
indexer = StringIndexer(inputCol="sector", outputCol="sector_index", handleInvalid="keep")
indexer_model = indexer.fit(df_cleaned)
df_cleaned = indexer_model.transform(df_cleaned)

# Get category labels (ordered by index)
category_names = indexer_model.labels

# Step 2: Apply OneHotEncoding to the indexed column
encoder = OneHotEncoder(inputCol="sector_index", outputCol="sector_ohe")
df_cleaned = encoder.fit(df_cleaned).transform(df_cleaned)

# Step 3: Convert the vector column into an array
df_cleaned = df_cleaned.withColumn("sector_ohe_array", vector_to_array("sector_ohe"))

# Step 4: Split array into separate columns with category names
for i, category in enumerate(category_names):
    df_cleaned = df_cleaned.withColumn(f"sector_{category}", col("sector_ohe_array")[i])

# Step 5: Drop the unnecessary columns
df_cleaned = df_cleaned.drop("sector", "sector_index", "sector_ohe", "sector_ohe_array")


### OHE-ing the sub-sector column

In [83]:
# Step 1: Convert categorical column into numerical indices
indexer = StringIndexer(inputCol="subsector", outputCol="subsector_index", handleInvalid="keep")
indexer_model = indexer.fit(df_cleaned)
df_cleaned = indexer_model.transform(df_cleaned)

# Get category labels (ordered by index)
category_names = indexer_model.labels

# Step 2: Apply OneHotEncoding to the indexed column
encoder = OneHotEncoder(inputCol="subsector_index", outputCol="subsector_ohe")
df_cleaned = encoder.fit(df_cleaned).transform(df_cleaned)

# Step 3: Convert the vector column into an array
df_cleaned = df_cleaned.withColumn("subsector_ohe_array", vector_to_array("subsector_ohe"))

# Step 4: Split array into separate columns with category names
for i, category in enumerate(category_names):
    df_cleaned = df_cleaned.withColumn(f"subsector_{category}", col("subsector_ohe_array")[i])

# Step 5: Drop the unnecessary columns
df_cleaned = df_cleaned.drop("subsector", "subsector_index", "subsector_ohe", "subsector_ohe_array")

### OHE-ing the gas column

In [84]:
# Step 1: Convert categorical column into numerical indices
indexer = StringIndexer(inputCol="gas", outputCol="gas_index", handleInvalid="keep")
indexer_model = indexer.fit(df_cleaned)
df_cleaned = indexer_model.transform(df_cleaned)

# Get category labels (ordered by index)
category_names = indexer_model.labels

# Step 2: Apply OneHotEncoding to the indexed column
encoder = OneHotEncoder(inputCol="gas_index", outputCol="gas_ohe")
df_cleaned = encoder.fit(df_cleaned).transform(df_cleaned)

# Step 3: Convert the vector column into an array
df_cleaned = df_cleaned.withColumn("gas_ohe_array", vector_to_array("gas_ohe"))

# Step 4: Split array into separate columns with category names
for i, category in enumerate(category_names):
    df_cleaned = df_cleaned.withColumn(f"gas_{category}", col("gas_ohe_array")[i])

# Step 5: Drop the unnecessary columns
df_cleaned = df_cleaned.drop("gas", "gas_index", "gas_ohe", "gas_ohe_array")

In [86]:
df_cleaned.show(5)

+---------+-------------------+-------------------+-----------------+-----------------+--------------+------------------+--------+--------------+--------------------+----------------------+--------------------+--------------+--------------------+--------------------+--------------------+----------+-----------+---------+--------+---------+-------+--------+------------------+----------------------------+----------------+---------------------+--------------------+------------+------------+-----------------------------+-------------------------+--------------------------+-----------------------------+----------------------------------+------------------------------------------+---------------------------------------+---------------------------------------------+---------------------------------------+-------------------------------------------+------------------+------------------------+------------------------+-------------------------+---------------------+------------------------------+

## Saving the preprocessed data

In [None]:
# Disable Hadoop NativeIO on Windows to prevent file permission errors while writing Parquet files
spark.conf.set("spark.hadoop.io.native.lib.available", "false")

In [90]:
df_cleaned.write.mode("overwrite").parquet("./data/preprocessed.parquet")

Py4JJavaError: An error occurred while calling o2175.parquet.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:420)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:802)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:842)
