## Setup

In [33]:
import glob

from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, IntegerType

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Initialize Spark Session
spark = SparkSession.builder.appName("GDELT Analysis").getOrCreate()

## Reading and Dataprep

In [19]:
directory = "/home/ialvarenga/projects/personal/factored-datathon-2024-seed42/"

In [20]:
gkg_files = glob.glob(directory + 'files/**/*export.CSV', recursive=True)
print(f"Found {len(gkg_files)} GKG files")
csv_files = [file for file in gkg_files if file.lower().endswith('.csv')]
print(f"Found {len(csv_files)} CSV files")

Found 226 GKG files
Found 226 CSV files


In [21]:
gdelt_df = spark.read.option("header", "true").option("sep", "\t").csv(csv_files)

gdelt_df.show(5)

                                                                                

+-------------+--------+---------+----+------------+----------+----------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+----------+-------------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+-----------+---------+-------------+-------------+---------+--------------+-----------+----------+-----------+-----------------+--------------+------------------+---------------------+------------------+-------------+--------------+-------------------+--------------+--------------------+---------------------+------------------+-------------+--------------+-------------------+--------------+--------------------+---------------------+------------------+-------------+--------------+-------------------+---------+--------------------+
|GLOBALEVENTID| SQLDATE|MonthYear|Year|FractionDate|Actor1Code|Actor1Name|Act

In [22]:
# Convert SQLDATE to a proper date format
gdelt_df = gdelt_df.withColumn('Date', F.to_date(F.col('SQLDATE').cast('string'), 'yyyyMMdd'))

# Create year and month columns for easier analysis
gdelt_df = gdelt_df.withColumn("GoldsteinScale", F.col("GoldsteinScale").cast(FloatType())) \
                   .withColumn("NumMentions", F.col("NumMentions").cast(IntegerType())) \
                   .withColumn("NumSources", F.col("NumSources").cast(IntegerType())) \
                   .withColumn("NumArticles", F.col("NumArticles").cast(IntegerType()))\
                   .withColumn("AvgTone", F.col("AvgTone").cast(FloatType())) \
					.withColumn("EventRootCode", F.col("EventRootCode").cast((IntegerType())))

cameo_df = spark.read.option("header", "true").csv(f"{directory}/files/cameo.csv", sep="\t")

In [23]:
sanctions_df = gdelt_df.join(cameo_df, gdelt_df.EventCode == cameo_df.EventCode, "left") \
                    .select(gdelt_df["*"], cameo_df["EventDescription"])

del(gdelt_df)
del(cameo_df)
sanctions_df.show(5)

+-------------+--------+---------+----+------------+----------+----------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+----------+-------------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+-----------+---------+-------------+-------------+---------+--------------+-----------+----------+-----------+----------+--------------+------------------+---------------------+------------------+-------------+--------------+-------------------+--------------+--------------------+---------------------+------------------+-------------+--------------+-------------------+--------------+--------------------+---------------------+------------------+-------------+--------------+-------------------+---------+--------------------+----------+--------------------+
|GLOBALEVENTID| SQLDATE|MonthYear|Year|FractionDate|

## Loading data with sanction codes

171: Reduce or eliminate economic sanctions
172: Impose economic sanctions
173: Reduce or eliminate military aid
174: Impose embargo
175: Cut off or reduce aid
...

http://data.gdeltproject.org/documentation/CAMEO.Manual.1.1b3.pdf

In [24]:
sanction_codes = [
	'1312', # Threaten to boycott, embargo, or sanction
	'132',  # Threaten with administrative sanctions, not specified below
	'163',  # Impose embargo, boycott, or sanctions
	'172',  # Impose administrative sanctions, not specified below
	'1241', # Refuse to ease administrative sanctions
	'1244'  # Refuse to ease economic sanctions, boycott, or embargo
]
# Filter GDELT data for sanction-related events
sanctions_df = sanctions_df.filter(sanctions_df.EventCode.isin(sanction_codes))

sanctions_df.show(5)

+-------------+--------+---------+----+------------+----------+-------------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+----------+----------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+-----------+---------+-------------+-------------+---------+--------------+-----------+----------+-----------+----------+--------------+--------------------+---------------------+------------------+-------------+--------------+-------------------+--------------+--------------------+---------------------+------------------+-------------+--------------+-------------------+--------------+--------------------+---------------------+------------------+-------------+--------------+-------------------+---------+--------------------+----------+--------------------+
|GLOBALEVENTID| SQLDATE|MonthYear|Year|FractionDat

In [25]:
num_rows = sanctions_df.count()
num_columns = len(sanctions_df.columns)

print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_columns}")



Number of rows: 325686
Number of columns: 60


                                                                                

* Colunas que ser√£o selecionadas para o modelo:
	* IsRootEvent
	* QuadClass
	* GoldSteinScale (y)
	* NumMentions
	* NumMentions
	* NumSources
	* NumArticles
	* AvgTone
	* Actor1Geo_Type
	* Actor2Geo_Type

