In [1]:
import os
import time
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, log1p

In [2]:
spark = SparkSession.builder.appName("GDP_Prediction_Preprocessing").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/02 20:56:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/02 20:56:20 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 33986)
Traceback (most recent call last):
  File "/uufs/chpc.utah.edu/sys/installdir/miniconda3/pyspark-3.5/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/uufs/chpc.utah.edu/sys/installdir/miniconda3/pyspark-3.5/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/uufs/chpc.utah.edu/sys/installdir/miniconda3/pyspark-3.5/lib/python3.11/socketserver.py", line 361, in fin

In [3]:
project_root = os.path.dirname(os.getcwd())
base_path = os.path.join(project_root, "data/raw")

In [4]:
# Define file paths
file_paths = {
    "Capital Stock": "CapitalStockData.csv",
    "Energy Use": "energy_use.csv",
    "Labor Force": "labor_force.csv",
    "Patents": "patents_res_nonres.csv",
    "R&D Expenditure": "R&D.csv",
    "Unemployment": "unemployed_ilo_estimate.csv"
}

# Load datasets into Pandas & Spark
dfs = {}
dfs_spark = {}

for name, file in file_paths.items():
    file_path = os.path.join(base_path, file)
    print(f"[INFO] Loading dataset: {name} from {file_path}")
    
    # Load Pandas DataFrame
    df = pd.read_csv(file_path)
    
    # Standardize column names
    df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]
    
    # Rename variations of "Country Code"
    rename_map = {"country": "country_code", "countrycode": "country_code", "Country Code": "country_code"}
    df.rename(columns=rename_map, inplace=True)
    
    # Identify columns that are years (e.g., "1960_[yr1960]")
    year_columns = [c for c in df.columns if "_[yr" in c]

    if year_columns:
        # Convert wide format to long format
        df_long = df.melt(id_vars=["country_code"], value_vars=year_columns,
                           var_name="year", value_name="value")

        # Extract numeric year from column name (e.g., "1960_[yr1960]" → "1960")
        df_long["year"] = df_long["year"].str.extract(r"(\d{4})").astype(int)
        
        # Replace original dataframe with long-format version
        df = df_long

    # Ensure the dataset has a "year" column
    if "year" not in df.columns:
        raise ValueError(f"Dataset {name} does not have a 'year' column after transformation.")

    # Convert numeric columns
    for col_name in df.columns:
        if col_name not in ["country_code", "year"]:
            df[col_name] = pd.to_numeric(df[col_name], errors='coerce')

    # Fill missing values
    df.fillna(df.mean(numeric_only=True), inplace=True)

    # Store preprocessed Pandas DataFrame
    dfs[name] = df

    # Convert to Spark DataFrame
    spark_df = spark.createDataFrame(df)

    # Rename columns in Spark
    for old_col, new_col in rename_map.items():
        if old_col in spark_df.columns:
            spark_df = spark_df.withColumnRenamed(old_col, new_col)

    # Store preprocessed Spark DataFrame
    dfs_spark[name] = spark_df

    print(f"[INFO] {name} dataset preprocessed successfully with {df.shape[0]} rows and {df.shape[1]} columns.")


[INFO] Loading dataset: Capital Stock from /uufs/chpc.utah.edu/common/home/u1463636/CS6964/GDP_Prediction_ALITE/data/raw/CapitalStockData.csv
[INFO] Capital Stock dataset preprocessed successfully with 11640 rows and 18 columns.
[INFO] Loading dataset: Energy Use from /uufs/chpc.utah.edu/common/home/u1463636/CS6964/GDP_Prediction_ALITE/data/raw/energy_use.csv
[INFO] Energy Use dataset preprocessed successfully with 17024 rows and 3 columns.
[INFO] Loading dataset: Labor Force from /uufs/chpc.utah.edu/common/home/u1463636/CS6964/GDP_Prediction_ALITE/data/raw/labor_force.csv
[INFO] Labor Force dataset preprocessed successfully with 17024 rows and 3 columns.
[INFO] Loading dataset: Patents from /uufs/chpc.utah.edu/common/home/u1463636/CS6964/GDP_Prediction_ALITE/data/raw/patents_res_nonres.csv
[INFO] Patents dataset preprocessed successfully with 34048 rows and 3 columns.
[INFO] Loading dataset: R&D Expenditure from /uufs/chpc.utah.edu/common/home/u1463636/CS6964/GDP_Prediction_ALITE/data

In [5]:
dfs_spark


{'Capital Stock': DataFrame[country_code: string, ifscode: bigint, countryname: double, year: bigint, igov_rppp: double, kgov_rppp: double, ipriv_rppp: double, kpriv_rppp: double, ippp_rppp: double, kppp_rppp: double, gdp_rppp: double, igov_n: double, kgov_n: double, ipriv_n: double, kpriv_n: double, kppp_n: double, gdp_n: double, income: double],
 'Energy Use': DataFrame[country_code: string, year: bigint, value: double],
 'Labor Force': DataFrame[country_code: string, year: bigint, value: double],
 'Patents': DataFrame[country_code: string, year: bigint, value: double],
 'R&D Expenditure': DataFrame[country_code: string, year: bigint, value: double],
 'Unemployment': DataFrame[country_code: string, year: bigint, value: double]}

