In [1]:
# Import PySpark and other necessary libraries
# Install Java
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install PySpark
#!pip install pyspark
#import os
#os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, year, month
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark session
spark = SparkSession.builder.appName('Trade Flow Prediction').getOrCreate()

24/05/04 02:37:40 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
#import pandas as pd

big_df = spark.read.csv("gs://acumberl_data_for_gcp_labs/Trade Flow/Trade_Flow_Big_Dataset.csv", header=True, inferSchema=True)
small_df = spark.read.csv("gs://acumberl_data_for_gcp_labs/Trade Flow/Trade_Flow_Small_Dataset.csv", header=True, inferSchema=True)


                                                                                

In [3]:
# DECIDE HERE WHETER TO RUN THE BIG OR SMALL DATASET

spark_df = big_df

In [4]:
# Data Exploration and Cleaning
# Show the first few rows of the dataframe
spark_df.show(5)

# Checking for missing values in each column
spark_df.select([count(when(col(c).isNull(), c)).alias(c) for c in spark_df.columns]).show()

# Removing rows with missing values
spark_df = spark_df.na.drop()

+----------+-----------------+----------------------+----------------+-----------+------------+
|      Date|Country_of_Origin|Country_of_Destination|Product_Category|Trade_Value|Trade_Volume|
+----------+-----------------+----------------------+----------------+-----------+------------+
|2012-05-10|            Japan|                 Italy|        Textiles|    1652.53|       265.0|
|2020-04-30|            Italy|                Russia|            Food|    3718.66|        81.0|
|2018-06-20|           Russia|                Canada|       Machinery|    7414.49|      1708.0|
|2011-04-12|          Germany|                France|       Machinery|    4939.41|      1045.0|
|2022-02-13|           Canada|                France|        Textiles|    1878.51|       569.0|
+----------+-----------------+----------------------+----------------+-----------+------------+
only showing top 5 rows





+----+-----------------+----------------------+----------------+-----------+------------+
|Date|Country_of_Origin|Country_of_Destination|Product_Category|Trade_Value|Trade_Volume|
+----+-----------------+----------------------+----------------+-----------+------------+
|   0|                0|                     0|               0|          0|           0|
+----+-----------------+----------------------+----------------+-----------+------------+



                                                                                

In [5]:
# Feature Engineering
# Indexing categorical columns
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(spark_df) for column in ['Country_of_Origin', 'Country_of_Destination', 'Product_Category']]

# Assemble vectors
assembler = VectorAssembler(inputCols=['Country_of_Origin_index', 'Country_of_Destination_index', 'Product_Category_index', 'Trade_Value'],
                            outputCol='features')

# Random Forest Model

rf = RandomForestRegressor(featuresCol='features', labelCol='Trade_Volume')

# Set up the pipeline
pipeline = Pipeline(stages=indexers + [assembler, rf])

                                                                                

In [6]:
# Model Training

# Split the data into training and testing sets
train_data, test_data = spark_df.randomSplit([0.8, 0.2], seed=42)

# Fit the model
rf_model = pipeline.fit(train_data)

# Make predictions
rf_predictions = rf_model.transform(test_data)

# Show sample predictions
rf_predictions.select('features', 'Trade_Volume', 'prediction').show(5)

[Stage 31:>                                                         (0 + 1) / 1]

+--------------------+------------+------------------+
|            features|Trade_Volume|        prediction|
+--------------------+------------+------------------+
|[5.0,6.0,5.0,6451...|      1218.0|499.53811668271226|
|[5.0,3.0,5.0,1131...|       488.0|500.68440834565956|
|[5.0,1.0,1.0,2291...|      1714.0|509.02961986769344|
|[5.0,7.0,6.0,3983...|       964.0| 506.7589940831146|
|[7.0,0.0,4.0,1594...|       207.0| 500.4471280953273|
+--------------------+------------+------------------+
only showing top 5 rows



                                                                                

In [7]:
# Random Forest Model Evaluation

evaluator = RegressionEvaluator(labelCol='Trade_Volume', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(rf_predictions)
print(f'Root Mean Squared Error (RMSE) on test data = {rmse}')


[Stage 32:>                                                         (0 + 2) / 2]

Root Mean Squared Error (RMSE) on test data = 498.92875435783685


                                                                                

In [8]:
# Linear Regression Model

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='features', labelCol='Trade_Volume')

lr_pipeline = Pipeline(stages=indexers + [assembler, lr])

# Model Training
# Fit the model
lr_model = lr_pipeline.fit(train_data)

# Make predictions
lr_predictions = lr_model.transform(test_data)

# Show sample predictions
lr_predictions.select('features', 'Trade_Volume', 'prediction').show(5)

24/05/04 02:39:18 WARN Instrumentation: [7c022ede] regParam is zero, which might cause numerical instability and overfitting.
[Stage 35:>                                                         (0 + 1) / 1]

+--------------------+------------+------------------+
|            features|Trade_Volume|        prediction|
+--------------------+------------+------------------+
|[5.0,6.0,5.0,6451...|      1218.0|499.32118113547574|
|[5.0,3.0,5.0,1131...|       488.0|499.42589174232256|
|[5.0,1.0,1.0,2291...|      1714.0| 499.0535805583509|
|[5.0,7.0,6.0,3983...|       964.0|500.84440735301274|
|[7.0,0.0,4.0,1594...|       207.0| 499.5986357808394|
+--------------------+------------+------------------+
only showing top 5 rows



                                                                                

In [9]:
# Linear Regression Model Evaluation

lr_rmse = evaluator.evaluate(lr_predictions)
print(f'Root Mean Squared Error (RMSE) on test data = {rmse}')



Root Mean Squared Error (RMSE) on test data = 498.92875435783685


                                                                                

In [10]:
num_rows = spark_df.count()
print(f"Number of rows: {num_rows}")



Number of rows: 1000000


                                                                                