In [2]:
import numpy as np
import pyspark
import os
import scipy as sp
from pyspark.sql import *
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import *

%matplotlib inline

spark = SparkSession.builder.getOrCreate()



In [7]:
GKG_SCHEMA = StructType([
        StructField("GKGRECORDID",StringType(),True),
        StructField("DATE",StringType(),True),
        StructField("SourceCollectionIdentifier",StringType(),True),
        StructField("SourceCommonName",StringType(),True),
        StructField("DocumentIdentifier",StringType(),True),
        StructField("Counts",StringType(),True),
        StructField("V2Counts",StringType(),True),
        StructField("Themes",StringType(),True),
        StructField("V2Themes",StringType(),True),
        StructField("Locations",StringType(),True),
        StructField("V2Locations",StringType(),True),
        StructField("Persons",StringType(),True),
        StructField("V2Persons",StringType(),True),
        StructField("Organizations",StringType(),True),
        StructField("V2Organizations",StringType(),True),
        StructField("V2Tone",StringType(),True),
        StructField("Dates",StringType(),True),
        StructField("GCAM",StringType(),True),
        StructField("SharingImage",StringType(),True),
        StructField("RelatedImages",StringType(),True),
        StructField("SocialImageEmbeds",StringType(),True),
        StructField("SocialVideoEmbeds",StringType(),True),
        StructField("Quotations",StringType(),True),
        StructField("AllNames",StringType(),True),
        StructField("Amounts",StringType(),True),
        StructField("TranslationInfo",StringType(),True),
        StructField("Extras",StringType(),True)
        ])

EVENTS_SCHEMA = StructType([
    StructField("GLOBALEVENTID",LongType(),True),
    StructField("Day_DATE",StringType(),True),
    StructField("MonthYear_Date",StringType(),True),
    StructField("Year_Date",StringType(),True),
    StructField("FractionDate",FloatType(),True),
    StructField("Actor1Code",StringType(),True),
    StructField("Actor1Name",StringType(),True),
    StructField("Actor1CountryCode",StringType(),True),
    StructField("Actor1KnownGroupCode",StringType(),True),
    StructField("Actor1EthnicCode",StringType(),True),
    StructField("Actor1Religion1Code",StringType(),True),
    StructField("Actor1Religion2Code",StringType(),True),
    StructField("Actor1Type1Code",StringType(),True),
    StructField("Actor1Type2Code",StringType(),True),
    StructField("Actor1Type3Code",StringType(),True),
    StructField("Actor2Code",StringType(),True),
    StructField("Actor2Name",StringType(),True),
    StructField("Actor2CountryCode",StringType(),True),
    StructField("Actor2KnownGroupCode",StringType(),True),
    StructField("Actor2EthnicCode",StringType(),True),
    StructField("Actor2Religion1Code",StringType(),True),
    StructField("Actor2Religion2Code",StringType(),True),
    StructField("Actor2Type1Code",StringType(),True),
    StructField("Actor2Type2Code",StringType(),True),
    StructField("Actor2Type3Code",StringType(),True),
    StructField("IsRootEvent",LongType(),True),
    StructField("EventCode",StringType(),True),
    StructField("EventBaseCode",StringType(),True),
    StructField("EventRootCode",StringType(),True),
    StructField("QuadClass",LongType(),True),
    StructField("GoldsteinScale",FloatType(),True),
    StructField("NumMentions",LongType(),True),
    StructField("NumSources",LongType(),True),
    StructField("NumArticles",LongType(),True),
    StructField("AvgTone",FloatType(),True),
    StructField("Actor1Geo_Type",LongType(),True),
    StructField("Actor1Geo_FullName",StringType(),True),
    StructField("Actor1Geo_CountryCode",StringType(),True),
    StructField("Actor1Geo_ADM1Code",StringType(),True),
    StructField("Actor1Geo_ADM2Code",StringType(),True),
    StructField("Actor1Geo_Lat",FloatType(),True),
    StructField("Actor1Geo_Long",FloatType(),True),
    StructField("Actor1Geo_FeatureID",StringType(),True),
    StructField("Actor2Geo_Type",LongType(),True),
    StructField("Actor2Geo_FullName",StringType(),True),
    StructField("Actor2Geo_CountryCode",StringType(),True),
    StructField("Actor2Geo_ADM1Code",StringType(),True),
    StructField("Actor2Geo_ADM2Code",StringType(),True),
    StructField("Actor2Geo_Lat",FloatType(),True),
    StructField("Actor2Geo_Long",FloatType(),True),
    StructField("Actor2Geo_FeatureID",StringType(),True),
    StructField("ActionGeo_Type",LongType(),True),
    StructField("ActionGeo_FullName",StringType(),True),
    StructField("ActionGeo_CountryCode",StringType(),True),
    StructField("ActionGeo_ADM1Code",StringType(),True),
    StructField("ActionGeo_ADM2Code",StringType(),True),
    StructField("ActionGeo_Lat",FloatType(),True),
    StructField("ActionGeo_Long",FloatType(),True),
    StructField("ActionGeo_FeatureID",StringType(),True),
    StructField("DATEADDED",LongType(),True),
    StructField("SOURCEURL",StringType(),True)
    ])

