# Data301 Project
#### Student: jli328 (Junwei Liang)
#### Student Number: 91925811

# Background:
#### Period: 12-29-2019 - Now

In [1]:
covid_start_date = '2019 DEC 12'

# Setup Libraries

In [None]:
# library and code setup
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# pyspark
!pip install -q pyspark
# newspaper3k for text analyze
!pip install newspaper3k
# gdelt for python
!pip install gdelt
# Test analyze
!pip install beautifulsoup4


import pyspark, os
from pyspark import SparkConf, SparkContext
os.environ["PYSPARK_PYTHON"]="python3"
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64/"

# Spark Setup and Debug function

In [3]:
#start spark local server
import sys, os
from operator import add
import time

os.environ["PYSPARK_PYTHON"]="python3"

import pyspark
from pyspark import SparkConf, SparkContext

#connects our python driver to a local Spark JVM running on the Google Colab server virtual machine
try:
  conf = SparkConf().setMaster("local[*]").set("spark.executor.memory", "1g")
  sc = SparkContext(conf = conf)
except ValueError:
  #it's ok if the server is already started
  pass

# A debug function used to print the rdd
def dbg(x):
  """ A helper function to print debugging information on RDDs """
  if isinstance(x, pyspark.RDD):
    print([(t[0], list(t[1]) if 
            isinstance(t[1], pyspark.resultiterable.ResultIterable) else t[1])
           if isinstance(t, tuple) else t
           for t in x.take(100)])
  else:
    print(x)

# Remove previous files

In [4]:
# !rm -r *

# Fetch gdelt data using gdeltPyR library

In [5]:
from concurrent.futures import ProcessPoolExecutor
from datetime import date, timedelta
import pandas as pd
import warnings
import gdelt
import os

# Ignore the future warning
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=UserWarning)

# set up gdeltpyr for version 2
gd = gdelt.gdelt(version=2)

# multiprocess the query
e = ProcessPoolExecutor()

# generic functions to pull and write data to disk based on date
def get_filename(x):
  # This will be some things like 20161101(YearMonthDay)
  date = x.strftime('%Y%m%d')
  return "{}_gdeltdata.csv".format(date)

# Write retrived data into files
def intofile(filename):
    try:
        if not os.path.exists("csvfiles_before"):
           os.mkdir("csvfiles_before")
        # If we dont have the file
        if not file_exists(filename):
          date = filename.split("_")[0]
          d = gd.Search(date, table='events', coverage=False) # not updata at 15mins
          d.to_csv('csvfiles_before/'+filename, encoding='utf-8', index=False)
        return 'csvfiles_before/'+filename
    except Exception as e:
          print(e)
          print("Error occurred while retriving the " + date + " events tables")

def file_exists(filename):
    # Return true if file exist in the storage
    return os.path.exists(filename)

# pull the data from gdelt into multi files; this may take a long time
# I set the data for one month, so that this notebook can be run in a reasonable time (about 30 mins)
files_before_covid = [get_filename(x) for x in pd.date_range('2019 Nov 12', covid_start_date)]
# pull the data base on the date and write them into files.
success_written_files = list(e.map(intofile, files_before_covid))

In [6]:
# multiprocess the query
e = ProcessPoolExecutor()

# generic functions to pull and write data to disk based on date
def get_filename(x):
  # This will be some things like 20161101(YearMonthDay)
  date = x.strftime('%Y%m%d')
  return "{}_gdeltdata.csv".format(date)

# Write retrived data into files
def intofile(filename):
    try:
        if not os.path.exists("csvfiles_after"):
           os.mkdir("csvfiles_after")
        # If we dont have the file
        if not file_exists(filename):
          date = filename.split("_")[0]
          d = gd.Search(date, table='events', coverage=False) # not updata at 15mins
          d.to_csv('csvfiles_after/'+filename, encoding='utf-8', index=False)
        return 'csvfiles_after/'+filename
    except Exception as e:
          print(e)
          print("Error occurred while retriving the " + date + " events tables")

def file_exists(filename):
    # Return true if file exist in the storage
    return os.path.exists(filename)

# pull the data from gdelt into multi files; this may take a long time
# I set the data for one month, so that this notebook can be run in a reasonable time (about 30 mins)
files_after_covid = [get_filename(x) for x in pd.date_range(covid_start_date,'2020 Jan 12')]
# pull the data base on the date and write them into files.
success_written_files = list(e.map(intofile, files_after_covid))

# Read downloaded files into RDDs

In [7]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
# Get the data
data_before_covid = (sqlContext.read
                               .option("header", "true")
                               .csv("csvfiles_before"))
