In [1]:
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark import StorageLevel
import datetime
import sklearn as skl
import sklearn.metrics as metrics
import math
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import GradientBoostedTrees

In [2]:
date_schema = StructType([
        StructField("archived", BooleanType(), True),
        StructField("author", StringType(), True),
        StructField("author_flair_css_class", StringType(), True),
        StructField("author_flair_text", StringType(), True),
        StructField("body", StringType(), True),
        StructField("controversiality", LongType(), True),
        StructField("created_utc", StringType(), True),
        StructField("day", LongType(), True),
        StructField("distinguished", StringType(), True),
        StructField("downs", LongType(), True),
        StructField("edited", StringType(), True),
        StructField("gilded", LongType(), True),
        StructField("id", StringType(), True),
        StructField("link_id", StringType(), True),
        StructField("month", LongType(), True),
        StructField("name", StringType(), True),
        StructField("parent_id", StringType(), True),
        StructField("removal_reason", StringType(), True),
        StructField("retrieved_on", LongType(), True),
        StructField("score", LongType(), True),
        StructField("score_hidden", BooleanType(), True),
        StructField("subreddit", StringType(), True),
        StructField("subreddit_id", StringType(), True),
        StructField("ups", LongType(), True),
        StructField("year", LongType(), True)
    ])

# Read in all comments in the year 2007
rawcomments = sqlContext.read.json("s3n://reddit-comments/2009", date_schema)

In [3]:
def add_date_columns(row):
    row_dict = row.asDict()
    utc_dt = datetime.datetime.fromtimestamp(int(row_dict['created_utc']))
    row_dict['year'] = utc_dt.year
    row_dict['month'] = utc_dt.month
    row_dict['day'] = utc_dt.day
        
    return Row(**row_dict)

In [4]:
relevant_features = ["author",
                     "controversiality",
                     "created_utc",
                     "distinguished",
                     "downs",
                     "edited",
                     "gilded",
                     "removal_reason",
                     "score",
                     "ups"]

In [5]:
final_schema = StructType([
        StructField("author", StringType(), True),
        StructField("controversiality", LongType(), True),
        StructField("created_utc", StringType(), True),
        StructField("day", LongType(), True),
        StructField("distinguished", StringType(), True),
        StructField("downs", LongType(), True),
        StructField("edited", StringType(), True),
        StructField("gilded", LongType(), True),
        StructField("month", LongType(), True),
        StructField("removal_reason", StringType(), True),
        StructField("score", LongType(), True),
        StructField("ups", LongType(), True),
        StructField("year", LongType(), True)
    ])

In [6]:
comments = rawcomments.select(relevant_features).map(add_date_columns).toDF(final_schema)
comments.registerTempTable("comments")

In [7]:
grouped_by_month = sqlContext.sql("""
    SELECT 
        author,
        AVG(controversiality) as controversiality,
        SUM(distinguished) as distinguished,
        SUM(downs) as downs,
        VARIANCE(downs) as var_downs,
        SUM(edited) as edited,
        SUM(gilded) as gilded,
        AVG(score) as score,
        VARIANCE(score) as var_score,
        SUM(ups) as ups,
        VARIANCE(ups) as var_ups,
        month,
        COUNT(*) as comment_count
    FROM comments
    GROUP BY author, month
""").persist(StorageLevel.MEMORY_AND_DISK)

In [9]:
def datapoint_transform(row):
    feature_vector = []
    feature_vector.append(row.controversiality)
    #feature_vector.append(row.distinguished)
    feature_vector.append(row.downs)
    feature_vector.append(row.var_downs)
    #feature_vector.append(row.edited)
    feature_vector.append(row.gilded)
    feature_vector.append(row.score)
    feature_vector.append(row.var_score)
    feature_vector.append(row.ups)
    feature_vector.append(row.var_ups)
    feature_vector.append(row.comment_count)
    return LabeledPoint(row.is_churned, feature_vector)

In [16]:
def build_churn_model(start_month, end_month):
    
    def add_target_column(row):
        row_dict = row.asDict()
        row_dict['is_churned'] = int(row_dict['author'] in churned)
        return Row(**row_dict)

    curr_month_data = grouped_by_month.rdd.filter(lambda x: x.month == start_month)
    curr_month_data.persist(StorageLevel.MEMORY_AND_DISK)
    
    curr_month_authors = curr_month_data.map(lambda x: x.author)
    next_month_authors = grouped_by_month.rdd.filter(lambda x: x.month == end_month).map(lambda x: x.author)
    
    curr_authors = set(curr_month_authors.collect())
    next_authors = set(next_month_authors.collect())
    
    churned = curr_authors - next_authors
    
    churn_fraction = len(churned) / float(len(curr_authors))
    
    raw_data = curr_month_data.map(add_target_column)
    
    data = raw_data.map(datapoint_transform)
    train, test = data.randomSplit([0.7, 0.3])
    
    train.persist(StorageLevel.MEMORY_AND_DISK)
    
    model = GradientBoostedTrees.trainClassifier(train, {}, numIterations=50, maxDepth=2, learningRate=.01)

    train.unpersist()
    curr_month_data.unpersist()
    
    return model, train, test

### Validation

In [14]:
def separate(model, data):
    predicted = model.predict(data.map(lambda x: x.features)).collect()
    actual = data.map(lambda x: x.label).collect()
    return predicted, actual

def roc(model, data):
    predicted, actual = separate(model, data)
    return metrics.roc_auc_score(actual, predicted)

def confusion(model, data):
    predicted, actual = separate(model, data)
    return metrics.confusion_matrix(actual, predicted)

In [17]:
model, train, test = build_churn_model(1, 2)
print("ROC: for training: {}, testing: {}".format(roc(model, train), roc(model, test)))
print("Training confusion mx:")
print((confusion(model, train) / float(train.count())))
print("Testing confusion mx:")
print((confusion(model, test) / float(test.count())))

ROC: for training: 0.756653957471, testing: 0.741993370391
Training confusion mx:
[[ 0.52005684  0.11746275]
 [ 0.10962947  0.25285095]]
Testing confusion mx:
[[ 0.50138744  0.12330905]
 [ 0.1195803   0.2557232 ]]


In [None]:
model, train, test = build_churn_model(3, 4)
print("ROC: for training: {}, testing: {}".format(roc(model, train), roc(model, test)))
print("Training confusion mx:")
print((confusion(model, train) / float(train.count())))
print("Testing confusion mx:")
print((confusion(model, test) / float(test.count())))