MENTIONS_SCHEMA = StructType([
    StructField("GLOBALEVENTID",LongType(),True),
    StructField("EventTimeDate",LongType(),True),
    StructField("MentionTimeDate",LongType(),True),
    StructField("MentionType",LongType(),True),
    StructField("MentionSourceName",StringType(),True),
    StructField("MentionIdentifier",StringType(),True),
    StructField("SentenceID",LongType(),True),
    StructField("Actor1CharOffset",LongType(),True),
    StructField("Actor2CharOffset",LongType(),True),
    StructField("ActionCharOffset",LongType(),True),
    StructField("InRawText",LongType(),True),
    StructField("Confidence",LongType(),True),
    StructField("MentionDocLen",LongType(),True),
    StructField("MentionDocTone",FloatType(),True),
    StructField("MentionDocTranslationInfo",StringType(),True),
    StructField("Extras",StringType(),True)
    ])

In [13]:
## Change this in the cluster

#DATA_DIR = 'hdfs:///datasets/gdeltv2'
DATA_DIR = 'data/'

# directory for local files (ex.: UrlToCountry)
DATA_LOCAL = 'data/'

In [11]:
# open GDELT data
gkg_df = spark.read.option("sep", "\t").csv(os.path.join(DATA_DIR, "*.gkg.csv"),schema=GKG_SCHEMA)
events_df = spark.read.option("sep", "\t").csv(os.path.join(DATA_DIR, "*.export.CSV"),schema=EVENTS_SCHEMA)
mentions_df = spark.read.option("sep", "\t").csv(os.path.join(DATA_DIR, "*.mentions.CSV"),schema=MENTIONS_SCHEMA)

In [14]:
# open helper datasets
UrlToCountry = spark.read.format("csv").option("header", "true").load(DATA_LOCAL + "UrlToCountry.csv")
CountryToCapital = spark.read.format("csv").option("header", "true").load(DATA_LOCAL + "country-capitals.csv")

In [15]:
# select the required data from Mentions Dataset
mentions_q1_df = mentions_df.select("GLOBALEVENTID", "EventTimeDate", "MentionType", "MentionSourceName") \
                .filter(mentions_df["MentionType"] == '1')

# join the dataframe url to country
UrlToCountry = UrlToCountry.select(UrlToCountry['Country name'].alias('country_source'), UrlToCountry['Clean URL'].alias('url')) 
mentions_q1_country = mentions_q1_df.join(UrlToCountry, UrlToCountry['url'] == mentions_q1_df['MentionSourceName'], "left_outer") 

# print the number of urls that have no country
print('number of unknown urls: ', mentions_q1_country.filter("country_source is null").select('MentionSourceName').distinct().count())

# filter out urls that are unknown
mentions_filter_df = mentions_q1_country.filter(mentions_q1_country.country_source.isNotNull())

# print the number of urls associated to a country
print('number of known urls: ', mentions_filter_df.select('country_source').distinct().count())

number of unknown urls:  128
number of known urls:  60


In [16]:
# join the file with the countries and capitals
mentions_coord_df = mentions_filter_df.join(CountryToCapital, CountryToCapital['CountryName'] == mentions_filter_df['country_source'], "left_outer") 
print('Total number of countries: ', mentions_coord_df.count())
print('Number of countries with no coordinates: ', mentions_coord_df.filter("CapitalLatitude is null").count())

# filter out rows with no geographic coordinates
mentions_filter_coord_df = mentions_coord_df.filter(mentions_coord_df.CapitalLatitude.isNotNull())

# select relevant columns 
mentions_clean_df = mentions_filter_coord_df.select('GLOBALEVENTID', 'EventTimeDate','CountryCode', 'CountryName', mentions_filter_coord_df['CapitalLatitude'].alias('Source_Lat'),
                                                   mentions_filter_coord_df['CapitalLongitude'].alias('Source_Long'))