data_after_covid = (sqlContext.read
                              .option("header", "true")
                              .csv("csvfiles_after"))

# Category events into different types

In [8]:
# Manually set the event types 
# Event codes categorised into positive, neutral, and negative events.
# Neutral events like: Decline comment, Appeal to yield..
neutral_events = ['011', '019', '020', '024', '0241', '0242', '025', '0253', 
                  '0341', '0342', '0343', '0344', '035', '040', '041', '042',
                  '043', '044', '045', '046', '080', '083', '0831', '0832', '0833',
                  '0834', '084', '0841', '0842', '090', '091', '092', '093',
                  '094', '100', '104', '1041', '1042', '1043', '1044', '105', 
                  '106', '107', '108', '110', '123', '1231', '1232', '1233', 
                  '1234', '124', '125', '126', '127', '128', '129', '140', 
                  '141', '1411', '1412', '1413', '1414', '150', '160', '166', '170']

# Negative events like: Make pessimistic comment, Make empathetic comment..
negative_events = ['012', '016', '111', '112', '1121', '1122', '1123', '1124',
                   '1125', '113', '115', '116', '120', '121', '1211', '1212', 
                   '122', '1221', '1222', '1223', '1224', '1241', '1242',
                   '1243', '1244', '1245', '1246', '130', '131', '1311', 
                   '1312', '1313', '132', '1321', '1322', '1323', '1324', 
                   '133', '134', '135', '136', '137', '1381', '138114', 
                   '1382', '1383', '1384', '1385', '139', '142', '1421', 
                   '1422', '1423', '1424', '143', '1431', '1432', '1433', 
                   '1434', '144', '1441', '1442', '1443', '1444', '145', 
                   '1451', '1452', '1453', '1454', '161', '162', '1621', 
                   '1622', '1623', '163', '164', '165', '1661', '1662', 
                   '1663', '171', '1711', '1712', '172', '1721', '1722', 
                   '1723', '1724', '173', '174', '175', '176', '180', '181', 
                   '182', '1821', '1822', '1823', '183', '1831', '1832', 
                   '1833', '1834', '184', '185', '186', '190195', '191', 
                   '192', '193', '194', '1951', '1952', '196', '200', '201',
                   '202', '203', '204', '2041', '2042']

# Positive events like: Make optimistic comment, Consider policy option..
positive_events = ['013', '014', '015', '017', '018', '021', '0211', '0212',
                   '0213', '0214', '022', '022', '023', '0231', '0232', '0233',
                   '0234', '0243', '0244', '0251', '0252', '0254', '0255', 
                   '0256', '026', '027', '028', '030', '031', '0311', '0312',
                   '0313', '0314', '032', '032', '033', '0331', '0332', '0333', 
                   '0334', '034', '0351', '0352', '0353', '0354', '0355', '0356', 
                   '036', '037', '038', '039', '050', '051', '052', '053', 
                   '054', '055', '056', '057', '060', '061', '062', '063',
                   '064', '070', '071', '072', '073', '074', '075', '081', 
                   '0811', '0812', '0813', '0814', '082', '085', '086', '0861',
                   '0862', '0863', '087', '0871', '0872', '0873', '0874',
                   '101', '1011', '1012', '1013', '1014', '102', '103', '1031',
                   '1032', '1033', '1034', '1051', '1052', '1053', '1054', 
                   '1055', '1056', '150', '151', '152', '153', '154', '155']

# Take the event code and return the event type it belongs to. If it doesn't match any of them, return 'Unknown'
def event_sign(event_code):
  return ("Positive" if event_code in positive_events else
          "Neutral" if event_code in neutral_events else
          "Negative" if event_code in negative_events else "Unknown")

# Target Countries

In [9]:
# These will be the ActorlCountryCode in the event table
target_countries = ['USA', 'CAN', 'MEX', 'GBR', 
                    'DEU', 'NZL', 'CHN', 'RUS']

In [None]:
from newspaper import Article
import urllib
from urllib.request import Request, urlopen, urlretrieve

# Retrieve the url from row
def get_urls(country, row):
    return (country, row['SOURCEURL'])

