In [1]:
import os

local_path = os.path.join(os.getcwd(), 'data')
parquet_path = os.path.join(local_path, 'parquet_scaling')
result_folder_path = os.path.join(os.getcwd(), 'test_results')

spark_rest_api_url = "http://localhost:4040/api/v1/applications"

In [2]:
from datetime import datetime, timedelta

# Choose time period for which to download the data
start_date = datetime.strptime('2015-07-01', '%Y-%m-%d')
end_date = datetime.strptime('2023-12-31', '%Y-%m-%d')

# Create a list of dates between start_date and end_date
date_list = [start_date + timedelta(days=x) for x in range(0, (end_date - start_date).days + 1)]

In [3]:
import urllib.request

# Create a list containing download urls for each date
base_url = 'http://data.gdeltproject.org/gdeltv2/'
url_list = []
index = 0
url_list.append([])
month = date_list[0].month

# Create a nested list containing a list of months with the corresponding download urls
for date in date_list:
    if date.month != month:
        month = date.month
        index += 1
        url_list.append([])

    # Create the url and append it to the month list
    for x in range(0, 24):
        for y in range(0, 60, 15):
            date_tmp = date + timedelta(hours=x, minutes=y)
            url = base_url + date_tmp.strftime('%Y%m%d%H%M%S') + '.export.CSV.zip'
            url_list[index].append(url)

In [4]:
# Create the local directory for the data if it doesn't exist
if not os.path.isdir(local_path):
    os.mkdir(local_path)

if not os.path.isdir(parquet_path):
    os.mkdir(parquet_path)

In [5]:
from pyspark.sql import SparkSession

# Start a spark session (see config folder for spark config)
spark = SparkSession.builder \
    .appName('Big Data Project') \
    .enableHiveSupport() \
    .getOrCreate()

In [6]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType

# Define original data schema for csv files
schema = StructType([
    StructField("GlobalEventID", IntegerType(), True),
    StructField("Day", DateType(), True),
    StructField("MonthYear", IntegerType(), True),
    StructField("Year", IntegerType(), True),
    StructField("FractionDate", FloatType(), True),
    StructField("Actor1Code", StringType(), True),
    StructField("Actor1Name", StringType(), True),
    StructField("Actor1CountryCode", StringType(), True),
    StructField("Actor1KnownGroupCode", StringType(), True),
    StructField("Actor1EthnicCode", StringType(), True),
    StructField("Actor1Religion1Code", StringType(), True),
    StructField("Actor1Religion2Code", StringType(), True),
    StructField("Actor1Type1Code", StringType(), True),
    StructField("Actor1Type2Code", StringType(), True),
    StructField("Actor1Type3Code", StringType(), True),
    StructField("Actor2Code", StringType(), True),
    StructField("Actor2Name", StringType(), True),
    StructField("Actor2CountryCode", StringType(), True),
    StructField("Actor2KnownGroupCode", StringType(), True),
    StructField("Actor2EthnicCode", StringType(), True),
    StructField("Actor2Religion1Code", StringType(), True),
    StructField("Actor2Religion2Code", StringType(), True),
    StructField("Actor2Type1Code", StringType(), True),
    StructField("Actor2Type2Code", StringType(), True),
    StructField("Actor2Type3Code", StringType(), True),
    StructField("IsRootEvent", IntegerType(), True),
    StructField("EventCode", StringType(), True),
    StructField("EventBaseCode", StringType(), True),
    StructField("EventRootCode", StringType(), True),
    StructField("QuadClass", IntegerType(), True),
    StructField("GoldsteinScale", FloatType(), True),
    StructField("NumMentions", IntegerType(), True),
    StructField("NumSources", IntegerType(), True),
    StructField("NumArticles", IntegerType(), True),
    StructField("AvgTone", FloatType(), True),
    StructField("Actor1Geo_Type", IntegerType(), True),
    StructField("Actor1Geo_FullName", StringType(), True),
    StructField("Actor1Geo_CountryCode", StringType(), True),
    StructField("Actor1Geo_ADM1Code", StringType(), True),
    StructField("Actor1Geo_ADM2Code", StringType(), True),
    StructField("Actor1Geo_Lat", FloatType(), True),
    StructField("Actor1Geo_Long", FloatType(), True),
    StructField("Actor1Geo_FeatureID", StringType(), True),
    StructField("Actor2Geo_Type", IntegerType(), True),
    StructField("Actor2Geo_FullName", StringType(), True),
    StructField("Actor2Geo_CountryCode", StringType(), True),
    StructField("Actor2Geo_ADM1Code", StringType(), True),
    StructField("Actor2Geo_ADM2Code", StringType(), True),
    StructField("Actor2Geo_Lat", FloatType(), True),
    StructField("Actor2Geo_Long", FloatType(), True),
    StructField("Actor2Geo_FeatureID", StringType(), True),
    StructField("ActionGeo_Type", IntegerType(), True),
    StructField("ActionGeo_FullName", StringType(), True),
    StructField("ActionGeo_CountryCode", StringType(), True),
    StructField("ActionGeo_ADM1Code", StringType(), True),
    StructField("ActionGeo_ADM2Code", StringType(), True),
    StructField("ActionGeo_Lat", FloatType(), True),
    StructField("ActionGeo_Long", FloatType(), True),
    StructField("ActionGeo_FeatureID", StringType(), True),
    StructField("DATEADDED", StringType(), True),
    StructField("SOURCEURL", StringType(), True),
])