Total number of countries:  5938
Number of countries with no coordinates:  323


In [17]:
# select Data from Events Dataset
events_q1_df= events_df.select("GLOBALEVENTID", "ActionGeo_Lat", "ActionGeo_Long", "NumMentions","NumSources","NumArticles","AvgTone")

# filter out events that have no geographic coordinates
print('Total number of events: ', events_q1_df.count())
events_clean_df = events_q1_df.filter(events_q1_df.ActionGeo_Lat.isNotNull())
print('Number of events with geographic coordinates: ', events_clean_df.count())

# merge the clean events and mentions dfs
event_mentions_df = events_clean_df.join(mentions_clean_df, 'GLOBALEVENTID') 

Total number of events:  3639
Number of events with geographic coordinates:  3527


In [80]:
events_df.select("SOURCEURL").show(20,False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|SOURCEURL                                                                                                                                                                    |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|http://www.wsws.org/en/articles/2017/11/23/gree-n23.html                                                                                                                     |
|http://www.gdnonline.com/Details/295252/Man-who-raped,-killed-eight-year-old-Jordanian-boy-executed                                                                          |
|http://www.wsws.org/en/articles/2017/11/23/gree-n23.html                                                               

In [15]:
print(gkg_df.count(), events_df.count())

4169 3639


In [22]:
# save the DF as a Parquet file
events_df.write.parquet(DATA_DIR + 'events.parquet')
gkg_df.write.parquet(DATA_DIR + 'gkg.parquet')

In [24]:
# load the data from Parquet file
events_df = spark.read.parquet(DATA_DIR + 'events.parquet')
gkg_df = spark.read.parquet(DATA_DIR + 'gkg.parquet')

In [29]:
gkg_df.select("GKGRECORDID", "DATE", "SourceCollectionIdentifier", "SourceCommonName", "Locations") \
                .filter(gkg_df["SourceCollectionIdentifier"] == '1').show(2,False)

+----------------+--------------+--------------------------+----------------+---------------------------------------------------------------------------------------------------+
|GKGRECORDID     |DATE          |SourceCollectionIdentifier|SourceCommonName|Locations                                                                                          |
+----------------+--------------+--------------------------+----------------+---------------------------------------------------------------------------------------------------+
|20171123073000-0|20171123073000|1                         |nrl.com         |null                                                                                               |
|20171123073000-1|20171123073000|1                         |360nobs.com     |1#United States#US#US#39.828175#-98.5795#US;1#Nigeria#NI#NI#10#8#NI;1#United Kingdom#UK#UK#54#-4#UK|
+----------------+--------------+--------------------------+----------------+---------------------------------

In [31]:
# select the gkg data for Q1
gkg_Q1 = gkg_df.select("GKGRECORDID", "DATE", "SourceCollectionIdentifier", "SourceCommonName", "Locations") \
                .filter(gkg_df["SourceCollectionIdentifier"] == '1')

In [45]:
# open the file url to country
UrlToCountry = spark.read.format("csv").option("header", "true").load(DATA_LOCAL + "UrlToCountry.csv")

In [46]:
# select and rename columns of interest
UrlToCountry = UrlToCountry.select(UrlToCountry['Country name'].alias('country_source'), UrlToCountry['Clean URL'].alias('url')) 
UrlToCountry.show()

+--------------+-----------------+
|country_source|              url|
+--------------+-----------------+
|   Afghanistan|       1tvnews.af|
|   Afghanistan|       1tvnews.af|
|   Afghanistan|       1tvnews.af|
|   Afghanistan|    ariananews.af|
|   Afghanistan|    ariananews.af|
|   Afghanistan|    ariananews.af|
|   Afghanistan|da.azadiradio.com|
|   Afghanistan|pa.azadiradio.com|
|   Afghanistan|          bbc.com|
|   Afghanistan|          bbc.com|
|   Afghanistan|           dw.com|
|   Afghanistan|           dw.com|
|   Afghanistan|     dari.irib.ir|
|   Afghanistan|   pashto.irib.ir|
|   Afghanistan|        negaah.tv|
|   Afghanistan|        nooraf.tv|
|   Afghanistan|        rferl.org|
|   Afghanistan|           tkg.af|
|   Afghanistan|           tkg.af|
|   Afghanistan|           tkg.af|
+--------------+-----------------+
only showing top 20 rows



In [47]:
# merge the two dataframes to get the country of the source 
gkg_Q1_country = gkg_Q1.join(UrlToCountry, UrlToCountry['url'] == gkg_Q1['SourceCommonName'], "left_outer") 
gkg_Q1_country.show()

+-----------------+--------------+--------------------------+--------------------+--------------------+--------------+--------------------+
|      GKGRECORDID|          DATE|SourceCollectionIdentifier|    SourceCommonName|           Locations|country_source|                 url|
+-----------------+--------------+--------------------------+--------------------+--------------------+--------------+--------------------+
| 20171123073000-0|20171123073000|                         1|             nrl.com|                null|          null|                null|
| 20171123073000-1|20171123073000|                         1|         360nobs.com|1#United States#U...|          null|                null|
| 20171123073000-2|20171123073000|                         1|            ann7.com|4#Durban, Kwazulu...|  South Africa|            ann7.com|
| 20171123073000-3|20171123073000|                         1|          openpr.com|1#Japan#JA#JA#36#...|          null|                null|
| 20171123073000-4|2

In [75]:
# extract and save urls that have no country 
unknown_urls = gkg_Q1_country.filter("country_source is null")\
                             .select('SourceCommonName').distinct()
unknown_urls.coalesce(1).write.csv(DATA_LOCAL+ "unknown_urls")

In [73]:
# filter out countries with unknown url
gkg_Q1_country_clean = gkg_Q1_country.filter(gkg_Q1_country.country_source.isNotNull())

In [74]:
print('unknown urls ', unknown_urls.count())
print('known urls', gkg_Q1_country_clean.select('country_source').distinct().count())

unknown urls  438
known urls 80


In [77]:
# get location of event
gkg_df.select("Locations").show(5,False)

+---------------------------------------------------------------------------------------------------------------------------+
|Locations                                                                                                                  |
+---------------------------------------------------------------------------------------------------------------------------+
|null                                                                                                                       |
|1#United States#US#US#39.828175#-98.5795#US;1#Nigeria#NI#NI#10#8#NI;1#United Kingdom#UK#UK#54#-4#UK                        |
|4#Durban, Kwazulu-Natal, South Africa#SF#SF02#-29.85#31.0167#-1224926                                                      |
|1#Japan#JA#JA#36#138#JA;1#United States#US#US#39.828175#-98.5795#US;1#Netherlands#NL#NL#52.5#5.75#NL;1#Spain#SP#SP#40#-4#SP|
|null                                                                                                                 

### Are we emotionally biased? 
**Do the number of conflicts or their distance from our home define our emotions? Is there an underlying trend of a more positive or negative news perception over time?**

Fetch the following:

- get the time of the event
- get the country of the source ("SourceCollectionIdentifier" = 1 (only web), "SourceCommonName", GKG)
- get the country of the event ('Locations', GKG -> longitude & latitude)
- get the average tone of the event ("AvgTone", EXPORT)
- get the ethnicity ('Actor1EthnicCode', Actor1Religion1Code, Actor1Religion2Code, Actor2EthnicCode', Actor2Religion1Code, Actor2Religion2Code, EXPORT) 
- Goldstein scale (GoldsteinScale, EXPORT)

Analysis:

- calculate the distance between the source and the country (get longitude & latitude of every country of the source -> compute euclidean distance)
- dependence of average tone and distance
- aggregate per country and average over average tone -> dependence between events per country and average tone
- aggregate per religion/ethnicity and average over average tone (and/ or Goldstein) -> look at dependence
- repeat the analysis in time bins

Visualization:

Discussion Thursday:
- open all the files within 1 hour/ 1 day
- count number of mentions an event gets and compare it to the number of mentions given
- get the country from the url in mentions
- calculate distance between country_source (from capital of country) and event_country ("ActionGeo_Lat, and Long from EVENTS)
- aggregate mentions over countries_source and event_country
- MentionDocTone for each source might/will allow to get the emotion of the source country


### Are some countries ignored in the news? 
**Is the number of conflicts taking place in a country in relation with the number of mentions in the media depending on where the conflict has happened?**

Fetch the following:

- country of the event (see above)
- number of mentions (NumMentions, NumSources, NumArticles, EXPORT)

Analysis:

- aggregate per country and average over the number of mentions (which one?, or maybe all and then see) -> dependence between country and number of records

### Are we emotionally predictable? 
**Can we observe patterns of emotions with respect to a country, religion or an ethnical group? Can we derive a model predicting emotions in case of a new conflict based on its specific features?**

Fetch the following:

- 

- group events by location, religion, ethnicity
- average over average tone for this group