# Get the web page(html) and write it into files
def write_wget(withid):
    country = withid[0][0]
    url = withid[0][1]
    id = withid[1]
    # Create 'articles' directory if we don't have one
    if not os.path.exists(f'{country}_articles'):
        os.mkdir(f'{country}_articles')
    s = f'{country}_articles/{str(id)}.html'
    dst = os.getcwd() + '/' + s
    # If we don't have the article yet
    if not os.path.exists(dst):
      try:
          # Pull the url and write it into destination file
          urlretrieve(url, dst)
          print(s,"completed")
      except urllib.error.HTTPError as e: #catch errors and peek some, heaps were 403
          if e.code == 403: # 403 blocked so opening as user-agent
            try: # Not sure if needed but better safe
              req = Request(url, headers={'User-Agent': 'Mozilla/5.0'})
              web_byte = urlopen(req).read()
              webpage = web_byte.decode('utf-8')
              f = open(dst, "a")
              f.write(webpage)
              print(s,"completed")
            except Exception as e: #catch unknown errors, and skip them
              print("failed due to ", e ,", skipping ", s)
          else:
            print(e.code, url, id, s) # attn data301, if you get non 404 errors you can handle them like above with "if e.code == 403"
      except Exception as e:
          print("Non http error occured while retrieving " + url + " id: " + id) # Catch all other errors
    return id

# Return the filename if the file exist, otherwise return None
def test_file_exist(x):
    if os.path.exists(f'{os.getcwd()}/USA_articles/{x[1]}.html'):
      return x
    else:
      return None

countries_allids_before_covid = dict()
countries_allids_after_covid = dict()
# Retrieve urls from rows
for country in target_countries:
    all_urls_withids_before_covid = (data_before_covid.rdd
                                    .filter(lambda row: row['Actor1CountryCode']==country)
                                    .map(lambda row: get_urls(country, row))
                                    .zipWithUniqueId())
    all_urls_withids_after_covid = (data_after_covid.rdd
                                    .filter(lambda row: row['Actor1CountryCode']==country)
                                    .map(lambda row: get_urls(country, row))
                                    .zipWithUniqueId())
    # Create a new executor
    e = ProcessPoolExecutor(16)
    # Retrieve the url and write it into files
    e.map(write_wget, all_urls_withids_before_covid.collect()+all_urls_withids_after_covid.collect())
    # Revalidate the files and keeps those files we successfully pulled
    countries_allids_before_covid[country] = all_urls_withids_before_covid.map(test_file_exist).filter(lambda x: x is not None)
    countries_allids_after_covid[country] = all_urls_withids_after_covid.map(test_file_exist).filter(lambda x: x is not None)

# Calculate similarity

In [None]:
import numpy as np

def get_country_pos_neg_summary(data, target_countries):
    # ((Country, EventSign), 1) and filter out None value
    country_event_types = (data.rdd
                               .map(lambda row: ((row['Actor1CountryCode'], event_sign(row['EventCode'])), 1))
                               .filter(lambda x: x[0][0] is not None)) # Remove unexist files
    # Sum up all the eventSign for each country
    summary = country_event_types.reduceByKey(lambda a, b: a+b)
    # Only keeps the countries we are interested
    country_summary = summary.filter(lambda countries_eventSign_count: countries_eventSign_count[0][0] in target_countries)
    # Return a map {(Country, EventSign), count}
    return {countries_eventSign : count for countries_eventSign, count in country_summary.collect()}

# Get tone for each country
def get_country_tone(data, target_countries):
    country_tone = (data.rdd
                        .map(lambda row: (row['Actor1CountryCode'], float(row['AvgTone'])))
                        .filter(lambda x: x[0] is not None) # Remove unexist files
                        .filter(lambda country_tone: country_tone[0] in target_countries)
                        .groupByKey()
                        .map(lambda x: (x[0], get_average_tone(x[1]))))
    return country_tone.collect()

# Get the average tone
def get_average_tone(data):
    counter = 0
    for average_tone in data:
        counter += average_tone
    return counter / len(data)

def get_vector(result, country):
    # Get the country's count and add it to the array
    return np.array([result[(country, 'Positive')],
                     result[(country, 'Negative')], 
                     result[(country, 'Neutral')],
                     result[(country, 'Unknown')]])

def cos_similarity(v1, v2):
    # Calculate the cosin similarity
    return v1.dot(v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))

def print_similarity(country, target_countries, before_dict, after_dict):
    for c in target_countries:
        before_similarity = cos_similarity(before_dict[country], before_dict[c])
        after_similarity = cos_similarity(after_dict[country], after_dict[c])
        print(country, '->', c, "Similarity before -> after ", before_similarity, "->", after_similarity)
        print(country, '->', c, "Reaction similarity ", after_similarity-before_similarity)

