In [2]:
%matplotlib inline

from collections import Counter
import json
from glob import glob

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm_notebook as tqdm

In [3]:

from pyspark import SparkContext, SparkConf, SQLContext
sc = SparkContext()


In [4]:
from itertools import groupby

#### Input and output paths

In [5]:
author_label_state = pd.read_csv("../data/processed/author-label-state.csv.bz2")

In [None]:
# You can download these files from https://files.pushshift.io/reddit/
comments_path = '/data/big/reddit/comments/2016/RC_2016-*.bz2' 

In [8]:
authors = author_label_state.author.values
authors_set = set(authors)

In [9]:
# Ensure there is one file per month
assert len(glob(comments_path)) == 12

In [10]:
subreddits_list = {'politics'}

In [11]:
def popularity_measures(scores_iterable):
    """ Returns a tuple with:
        no. of comments, avg, std dev,
        no. of pos. comments, avg score of pos. comments, std dev of pos. comments,
        no. of neg. comments, avg score of neg. comments, std dev of neg. comments
    """
    scores = list(scores_iterable)
    sign2scores = {k: list(v) for k, v in groupby(scores, lambda x: x > 0)}
    pos = sign2scores.get(True, [])
    neg = sign2scores.get(False, [])
    return [
        f(v)
        for v in [scores, pos, neg]
        for f in [len, np.mean, np.std]
    ]

In [14]:
comments_rdd = sc.textFile(comments_path).map(json.loads)
rdd_selected_comments = comments_rdd.filter(
        lambda x: 
                'author' in x.keys() and
                x['author'] in authors_set and
                'subreddit' in x.keys() and
                x['subreddit'] in subreddits_list
        )

In [15]:
author_popularity = (rdd_selected_comments
     .map(lambda x: (x['author'], float(x['score'])))
     .groupByKey() # Result: author -> [post_score_0, ..., post_score_N]
     .map(lambda x: tuple([x[0]] + popularity_measures(x[1])))
     # Result: author, *popularity_measures
)

In [17]:
output_path = "../data/processed/scores-of-authors-in-politics-from-author-label-state.csv.bz2"
author_popularity.map(lambda x: ','.join(str(d) for d in x)).repartition(1).saveAsTextFile(
    output_path,
    compressionCodecClass='org.apache.hadoop.io.compress.BZip2Codec')

In [None]:
df = pd.read_csv(output_path, names=[
                'author', 'num_comm', 'avg_score', 'stddev_score', 
                          'num_comm_pos', 'avg_score_pos', 'stddev_score_pos', 
                          'num_comm_neg', 'avg_score_neg', 'stddev_score_neg'
            ]
)
os.remove(output_path)
df.to_csv(output_path, index=False)