In [26]:
# Create a column to verify if the country had a previous sanction
sanctions_df = sanctions_df.orderBy(['Actor2CountryCode', 'Date'])

# Create a window specification that defines how to look at previous rows within the same target country
window_spec = Window.partitionBy('Actor2CountryCode').orderBy('Date')

# Create a lagged column that checks if there was a previous sanction event for the target country
sanctions_df = sanctions_df.withColumn(
    'PreviousSanction',
    F.lag('EventCode').over(window_spec)
)

# Create a binary flag 'HadPreviousSanction' that indicates if the target country had been sanctioned before
sanctions_df = sanctions_df.withColumn(
    'HadPreviousSanction',
    F.when(F.col('PreviousSanction').isNotNull(), 1).otherwise(0)
)

# Optionally drop the 'PreviousSanction' column if only the flag is needed
sanctions_df = sanctions_df.drop('PreviousSanction')

In [27]:
# Select only countries
sanctions_df = sanctions_df.filter(F.col('Actor1CountryCode').isNotNull() & F.col('Actor2CountryCode').isNotNull())

## Predictive Modeling Considering Sanctions

In [28]:
# Select relevant columns
selected_columns = [
#	'Actor1CountryCode', 'Actor2CountryCode', #TODO add longitude and latitude
	'Date',
    'EventRootCode', 'QuadClass', 'NumMentions', 'NumSources',
    'NumArticles', 'AvgTone', 'EventCode', 'GoldsteinScale',
]

sanctions_df = sanctions_df.select(selected_columns).dropna(subset=['GoldsteinScale'])

In [29]:
# 1. Create a binary column 'SanctionAction' to indicate if the event is imposing or threatening sanctions
sanctions_df = sanctions_df.withColumn(
    'SanctionActionImpose', 
    F.when(sanctions_df['EventCode'].isin('163', '172'), 1)
     .otherwise(0)
)

# 1. Create a binary column 'SanctionAction' to indicate if the event is imposing or threatening sanctions
sanctions_df = sanctions_df.withColumn(
    'SanctionActionThreaten', 
    F.when(sanctions_df['EventCode'].isin('1312', '132'), 1)
     .otherwise(0)
)

# 3. Calculate the 'TotalMediaCoverage' by summing 'NumMentions', 'NumSources', and 'NumArticles'
sanctions_df = sanctions_df.withColumn(
    'TotalMediaCoverage',
    sanctions_df['NumMentions'] + sanctions_df['NumSources'] + sanctions_df['NumArticles']
)

# 3. Calculate the 'TotalMediaCoverage' by summing 'NumMentions', 'NumSources', and 'NumArticles'
sanctions_df = sanctions_df.withColumn(
    'VerbalConflict',
     F.when(sanctions_df['QuadClass'] == 3, 1).otherwise(0)
)

# 3. Calculate the 'TotalMediaCoverage' by summing 'NumMentions', 'NumSources', and 'NumArticles'
sanctions_df = sanctions_df.withColumn(
    'MaterialConflict',
    F.when(sanctions_df['QuadClass'] == 4, 1).otherwise(0)
)

In [31]:
sanctions_df = sanctions_df.drop("EventCode", "QuadClass")

In [32]:
# Assemble features
assembler = VectorAssembler(inputCols=[col for col in sanctions_df.columns if col != "GoldsteinScale" and col != "Date"], outputCol="features")
data = assembler.transform(sanctions_df)

# Select the necessary columns
data = data.select("features", "GoldsteinScale", "Date")

# Sort the data by the "Date" column
data = data.orderBy(F.col("Date"))

# Calculate the number of rows
total_rows = data.count()

# Define the split ratios
train_ratio = 0.7
validation_ratio = 0.15
test_ratio = 0.15

# Calculate the split indices
train_index = int(total_rows * train_ratio)
validation_index = int(total_rows * (train_ratio + validation_ratio))

# Split the data into training, validation, and test sets
train_data = data.limit(train_index)
remaining_data = data.subtract(train_data)
validation_data = remaining_data.limit(validation_index - train_index)
test_data = remaining_data.subtract(validation_data)

# Show the count of each split
print(f"Training set count: {train_data.count()}")
print(f"Validation set count: {validation_data.count()}")
print(f"Test set count: {test_data.count()}")

                                                                                

Training set count: 55118


                                                                                

Validation set count: 11811


                                                                                

Test set count: 7406


In [16]:
# Initialize the Linear Regression model
lr = LinearRegression(labelCol="GoldsteinScale", featuresCol="features")

# Train the model
lr_model = lr.fit(train_data)

# Make predictions on the test set
predictions = lr_model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="GoldsteinScale", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

# Show some predictions
predictions.select("prediction", "GoldsteinScale", "features").show(5)

24/08/25 00:12:35 WARN Instrumentation: [cbc4a142] regParam is zero, which might cause numerical instability and overfitting.
24/08/25 00:12:36 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/08/25 00:12:36 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
24/08/25 00:12:36 WARN Instrumentation: [cbc4a142] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
                                                                                

