# Load State of the Union (SOU) speeches as json

In [1]:
import json
from pprint import pprint
import re
from pyspark.sql import SparkSession

spark  = SparkSession.builder.master('local').appName('sou').getOrCreate()
speeches = spark.read.json('/project/cmsc25025/sou/speeches.json')
speeches = spark.SparkContext.parallelize(speeches.collect())

AttributeError: 'SparkSession' object has no attribute 'SparkContext'

# (a) compute TF-IDF vectors

In [27]:
import string
from math import log10

num_speeches = speeches.rdd.count()

def sou_word_count(x):
    x = x.replace('Mr.','')\
         .replace('Mrs.','')\
         .replace('Miss.','')\
         .replace('Ms.', '')\
         .replace('Dr.', '')\
         .replace('Prof.','')

    x = x.lower().translate({ord(c): None for c in string.punctuation})
        
    wrds = x.strip().split(' ')
    
    wrd_cnts = {}
    
    for w in wrds:
        wrd_cnts[w] = wrd_cnts.get(w, 0) + 1
    
    return wrd_cnts

def sou_doc_count(*args):
    doc_counts = {}
    
    for d in args:
        for word in d.keys():
            doc_counts[word] = doc_counts.get(word, 0) + 1
        
    return doc_counts

def sou_word_across_doc_count(*args):
    counts = {}
    
    for d in args:
        for word in d.keys():
            counts[word] = counts.get(word, 0) + d[word]
    
    return counts

def compute_weights(wrd_cnts, doc_cnts, all_doc_cnts, top20):
    weights = {}
    
    for word in doc_cnts.keys():
        if all_doc_cnts[word] >= 50 and word not in top20:
            weights[word] = wrd_cnts.get(word, 0) * log10(num_speeches/doc_cnts[word])

    return weights.values()
        
sou_wrd_cnt = speeches.rdd.map(lambda x: (x['year'], x['president'], sou_word_count(x['text'])))

stripped_wrd_cnt = sou_wrd_cnt.map(lambda x: x[2])

sou_doc_cnt = stripped_wrd_cnt.reduce(sou_doc_count)
sou_wrd_acrss_doc_cnt = stripped_wrd_cnt.reduce(sou_word_across_doc_count)
sou_wrd_tpl_lst = sou_wrd_acrss_doc_cnt.items()

top20 = sorted(sou_wrd_tpl_lst, key=lambda x: x[1])[len(sou_wrd_tpl_lst)-20:]

tf_idf_weights = sou_wrd_cnt.map(lambda d: (d[0], d[1], compute_weights(d[2], sou_doc_cnt, sou_wrd_acrss_doc_cnt, top20)))

print tf_idf_weights.collect()[1]


(u'1897', u'William McKinley', [6.15923533045026, 4.10615688696684, 10.2653922174171, 2.05307844348342, 4.7082168782948015, 2.3541084391474008, 6.15923533045026, 4.7082168782948015, 4.10615688696684, 2.05307844348342, 61.592353304502595, 2.05307844348342, 2.05307844348342, 4.7082168782948015, 4.10615688696684, 12.31847066090052, 36.95541198270156, 14.124650634884404, 7.062325317442202, 2.3541084391474008, 4.10615688696684, 2.05307844348342, 8.21231377393368, 2.3541084391474008, 20.5307844348342, 2.05307844348342, 2.05307844348342, 9.416433756589603, 2.05307844348342, 8.21231377393368, 6.15923533045026, 4.10615688696684, 2.3541084391474008, 2.05307844348342, 2.05307844348342, 8.21231377393368, 4.10615688696684, 4.10615688696684, 4.10615688696684, 2.05307844348342, 2.05307844348342, 2.3541084391474008, 4.10615688696684, 12.31847066090052, 2.05307844348342, 2.3541084391474008, 2.05307844348342, 14.37154910438394, 2.05307844348342, 4.10615688696684, 8.21231377393368, 4.10615688696684, 14.1

# (b) similarity between speeches

In [18]:
import numpy as np
from itertools import combinations

def sim(d,dp):
    norm_d = np.linalg.norm(d)
    norm_dp = np.linalg.norm(dp)
    return np.dot(d,dp) / (norm_d * norm_dp)

all_speeches_only_weights = tf_idf_weights.map(lambda x: x[2])

def same_pres_sim_list(weights):
    #each element is a tuple of [(year, pres), (year, pres), sim]
    sim_list = []
    for c in combinations(weights, 2):
        if c[0][1] == c[1][1]:
            sim_list.append((c[0][:2], c[1][:2], sim(c[0][2],c[1][2])))
    
    return spark.sparkContext.parallelize(sim_list)

def diff_pres_sim_list(weights):
    #each element is a tuple of [(year, pres), (year, pres), sim]
    sim_list = []
    for c in combinations(weights, 2):
        if c[0][1] != c[1][1]:
            sim_list.append((c[0][:2], c[1][:2], sim(c[0][2],c[1][2])))
    
    return spark.sparkContext.parallelize(sim_list)


diff_pres_sim = diff_pres_sim_list(tf_idf_weights.collect()).sortBy(lambda x: x[2])
same_pres_sim = same_pres_sim_list(tf_idf_weights.collect()).sortBy(lambda x: x[2])

print 'Similar pairs given by diff President'
print 2 * ' ' + '  year1' + '   pres1' +'    year2' + '     pres2' + '     sim\n' + 55 * '='

for data in diff_pres_sim.collect()[:50]:
    print '%s\n%s ' % (data, 55 * '-')


Similar pairs given by diff President
    year1   pres1    year2     pres2     sim
((u'1883', u'Chester A. Arthur'), (u'2001', u'George W. Bush'), 0.70580282652757687)
------------------------------------------------------- 
((u'2001', u'George W. Bush'), (u'1882', u'Chester A. Arthur'), 0.7118514263234853)
------------------------------------------------------- 
((u'1827', u'John Quincy Adams'), (u'2001', u'George W. Bush'), 0.71464769264806038)
------------------------------------------------------- 
((u'1828', u'John Quincy Adams'), (u'2001', u'George W. Bush'), 0.72205350729033468)
------------------------------------------------------- 
((u'2001', u'George W. Bush'), (u'1872', u'Ulysses S. Grant'), 0.73070441620277948)
------------------------------------------------------- 
((u'2001', u'George W. Bush'), (u'1884', u'Chester A. Arthur'), 0.7345036885452868)
------------------------------------------------------- 
((u'2001', u'George W. Bush'), (u'1815', u'James Madison'), 0.736399