In [1]:
download_metadata = True # will create a list of all csv files in the s3 bucket
cache_http_calls = True # TTL for 1 hour 

In [None]:
# !pip install python-dotenv

In [2]:
import pyspark 
import requests
import os
import pandas as pd
import boto3
import json
import cachetools

from botocore import UNSIGNED
from botocore.config import Config

from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.conf import SparkConf


from copy import deepcopy
from datetime import datetime, timedelta
from dotenv import load_dotenv

load_dotenv()


os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:3.2.0" pyspark-shell'

from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

# !pip install boto3
# !pip install cachetools

sparkConf = SparkConf()
sparkConf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
sparkConf.set("spark.hadoop.fs.s3a.threads.max", 10)
sparkConf.set("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com")

sc = pyspark.SparkContext("local[*]", conf = sparkConf, appName = "s_p_challenge")
spark = SparkSession(sc)

print(f"spark version = {spark.version}")
print(f"pyspark version = {pyspark.__version__}")
print(f"Hadoop version = {sc._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}")

Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0f9963ff-5e5c-4bbc-a922-e17bf04848be;1.0
	confs: [default]


:: loading settings :: url = jar:file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.hadoop#hadoop-aws;3.2.0 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.375 in central
:: resolution report :: resolve 137ms :: artifacts dl 4ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.375 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-0f9963ff-5e5c-4bbc-a922-e17bf04848be
	confs: [default]
	0 artifacts copied, 2 already retrieved (0kB/4ms)
21/10/03 14:59:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your pla

spark version = 3.1.2
pyspark version = 3.1.2
Hadoop version = 3.2.0


### Download data and metadata from gdelt-open-data S3 bucket

In [3]:
# v2 event headers: https://github.com/linwoodc3/gdelt2HeaderRows/blob/master/schema_csvs/GDELT_2.0_Events_Column_Labels_Header_Row_Sep2016.csv
headers = pd.read_csv('headers.csv')
headers.head(n=2)

Unnamed: 0,tableId,dataType,Empty,Description
0,GLOBALEVENTID,INTEGER,NULLABLE,Globally unique identifier assigned to each ev...
1,SQLDATE,INTEGER,NULLABLE,Date the event took place in YYYYMMDD format. ...


In [4]:
def download_metadata():
    '''
    Download metadata from https://s3.console.aws.amazon.com/s3/buckets/gdelt-open-data?region=us-east-1
    '''
    s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED), region_name='us-east-1')
    s3_events = s3.list_objects_v2(Bucket='gdelt-open-data', Prefix='v2/events/')
    s3_all_events = []

    is_truncated = True
    continuation_token = None

    while is_truncated: 
        if continuation_token:
            s3_events = s3_events = s3.list_objects_v2(Bucket='gdelt-open-data', Prefix='v2/events/', ContinuationToken=continuation_token)
        else: 
            s3_events = s3_events = s3.list_objects_v2(Bucket='gdelt-open-data', Prefix='v2/events/')
        s3_all_events.append(s3_events)
        is_truncated = s3_events['IsTruncated']
        if 'NextContinuationToken' in s3_events: 
            continuation_token = s3_events['NextContinuationToken']
    print('Total number of iterations to the S3 list objects = {:,}'.format(len(s3_all_events)))
    s3_actual_events = []
    for s3_events in s3_all_events:
        s3_actual_events.extend(s3_events['Contents'])
    print('Total number of files in the S3 bucket = {:,}'.format(len(s3_actual_events)))
    return s3_actual_events

In [5]:
if download_metadata:
    events_metadata = pd.DataFrame(download_metadata())
    events_metadata.sort_values(by='LastModified', inplace=True, ascending=False)
    events_metadata

Total number of iterations to the S3 list objects = 144
Total number of files in the S3 bucket = 143,462


In [6]:
# Example uses GDELT dataset found here: https://aws.amazon.com/public-datasets/gdelt/
events = spark.read.csv("s3a://gdelt-open-data/v2/events/20190416151500.export.csv", header=False, sep='\t', inferSchema=True)
print(f"Total number of events in current file: {events.count()}")

21/10/03 15:01:28 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

Total number of events in current file: 1772


In [7]:
assert len(events.columns) == len(headers['tableId'])
for idx in range(len(events.columns)):
    events = events.withColumnRenamed(f"_c{idx}", list(headers['tableId'])[idx])