In [7]:
import zipfile

def download_file(url):
    fname = url.split('/')[-1]
    folder_location = os.path.join(local_path, fname[:4], fname[4:6])

    # Download file from the specified url, if it doesn't exist yet
    if not os.path.isfile(os.path.join(folder_location, fname).replace(".zip", "")):
        try:
            urllib.request.urlretrieve(url, os.path.join(folder_location, fname))

            # Unzip zip file
            with zipfile.ZipFile(os.path.join(folder_location, fname), 'r') as zip_ref:
                zip_ref.extractall(folder_location)

            # Delete zip file
            os.remove(os.path.join(folder_location, fname))

        except Exception as e:
            print(f"An error occurred with file {fname}: {e}")

    else:
        print('File ' + fname + ' already exists')

In [0]:
import shutil
from concurrent.futures import ThreadPoolExecutor

# Download files and write them to parquet files in parallel for each month
# This is done in batches to allow simple addition of new months to already existing data
i = 0
for month_list in url_list:
    # Skip month if parquet file already exists
    if os.path.exists(os.path.join(parquet_path, str(i) + ".parquet")):
        i += 1
        continue

    year_folder = os.path.join(local_path, month_list[0].split('/')[-1][:4])
    month_folder = os.path.join(year_folder, month_list[0].split('/')[-1][4:6])

    if not os.path.isdir(year_folder):
        os.mkdir(year_folder)

    if not os.path.isdir(month_folder):
        os.mkdir(month_folder)

    # Download all files from the url list in parallel (threads = no. processors on machine * 5)
    with ThreadPoolExecutor() as executor:
        executor.map(download_file, month_list)

    # Read all csv files of one month into a spark dataframe
    df = spark.read.csv(month_folder, sep='\t', header=False, schema=schema, dateFormat='yyyyMMdd')

    # Write the data of one month into a parquet file
    df.write.parquet(os.path.join(parquet_path, str(i) + ".parquet"), mode='overwrite')
    
    i += 1
    
    # Delete the csv files to free up disk space
    shutil.rmtree(month_folder)

An error occurred with file 20150713230000.export.CSV.zip: HTTP Error 404: Not Found
An error occurred with file 20150804161500.export.CSV.zip: HTTP Error 404: Not Found
An error occurred with file 20150809034500.export.CSV.zip: HTTP Error 404: Not Found
An error occurred with file 20151020010000.export.CSV.zip: HTTP Error 404: Not Found
An error occurred with file 20151021041500.export.CSV.zip: HTTP Error 404: Not Found
An error occurred with file 20151021043000.export.CSV.zip: HTTP Error 404: Not Found
An error occurred with file 20151021044500.export.CSV.zip: HTTP Error 404: Not Found
An error occurred with file 20151021050000.export.CSV.zip: HTTP Error 404: Not Found
An error occurred with file 20151021051500.export.CSV.zip: HTTP Error 404: Not Found
An error occurred with file 20151021053000.export.CSV.zip: HTTP Error 404: Not Found
An error occurred with file 20151021060000.export.CSV.zip: HTTP Error 404: Not Found
An error occurred with file 20151021054500.export.CSV.zip: HTTP E

In [12]:
import pandas as pd
import requests

# Get the csv and parquet file sizes via the input/output bytes of the stages from the spark rest api
result_file_path = os.path.join(result_folder_path, 'data_size.csv')

if not os.path.isdir(result_folder_path):
    os.mkdir(result_folder_path)

# Fetch the list of applications to get the application id
apps_response = requests.get(spark_rest_api_url)
apps = apps_response.json()
app_id = apps[0]['id']

# Get stages information for the application (1 stage per parquet file write)
stages_response = requests.get(f"{spark_rest_api_url}/{app_id}/stages")
stages_data = stages_response.json()

stage_result = {}

# Get necessary information of each stage
for stage in stages_data:
    stage_result[stage['stageId']] = {
        'status': stage['status'],
        'input_data': stage['inputBytes'],
        'output_data': stage['outputBytes']
    }
    