Root Mean Squared Error (RMSE) on test data: 7.1452826568068515e-09




+------------------+--------------+--------------------+
|        prediction|GoldsteinScale|            features|
+------------------+--------------+--------------------+
| -4.00000001141229|          -4.0|[12.0,1.0,1.0,1.0...|
|-4.000000009507408|          -4.0|[12.0,1.0,1.0,1.0...|
|-4.000000009368371|          -4.0|[12.0,1.0,1.0,1.0...|
|-4.000000009014439|          -4.0|[12.0,1.0,1.0,1.0...|
|-4.000000008714878|          -4.0|[12.0,1.0,1.0,1.0...|
+------------------+--------------+--------------------+
only showing top 5 rows



                                                                                

In [34]:
gbt = GBTRegressor(labelCol="GoldsteinScale", featuresCol="features")

# Define the parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [5, 10]) \
    .addGrid(gbt.maxIter, [50, 100]) \
    .addGrid(gbt.stepSize, [0.05, 0.1]) \
    .build()

# Define the evaluator
evaluator = RegressionEvaluator(labelCol="GoldsteinScale", predictionCol="prediction", metricName="rmse")

# Set up CrossValidator
crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)  # 5-fold cross-validation

# Fit the CrossValidator to find the best model
cv_model = crossval.fit(validation_data)

# Get the best model
best_model = cv_model.bestModel

# Evaluate the best model on the test data
test_predictions = best_model.transform(test_data)

rmse_test = evaluator.evaluate(test_predictions)
print(f"Best Model RMSE on test data: {rmse_test}")

# Calculate MAPE on the test data
test_predictions = test_predictions.withColumn("absolute_percentage_error", abs((F.col("GoldsteinScale") - F.col("prediction")) / F.col("GoldsteinScale")) * 100)
mape = test_predictions.selectExpr("avg(absolute_percentage_error) as MAPE").collect()[0]["MAPE"]
print(f"Best Model MAPE on test data: {mape}")

# Show sample predictions
test_predictions.select("prediction", "GoldsteinScale", "features").show(5)

                                                                                

Best Model RMSE on test data: 0.0


                                                                                

Best Model MAPE on test data: 0.0


                                                                                

+----------+--------------+--------------------+
|prediction|GoldsteinScale|            features|
+----------+--------------+--------------------+
|      -5.0|          -5.0|[17.0,1.0,1.0,1.0...|
|      -5.0|          -5.0|[17.0,2.0,1.0,2.0...|
|      -5.0|          -5.0|[17.0,7.0,1.0,7.0...|
|      -5.0|          -5.0|[17.0,5.0,1.0,5.0...|
|      -5.0|          -5.0|[17.0,2.0,1.0,2.0...|
+----------+--------------+--------------------+
only showing top 5 rows



In [35]:
# Extract the best hyperparameters
best_maxDepth = best_model._java_obj.getMaxDepth()
best_maxIter = best_model._java_obj.getMaxIter()
best_stepSize = best_model._java_obj.getStepSize()

# Train a new model using the training dataset with the best hyperparameters
final_gbt = GBTRegressor(labelCol="GoldsteinScale", featuresCol="features", maxDepth=best_maxDepth, maxIter=best_maxIter, stepSize=best_stepSize)
final_model = final_gbt.fit(train_data)

# Evaluate the final model on the test data
test_predictions = final_model.transform(test_data)

rmse_test = evaluator.evaluate(test_predictions)
print(f"Final Model RMSE on test data: {rmse_test}")

# Calculate MAPE on the test data
test_predictions = test_predictions.withColumn("absolute_percentage_error", abs((F.col("GoldsteinScale") - F.col("prediction")) / F.col("GoldsteinScale")) * 100)
mape = test_predictions.selectExpr("avg(absolute_percentage_error) as MAPE").collect()[0]["MAPE"]
print(f"Final Model MAPE on test data: {mape}")

# Show sample predictions
test_predictions.select("prediction", "GoldsteinScale", "features").show(5)

                                                                                

Final Model RMSE on test data: 0.0


                                                                                

Final Model MAPE on test data: 0.0


                                                                                

+----------+--------------+--------------------+
|prediction|GoldsteinScale|            features|
+----------+--------------+--------------------+
|      -5.0|          -5.0|[17.0,1.0,1.0,1.0...|
|      -5.0|          -5.0|[17.0,2.0,1.0,2.0...|
|      -5.0|          -5.0|[17.0,7.0,1.0,7.0...|
|      -5.0|          -5.0|[17.0,5.0,1.0,5.0...|
|      -5.0|          -5.0|[17.0,2.0,1.0,2.0...|
+----------+--------------+--------------------+
only showing top 5 rows



In [37]:
import mlflow
import mlflow.spark

mlflow.spark.log_model(final_model, "model")



<mlflow.models.model.ModelInfo at 0x7f7573027f40>

In [44]:
final_model_path = f"{directory}/app/models/final_model"

final_model.save(final_model_path)