# Data Engineering Final Project: GDELT
### Global Data on Events, Location, and Tone

"The GDELT Project is a realtime network diagram and database of global human society for open research. GDELT monitors the world's news media from nearly every corner of every country
in print, broadcast, and web formats, in over 100 languages,
every moment of every day."


### Events: 
The GDELT Event Database records over 300 categories of physical activities around the world, from riots and protests to peace appeals and diplomatic exchanges, georeferenced to the city or mountaintop, across the entire planet dating back to January 1, 1979 and updated every 15 minutes.

Essentially it takes a sentence like "The United States criticized Russia yesterday for deploying its troops in Crimea, in which a recent clash with its soldiers left 10 civilians injured" and transforms this blurb of unstructured text into three structured database entries, recording US CRITICIZES RUSSIA, RUSSIA TROOP-DEPLOY UKRAINE (CRIMEA), and RUSSIA MATERIAL-CONFLICT CIVILIANS (CRIMEA).

Nearly 60 attributes are captured for each event, including the approximate location of the action and those involved. This translates the textual descriptions of world events captured in the news media into codified entries in a grand "global spreadsheet."

### Global Knowledge Graph:
Much of the true insight captured in the world's news media lies not in what it says, but the context of how it says it. The GDELT Global Knowledge Graph (GKG) compiles a list of every person, organization, company, location and several million themes and thousands of emotions from every news report, using some of the most sophisticated named entity and geocoding algorithms in existance, designed specifically for the noisy and ungrammatical world that is the world's news media.

The resulting network diagram constructs a graph over the entire world, encoding not only what's happening, but what its context is, who's involved, and how the world is feeling about it, updated every single day.

In [1]:
from IPython.display import YouTubeVideo, HTML
HTML('<iframe width="560" height="315" src="https://www.youtube.com/embed/GpCarC_I3Ao?list=PLlRVXVT7h9_gCGCOl_bNYHA7FXbSOIVbs" frameborder="0" allowfullscreen></iframe>')


# Proposed Architecture

!['gdelt_architecture'](images/gdelt_architecture.png)

## Why Cassandra and Neo4j? I'll be doing a lot a column queries and will be working with network data.

How does my system have this property?
How does my system fall short and how could it be improved?

### Robustness and Fault Tolerance

My system hinges on two EC2 instances, and an EMR cluster. I may look into using Elastic Beanstalk to deploy fresh EC2 instances in the event that one fails. 

### Low latency reads and updates

My system does not consider how to lower latency reads between my flask app and my distributed data stores. However, I will serialize each CSV as an Apache Parquet file which will make data processing in Spark faster. I honestly have no idea how I can improve web app latency.

### Scalability

Again, I may need to look into Elastic Beanstalk to enable automated scaling.

### Genearlization

I hope to use Airflow to coordinate my Spark DAGs. This would make it easier to do future projects that entail prediction and machine learning. It would be nice to enable queries in the web app that trigger a Spark job that returns an answer. Elastic Search may be worth investigating.

### Extensibility

Saving my raw files onto S3 and using Airflow to construct Spark DAGs makes this system reasonably extensible.

### Ad Hoc Queries

Elastic Search combined with Spark would be a great way to perform efficient ad hoc queries.

### Minimal Maintenance

Using two NoSQL databases is a liability, but I want to try them out for this project. I am not sure how minimize maintanence within these data stores.

### Debuggability
Airflow will help with debuggability. I will want to store error logs on S3 for each process.



# Data Demo

In [5]:
!ls ../src/data