df_result = pd.DataFrame.from_dict(stage_result, orient='index', columns=['status','input_data', 'output_data'])

# Write the result to a csv file to use them later
if not os.path.isfile(result_file_path):
    df_result.to_csv(result_file_path, sep=';')

In [8]:
from pyspark.sql.functions import broadcast
from pyspark.sql import functions as F


# Method to run the non aggregated version of the program repeatedly in the test loop
def run_non_aggregated(df_base):
    # CSV file containing a mapping from FIPS10-4 country codes to ISO 3166-1 alpha-2 country codes (necessary for superset heatmap)
    mapping_file_path = os.path.join(os.getcwd(), 'util', 'country_code_mapping.csv')

    # Load mapping file outside of spark (small dataset)
    df_mapping = spark.read.csv(mapping_file_path, sep=';', header=True, inferSchema=True).select(
        F.col('FIPS 10-4'),
        F.col('ISO 3166-1')
    )

    # Map the country codes
    df_non_aggregated = df_base.join(broadcast(df_mapping), df_base['ActionGeo_CountryCode'] == df_mapping['FIPS 10-4'],
                                     'left_outer')

    df_non_aggregated = df_non_aggregated \
        .withColumn('ActionGeo_CountryCode', F.col('ISO 3166-1')) \
        .drop('ISO 3166-1') \
        .drop('FIPS 10-4')

    # Load data, trigger caching and create a global temp view (as it would be necessary to use the data with superset)
    df_non_aggregated.cache()
    df_non_aggregated.count()
    df_non_aggregated.createOrReplaceGlobalTempView("GDELT")
    
    return df_non_aggregated

In [9]:
# Method to run the aggregated version of the program repeatedly in the test loop
def run_aggregated(df_base):
    # Select only relevant columns for the aggregation
    df_selection = df_base.select(
        F.col('Day'),
        F.col('ActionGeo_CountryCode'),
        F.col('GoldsteinScale')
    )

    # Remove rows that contain null values, which would distort the aggregation results 
    df_selection = df_selection.na.drop()
    
    # Aggregate the values by date and country so there is only one value per country per day
    df_aggregated = df_selection.groupBy('Day', 'ActionGeo_CountryCode').agg(
        F.sum('GoldsteinScale').alias('GoldsteinScaleSum'),
        F.count('*').alias('EventCount')
    )

    # CSV file containing a mapping from FIPS10-4 country codes to ISO 3166-1 alpha-2 country codes (necessary for superset heatmap)
    mapping_file_path = os.path.join(os.getcwd(), 'util', 'country_code_mapping.csv')

    # Load mapping file outside of spark (small dataset)
    df_mapping = spark.read.csv(mapping_file_path, sep=';', header=True, inferSchema=True).select(
        F.col('FIPS 10-4'),
        F.col('ISO 3166-1')
    )

    # Map the country codes
    df_aggregated = df_aggregated.join(broadcast(df_mapping),
                                       df_aggregated['ActionGeo_CountryCode'] == df_mapping['FIPS 10-4'],
                                       'left_outer')

    df_aggregated = df_aggregated \
        .withColumn('ActionGeo_CountryCode', F.col('ISO 3166-1')) \
        .drop('ISO 3166-1') \
        .drop('FIPS 10-4')

    # Load data, trigger caching and create a global temp view (as it would be necessary to use the data with superset)
    df_aggregated.cache()
    df_aggregated.count()
    df_aggregated.createOrReplaceGlobalTempView("GDELT_AGGR")
    
    return df_aggregated

In [10]:
import requests

# Method to get the current cache information from the spark rest api
def get_cache_information():
    
    # Fetch the list of applications to get the application id
    apps_response = requests.get(spark_rest_api_url)
    apps = apps_response.json()
    app_id = apps[0]['id']
    
    # Get storage information for the application
    storage_response = requests.get(f"{spark_rest_api_url}/{app_id}/storage/rdd")
    storage_data = storage_response.json()
    
    # Only one dataframe is cached at a time, so only the first entry is relevant
    return storage_data[0]['memoryUsed'], storage_data[0]['diskUsed']

In [11]:
import time
from pyhive import hive
import pandas as pd

def average_sql_request_execution_time(sql, n=10):
    with hive.connect(host='localhost', port=10000, username='spark') as connection:
        request_start_time = time.time()

        for _ in range(n):
            pd.read_sql(sql=sql, con=connection)

        request_end_time = time.time()

    return (request_end_time - request_start_time) / n

In [12]:
from py4j.java_gateway import java_import

# Retrieve the spark context from the current spark session
sc = spark.sparkContext

# Import the HiveThriftServer2 class using the JVM instance of the spark context
java_import(sc._jvm, "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2")

