In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder \
     .appName("Test") \
     .getOrCreate()

In [2]:
import urllib.request  # lib that handles URLs
import io
import pandas as pd
pd.options.display.max_rows=250
import numpy as np
from datetime import datetime
from datetime import timedelta

import matplotlib.pyplot as plt
plt.style.use('seaborn')
import matplotlib.ticker as ticker
from matplotlib.ticker import FormatStrFormatter, FuncFormatter

import pyspark.sql.functions as func

In [3]:
target_url="https://raw.githubusercontent.com/linwoodc3/gdelt2HeaderRows/master/schema_csvs/GDELT_2.0_Events_Column_Labels_Header_Row_Sep2016.csv"
data = urllib.request.urlopen(target_url).read().decode('utf8')
formats = pd.read_csv(io.StringIO(data))
colnames_events = list(formats["tableId"])

In [4]:
df_events = spark.read.format("csv") \
    .option("header", "false") \
    .option("delimiter","\t") \
    .option("inferSchema", "true") \
    .load(path= ["s3://gdelt-open-data/v2/events/2017*.csv",
                "s3://gdelt-open-data/v2/events/2018*.csv"])

In [5]:
df_events = df_events.toDF(*colnames_events)



In [6]:
df_events = df_events.na.drop("all")



In [8]:
df_events.columns

['GLOBALEVENTID',
 'SQLDATE',
 'MonthYear',
 'Year',
 'FractionDate',
 'Actor1Code',
 'Actor1Name',
 'Actor1CountryCode',
 'Actor1KnownGroupCode',
 'Actor1EthnicCode',
 'Actor1Religion1Code',
 'Actor1Religion2Code',
 'Actor1Type1Code',
 'Actor1Type2Code',
 'Actor1Type3Code',
 'Actor2Code',
 'Actor2Name',
 'Actor2CountryCode',
 'Actor2KnownGroupCode',
 'Actor2EthnicCode',
 'Actor2Religion1Code',
 'Actor2Religion2Code',
 'Actor2Type1Code',
 'Actor2Type2Code',
 'Actor2Type3Code',
 'IsRootEvent',
 'EventCode',
 'EventBaseCode',
 'EventRootCode',
 'QuadClass',
 'GoldsteinScale',
 'NumMentions',
 'NumSources',
 'NumArticles',
 'AvgTone',
 'Actor1Geo_Type',
 'Actor1Geo_FullName',
 'Actor1Geo_CountryCode',
 'Actor1Geo_ADM1Code',
 'Actor1Geo_ADM2Code',
 'Actor1Geo_Lat',
 'Actor1Geo_Long',
 'Actor1Geo_FeatureID',
 'Actor2Geo_Type',
 'Actor2Geo_FullName',
 'Actor2Geo_CountryCode',
 'Actor2Geo_ADM1Code',
 'Actor2Geo_ADM2Code',
 'Actor2Geo_Lat',
 'Actor2Geo_Long',
 'Actor2Geo_FeatureID',
 'ActionGe

In [12]:
df_events.select("EventRootCode", "EventBaseCode", "EventCode").show(10)

+-------------+-------------+---------+
|EventRootCode|EventBaseCode|EventCode|
+-------------+-------------+---------+
|           08|          080|      080|
|           06|          060|      060|
|           04|          040|      040|
|           17|          173|      173|
|           11|          112|      112|
|           17|          173|      173|
|           02|          020|      020|
|           01|          010|      010|
|           01|          013|      013|
|           02|          020|      020|
+-------------+-------------+---------+
only showing top 10 rows



In [7]:
df_events_edited = df_events.select("GlobalEventID", "SQLDATE", "MonthYear", "Actor1Name", "Actor2Name", "EventCode", 
                                    "GoldsteinScale", "NumMentions", "NumSources", "NumArticles", "AvgTone", "ActionGeo_CountryCode", 
                                    "ActionGeo_Fullname", "ActionGeo_FeatureID",  "SOURCEURL")

In [8]:
df_events_edited.cache()

DataFrame[GlobalEventID: int, SQLDATE: int, MonthYear: int, Actor1Name: string, Actor2Name: string, EventCode: string, GoldsteinScale: double, NumMentions: int, NumSources: int, NumArticles: int, AvgTone: double, ActionGeo_CountryCode: string, ActionGeo_Fullname: string, ActionGeo_FeatureID: string, SOURCEURL: string]

In [17]:
#find out how many distinct events are recorded
df_events_edited.select("GlobalEventID").distinct().count()

127869166

In [23]:
from pyspark.sql.functions import col

df_events_edited.where(col("Actor1Name").isNotNull()).count()

116055590

In [29]:
#find how many countries are represented in the events dataset
df_events_edited.select("ActionGeo_CountryCode").distinct().count()

262

In [15]:
df_events_edited.write.parquet("s3://bigdataproject-pr/filtered_events.parquet", mode = "overwrite")


In [None]:
df_events = spark.read.parquet("s3://bigdataproject-pr/filtered_events.parquet")
df_events.cache()

In [30]:
relevant_events = df_events_edited.where(df_events_edited.SOURCEURL.rlike('trump'))
relevant_events = relevant_events.orderBy(relevant_events.NumMentions.desc())

In [64]:
top_50_countries_by_mentions = relevant_events.take(50)

In [54]:
df1 = top_50_countries_by_mentions.groupBy("ActionGeo_CountryCode")\
    .agg(func.mean("AvgTone"))\
    .collect()

AttributeError: 'list' object has no attribute 'groupBy'

In [31]:
pip install pandas-bokeh

Collecting pandas-bokeh
  Downloading https://files.pythonhosted.org/packages/73/e5/2d2ff2d7d91277f5b80c3415bcfe961030f953d3057c9690b5460c3b6995/pandas_bokeh-0.2-py2.py3-none-any.whl
Installing collected packages: pandas-bokeh
Successfully installed pandas-bokeh-0.2
Note: you may need to restart the kernel to use updated packages.


In [32]:
import pandas as pd
import pandas_bokeh
pandas_bokeh.output_notebook()

In [63]:

top_50_countries_by_mentions= pd.DataFrame(top_50_countries_by_mentions)
top_50_countries_by_mentions.head()

Unnamed: 0,GlobalEventID,SQLDATE,MonthYear,Actor1Name,Actor2Name,EventCode,GoldsteinScale,NumMentions,NumSources,NumArticles,AvgTone,ActionGeo_CountryCode,ActionGeo_Fullname,ActionGeo_FeatureID,SOURCEURL
0,,,,,,,,,,,,,,,
1,,,,,,,,,,,,,,,
2,,,,,,,,,,,,,,,
3,,,,,,,,,,,,,,,
4,,,,,,,,,,,,,,,


In [41]:
df= pd.DataFrame(df)
df.plot_bokeh.bar(
    ylabel="AvgTone", 
    title="Sentiment By Country", 
    alpha=0.6)

In [33]:
relevant_events.printSchema

<bound method DataFrame.printSchema of DataFrame[GlobalEventID: int, SQLDATE: int, MonthYear: int, Actor1Name: string, Actor2Name: string, EventCode: string, GoldsteinScale: double, NumMentions: int, NumSources: int, NumArticles: int, AvgTone: double, ActionGeo_CountryCode: string, ActionGeo_Fullname: string, ActionGeo_FeatureID: string, SOURCEURL: string]>

In [11]:
relevant_events["SQLDATE"] = pd.to_datetime(relevant_events["SQLDATE"].astype(str),format='%Y%m%d', errors="coerce")

In [12]:
relevant_events = relevant_events.sort_values(by = "SQLDATE")

Object `plot_bokeh` not found.


In [26]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler

stringInputs = ["Actor1Name", "Actor2Name", "EventCode", "Actor1Geo_FullName", "Actor2Geo_FullName", "SOURCEURL"]

indexer = [StringIndexer(inputCol=column, outputCol=column + "_IX") for column in stringInputs]

In [11]:
from pyspark.ml.feature import RFormula
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol='features', labelCol='label', maxIter=10)