events = events.withColumn("SQLDATE", to_date(col("SQLDATE").cast("string"), "yyyyMMdd"))
events.printSchema()
events.show(n=2)

root
 |-- GLOBALEVENTID: integer (nullable = true)
 |-- SQLDATE: date (nullable = true)
 |-- MonthYear: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- FractionDate: double (nullable = true)
 |-- Actor1Code: string (nullable = true)
 |-- Actor1Name: string (nullable = true)
 |-- Actor1CountryCode: string (nullable = true)
 |-- Actor1KnownGroupCode: string (nullable = true)
 |-- Actor1EthnicCode: string (nullable = true)
 |-- Actor1Religion1Code: string (nullable = true)
 |-- Actor1Religion2Code: string (nullable = true)
 |-- Actor1Type1Code: string (nullable = true)
 |-- Actor1Type2Code: string (nullable = true)
 |-- Actor1Type3Code: string (nullable = true)
 |-- Actor2Code: string (nullable = true)
 |-- Actor2Name: string (nullable = true)
 |-- Actor2CountryCode: string (nullable = true)
 |-- Actor2KnownGroupCode: string (nullable = true)
 |-- Actor2EthnicCode: string (nullable = true)
 |-- Actor2Religion1Code: string (nullable = true)
 |-- Actor2Religion2Code: str

21/10/03 15:01:41 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------------+----------+---------+----+------------+----------+----------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+----------+----------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+-----------+---------+-------------+-------------+---------+--------------+-----------+----------+-----------+----------------+--------------+------------------+---------------------+------------------+------------------+-------------+--------------+-------------------+--------------+------------------+---------------------+------------------+------------------+-------------+--------------+-------------------+--------------+------------------+---------------------+------------------+------------------+-------------+--------------+-------------------+--------------+--------------------+
|GLOBALEVENTID|   SQL

In [8]:
print(f"Total number of events before cleaning: {events.count()}")
events_clean = events.filter('Actor1Code is not Null and Actor2Code is not Null and Actor1Geo_Lat is not Null and Actor1Geo_Long is not Null and Actor2Geo_Lat is not Null and Actor2Geo_Long is not Null')
print(f"Total number of events after cleaning: {events_clean.count()}")
events_clean.show(n=2)

Total number of events before cleaning: 1772


                                                                                

Total number of events after cleaning: 1108
+-------------+----------+---------+----+------------+----------+----------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+----------+----------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+-----------+---------+-------------+-------------+---------+--------------+-----------+----------+-----------+-----------------+--------------+--------------------+---------------------+------------------+------------------+-------------+--------------+-------------------+--------------+--------------------+---------------------+------------------+------------------+-------------+--------------+-------------------+--------------+--------------------+---------------------+------------------+------------------+-------------+--------------+-------------------+-------

In [9]:
if not cache_http_calls:
    call_cache = call_cache = cachetools.TTLCache(10000, ttl=timedelta(seconds=1), timer=datetime.now)
else:
    call_cache = call_cache = cachetools.TTLCache(10000, ttl=timedelta(hours=1), timer=datetime.now)

def get_model_response(payload: dict[str, object], event_id=None):
    '''
    Sample payload: 
    payload = {
        "data": {
            "avg_tone": -2,
            "goldstein": 0.5,
            "actor_code": "GOV",
            "lat": 38,
            "lon": -78,
            "date": "2018-10-23 04:30:00"
            }
        }
    TODO: Need to check if we need to implement politeness while calling the API
    '''
    if event_id and event_id in call_cache:
        return call_cache['event_id']
    
    headers = {'Content-Type': 'application/json'}
    username = password = os.getenv('API_PASSWORD')

    res = requests.post(
        url=os.getenv('API_URL'),
        headers = headers,
        data = json.dumps(payload),
        auth=(username, password)
    )
    response = json.loads(res.content.decode('utf-8'))
    call_cache[event_id] = response
    return response