# EventSign Positive, Negative, Neutral, Unknown
summary_before_covid = get_country_pos_neg_summary(data_before_covid, target_countries)
summary_after_covid = get_country_pos_neg_summary(data_after_covid, target_countries)
# Countries tone
country_tone_before_covid = get_country_tone(data_before_covid, target_countries)
country_tone_after_covid = get_country_tone(data_after_covid, target_countries)
# Get vectors for each country
before = {country : get_vector(summary_before_covid, country) for country in target_countries}
after = {country : get_vector(summary_after_covid, country) for country in target_countries}
# Average tone
average_tone_before = {country : average_tone for country, average_tone in country_tone_before_covid}
average_tone_after = {country : average_tone for country, average_tone in country_tone_after_covid}
# Print the result
for country in target_countries:
    print(country)
    print("Vectors before -> after ")
    print("         [Positive, Negative, Neutral, Unknown]")
    print("Before", before[country])
    print("After", after[country])
    print_similarity(country, target_countries, before, after)
    print("AverageTone before -> after")
    print("Before", average_tone_before[country])
    print("After", average_tone_after[country])
    print()


# Keywords

In [None]:
import time
import csv
from bs4 import BeautifulSoup

# Caution: This process will take you about 40mins
# all keywords before
def output_keywords(htmlfilepath):
  try:
    country = htmlfilepath.split('_')[0]
    id = htmlfilepath.split('/')[1].split('.')[0]
    # If the html file doesn't exist
    if not os.path.exists(f'{country}_keywords_before'):
        os.mkdir(f'{country}_keywords_before')
    # If we haven't got the file
    if not os.path.exists(f"{country}_keywords_before/{id}.txt"):
      # Create one directory if we haven't got one.
      if not os.path.exists(f'{country}_keywords_before'):
          os.mkdir(f'{country}_keywords_before')
      filenameurl = "file://" + os.getcwd() + '/'+ htmlfilepath
      with open(htmlfilepath) as f:
          s = f.read()
          soup = BeautifulSoup(s, features="html.parser")
          # kill all script and style elementsi
          for script in soup(["script", "style"]):
              script.extract()
          # get text
          text = soup.get_text()
          # break into lines and remove leading and trailing space on each
          lines = (line.strip() for line in text.splitlines())
          # break multi-headlines into a line each
          chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
          # drop blank lines
          text = '\n'.join(chunk for chunk in chunks if chunk)
      # Write the keywords into file
      with open(f"{country}_keywords_before/{id}.txt", 'w') as keywords_file:
          keywords_file.write(text)
      print(country, id)
      return f"{country}_keywords_before/{id}.txt"
  except Exception as e:
    print("Exception throught while on ", htmlfilepath)
    print("Exception: " + str(e))
    return ""

# Return the filepath of the article
def get_article_filepath(country, id):
    return f'{country}_articles/{str(id)}.html'

# keywords_count_before = dict()
for country in target_countries:
  all_article_filepath_before = (countries_allids_before_covid[country].map(lambda x:x[1])
                                  .map(lambda id: get_article_filepath(country, id)))
  # Create executors
  e = ProcessPoolExecutor(16)
  e.map(output_keywords, all_article_filepath_before.collect())
  # Wait until all finish
  e.shutdown(wait=True)

In [None]:
# all keywords after
def output_keywords(htmlfilepath):
  try:
      country = htmlfilepath.split('_')[0]
      id = htmlfilepath.split('/')[1].split('.')[0]
      if not os.path.exists(f'{country}_keywords_after'):
          os.mkdir(f'{country}_keywords_after')
      # htmlfilepath = f'{country}_articles/{str(id)}.html'
      filenameurl = "file://" + os.getcwd() + '/'+ htmlfilepath
      if not os.path.exists(f'{country}_keywords_after'):
          return ""
      print(country, id)
      with open(htmlfilepath) as f:
          s = f.read()
          soup = BeautifulSoup(s, features="html.parser")
          # kill all script and style elementsi
          for script in soup(["script", "style"]):
              script.extract()    # rip it out
          # get text
          text = soup.get_text()
          # break into lines and remove leading and trailing space on each
          lines = (line.strip() for line in text.splitlines())
          # break multi-headlines into a line each
          chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
          # drop blank lines
          text = '\n'.join(chunk for chunk in chunks if chunk)
      if not os.path.exists(f"{country}_keywords_after/{id}.txt"):
          with open(f"{country}_keywords_after/{id}.txt", 'w') as keywords_file:
              keywords_file.write(text)
      return f"{country}_keywords_after/{id}.txt"
  except Exception as e:
    print(htmlfilepath)
    print("Exception: " + str(e))
    return ""

def get_article_filepath(country, id):
    return f'{country}_articles/{str(id)}.html'

