In [8]:
from datetime import datetime
from functools import partial
import re
import os
import json

from pyspark import SparkContext

In [9]:
index2events_column = {
  0: "GlobalEventId",
  37: "Actor1Geo_CountryCode",
  60: "url",
  40: "lat",
  41: "long",
  59: "DateAdded"
}

index2mentions_column = {
  0: "GlobalEventId",
  3: "MentionType",
  5: "MentionIdentifier",
  1: "EventTimeDate",
  2: "MentionTimeDate"
}

basepath = "/home/ubuntu/data/project/gdelt/sample"
events_path_english = os.path.join(basepath, "english/events/")
mentions_path_english = os.path.join(basepath, "english/mentions/")
events_path_multi = os.path.join(basepath, "multilingual/events/")
mentions_path_multi = os.path.join(basepath, "multilingual/mentions/")

In [10]:
sc = SparkContext()

In [11]:
events_rdd = sc.textFile(events_path_english).union(sc.textFile(events_path_multi))
mentions_rdd = sc.textFile(mentions_path_english).union(sc.textFile(mentions_path_multi))

In [12]:
tab_split_func = lambda input_line: input_line.split("\t")

In [13]:
events_split_rdd = events_rdd.map(tab_split_func)

In [14]:
def transform_to_json(record, index2column):
    return_value = dict()
    for index, column_name in index2column.items():
        return_value[column_name] = record[index]
    return return_value

events_json_rdd = events_split_rdd.map(partial(transform_to_json, index2column=index2events_column))

In [15]:
events_json_rdd.take(1)

[{'GlobalEventId': '915090043',
  'Actor1Geo_CountryCode': 'US',
  'url': 'https://gazette.com/premium/editorial-give-oil-and-gas-a-break-from-new-rules/article_a73a1844-7092-11ea-86ab-837f0f45f6a9.html',
  'lat': '39.0646',
  'long': '-105.327',
  'DateAdded': '20200328134500'}]

In [16]:
def convert_dtstr_dt(dtstr):
    format_string = "%Y%m%d%H%M%S"
    dt = datetime.strptime(dtstr, format_string)
    return dt

def convert_to_float(floatstr):
    try:
        return float(floatstr)
    except ValueError:
        return None

type_converters = {
    "DateAdded": convert_dtstr_dt,
    "lat": convert_to_float,
    "long": convert_to_float
}

def convert_types(record, converters):
    for col_name, convert_func in converters.items():
        record[col_name] = convert_func(record[col_name])
    return record

In [17]:
assert convert_dtstr_dt("20200310120000") == datetime(2020, 3, 10, 12, 0)

In [18]:
events_converted_rdd = events_json_rdd.map(partial(convert_types, converters=type_converters))

In [19]:
events_converted_rdd.take(1)

[{'GlobalEventId': '915090043',
  'Actor1Geo_CountryCode': 'US',
  'url': 'https://gazette.com/premium/editorial-give-oil-and-gas-a-break-from-new-rules/article_a73a1844-7092-11ea-86ab-837f0f45f6a9.html',
  'lat': 39.0646,
  'long': -105.327,
  'DateAdded': datetime.datetime(2020, 3, 28, 13, 45)}]

In [20]:
corona_regex = re.compile("(?=.*[ck]orona)(?=.*virus)|[Cc][Oo][Vv][Ii][Dd]-?19")
def contains_corona(record):
    url = record["url"]
    return corona_regex.search(url) != None

In [21]:
corona_events_rdd = events_converted_rdd.filter(contains_corona)

In [22]:
corona_forjoin_rdd = corona_events_rdd.map(lambda record: (record["GlobalEventId"], record))

In [23]:
corona_forjoin_rdd.take(1)

[('915090050',
  {'GlobalEventId': '915090050',
   'Actor1Geo_CountryCode': '',
   'url': 'https://www.inbrampton.com/federal-doctor-says-covid-19-battle-will-last-months-many-months-as-cases-soar',
   'lat': None,
   'long': None,
   'DateAdded': datetime.datetime(2020, 3, 28, 13, 45)})]

In [24]:
mentions_count_rdd = (mentions_rdd
     .map(tab_split_func)
     .map(partial(transform_to_json, index2column=index2mentions_column))
     .map(lambda json_record: (json_record["GlobalEventId"],1))
     .reduceByKey(lambda x, y: x+y)
)

In [25]:
joined_rdd = corona_forjoin_rdd.join(mentions_count_rdd)

In [26]:
def add_mentions(tpl):
    event_id, (event_record, number_of_mentions) = tpl
    event_record["NumberOfMentions"] = number_of_mentions
    return event_record

In [27]:
event_mentions_rdd = joined_rdd.map(add_mentions)

In [28]:
def stringify_record(record):
    record["DateAdded"] = record["DateAdded"].strftime("%Y%m%d:%H%M%S")
    return json.dumps(record)

In [None]:
event_mentions_rdd.map(stringify_record).saveAsTextFile("/home/ubuntu/data/project/gdelt/sample/output/")

In [29]:
fips2iso_file = "/home/ubuntu/data/project/gdelt/fips2iso_country_codes.tsv"
with open(fips2iso_file,"r") as fin:
    fips2iso = dict()
    for line in fin:
        _, fips_code, iso_code = line.strip().split("\t")
        fips2iso[fips_code] = iso_code

In [31]:
broadcast_fips2iso = sc.broadcast(fips2iso)

In [32]:
def convert_fips2iso(record):
    record["Actor1Geo_CountryCode"] = broadcast_fips2iso.value.get(record["Actor1Geo_CountryCode"],
                                                                   record["Actor1Geo_CountryCode"])
    return record

In [34]:
event_mentions_iso_rdd = event_mentions_rdd.map(convert_fips2iso)

In [35]:
event_mentions_iso_rdd.take(1)

[{'GlobalEventId': '915090594',
  'Actor1Geo_CountryCode': 'NG',
  'url': 'https://news2.onlinenigeria.com/news/general/822000-president-buhari-s-43-ministers-donate-50-of-march-salary-to-fight-coronavirus.html',
  'lat': 6.45306,
  'long': 3.39583,
  'DateAdded': datetime.datetime(2020, 3, 28, 13, 45),
  'NumberOfMentions': 1}]

In [40]:
import mysql.connector

def connection_factory(user, password, host, database):
    cnx = mysql.connector.connect(
                    user=user, 
                    password=password,
                    host=host,
                    database=database
    )
    cursor = cnx.cursor()
    return cnx, cursor

connection_factory = partial(connection_factory, 
                             user="root", 
                             password="testtest", 
                             host="localhost",
                             database="Corona"
                    )

In [49]:
def store_records(records, connection_factory):
    cnx, cursor = connection_factory()
    
    insert_statement_str = "insert into GeoEventMentions (GlobalEventId,DateAdded,CountryCode,Latitude,Longitude,NumberOfMentions) VALUES (%s, %s, %s, %s, %s, %s);"
    
    record_list = list()
    for record in records:
        record_list.append((
            record["GlobalEventId"],
            record["DateAdded"],
            record["Actor1Geo_CountryCode"],
            record["lat"],
            record["long"],
            record["NumberOfMentions"]
        ))
        
    cursor.executemany(insert_statement_str, record_list)
    
    cnx.commit()
    cnx.close()

In [50]:
event_mentions_iso_rdd.foreachPartition(partial(store_records, connection_factory=connection_factory))