def flatten_model_response(actor: str, response: dict[str,object], event_id=None, debug=False):
    d = {}
    if event_id and debug: 
        print(event_id)
    try:
        d[f'{actor}_model_time_in_ms'] = response['model_time_in_ms']
        d[f'{actor}_release_harness_version'] = response['release']['harness_version']
        d[f'{actor}_release_model_version'] = response['release']['model_version']
        d[f'{actor}_release_model_version_number'] = response['release']['model_version_number']
        d[f'{actor}_request_id'] = response['request_id']
        d[f'{actor}_result_class1'] = response['result']['class1']
        d[f'{actor}_result_class2'] = response['result']['class2']
        d[f'{actor}_timing'] = response['timing']
    except Exception as e: 
        print(response)
    return d

In [10]:
def create_payload(avg_tone, goldstein, actor_code, lat, lon, date):
    data = {}
    data['avg_tone'] = avg_tone
    data['goldstein'] = goldstein
    data['actor_code'] = actor_code
    data['lat'] = lat
    data['lon'] = lon
    data['date'] = date.strftime('%Y-%m-%d %H:%M:%S')
    payload = {}
    payload['data'] = data
    return payload
    

def call_model_output(row):
    '''
        payload = {
        "data": {
            "avg_tone": -2,
            "goldstein": 0.5,
            "actor_code": "GOV",
            "lat": 38,
            "lon": -78,
            "date": "2018-10-23 04:30:00"
            }
        }
    '''
    # actor 1
    r = row.asDict(True)
    payload = create_payload(row['AvgTone'], row['GoldsteinScale'], row['Actor1Code'], row['Actor1Geo_Lat'], row['Actor1Geo_Long'], datetime.strptime(str(row['DATEADDED']),'%Y%m%d%H%M%S'))
    response = flatten_model_response('Actor1', get_model_response(payload), event_id=row['GLOBALEVENTID'])

    for k, v in response.items():
        r[k] = v    
    
    # actor 2
    payload = create_payload(row['AvgTone'], row['GoldsteinScale'], row['Actor2Code'], row['Actor2Geo_Lat'], row['Actor2Geo_Long'], datetime.strptime(str(row['DATEADDED']),'%Y%m%d%H%M%S'))
    response = flatten_model_response('Actor2', get_model_response(payload), event_id=row['GLOBALEVENTID'])
    
    for k, v in response.items():
        r[k] = v    
    
    return Row(**r)

def define_schema(events):
    schema = deepcopy(events.schema)
    print('Number of columns in schema before addition = {:,}'.format(len(schema)))
    # https://spark.apache.org/docs/latest/sql-ref-datatypes.html
    for actor in ['Actor1', 'Actor2']:
        schema.add(StructField(f'{actor}__model_time_in_ms', IntegerType(), True))
        schema.add(StructField(f'{actor}_release_harness_version', StringType(), True))
        schema.add(StructField(f'{actor}_release_model_version', StringType(), True))
        schema.add(StructField(f'{actor}_release_model_version_number', IntegerType(), True))
        schema.add(StructField(f'{actor}_request_id', StringType(), True))
        schema.add(StructField(f'{actor}_result_class1', BooleanType(), True))
        schema.add(StructField(f'{actor}_result_class2', IntegerType(), True))
        schema.add(StructField(f'{actor}_timing', DoubleType(), True))
    print('Number of columns in schema after addition = {:,}'.format(len(schema)))
    return schema

In [11]:
df = events_clean.rdd.map(call_model_output)
schema = define_schema(events_clean)
df = spark.createDataFrame(df, schema)
df.show(n=1, vertical=True)
df.write.parquet('model_output.parquet')

Number of columns in schema before addition = 61
Number of columns in schema after addition = 77


                                                                                

-RECORD 0---------------------------------------------------
 GLOBALEVENTID                       | 838788881            
 SQLDATE                             | 2018-04-16           
 MonthYear                           | 201804               
 Year                                | 2018                 
 FractionDate                        | 2018.2904            
 Actor1Code                          | EDU                  
 Actor1Name                          | ECONOMIST            
 Actor1CountryCode                   | null                 
 Actor1KnownGroupCode                | null                 
 Actor1EthnicCode                    | null                 
 Actor1Religion1Code                 | null                 
 Actor1Religion2Code                 | null                 
 Actor1Type1Code                     | EDU                  
 Actor1Type2Code                     | null                 
 Actor1Type3Code                     | null                 
 Actor2Code             

AnalysisException: path file:/home/jovyan/work/model_output.parquet already exists.

In [None]:
df = spark.read.parquet('model_output.parquet')
dfp = df.toPandas()
dfp.to_csv('model_output.csv')