In [12]:
from pyspark.ml import Pipeline, Model

pipeline = Pipeline(stages=[transformer, lr])
model_rf = pipeline.fit(relevant_events_2017)


KeyboardInterrupt: 

In [21]:
predictions = model_rf.transform(relevant_events_2018)

DataFrame[GLOBALEVENTID: int, SQLDATE: int, MonthYear: int, Year: int, FractionDate: double, Actor1Code: string, Actor1Name: string, Actor1CountryCode: string, Actor1KnownGroupCode: string, Actor1EthnicCode: string, Actor1Religion1Code: string, Actor1Religion2Code: string, Actor1Type1Code: string, Actor1Type2Code: string, Actor1Type3Code: string, Actor2Code: string, Actor2Name: string, Actor2CountryCode: string, Actor2KnownGroupCode: string, Actor2EthnicCode: string, Actor2Religion1Code: string, Actor2Religion2Code: string, Actor2Type1Code: string, Actor2Type2Code: string, Actor2Type3Code: string, IsRootEvent: int, EventCode: string, EventBaseCode: string, EventRootCode: string, QuadClass: int, GoldsteinScale: double, NumMentions: int, NumSources: int, NumArticles: int, AvgTone: double, Actor1Geo_Type: int, Actor1Geo_FullName: string, Actor1Geo_CountryCode: string, Actor1Geo_ADM1Code: string, Actor1Geo_ADM2Code: string, Actor1Geo_Lat: double, Actor1Geo_Long: string, Actor1Geo_FeatureID

In [22]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

DataFrame[GLOBALEVENTID: int, SQLDATE: int, MonthYear: int, Year: int, FractionDate: double, Actor1Code: string, Actor1Name: string, Actor1CountryCode: string, Actor1KnownGroupCode: string, Actor1EthnicCode: string, Actor1Religion1Code: string, Actor1Religion2Code: string, Actor1Type1Code: string, Actor1Type2Code: string, Actor1Type3Code: string, Actor2Code: string, Actor2Name: string, Actor2CountryCode: string, Actor2KnownGroupCode: string, Actor2EthnicCode: string, Actor2Religion1Code: string, Actor2Religion2Code: string, Actor2Type1Code: string, Actor2Type2Code: string, Actor2Type3Code: string, IsRootEvent: int, EventCode: string, EventBaseCode: string, EventRootCode: string, QuadClass: int, GoldsteinScale: double, NumMentions: int, NumSources: int, NumArticles: int, AvgTone: double, Actor1Geo_Type: int, Actor1Geo_FullName: string, Actor1Geo_CountryCode: string, Actor1Geo_ADM1Code: string, Actor1Geo_ADM2Code: string, Actor1Geo_Lat: double, Actor1Geo_Long: string, Actor1Geo_FeatureID

In [5]:
spark.stop()

NameError: name 'spark' is not defined