20150220183000.gkg.csv          [34m__pycache__[m[m
20150220183000.gkg.csv.zip      download.sh
20150220184500.export.CSV.zip   download_weather.sh
20150220184500.mentions.CSV.zip ingest_data.py
20170221051500.gkg.csv          landsat_geotiff_to_rgb.sh
20170221051500.gkg.csv.zip      make_dataset.py
20170221070000.gkg.csv.zip


In [6]:
# Load the "autoreload" extension
%load_ext autoreload

# always reload modules marked with "%aimport"
%autoreload 1

# Import Dependencies
import os
import requests
import sys
import pandas as pd

# add the 'src' directory as one where we can import modules
PROJ_ROOT = os.pardir
src_dir = os.path.join(PROJ_ROOT, 'src')
sys.path.append(src_dir)

# import my ingestion method from the source code
%aimport data.ingest_data
import data.ingest_data as ingest

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [7]:
# Get a a list of all the csv files available for download
url_list = ingest.get_list_of_urls()



<class 'requests.models.Response'>
22581606
210906
http://data.gdeltproject.org/gdeltv2/20150218230000.export.CSV.zip


In [8]:
# File sample
print('Number of zipped CSV files: ',len(url_list))
print('\n10 most recent files:\n\n',url_list[-10:])

Number of zipped CSV files:  210906

10 most recent files:

 ['http://data.gdeltproject.org/gdeltv2/20170222171500.gkg.csv.zip', 'http://data.gdeltproject.org/gdeltv2/20170222173000.export.CSV.zip', 'http://data.gdeltproject.org/gdeltv2/20170222173000.mentions.CSV.zip', 'http://data.gdeltproject.org/gdeltv2/20170222173000.gkg.csv.zip', 'http://data.gdeltproject.org/gdeltv2/20170222174500.export.CSV.zip', 'http://data.gdeltproject.org/gdeltv2/20170222174500.mentions.CSV.zip', 'http://data.gdeltproject.org/gdeltv2/20170222174500.gkg.csv.zip', 'http://data.gdeltproject.org/gdeltv2/20170222180000.export.CSV.zip', 'http://data.gdeltproject.org/gdeltv2/20170222180000.mentions.CSV.zip', 'http://data.gdeltproject.org/gdeltv2/20170222180000.gkg.csv.zip']


In [9]:
# This is the file I will download every 15 minutes
gdelt_last_15 = requests.get('http://data.gdeltproject.org/gdeltv2/lastupdate.txt')
last_15_lines = gdelt_last_15.text.split('\n')
last_15_lines = [i.split() for i in last_15_lines]
last_15_lines

[['208039',
  '4bdeaf3495bba2eb92f432d6a93ce6db',
  'http://data.gdeltproject.org/gdeltv2/20170222180000.export.CSV.zip'],
 ['436448',
  '681a69f8b10617f47d305a2f1365c9e4',
  'http://data.gdeltproject.org/gdeltv2/20170222180000.mentions.CSV.zip'],
 ['19204158',
  '9f8f8dc5336f701022e6b01fde01d546',
  'http://data.gdeltproject.org/gdeltv2/20170222180000.gkg.csv.zip'],
 []]

### Features found in the three datasets

In [10]:
gkg_columns = ['GKGRECORDID', 'DATE', 'SourceCollectionIdentifier',
          'SourceCommonName', 'DocumentIdentifier', 'Counts',
          'V2Counts', 'Themes', 'V2Themes', 'Locations',
          'V2Locations', 'Persons', 'V2Persons', 'Organizations',
          'V2Organizations', 'V2Tone', 'Dates', 'GCAM',
           'SharingImage', 'RelatedImages', 'SocialImageEmbeds',
          'SocialVideoEmbeds', 'Quotations', 'AllNames', 'Amounts',
          'TranslationInfo', 'Extras']

events_columns = ['GLOBALEVENTID', 'SQLDATE', 'MonthYear', 'Year', 'FractionDate',
   'Actor1Code', 'Actor1Name', 'Actor1CountryCode',
   'Actor1KnownGroupCode', 'Actor1EthnicCode', 'Actor1Religion1Code',
   'Actor1Religion2Code', 'Actor1Type1Code', 'Actor1Type2Code',
   'Actor1Type3Code', 'Actor2Code', 'Actor2Name', 'Actor2CountryCode',
   'Actor2KnownGroupCode', 'Actor2EthnicCode', 'Actor2Religion1Code',
   'Actor2Religion2Code', 'Actor2Type1Code', 'Actor2Type2Code',
   'Actor2Type3Code', 'IsRootEvent', 'EventCode', 'EventBaseCode',
   'EventRootCode', 'QuadClass', 'GoldsteinScale', 'NumMentions',
   'NumSources', 'NumArticles', 'AvgTone', 'Actor1Geo_Type',
   'Actor1Geo_FullName', 'Actor1Geo_CountryCode', 'Actor1Geo_ADM1Code',
   'Actor1Geo_ADM2Code',
   'Actor1Geo_Lat', 'Actor1Geo_Long', 'Actor1Geo_FeatureID',
   'Actor2Geo_Type', 'Actor2Geo_FullName', 'Actor2Geo_CountryCode',
   'Actor2Geo_ADM1Code',
   'Actor2Geo_ADM2Code',
    'Actor2Geo_Lat', 'Actor2Geo_Long',
   'Actor2Geo_FeatureID', 'ActionGeo_Type', 'ActionGeo_FullName',
   'ActionGeo_CountryCode', 'ActionGeo_ADM1Code',
   'ActionGeo_ADM2Code',
    'ActionGeo_Lat',
   'ActionGeo_Long', 'ActionGeo_FeatureID', 'DATEADDED', 'SOURCEURL']

mentions_columns = ['GLOBALEVENTID', 'EventTimeDate', 'MentionTimeDate',
                       'MentionType', 'MentionSourceName', 'MentionIdentifier',
                       'SentenceID', 'Actor1CharOffset', 'Actor2CharOffset',
                       'ActionCharOffset', 'InRawText', 'Confidence',
                       'MentionDocLen', 'MentionDocTone',
                       'MentionDocTranslationInfo', 'Extras']

### Load sample DataFrames

In [11]:
gkg = pd.read_csv('{}/data/raw/20150220183000.gkg.csv'.format(PROJ_ROOT), sep='\t')
gkg.columns = gkg_columns

events = pd.read_csv('{}/data/raw/20150220184500.export.CSV'.format(PROJ_ROOT), sep='\t')
events.columns = events_columns

mentions = pd.read_csv('{}/data/raw/20150220184500.mentions.CSV'.format(PROJ_ROOT), sep='\t')
mentions.columns = mentions_columns

## Events

In [12]:
events.T.iloc[:,:2]

Unnamed: 0,0,1
GLOBALEVENTID,410987301,410987302
SQLDATE,20140220,20140220
MonthYear,201402,201402
Year,2014,2014
FractionDate,2014.14,2014.14
Actor1Code,,
Actor1Name,,
Actor1CountryCode,,
Actor1KnownGroupCode,,
Actor1EthnicCode,,


## Mentions (Supplements Events Table)

In [13]:
gkg.T.iloc[:,:2]

Unnamed: 0,0,1
GKGRECORDID,20150220183000-1,20150220183000-2
DATE,20150220183000,20150220183000
SourceCollectionIdentifier,2,2
SourceCommonName,BBC Monitoring,BBC Monitoring
DocumentIdentifier,"Tolo TV, Kabul/BBC Monitoring/(c) BBC","Suna news agency website, Khartoum/BBC Monitor..."
Counts,,
V2Counts,,
Themes,GENERAL_GOVERNMENT;TAX_TERROR_GROUP;TAX_TERROR...,LEADER;TAX_FNCACT;TAX_FNCACT_PRESIDENT;GENERAL...
V2Themes,"GENERAL_GOVERNMENT,32;GENERAL_GOVERNMENT,143;G...","TAX_FNCACT,69;TAX_FNCACT,422;TAX_FNCACT,486;EC..."
Locations,"4#Beijing, Beijing, China#AF#CH22#39.9289#116....",1#United Arab Emirates#AE#AE#24#54#AE;4#Kharto...


## Global Knowledge Graph

In [14]:
mentions.T.iloc[:,:2]

Unnamed: 0,0,1
GLOBALEVENTID,410844811,410952023
EventTimeDate,20150220081500,20150220161500
MentionTimeDate,20150220184500,20150220184500
MentionType,1,1
MentionSourceName,freepressjournal.in,ap.org
MentionIdentifier,http://www.freepressjournal.in/climate-change-...,http://hosted2.ap.org/CAANR/0260ea4c3e85456b80...
SentenceID,2,25
Actor1CharOffset,-1,-1
Actor2CharOffset,918,7252
ActionCharOffset,830,7319


### Last Update file provides URLs to most recent datasets

In [15]:
gdelt_last_15 = requests.get('http://data.gdeltproject.org/gdeltv2/lastupdate.txt')

lines = gdelt_last_15.text.split('\n')

lines = [i.split() for i in lines]

In [16]:
lines

[['208039',
  '4bdeaf3495bba2eb92f432d6a93ce6db',
  'http://data.gdeltproject.org/gdeltv2/20170222180000.export.CSV.zip'],
 ['436448',
  '681a69f8b10617f47d305a2f1365c9e4',
  'http://data.gdeltproject.org/gdeltv2/20170222180000.mentions.CSV.zip'],
 ['19204158',
  '9f8f8dc5336f701022e6b01fde01d546',
  'http://data.gdeltproject.org/gdeltv2/20170222180000.gkg.csv.zip'],
 []]