In [1]:
from collections import defaultdict
import numpy as np
from pyspark.ml.linalg import SparseVector
from pyspark.sql.functions import explode
from pyspark import SparkFiles
from pyspark.sql import Row

import bz2
import json
import time
from pyspark.ml import Pipeline
from pyspark.ml.feature import * # CountVectorizer, Tokenizer, RegexTokenizer, HashingTF
from pyspark.ml.regression import * # RandomForestRegressor, LinearRegression, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1607295858388_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
def timeit(method):
    '''
    Decorator to time functions.
    '''
    def timed(*args, **kw):
        ts = time.time()
        result = method(*args, **kw)
        te = time.time()

        print('%r took %2.2f sec\n' % (method.__name__, te-ts))
              
        return result
    return timed

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
import re
# DATAFILE_PATTERN = '^(.+),"(.+)",(.*),(.*),(.*)'
ID_PATTERN = '"id":(.*?(?=,|}))'
UPS_PATTERN = '"ups":(.*?(?=,|}))'
BODY_PATTERN = '"body":(.*?(?=,|}))'
# DOWNS_PATTERN = '"downs":(.*?(?=,|}))'
SCORE_PATTERN = '"score":(.*?(?=,|}))'
# CONTROVERSIALITY_PATTERN = '"controversiality":(.*?(?=,|}))'

def removeQuotes(s):
    """ Remove quotation marks from an input string
    Args:
        s (str): input string that might have the quote "" characters
    Returns:
        str: a string without the quote characters
    """
    return ''.join(i for i in s if i!='"')

