In [1]:
import os
import pyspark
import numpy as np
from datetime import datetime
import time
import csv
from operator import add
import json
from collections import Counter
from itertools import combinations

In [2]:
FILE_NAME = 'News_Final.csv'
sc = pyspark.SparkContext(appName='hw1_code', master='spark://spark-master:7077')
text_file = sc.textFile(os.path.join('../', 'dataset', FILE_NAME))

In [3]:
text_file.take(5)

['"IDLink","Title","Headline","Source","Topic","PublishDate","SentimentTitle","SentimentHeadline","Facebook","GooglePlus","LinkedIn"',
 '99248,"Obama Lays Wreath at Arlington National Cemetery","Obama Lays Wreath at Arlington National Cemetery. President Barack Obama has laid a wreath at the Tomb of the Unknowns to honor","USA TODAY","obama","2002-04-02 00:00:00",0,-0.0533001790889026,-1,-1,-1',
 '10423,"A Look at the Health of the Chinese Economy","Tim Haywood, investment director business-unit head for fixed income at Gam, discusses the China beige book and the state of the economy.","Bloomberg","economy","2008-09-20 00:00:00",0.208333333333333,-0.156385810542806,-1,-1,-1',
 '18828,"Nouriel Roubini: Global Economy Not Back to 2008","Nouriel Roubini, NYU professor and chairman at Roubini Global Economics, explains why the global economy isn\'t facing the same conditions","Bloomberg","economy","2012-01-28 00:00:00",-0.425210032135381,0.139754248593737,-1,-1,-1',
 '27788,"Finland GDP Ex

In [4]:
take_size = 3
top_n = 100
attr_idx = {
    attr_name:idx
    for row in csv.reader([text_file.first()], delimiter=',')
    for idx, attr_name in enumerate(row)
}
bc_attr_idx = sc.broadcast(attr_idx)
bc_attr_idx.value, sc.defaultParallelism

({'IDLink': 0,
  'Title': 1,
  'Headline': 2,
  'Source': 3,
  'Topic': 4,
  'PublishDate': 5,
  'SentimentTitle': 6,
  'SentimentHeadline': 7,
  'Facebook': 8,
  'GooglePlus': 9,
  'LinkedIn': 10},
 4)

In [5]:
headline = text_file.first()

def filter_headline(headline):
    def _filter_headline(row):
        return row != headline
    return _filter_headline

def get_attrs(row_gen):
    reader = csv.reader(row_gen, delimiter=',')
    for row in reader:
        yield row

attrs_rdd = text_file.filter(filter_headline(headline)).mapPartitions(get_attrs, preservesPartitioning=True)
text_file.filter(filter_headline(headline)).mapPartitions(get_attrs, preservesPartitioning=True).take(take_size)

[['99248',
  'Obama Lays Wreath at Arlington National Cemetery',
  'Obama Lays Wreath at Arlington National Cemetery. President Barack Obama has laid a wreath at the Tomb of the Unknowns to honor',
  'USA TODAY',
  'obama',
  '2002-04-02 00:00:00',
  '0',
  '-0.0533001790889026',
  '-1',
  '-1',
  '-1'],
 ['10423',
  'A Look at the Health of the Chinese Economy',
  'Tim Haywood, investment director business-unit head for fixed income at Gam, discusses the China beige book and the state of the economy.',
  'Bloomberg',
  'economy',
  '2008-09-20 00:00:00',
  '0.208333333333333',
  '-0.156385810542806',
  '-1',
  '-1',
  '-1'],
 ['18828',
  'Nouriel Roubini: Global Economy Not Back to 2008',
  "Nouriel Roubini, NYU professor and chairman at Roubini Global Economics, explains why the global economy isn't facing the same conditions",
  'Bloomberg',
  'economy',
  '2012-01-28 00:00:00',
  '-0.425210032135381',
  '0.139754248593737',
  '-1',
  '-1',
  '-1']]

In [6]:
def tokenizer(sent):
    return sent.lower().split()

def tokenize_title_headline(tokenizer):
    def _tokenize_title_headline(row_gen):
        for row in row_gen:
            title_idx, headline_idx = bc_attr_idx.value['Title'], bc_attr_idx.value['Headline']
            row[title_idx] = tokenizer(row[title_idx])
            row[headline_idx] = tokenizer(row[headline_idx])
            yield row
    return _tokenize_title_headline

tokenized_attrs_rdd = attrs_rdd.mapPartitions(tokenize_title_headline(tokenizer), preservesPartitioning=True)
attrs_rdd.mapPartitions(tokenize_title_headline(tokenizer)).take(take_size)

[['99248',
  ['obama', 'lays', 'wreath', 'at', 'arlington', 'national', 'cemetery'],
  ['obama',
   'lays',
   'wreath',
   'at',
   'arlington',
   'national',
   'cemetery.',
   'president',
   'barack',
   'obama',
   'has',
   'laid',
   'a',
   'wreath',
   'at',
   'the',
   'tomb',
   'of',
   'the',
   'unknowns',
   'to',
   'honor'],
  'USA TODAY',
  'obama',
  '2002-04-02 00:00:00',
  '0',
  '-0.0533001790889026',
  '-1',
  '-1',
  '-1'],
 ['10423',
  ['a', 'look', 'at', 'the', 'health', 'of', 'the', 'chinese', 'economy'],
  ['tim',
   'haywood,',
   'investment',
   'director',
   'business-unit',
   'head',
   'for',
   'fixed',
   'income',
   'at',
   'gam,',
   'discusses',
   'the',
   'china',
   'beige',
   'book',
   'and',
   'the',
   'state',
   'of',
   'the',
   'economy.'],
  'Bloomberg',
  'economy',
  '2008-09-20 00:00:00',
  '0.208333333333333',
  '-0.156385810542806',
  '-1',
  '-1',
  '-1'],
 ['18828',
  ['nouriel', 'roubini:', 'global', 'economy', 'not',

In [7]:
def word_count(row_gen):
    for row in row_gen:
        for word in row:
            yield (word, 1)

def get_col_by_idx(idx):
    def _get_col_by_idx(row_gen):
        for row in row_gen:
            yield row[idx]
    return _get_col_by_idx

def sort_idx(index):
    def _sort_idx(row):
        return row[index]
    return _sort_idx

def trans_by_idx(index, fn):
    def trans_by_idx(row_gen):
        for row in row_gen:
            row[index] = fn(row[index])
            yield row
    return trans_by_idx

def string_to_day(string, _format="%Y-%m-%d %H:%M:%S"):
    return datetime.strptime(string, _format).date()

def make_key_by_idx(index):
    def _make_key_by_idx(row_gen):
        for row in row_gen:
            yield (row[index], row)
    return _make_key_by_idx

def remove_key(row_gen):
    for row in row_gen:
        yield row[1]

def merge_attrs(a, b):
    title_idx, headline_idx = bc_attr_idx.value['Title'], bc_attr_idx.value['Headline']
    a[title_idx].update(b[title_idx])
    a[headline_idx].update(b[headline_idx])
    return a

def most_common(top_n):
    '''this is a very bad solution QQ'''
    def _most_common(row_gen):
        for row in row_gen:
            key, attrs = row
            title_idx, headline_idx = bc_attr_idx.value['Title'], bc_attr_idx.value['Headline']
            title_most_words = attrs[title_idx].most_common(top_n)
            headline_most_words = attrs[headline_idx].most_common(top_n)
            yield (key, {'title': title_most_words, 'headline':headline_most_words})
    return _most_common

def get_word_freq_in_total(rdd, target_idx):
    return rdd.mapPartitions(get_col_by_idx(target_idx), preservesPartitioning=True) \
              .mapPartitions(word_count, preservesPartitioning=True) \
              .reduceByKey(add).sortBy(sort_idx(1), ascending=False)


In [8]:
get_word_freq_in_total(tokenized_attrs_rdd, target_idx=attr_idx['Title']).take(take_size)

[('to', 27417), ('...', 22629), ('economy', 22336)]

In [9]:
get_word_freq_in_total(tokenized_attrs_rdd, target_idx=attr_idx['Headline']).take(take_size)

[('the', 153602), ('to', 69472), ('of', 61485)]

### word frequent per day

In [10]:
tokenized_attrs_rdd.mapPartitions(trans_by_idx(attr_idx['PublishDate'], string_to_day)) \
                   .mapPartitions(trans_by_idx(attr_idx['Title'], Counter)) \
                   .mapPartitions(trans_by_idx(attr_idx['Headline'], Counter)) \
                   .mapPartitions(make_key_by_idx(attr_idx['PublishDate'])) \
                   .reduceByKey(merge_attrs) \
                   .mapPartitions(most_common(5)) \
                   .take(take_size)

[(datetime.date(2016, 3, 5),
  {'title': [('to', 66),
    ('...', 55),
    ('obama', 52),
    ('economy', 51),
    ('in', 40)],
   'headline': [('the', 295),
    ('to', 135),
    ('a', 132),
    ('in', 113),
    ('and', 95)]}),
 (datetime.date(2016, 3, 6),
  {'title': [('...', 53),
    ('economy', 52),
    ('to', 48),
    ('the', 34),
    ('microsoft', 30)],
   'headline': [('the', 281),
    ('to', 133),
    ('of', 114),
    ('a', 110),
    ('and', 96)]}),
 (datetime.date(2016, 3, 8),
  {'title': [('...', 139),
    ('to', 120),
    ('obama', 110),
    ('the', 102),
    ('economy', 98)],
   'headline': [('the', 724),
    ('to', 313),
    ('in', 277),
    ('a', 269),
    ('and', 255)]})]

### word frequent per topic

In [11]:
word_freq_topic = tokenized_attrs_rdd.mapPartitions(trans_by_idx(attr_idx['Title'], Counter)) \
                                     .mapPartitions(trans_by_idx(attr_idx['Headline'], Counter)) \
                                     .mapPartitions(make_key_by_idx(attr_idx['Topic'])) \
                                     .reduceByKey(merge_attrs) \
                                     .mapPartitions(most_common(100)) \
                                     .collectAsMap()
word_freq_topic

{'economy': {'title': [('economy', 22238),
   ('the', 8920),
   ('to', 8822),
   ('in', 6823),
   ('...', 6724),
   ('of', 4059),
   ('for', 3974),
   ('on', 3566),
   ('economic', 2749),
   ('a', 2679),
   ('is', 2512),
   ('as', 2326),
   ('and', 2188),
   ('us', 2183),
   ('global', 1942),
   ('says', 1723),
   ('growth', 1661),
   ('economy,', 1600),
   ('will', 1213),
   ('china', 1149),
   ('at', 1063),
   ("china's", 1063),
   ('economy:', 1032),
   ('new', 1023),
   ('about', 942),
   ('by', 938),
   ('with', 922),
   ('how', 896),
   ('-', 864),
   ('be', 842),
   ('uk', 807),
   ('brexit', 802),
   ('could', 801),
   ('boost', 800),
   ('more', 795),
   ('2016', 785),
   ('but', 776),
   ('not', 757),
   ('world', 719),
   ('are', 679),
   ('up', 675),
   ('oil', 673),
   ('than', 664),
   ('from', 658),
   ('u.s.', 638),
   ('bank', 620),
   ('rate', 600),
   ('economy?', 599),
   ('over', 592),
   ('quarter', 581),
   ('can', 576),
   ('minister', 570),
   ('why', 568),
   

### sentiment per topic

In [12]:
def merge_senti(a, b):
    title_idx, headline_idx = bc_attr_idx.value['SentimentTitle'], bc_attr_idx.value['SentimentHeadline']
    a[title_idx] += b[title_idx]
    a[headline_idx] += b[headline_idx]
    return a

def get_col_by_idxs(idxs):
    def _get_col_by_idxs(row_gen):
        for row in row_gen:
            yield [row[idx] for idx in idxs]
    return _get_col_by_idxs

keep_index = [attr_idx['Topic'], attr_idx['SentimentTitle'], attr_idx['SentimentHeadline']]
print('topic', 'SentimentTitle', 'SentimentHeadline')
attrs_rdd.mapPartitions(trans_by_idx(attr_idx['SentimentTitle'], float)) \
         .mapPartitions(trans_by_idx(attr_idx['SentimentHeadline'], float)) \
         .mapPartitions(make_key_by_idx(attr_idx['Topic'])) \
         .reduceByKey(merge_senti) \
         .mapPartitions(remove_key) \
         .mapPartitions(get_col_by_idxs(keep_index)) \
         .collect()

topic SentimentTitle SentimentHeadline


[['economy', -350.89723953455353, -1340.138817215566],
 ['microsoft', 51.56650733416532, -322.28267520542335],
 ['palestine', -177.61514431068508, -393.38238008336054],
 ['obama', -27.604435365771636, -507.62019742134936]]

### co-occurrence

dict_keys(['economy', 'microsoft', 'palestine', 'obama'])