In [2]:
import os
import json
import csv
import tld
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [3]:
from dask.distributed import Client
import dask.bag as db
import dask.dataframe as dd

client = Client(n_workers=4, memory_limit="4 GiB")

In [4]:
DATASET_PATH = "datasets"

filename = "raw_quotes/quotes-2020.json"

dataset = db.read_text(os.path.join(DATASET_PATH, filename)).map(json.loads)

In [5]:
def remove_probability_of_others_then_most(quote):
    del quote["probas"]
    return quote

In [6]:
def extract_probability_of_most_prob_speaker(quote):
    quote["speaker_prob"] = float(quote["probas"][0][1])
    return quote

In [7]:
def extract_only_one_qid(quote):
    quote["qids"] = quote["qids"] if len(quote["qids"]) == 1 else None
    return quote

In [8]:
with open('datasets/GDELTDOMAINSBYCOUNTRY-MAY2018.TXT') as csvfile:
    reader = csv.reader(csvfile, delimiter='\t')
    next(reader, None) # skip header
    domains_by_country_dict = {rows[0]:rows[1] for rows in reader}

In [9]:
def convert_domain_to_country(quote, domains_by_country_dict):
    #add countries list to data based on the GDELT dataset
    countries_set = set()
    for url in quote["urls"]:
        domain_name = tld.get_fld(url)
        if domain_name in domains_by_country_dict:
            countries_set.add(domains_by_country_dict[domain_name])
    quote["url_countries"] = list(countries_set) if len(countries_set) != 0  else None
    return quote


In [10]:
ds1 = dataset.map(extract_probability_of_most_prob_speaker)
ds2 = ds1.map(remove_probability_of_others_then_most)
ds3 = ds2.map(extract_only_one_qid)
ds4 = ds3.map(convert_domain_to_country, domains_by_country_dict)
dataset = ds4
dataset.filter(lambda quote: len(quote["url_countries"])>1 if quote["url_countries"] else 0 ).take(5)

({'quoteID': '2020-04-08-001814',
  'quotation': 'After 10 years of growth we will experience a recession this year,',
  'speaker': 'Peter Altmaier',
  'qids': ['Q65539'],
  'date': '2020-04-08 07:54:20',
  'numOccurrences': 5,
  'urls': ['http://www.breitbart.com/news/france-germany-face-historic-economic-declines/',
   'https://www.ibtimes.com/german-economy-shrink-nearly-10-q2-experts-2954923',
   'https://www.financialexpress.com/economy/europes-biggest-german-economy-to-shrink-by-nearly-10-in-q2-say-experts/1922698/',
   'http://www.courthousenews.com/german-economy-takes-a-staggering-hit/',
   'https://www.business-standard.com/article/pti-stories/france-germany-face-historic-economic-declines-120040801280_1.html'],
  'phase': 'E',
  'speaker_prob': 0.7357,
  'url_countries': ['IN', 'US']},
 {'quoteID': '2020-02-16-002091',
  'quotation': 'an expectation of the contact Aboriginal and Torres Strait Islander people and particularly young people will have with police',
  'speaker': 

In [11]:
def flatten(record):
    return {
        "quoteID": record["quoteID"],
        "quotation": record["quotation"],
        "speaker": record["speaker"],
        "qids": record["qids"],
        "date": record["date"],
        "numOccurrences": record["numOccurrences"],
        "urls": record["urls"],
        "phase": record["phase"],
        "speaker_prob": record["speaker_prob"],
        "url_countries": record["url_countries"],
    }

In [13]:
df = dataset.map(flatten).to_dataframe()#.compute()
df = df.repartition(npartitions=10000).compute()



KilledWorker: ("('bag-from-delayed-convert_domain_to_country-extract_only_one_qid-extract_probability_of_most_prob_speaker-file_to_bl--70f', 0)", <WorkerState 'tcp://127.0.0.1:52237', name: 0, status: closed, memory: 0, processing: 1>)

In [None]:
print(len(df))