def parseDatafileLine(datafileLine):
    """ Parse a line of the data file using the specified regular expression pattern
    Args:
        datafileLine (str): input string that is a line from the data file
    Returns:
        tuple: a tuple including the parsed results using the given regular expression and without the quote characters
    """
    id_match = re.search(ID_PATTERN, datafileLine.decode('utf-8'))
    ups_match = re.search(UPS_PATTERN, datafileLine.decode('utf-8'))
    body_match = re.search(BODY_PATTERN, datafileLine.decode('utf-8'))
    score_match = re.search(SCORE_PATTERN, datafileLine.decode('utf-8'))
    
    if (id_match is None) or (ups_match is None) or (body_match is None) or (score_match is None):
        print('Invalid datafile line: %s' % datafileLine)
        return (datafileLine, -1)
    else:
        viralness = 0
        if int(score_match.group(1)) < -10 or int(score_match.group(1)) > 10:
            viralness = 1
        comment = (id_match.group(1), int(ups_match.group(1)), removeQuotes(body_match.group(1)), int(score_match.group(1)), viralness)
        return (comment, 1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
import sys
import os
from pyspark import SparkFiles

#RC_PATH = '/FileStore/shared_uploads/ddk1@andrew.cmu.edu/RC_2007_10'
RC_PATH = 's3://dsml-vasu-simar-daniel/RC_2015-0*'

def parseData(path):
    """ Parse a data file
    Args:
        filename (str): input file name of the data file
    Returns:
        RDD: a RDD of parsed lines
    """
#     sc.addFile(path)
    return (sc
            .textFile(path, 4, 0)
            .map(parseDatafileLine)
            .cache())
@timeit
def loadData(path):
    """ Load a data file
    Args:
        path (str): input file name of the data file
    Returns:
        RDD: a RDD of parsed valid lines
    """

    raw = parseData(path).cache()
    
    failed = (raw
              .filter(lambda s: s[1] == -1)
              .map(lambda s: s[0]))
    for line in failed.take(10):
        print('%s - Invalid datafile line: %s' % (path, line))
    
    deleted = (raw
             .filter(lambda s: s[0][2] == '[deleted]')
             .map(lambda s: s[0]))
    
    valid = (raw
             .filter(lambda s: s[1] == 1)
             .filter(lambda s: s[0][2] != '[deleted]')
             .map(lambda s: s[0])
             .cache())
   
    viral = (raw
             .filter(lambda s: s[1] == 1)
             .filter(lambda s: s[0][4] == 1)
             .map(lambda s: s[0])
             .cache())
    nonviral = (raw
               .filter(lambda s: s[1] == 1)
               .filter(lambda s: s[0][4] == 0)
               .map(lambda s: s[0])
               .cache())
    nonviral_cut = nonviral.sample(False, viral.count()/nonviral.count())
    viral_nonviral_cut = viral.union(nonviral_cut)
    print('%s - Read %d lines, successfully parsed %d lines, failed to parse %d lines, %d lines were deleted, %d lines were viral, %d lines were non-viral, %d viral and non-viral lines were returned' % (path,
                                                                                                                                                                                                 raw.count(),
                                                                                                                                                                                                 valid.count(),
                                                                                                                                                                                                 failed.count(),
                                                                                                                                                                                                 deleted.count(),
                                                                                                                                                                                                 viral.count(),
                                                                                                                                                                                                 nonviral.count(),
                                                                                                                                                                                                 viral_nonviral_cut.count()))                                                  
    return viral_nonviral_cut

reddit = loadData(RC_PATH)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://dsml-vasu-simar-daniel/RC_2015-0* - Read 156758730 lines, successfully parsed 147697364 lines, failed to parse 0 lines, 9061366 lines were deleted, 11674519 lines were viral, 145084211 lines were non-viral, 23346870 viral and non-viral lines were returned
'loadData' took 276.31 sec

In [17]:
sentenceDF.show(100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----+--------------------+-----+---------+
|       id| ups|                body|score|viralness|
+---------+----+--------------------+-----+---------+
|"cnas8zv"|  14|Most of us have s...|   14|        1|
|"cnas90g"|  17|&gt;&gt;If a woma...|   17|        1|
|"cnas91f"|  19|Made a multiscree...|   19|        1|
|"cnas91m"|  41|A lot of disrespe...|   41|        1|
|"cnas91w"|  14|You hear that SDS...|   14|        1|
|"cnas92j"|  18|   The greater good.|   18|        1|
|"cnas932"| -11|No. It's fucking ...|  -11|        1|
|"cnas938"|  14|And one pit. Poor...|   14|        1|
|"cnas93e"|  22|Oh man........\n\...|   22|        1|
|"cnas93l"|  24|lol if that happe...|   24|        1|
|"cnas93m"|  23|why are some play...|   23|        1|
|"cnas93o"|  49|Compulsive hugger...|   49|        1|
|"cnas944"|  14|Does this mean Ha...|   14|        1|
|"cnas94h"|  82|Output:\n\n    Ha...|   82|        1|
|"cnas951"|  59|I thought that co...|   59|        1|
|"cnas953"|  24|The League C

In [6]:
sentenceDF = reddit.toDF().selectExpr("_1 as id", "_2 as ups", "_3 as body", "_4 as score", "_5 as viralness")
sentenceDF.show(n=5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+---+--------------------+-----+---------+
|       id|ups|                body|score|viralness|
+---------+---+--------------------+-----+---------+
|"cnas8zv"| 14|Most of us have s...|   14|        1|
|"cnas90g"| 17|&gt;&gt;If a woma...|   17|        1|
|"cnas91f"| 19|Made a multiscree...|   19|        1|
|"cnas91m"| 41|A lot of disrespe...|   41|        1|
|"cnas91w"| 14|You hear that SDS...|   14|        1|
+---------+---+--------------------+-----+---------+
only showing top 5 rows

In [7]:
split_regex = r'\W+'
linebreak_regex = r'\\r\\n\\r\\n'

def simpleTokenize(string):
    """ A simple implementation of input string tokenization
    Args:
        string (str): input string
    Returns:
        list: a list of tokens
    """
    linebreak_removed_string = re.sub(linebreak_regex, " ", string)
    return list(filter(None, re.split(split_regex, linebreak_removed_string.lower())))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
stopwords = set(sc.textFile("s3://dsml-vasu-simar-daniel/stopwords.txt").collect())
print('These are the stopwords: %s' % stopwords)

def tokenize(string):
    """ An implementation of input string tokenization that excludes stopwords
    Args:
        string (str): input string
    Returns:
        list: a list of tokens without stopwords
    """
    return list(filter(lambda word: word not in stopwords,simpleTokenize(string)))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

These are the stopwords: {'out', 'we', 'was', 'how', 'myself', 'for', 'they', 'about', 'then', 'both', 'so', 'don', 'as', 'any', 'after', 'you', 'why', 'been', 'where', 'by', 'yourself', 'a', 'did', 'their', 'doing', 'be', 'further', 'ours', 'now', 'am', 'her', 'yourselves', 'that', 'what', 'my', 'to', 'not', 'own', 'there', 'this', 'each', 'all', 'more', 'me', 'which', 'himself', 'nor', 'other', 'who', 'same', 'at', 'such', 't', 'up', 'than', 'can', 'too', 'these', 'while', 'before', 'ourselves', 'he', 'i', 'our', 'its', 'but', 'with', 'because', 'those', 'the', 'it', 'hers', 'just', 'over', 'between', 'had', 'does', 'have', 'and', 'some', 'or', 'only', 'when', 'below', 'in', 'if', 'theirs', 'again', 'his', 'whom', 'above', 'should', 'itself', 'themselves', 'until', 'are', 'she', 'will', 'from', 'into', 'no', 'your', 'few', 'herself', 'of', 'has', 'down', 'were', 'once', 'having', 'them', 'under', 'him', 'do', 'on', 'an', 'yours', 'being', 'off', 'very', 'through', 'most', 'against', 

In [9]:
redditRecToToken = reddit.map(lambda line: (line[0], line[1], tokenize(line[2]),line[3], line[4]))

print(redditRecToToken.take(5))

def countTokens(vendorRDD):
    """ Count and return the number of tokens
    Args:
        vendorRDD (RDD of (recordId, tokenizedValue)): Pair tuple of record ID to tokenized output
    Returns:
        count: count of all tokens
    """
    # TODO: Uncomment the template below and replace <FILL IN> with appropriate code
    recordCount = vendorRDD.map(lambda line: len(line[0]))
    recordSum = recordCount.sum()
    return recordSum

totalTokens = countTokens(redditRecToToken)
print('There are %s tokens in the combined datasets' % totalTokens)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('"cnas8zv"', 14, ['us', 'family', 'members', 'like', 'family', 'like'], 14, 1), ('"cnas90g"', 17, ['gt', 'gt', 'woman', 'wants', 'give', 'child', 'adoption', 'named', 'father'], 17, 1), ('"cnas91f"', 19, ['made', 'multiscreen', 'video', 'sony', 'vegas', 'converted', 'webm'], 19, 1), ('"cnas91m"', 41, ['lot', 'disrespecting', 'league', 'today', 'think', 'll', 'eventually', 'need', 'see', 'garber', 'publicly', 'reprimanding', 'nycfc', 'ownership', 'group', 'doubt', 'would', 'happen', 'though'], 41, 1), ('"cnas91w"', 14, ['hear', 'sdss', 'j122952', '66', '112227', '8', 'heard', 'talking', 'shit', 're', 'coming', 'ya'], 14, 1)]
There are 210121830 tokens in the combined datasets

In [10]:
@timeit
def term_frequency(df, inputCol, outputCol, hashFeatures=None):
    '''
    Returns a DataFrame object containing a new row with the extracted features. 
    Passing hashed=True will return a Featured Hashed matrix.
    
    @params:
        df - DataFrame
        inputCol - name of input column from DataFrame to find features
        outputCol - name of the column to save the features
        hashFeatures - number of features for HashingTF, if None will perform 
            CountVectorization
    '''
    
    # since the number of features was not passed perform standard CountVectorization
    if hashFeatures is None:
        cv = CountVectorizer(inputCol=inputCol, outputCol=outputCol)
        feature_extractor = cv.fit(df)
    # otherwise perform a feature extractor with 
    else:
        feature_extractor = HashingTF(\
                              inputCol=inputCol, outputCol=outputCol, numFeatures=hashFeatures)
    
    # create a new DataFrame using either feature extraction method
    return feature_extractor.transform(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
wordsFilteredDF = spark.createDataFrame(redditRecToToken).toDF("id", "ups", "filtered_words", "score", "viralness")

# Feature Hash the comment content
# number of features for Feature Hash matrix, reccomended too use power of 2
hashDF = term_frequency(\
    df=wordsFilteredDF, inputCol="filtered_words", outputCol="features", hashFeatures=1024)

# Display snippet of new DataFrame
hashDF.select('filtered_words','features').show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'term_frequency' took 0.24 sec

+--------------------+--------------------+
|      filtered_words|            features|
+--------------------+--------------------+
|[us, family, memb...|(1024,[368,386,45...|
|[gt, gt, woman, w...|(1024,[101,159,31...|
|[made, multiscree...|(1024,[15,474,496...|
|[lot, disrespecti...|(1024,[85,88,159,...|
|[hear, sdss, j122...|(1024,[122,340,49...|
+--------------------+--------------------+
only showing top 5 rows

In [12]:
@timeit
def random_forest_regression(df, featuresCol, labelCol):
    '''
    Returns a DataFrame containing a column of predicted values of the labelCol.
    Predict the output of labelCol using values in featuresCol y = rf(x).
    
    @params:
        df - DataFrame
        featuresCol - input features, x
        labelCol - output variable, y
    '''
    # split the training and test data using the holdout method
    (trainingData, testData) = df.randomSplit([0.8, 0.2])
    
    # create the random forest regressor, limit number of trees to ten
    dtr = RandomForestRegressor(\
       featuresCol=featuresCol, labelCol=labelCol)
    
    # fit the training data to the regressor to create the model
    model = dtr.fit(trainingData)
    
    # create a DataFrame contained a column with predicted values of the labelCol
    predictions = model.transform(testData)
    
    return predictions

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# train random forest regression
rfpredictions = random_forest_regression(df=hashDF,featuresCol="features",labelCol="viralness")

# compute the error
evaluator = RegressionEvaluator(labelCol="viralness", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(rfpredictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'random_forest_regression' took 429.46 sec

Root Mean Squared Error (RMSE) on test data = 0.496549

In [15]:
rfpredictions.show(100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+---+--------------------+-----+---------+--------------------+------------------+
|       id|ups|      filtered_words|score|viralness|            features|        prediction|
+---------+---+--------------------+-----+---------+--------------------+------------------+
|"cnas93l"| 24|[lol, happened, w...|   24|        1|(1024,[206,259,39...|0.5020501332664258|
|"cnas95n"| 25|[monocorp, sells, x]|   25|        1|(1024,[46,668,802...|0.5020501332664258|
|"cnas993"| 20|      [wasteland, 2]|   20|        1|(1024,[85,675],[1...|0.5020501332664258|
|"cnas9aa"| 16|   [missed, saxxies]|   16|        1|(1024,[649,877],[...|0.5020501332664258|
|"cnas9bl"| 13|[truth, doesn, ma...|   13|        1|(1024,[60,128,187...|0.5262861155417584|
|"cnas9d3"| 21|[would, even, pos...|   21|        1|(1024,[184,237,25...|0.5020501332664258|
|"cnas9ka"| 32|[get, belly, taylor]|   32|        1|(1024,[389,432,56...|0.5020501332664258|
|"cnas9kj"| 35|[let, goalie, bat...|   35|        1|(1024,[259,327,44.

In [18]:
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel, LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
@timeit
def logistic_regression_viral(df, featuresCol, labelCol):
    '''
    Returns a DataFrame containing a column of predicted values of the labelCol.
    Predict the output of labelCol using values in featuresCol y = rf(x).
    
    @params:
        df - DataFrame
        featuresCol - input features, x
        labelCol - output variable, y
    '''
    # split the training and test data using the holdout method
#     print(df.count())
#     df = df.filter((df.score >=0) & (df.score <10))
#     print(df.show(10))
#     print(df.count())
    (trainingData, testData) = df.randomSplit([0.8, 0.2])
    
    
    # TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
    # Given hyperparameters
    standardization = False
    elastic_net_param = 0.8
    reg_param = .3
    max_iter = 10

    lr = (LogisticRegression(featuresCol=featuresCol, labelCol=labelCol, regParam = reg_param, standardization = standardization, maxIter = max_iter,elasticNetParam = elastic_net_param))
  
    lr_model_basic = lr.fit(trainingData)

    trainingSummary = lr_model_basic.summary
    accuracy = trainingSummary.accuracy
    print(accuracy)
    # create a DataFrame contained a column with predicted values of the labelCol
    predictions = lr_model_basic.transform(testData)
    
    return predictions

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# train random forest regression
lrViralpredictions = logistic_regression_viral(df=hashDF,featuresCol="features",labelCol="viralness")

# compute the error
evaluator = RegressionEvaluator(labelCol="viralness", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(lrViralpredictions)
print ("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.5000322903531419
'logistic_regression_viral' took 116.50 sec

Root Mean Squared Error (RMSE) on test data = 0.707362

In [21]:
lrViralpredictions.show(100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+---+--------------------+-----+---------+--------------------+--------------------+--------------------+----------+
|       id|ups|      filtered_words|score|viralness|            features|       rawPrediction|         probability|prediction|
+---------+---+--------------------+-----+---------+--------------------+--------------------+--------------------+----------+
|"cnas90g"| 17|[gt, gt, woman, w...|   17|        1|(1024,[101,159,31...|[1.29161412747016...|[0.50003229035314...|       0.0|
|"cnas91f"| 19|[made, multiscree...|   19|        1|(1024,[15,474,496...|[1.29161412747016...|[0.50003229035314...|       0.0|
|"cnas91m"| 41|[lot, disrespecti...|   41|        1|(1024,[85,88,159,...|[1.29161412747016...|[0.50003229035314...|       0.0|
|"cnas98y"| 13|          [fairness]|   13|        1|  (1024,[260],[1.0])|[1.29161412747016...|[0.50003229035314...|       0.0|
|"cnas9bl"| 13|[truth, doesn, ma...|   13|        1|(1024,[60,128,187...|[1.29161412747016...|[0.50003229035314