In [6]:
dfs_spark

{'Capital Stock': DataFrame[country_code: string, ifscode: bigint, countryname: double, year: bigint, igov_rppp: double, kgov_rppp: double, ipriv_rppp: double, kpriv_rppp: double, ippp_rppp: double, kppp_rppp: double, gdp_rppp: double, igov_n: double, kgov_n: double, ipriv_n: double, kpriv_n: double, kppp_n: double, gdp_n: double, income: double],
 'Energy Use': DataFrame[country_code: string, year: bigint, value: double],
 'Labor Force': DataFrame[country_code: string, year: bigint, value: double],
 'Patents': DataFrame[country_code: string, year: bigint, value: double],
 'R&D Expenditure': DataFrame[country_code: string, year: bigint, value: double],
 'Unemployment': DataFrame[country_code: string, year: bigint, value: double]}

In [7]:
# Define output directory
output_path = os.path.join(project_root, "data/processed")
os.makedirs(output_path, exist_ok=True)

# Load individual datasets
df_cap = dfs_spark["Capital Stock"].alias("cap")
df_energy = dfs_spark["Energy Use"].alias("energy")
df_labor = dfs_spark["Labor Force"].alias("labor")
df_patents = dfs_spark["Patents"].alias("patents")
df_rd = dfs_spark["R&D Expenditure"].alias("rd")
df_unemployment = dfs_spark["Unemployment"].alias("unemployment")

# Print the initial schema before joining
print("\n[INFO] Initial Schema of Capital Stock:")
df_cap.printSchema()
df_energy.printSchema()


[INFO] Initial Schema of Capital Stock:
root
 |-- country_code: string (nullable = true)
 |-- ifscode: long (nullable = true)
 |-- countryname: double (nullable = true)
 |-- year: long (nullable = true)
 |-- igov_rppp: double (nullable = true)
 |-- kgov_rppp: double (nullable = true)
 |-- ipriv_rppp: double (nullable = true)
 |-- kpriv_rppp: double (nullable = true)
 |-- ippp_rppp: double (nullable = true)
 |-- kppp_rppp: double (nullable = true)
 |-- gdp_rppp: double (nullable = true)
 |-- igov_n: double (nullable = true)
 |-- kgov_n: double (nullable = true)
 |-- ipriv_n: double (nullable = true)
 |-- kpriv_n: double (nullable = true)
 |-- kppp_n: double (nullable = true)
 |-- gdp_n: double (nullable = true)
 |-- income: double (nullable = true)

root
 |-- country_code: string (nullable = true)
 |-- year: long (nullable = true)
 |-- value: double (nullable = true)



In [8]:
df_energy = df_energy.withColumnRenamed("value", "energy_use")
df_labor = df_labor.withColumnRenamed("value", "labor_force")
df_patents = df_patents.withColumnRenamed("value", "patents")
df_rd = df_rd.withColumnRenamed("value", "rd_expenditure")
df_unemployment = df_unemployment.withColumnRenamed("value", "unemployment")


In [9]:
print("\n[INFO] Joining Energy Use dataset...")
df_merged = df_cap.join(df_energy, df_cap.country_code == df_energy.country_code, "outer").drop(df_energy.country_code)
print("[INFO] Schema after joining Energy Use:")
df_merged.printSchema()


[INFO] Joining Energy Use dataset...
[INFO] Schema after joining Energy Use:
root
 |-- country_code: string (nullable = true)
 |-- ifscode: long (nullable = true)
 |-- countryname: double (nullable = true)
 |-- year: long (nullable = true)
 |-- igov_rppp: double (nullable = true)
 |-- kgov_rppp: double (nullable = true)
 |-- ipriv_rppp: double (nullable = true)
 |-- kpriv_rppp: double (nullable = true)
 |-- ippp_rppp: double (nullable = true)
 |-- kppp_rppp: double (nullable = true)
 |-- gdp_rppp: double (nullable = true)
 |-- igov_n: double (nullable = true)
 |-- kgov_n: double (nullable = true)
 |-- ipriv_n: double (nullable = true)
 |-- kpriv_n: double (nullable = true)
 |-- kppp_n: double (nullable = true)
 |-- gdp_n: double (nullable = true)
 |-- income: double (nullable = true)
 |-- year: long (nullable = true)
 |-- energy_use: double (nullable = true)



In [10]:
# Join Labor Force dataset and drop duplicate `country_code`
print("\n[INFO] Joining Labor Force dataset...")
df_merged = df_merged.join(df_labor, df_merged.country_code == df_labor.country_code, "outer").drop(df_labor.country_code)
print("[INFO] Schema after joining Labor Force:")
# df_merged.printSchema()

