In [12]:
!pip install pyspark==3.4.0 findspark pyarrow fastparquet



In [23]:
#import and initialise findspark
import findspark
findspark.init()

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder\
    .appName("UnemploymentRatePrediction")\
    .config("spark.sql.debug.maxToStringFields", 2000)\
    .config("spark.driver.memory", "2g")\
    .getOrCreate()

In [25]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
unemployment_url = "https://lxproject4.s3.ap-southeast-2.amazonaws.com/project-4/unemployment_cleaned.csv"
spark.sparkContext.addFile(unemployment_url)
unemployment_df = spark.read.csv(SparkFiles.get("unemployment_cleaned.csv"), sep=",", header=True)

# Show the unemployment data
unemployment_df.show()

+-----------+------+---------+----+-----------------+
|    country|gender|age_group|year|unemployment_rate|
+-----------+------+---------+----+-----------------+
|Afghanistan|  Male|      15+|2021|              5.6|
|Afghanistan|  Male|    15-24|2021|             8.45|
|Afghanistan|  Male|      25+|2021|              4.6|
|Afghanistan|Female|      15+|2021|              5.5|
|Afghanistan|Female|    15-24|2021|             9.41|
|Afghanistan|Female|      25+|2021|             3.77|
|Afghanistan|  Male|      15+|2020|            10.45|
|Afghanistan|  Male|    15-24|2020|            14.54|
|Afghanistan|  Male|      25+|2020|             8.86|
|Afghanistan|Female|      15+|2020|            16.77|
|Afghanistan|Female|    15-24|2020|            21.11|
|Afghanistan|Female|      25+|2020|            14.12|
|Afghanistan|  Male|      15+|2017|            10.37|
|Afghanistan|  Male|    15-24|2017|            16.29|
|Afghanistan|  Male|      25+|2017|             7.86|
|Afghanistan|Female|      15

In [39]:
unemployment_df.dtypes

[('country', 'string'),
 ('gender', 'string'),
 ('age_group', 'string'),
 ('year', 'int'),
 ('unemployment_rate', 'float'),
 ('lag_1', 'float'),
 ('lag_2', 'float'),
 ('lag_3', 'float')]

In [46]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import IntegerType, FloatType

# Filter the DataFrame to only include female data
female_unemployment_df = unemployment_df.filter(unemployment_df['gender'] == 'Female')

# Create lag features for unemployment rate (e.g., unemployment rate of past five years)
for lag in range(1, 6):
    window = Window.partitionBy("country").orderBy("year")
    female_unemployment_df = female_unemployment_df.withColumn(f"lag_{lag}", F.lag("unemployment_rate", lag).over(window))

# Remove rows with null values (created due to lags)
female_unemployment_df = female_unemployment_df.dropna()


In [47]:
# Splitting the dataset into 80% training and 20% testing
train_df, test_df = unemployment_df.randomSplit([0.8, 0.2], seed=42)


In [48]:
# Using VectorAssembler to transform features into a single column
assembler = VectorAssembler(inputCols=["lag_1", "lag_2", "lag_3", "lag_4", "lag_5"], outputCol="features")

# Define the Random Forest Regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="unemployment_rate")

# Build the pipeline
pipeline = Pipeline(stages=[assembler, rf])

# Train the model
model = pipeline.fit(train_df)


In [49]:
# Predicting using the test set
predictions = model.transform(test_df)

# Evaluating model performance
evaluator = RegressionEvaluator(labelCol="unemployment_rate", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(f"R-squared on test data = {r2}")


R-squared on test data = 0.8775058243858875