# keywords_count_before = dict()
for country in target_countries:
    all_article_filepath_after = (countries_allids_after_covid[country].map(lambda x:x[1])
                                   .map(lambda id: get_article_filepath(country, id)))
    # Create
    e = ProcessPoolExecutor(16)
    e.map(output_keywords, all_article_filepath_after.collect())
    # Wait
    e.shutdown(wait=True)

In [14]:
def readAndSplitFileIntoRdd(dirpath):
  # load the file into a distributed dataset of lines
  file = sc.wholeTextFiles(dirpath)
  # split each line into (word, 1) tuples
  words = (file
           .map(lambda x: x[1])
           .flatMap(lambda line: [(word.lower(), 1) for word in line.split(" ")]))
  # reduce by key (the word) the counts and sort descending
  result = (words
            .reduceByKey(lambda a, b: a + b)
            .sortBy(lambda x: x[1], False))
  return result

In [None]:
# sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")
from math import log

for country in target_countries:
    keywords_before = readAndSplitFileIntoRdd(f'{country}_keywords_before')
    hightest_count_before = keywords_before.lookup("the")[0]
    TF_before = (keywords_before.map(lambda term : (term[0], term[1] / hightest_count_before)).cache())

    keywords_after = readAndSplitFileIntoRdd(f'{country}_keywords_after')
    hightest_count_after = keywords_after.lookup("the")[0]
    TF_after = (keywords_after.map(lambda term : (term[0], term[1] / hightest_count_after)).cache())

    IDFi = (keywords_before
            .union(keywords_after)
            .groupByKey()
            .mapValues(len)
            .map(lambda a : (a[0], log(4/a[1], 2)))
            .cache())
    # Attention. Dont broadcast this if the value is too large
    broadcast_IDFi = sc.broadcast(dict(IDFi.collect()))
    TF_IDF_before = (TF_before
                    .map(lambda term : (term[0], term[1]*broadcast_IDFi.value.get(term[0])))
                    .sortBy(lambda term : -term[1])
                    .cache())
    print(TF_IDF_before.take(100))

In [None]:
# # Save all resources
# !zip -o CAN_keywords_after.zip CAN_keywords_after/*
# !zip -o CHN_keywords_after.zip CHN_keywords_after/*
# !zip -o DEU_keywords_after.zip DEU_keywords_after/*
# !zip -o GBR_keywords_after.zip GBR_keywords_after/*
# !zip -o MEX_keywords_after.zip MEX_keywords_after/*
# !zip -o NZL_keywords_after.zip NZL_keywords_after/*
# !zip -o RUS_keywords_after.zip RUS_keywords_after/*
# !zip -o USA_keywords_after.zip USA_keywords_after/*
# # Save all resources
# !zip -o CAN_keywords_before.zip CAN_keywords_before/*
# !zip -o CHN_keywords_before.zip CHN_keywords_before/*
# !zip -o DEU_keywords_before.zip DEU_keywords_before/*
# !zip -o GBR_keywords_before.zip GBR_keywords_before/*
# !zip -o MEX_keywords_before.zip MEX_keywords_before/*
# !zip -o NZL_keywords_before.zip NZL_keywords_before/*
# !zip -o RUS_keywords_before.zip RUS_keywords_before/*
# !zip -o USA_keywords_before.zip USA_keywords_before/*
# # Save all resources
# !zip -o CAN_articles.zip CAN_articles/*
# !zip -o CHN_articles.zip CHN_articles/*
# !zip -o DEU_articles.zip DEU_articles/*
# !zip -o GBR_articles.zip GBR_articles/*
# !zip -o MEX_articles.zip MEX_articles/*
# !zip -o NZL_articles.zip NZL_articles/*
# !zip -o RUS_articles.zip RUS_articles/*
# !zip -o USA_articles.zip USA_articles/*
# !zip -o csvfiles_after.zip csvfiles_after/*
# !zip -o csvfiles_before.zip csvfiles_before/*

In [None]:
# !unzip CAN_keywords_before.zip
# !unzip CAN_keywords_before.zip
# !unzip CAN_keywords_after.zip
# !unzip CHN_keywords_before.zip
# !unzip CHN_keywords_after.zip
# !unzip DEU_keywords_before.zip
# !unzip DEU_keywords_after.zip
# !unzip GBR_keywords_before.zip
# !unzip GBR_keywords_after.zip
# !unzip MEX_keywords_before.zip
# !unzip MEX_keywords_after.zip
# !unzip NZL_keywords_before.zip 
# !unzip NZL_keywords_after.zip
# !unzip RUS_keywords_before.zip 
# !unzip RUS_keywords_after.zip y
# # Special unzip
# !sudo apt-get install fastjar
# !jar xvf USA_keywords_before.zip
# !jar xvf USA_keywords_after.zip