# Join Patents dataset and drop duplicate `country_code`
print("\n[INFO] Joining Patents dataset...")
df_merged = df_merged.join(df_patents, df_merged.country_code == df_patents.country_code, "outer").drop(df_patents.country_code)
print("[INFO] Schema after joining Patents:")
# df_merged.printSchema()

# Join R&D Expenditure dataset and drop duplicate `country_code`
print("\n[INFO] Joining R&D Expenditure dataset...")
df_merged = df_merged.join(df_rd, df_merged.country_code == df_rd.country_code, "outer").drop(df_rd.country_code)
print("[INFO] Schema after joining R&D Expenditure:")
# df_merged.printSchema()

# Join Unemployment dataset and drop duplicate `country_code`
print("\n[INFO] Joining Unemployment dataset...")
df_merged = df_merged.join(df_unemployment, df_merged.country_code == df_unemployment.country_code, "outer").drop(df_unemployment.country_code)
print("[INFO] Schema after joining Unemployment:")
df_merged.printSchema()


[INFO] Joining Labor Force dataset...
[INFO] Schema after joining Labor Force:

[INFO] Joining Patents dataset...
[INFO] Schema after joining Patents:

[INFO] Joining R&D Expenditure dataset...
[INFO] Schema after joining R&D Expenditure:

[INFO] Joining Unemployment dataset...
[INFO] Schema after joining Unemployment:
root
 |-- country_code: string (nullable = true)
 |-- ifscode: long (nullable = true)
 |-- countryname: double (nullable = true)
 |-- year: long (nullable = true)
 |-- igov_rppp: double (nullable = true)
 |-- kgov_rppp: double (nullable = true)
 |-- ipriv_rppp: double (nullable = true)
 |-- kpriv_rppp: double (nullable = true)
 |-- ippp_rppp: double (nullable = true)
 |-- kppp_rppp: double (nullable = true)
 |-- gdp_rppp: double (nullable = true)
 |-- igov_n: double (nullable = true)
 |-- kgov_n: double (nullable = true)
 |-- ipriv_n: double (nullable = true)
 |-- kpriv_n: double (nullable = true)
 |-- kppp_n: double (nullable = true)
 |-- gdp_n: double (nullable = true

In [11]:
df_cap.show(10)
df_merged.limit(10).collect()

                                                                                

+------------+-------+-----------+----+---------+---------+----------+----------+------------------+-----------------+-----------------+------------------+-------------------+-----------------+--------------------+-----------------+----------------+------+
|country_code|ifscode|countryname|year|igov_rppp|kgov_rppp|ipriv_rppp|kpriv_rppp|         ippp_rppp|        kppp_rppp|         gdp_rppp|            igov_n|             kgov_n|          ipriv_n|             kpriv_n|           kppp_n|           gdp_n|income|
+------------+-------+-----------+----+---------+---------+----------+----------+------------------+-----------------+-----------------+------------------+-------------------+-----------------+--------------------+-----------------+----------------+------+
|         AFG|    512|        NaN|1960|      3.0|     50.0|       1.0|      15.0|0.7427164546573656|6.190049958368027|319.5912096460521|313764.81190356606|2.757517812525994E7|53660.26305876444|1.6597856587033639E7|32856.050166527

25/03/02 20:56:32 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/03/02 20:57:39 ERROR Executor: Exception in task 0.0 in stage 21.0 (TID 16)7]
java.lang.OutOfMemoryError: GC overhead limit exceeded
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:469)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.smj_consumeFullOuterJoinRow_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at

ERROR StatusConsoleListener An exception occurred processing Appender console
 org.apache.logging.log4j.core.appender.AppenderLoggingException: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:165)
	at org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:134)
	at org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:125)
	at org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:89)
	at org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:683)
	at org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:641)
	at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:624)
	at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:560)
	at org.apache.logging.log4j.core.config.AwaitComple

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/uufs/chpc.utah.edu/sys/installdir/miniconda3/pyspark-3.5/lib/python3.11/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/uufs/chpc.utah.edu/sys/installdir/miniconda3/pyspark-3.5/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/uufs/chpc.utah.edu/sys/installdir/miniconda3/pyspark-3.5/lib/python3.11/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent cal

Py4JError: org does not exist in the JVM

In [None]:
# Save the final merged dataset
df_merged.write.csv(output_path, header=True, mode="overwrite")

print(f"\n[INFO] Data merging complete. Output saved at: {output_path}")


In [None]:
xxxxx

In [None]:
!pip3 install alite

In [None]:

from alite import Alite

# Initialize ALITE
print("\n[INFO] Performing ALITE Integration...")
alite = Alite()

# Add datasets to ALITE
for name, df in dfs.items():
    alite.add_dataframe(name, df, key=["country_code", "year"])

# Measure time for ALITE Integration
start_time = time.time()
alite_integrated = alite.integrate()
alite_time = time.time() - start_time

# Save ALITE merged dataset
alite_integrated.to_csv("data/alite_merged.csv", index=False)
print(f"[INFO] ALITE Integration Complete in {alite_time:.4f} seconds")


In [None]:
df_merged.show(10)