# Dummy java arguments for main method
java_args = sc._gateway.new_array(sc._gateway.jvm.java.lang.String, 0)

# Start the thrift server by calling the main method of the imported class
sc._jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(java_args)

In [14]:
# Create a list of all parquet file paths
parquet_path_list = []

for i in range(0, len(os.listdir(parquet_path))):
    parquet_path_list.append(os.path.join(parquet_path, str(i) + ".parquet"))

In [None]:
import time

result_non_aggregated = {}

# Run the non aggregated version of main logic (loading & pre-processing) in a loop to test the scalability of the program
# With every run 6 more parquet files (each representing a month) are loaded and processed
for i in range(6, len(os.listdir(parquet_path)) + 1, 6):
    
    # Start timer
    start_time = time.time()
    
    # Read in the parquet files relevant for the current run
    df = spark.read.parquet(*parquet_path_list[:i])
    
    # Run the non aggregated version of the main logic
    df_non_aggregated = run_non_aggregated(df)
    
    # Stop timer
    end_time = time.time()
    
    # Calculate duration of the run
    duration = end_time - start_time
    
    # Get cache information
    memory_usage, disk_usage = get_cache_information()
    
    # Get the average execution time of the sql query used by superset to display the heatmap
    avg_sql_time = average_sql_request_execution_time("""
        SELECT ActionGeo_CountryCode AS ActionGeo_CountryCode,
               AVG(GoldsteinScale) AS GoldsteinScaleAvg
        FROM
          (SELECT *
           FROM global_temp.GDELT
           WHERE ActionGeo_CountryCode IS NOT NULL
             AND GoldsteinScale IS NOT NULL) AS virtual_table
        GROUP BY ActionGeo_CountryCode
    """)
    
    result_non_aggregated[i] = {
        'duration': duration,
        'memory_usage': memory_usage,
        'disk_usage': disk_usage,
        'avg_sql_time': avg_sql_time
    }
    
    print(result_non_aggregated)
    df_non_aggregated.unpersist()

  pd.read_sql(sql=sql, con=connection)


{6: {'duration': 170.5504674911499, 'memory_usage': 5993606440, 'disk_usage': 1042432428, 'avg_sql_time': 3.129914879798889}}
{6: {'duration': 170.5504674911499, 'memory_usage': 5993606440, 'disk_usage': 1042432428, 'avg_sql_time': 3.129914879798889}, 12: {'duration': 311.4626443386078, 'memory_usage': 5876227744, 'disk_usage': 6974715762, 'avg_sql_time': 6.637240314483643}}


In [ ]:
result_file_path = os.path.join(result_folder_path, 'test_result_non_aggregated.csv')

df_result_non_aggregated = pd.DataFrame.from_dict(result_non_aggregated, orient='index', columns=['duration','memory_usage', 'disk_usage'])

# Write the test result of the aggregated version to a csv file
if not os.path.isfile(result_file_path):
    df_result.to_csv(result_file_path, sep=';')

In [ ]:
import time

result_aggregated = {}

# Same test run with the aggregated version of the loading & preprocessing logic
for i in range(6, len(os.listdir(parquet_path)) + 1, 6):

    # Start timer
    start_time = time.time()
    
    # Read in the parquet files relveant for the current run
    df = spark.read.parquet(*parquet_path_list[:i])
    
    # Run the aggregated version of the main logic
    df_aggregated = run_aggregated(df)
    
    # Stop timer
    end_time = time.time()
    
    # Calculate duration of the run
    duration = end_time - start_time
    
    # Get cache information
    memory_usage, disk_usage = get_cache_information()
    
    # Get the average execution time of the sql query used by superset to display the heatmap
    avg_sql_time = average_sql_request_execution_time("""
        SELECT ActionGeo_CountryCode AS ActionGeo_CountryCode,
               SUM(GoldsteinScaleSum)/SUM(EventCount) AS GoldsteinScaleAvg
        FROM
          (SELECT *
           FROM global_temp.GDELT_AGGR) AS virtual_table
        GROUP BY ActionGeo_CountryCode
    """)

    
    result_aggregated[i] = {
        'duration': duration,
        'memory_usage': memory_usage,
        'disk_usage': disk_usage,
        'avg_sql_time': avg_sql_time
    }
    
    df_aggregated.unpersist()

In [ ]:
result_file_path = os.path.join(result_folder_path, 'test_result_aggregated.csv')

df_result_aggregated = pd.DataFrame.from_dict(result_aggregated, orient='index', columns=['duration','memory_usage', 'disk_usage'])

# Write the test result of the aggregated version to a csv file
if not os.path.isfile(result_file_path):
    df_result.to_csv(result_file_path, sep=';')