In [None]:
gcloud beta dataproc clusters create cluster-ce8a --enable-component-gateway --bucket its_a_bukket --region us-central1 --subnet default --zone us-central1-b --master-machine-type n1-standard-2 --master-boot-disk-size 500 --num-workers 2 --worker-machine-type n1-standard-2 --worker-boot-disk-size 500 --image-version 1.3-deb9 --optional-components ANACONDA,JUPYTER --max-idle 7200s --scopes 'https://www.googleapis.com/auth/cloud-platform' --project sapient-origin-267302

Helpful links:

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3923635548890252/1357850364289680/4930913221861820/latest.html

https://spark.apache.org/docs/latest/ml-features.html#tf-idf



In [11]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().appName('reddit_pipeline').getOrCreate()
sc = spark.sparkContext

In [12]:
sc.version

'2.4.0-cdh6.3.0'

I will now load the data as a PySpark DataFrame, and confirm the load was successful by reviewing the first five rows of the text.

In [13]:
%time df_raw = spark.read.option("quote", "\"").option("escape", "\"").csv('politics_all_politics_all000000000000',header=True)

CPU times: user 2.28 ms, sys: 55 µs, total: 2.34 ms
Wall time: 375 ms


To ensure the load worked as intended, I will count missing values by column.

In [14]:
from pyspark.sql.functions import isnan, when, count, col

%time df_raw.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_raw.columns]).show()

+----+------------+--------+-------+-------+-----------------+-------+-----------+------------+-------+---------+-------+------------+----------------+-------+-------+---------+-------+-------------+----------------------+
|body|score_hidden|archived|   name| author|author_flair_text|  downs|created_utc|subreddit_id|link_id|parent_id|  score|retrieved_on|controversiality| gilded|     id|subreddit|    ups|distinguished|author_flair_css_class|
+----+------------+--------+-------+-------+-----------------+-------+-----------+------------+-------+---------+-------+------------+----------------+-------+-------+---------+-------+-------------+----------------------+
| 115|     1969152| 2057434|2414161|1142312|          2255571|2480318|    1197470|     1094818|1064760|  1052482|1046843|     1044667|         1043137|1042456|1042118|  1041933|2143207|      2505445|               2395523|
+----+------------+--------+-------+-------+-----------------+-------+-----------+------------+-------+-----

We seee that there are a few rows with missing "body" information, so these will have to be dropped from the DataFrame before continuing.

In [15]:
#select only rows with non-null 'body' values
df_raw = df_raw.filter(df_raw.body.isNotNull())

#confirm process completed successfully
%time df_raw.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_raw.columns]).show()

+----+------------+--------+-------+-------+-----------------+-------+-----------+------------+-------+---------+-------+------------+----------------+-------+-------+---------+-------+-------------+----------------------+
|body|score_hidden|archived|   name| author|author_flair_text|  downs|created_utc|subreddit_id|link_id|parent_id|  score|retrieved_on|controversiality| gilded|     id|subreddit|    ups|distinguished|author_flair_css_class|
+----+------------+--------+-------+-------+-----------------+-------+-----------+------------+-------+---------+-------+------------+----------------+-------+-------+---------+-------+-------------+----------------------+
|   0|     1969076| 2057345|2414053|1142273|          2255500|2480205|    1197432|     1094788|1064729|  1052452|1046813|     1044637|         1043107|1042426|1042088|  1041902|2143116|      2505333|               2395436|
+----+------------+--------+-------+-------+-----------------+-------+-----------+------------+-------+-----

With this initial data cleaning step completed, I will review a sample of the comments to observe their format.

In [16]:
df_raw.select("body").take(5)

[Row(body='almost a third of registered voters in CA are republican, though...'),
 Row(body="So, you're saying netanyahu wants peace?"),
 Row(body='But the Saudis said that the refugees are too dangerous to take in and have taken in 0...'),
 Row(body='Yep, "opt-out" has been changed to "you are permitted to ask"'),
 Row(body="They can answer however they'd like")]

In order to process this text for machine learning analysis, we will have to remove punctuation as a first step.  This will be accomplished via a regex function that removes any character that is not a letter, number, or space.

In [17]:
import pyspark.sql.functions as f

#remove formatting characters
df_raw = df_raw.withColumn('body_vec', f.regexp_replace('body', "[^a-zA-Z0-9\\s]", ""))

#confirm output
%time df_raw.select("body_vec").take(5)

CPU times: user 1.27 ms, sys: 2.4 ms, total: 3.67 ms
Wall time: 184 ms


[Row(body_vec='almost a third of registered voters in CA are republican though'),
 Row(body_vec='So youre saying netanyahu wants peace'),
 Row(body_vec='But the Saudis said that the refugees are too dangerous to take in and have taken in 0'),
 Row(body_vec='Yep optout has been changed to you are permitted to ask'),
 Row(body_vec='They can answer however theyd like')]

Next, we will put all letters into lower-case so our hashing algorithm doesn't perceive "Clinton" and "clinton" as different words.

In [18]:
from pyspark.sql.functions import col, lower

df_raw = df_raw.withColumn('body_vec', lower(col('body_vec')))

%time df_raw.select("body_vec").take(5)

CPU times: user 3.1 ms, sys: 1.03 ms, total: 4.12 ms
Wall time: 197 ms


[Row(body_vec='almost a third of registered voters in ca are republican though'),
 Row(body_vec='so youre saying netanyahu wants peace'),
 Row(body_vec='but the saudis said that the refugees are too dangerous to take in and have taken in 0'),
 Row(body_vec='yep optout has been changed to you are permitted to ask'),
 Row(body_vec='they can answer however theyd like')]

Here, we establish a custom Pipeline function to stem all tokens in a column using the NLTK Porter Stemmer.

In [19]:
from pyspark import keyword_only
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
#import nltk
from nltk.stem.porter import PorterStemmer

#this code courtesy of http://michael-harmon.com/blog/SentimentAnalysisP2.html

class PorterStemming(Transformer, HasInputCol, HasOutputCol):
    """
    PosterStemming class using the NLTK Porter Stemmer
    
    This comes from https://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml
    Adapted to work with the Porter Stemmer from NLTK.
    """
    
    @keyword_only
    def __init__(self, 
                 inputCol  : str = None, 
                 outputCol : str = None, 
                 min_size  : int = None):
        """
        Constructor takes in the input column name, output column name,
        plus the minimum legnth of a token (min_size)
        """
        # call Transformer classes constructor since were extending it.
        super(Transformer, self).__init__()

        # set Parameter objects minimum token size
        self.min_size = Param(self, "min_size", "")
        self._setDefault(min_size=0)

        # set the input keywork arguments
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

        # initialize Stemmer object
        self.stemmer  = PorterStemmer()

        
    @keyword_only
    def setParams(self, 
                  inputCol  : str = None, 
                  outputCol : str = None, 
                  min_size  : int = None
      ) -> None:
        """
        Function to set the keyword arguemnts
        """
        kwargs = self._input_kwargs
        return self._set(**kwargs)
    

    def _stem_func(self, words  : list) -> list:
        """
        Stemmer function call that performs stemming on a
        list of tokens in words and returns a list of tokens
        that have meet the minimum length requiremnt.
        """
        # We need a way to get min_size and cannot access it 
        # with self.min_size
        min_size       = self.getMinSize()

        # stem that actual tokens by applying 
        # self.stemmer.stem function to each token in 
        # the words list
        stemmed_words  = map(self.stemmer.stem, words)

        # now create the new list of tokens from
        # stemmed_words by filtering out those
        # that are not of legnth > min_size
        filtered_words = filter(lambda x: len(x) > min_size, stemmed_words)

        return list(filtered_words)
    
    def _transform(self, df: DataFrame) -> DataFrame:
        """
        Transform function is the method that is called in the 
        MLPipleline.  We have to override this function for our own use
        and have it call the _stem_func.

        Notice how it takes in a type DataFrame and returns type Dataframe
        """
        # Get the names of the input and output columns to use
        out_col       = self.getOutputCol()
        in_col        = self.getInputCol()

        # create the stemming function UDF by wrapping the stemmer 
        # method function
        stem_func_udf = F.udf(self._stem_func, ArrayType(StringType()))
        
        # now apply that UDF to the column in the dataframe to return
        # a new column that has the same list of words after being stemmed
        df2           = df.withColumn(out_col, stem_func_udf(df[in_col]))

        return df2
  
  
    def setMinSize(self,value):
        """
        This method sets the minimum size value
        for the _paramMap dictionary.
        """
        self._paramMap[self.min_size] = value
        return self

    def getMinSize(self) -> int:
        """
        This method uses the parent classes (Transformer)
        .getOrDefault method to get the minimum
        size of a token.
        """
        return self.getOrDefault(self.min_size)


With this custom function in place, I will build out a full pipeline that completes the following steps in order:

* Tokenize each comment string
* Remove stop words
* Stem each token
* Hashes the tokens
* Computes the IDF of those tokens

The pipeline will ultimately output a column named "features" that can be used for analysis.

In [None]:
# from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover, IDF

# #tokenize body text
# tokenizer = Tokenizer(inputCol="body_vec", outputCol="body_vec_token")
# df_raw = tokenizer.transform(df_raw)
# df_raw.select('body_vec_token').take(5)

# #clean old column
# %time df_raw = df_raw.drop('body_vec')

# #remove stop words
# remover = StopWordsRemover(inputCol = "body_vec_token", outputCol = "body_vec_token_nosw")
# df_raw = remover.transform(df_raw)
# df_raw.select('body_vec_token_nosw').take(5)

# #clean old column
# %time df_raw = df_raw.drop('body_vec_token')

# #stem all tokens
# stemmer = PorterStemming(inputCol = "body_vec_token_nosw", outputCol = "body_vec_cleaned")
# df_raw = stemmer.transform(df_raw)
# df_raw.select('body_vec_cleaned').take(5)

# #clean old column
# %time df_raw = df_raw.drop('body_vec_token_nosw')

# #hashingTF vectorization
# hashingTF = HashingTF(inputCol="body_vec_cleaned", outputCol="body_vec_tf")
# df_raw = hashingTF.transform(df_raw)
# df_raw.select('body_vec_tf').take(5)

# #clean old column
# %time df_raw = df_raw.drop('body_vec_cleaned')

# #IDF vectorization
# idf = IDF(inputCol="body_vec_tf", outputCol="body_vec_tfidf")
# df_raw = idf.fit(df_raw).transform(df_raw)
# df_raw.select('body_vec_tfidf').take(5)

In [21]:
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover, IDF
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="body_vec", outputCol="body_vec_token")
remover = StopWordsRemover(inputCol = "body_vec_token", outputCol = "body_vec_token_nosw")
stemmer = PorterStemming(inputCol = "body_vec_token_nosw", outputCol = "body_vec_cleaned")
hashingTF = HashingTF(inputCol="body_vec_cleaned", outputCol="body_vec_tf")

pipeline = Pipeline(stages=[tokenizer, remover, stemmer, hashingTF])

In [22]:
%time df_raw = pipeline.fit(df_raw).transform(df_raw)

CPU times: user 43.3 ms, sys: 8.26 ms, total: 51.6 ms
Wall time: 282 ms


In [23]:
%time df_raw.select("body_vec_tf").take(5)

CPU times: user 7.12 ms, sys: 4.87 ms, total: 12 ms
Wall time: 7.43 s


[Row(body_vec_tf=SparseVector(262144, {7761: 1.0, 12074: 1.0, 68641: 1.0, 74383: 1.0, 145449: 1.0, 197339: 1.0, 223821: 1.0})),
 Row(body_vec_tf=SparseVector(262144, {55039: 1.0, 65212: 1.0, 180893: 1.0, 190256: 1.0, 204137: 1.0})),
 Row(body_vec_tf=SparseVector(262144, {8227: 1.0, 55639: 1.0, 75173: 1.0, 82321: 1.0, 82657: 1.0, 168976: 1.0, 235413: 1.0})),
 Row(body_vec_tf=SparseVector(262144, {1007: 1.0, 109810: 1.0, 172796: 1.0, 221124: 1.0, 248891: 1.0})),
 Row(body_vec_tf=SparseVector(262144, {86850: 1.0, 181489: 1.0, 198829: 1.0, 208258: 1.0}))]

In [24]:
df_raw = df_raw.drop('body_vec')
df_raw = df_raw.drop('body_vec_token')
df_raw = df_raw.drop('body_vec_token_nosw')
df_raw = df_raw.drop('body_vec_cleaned')

In [25]:
idf = IDF(inputCol="body_vec_tf", outputCol="body_vec_tfidf")

df_raw = idf.fit(df_raw).transform(df_raw)

df_raw.select('body_vec_tfidf').take(5)

[Row(body_vec_tfidf=SparseVector(262144, {7761: 4.7521, 12074: 3.7989, 68641: 6.758, 74383: 6.2482, 145449: 7.753, 197339: 4.5417, 223821: 5.294})),
 Row(body_vec_tfidf=SparseVector(262144, {55039: 3.8732, 65212: 3.241, 180893: 9.2539, 190256: 3.4849, 204137: 6.6828})),
 Row(body_vec_tfidf=SparseVector(262144, {8227: 7.181, 55639: 3.9531, 75173: 7.287, 82321: 6.0177, 82657: 6.2106, 168976: 3.9227, 235413: 6.9651})),
 Row(body_vec_tfidf=SparseVector(262144, {1007: 10.7641, 109810: 4.7971, 172796: 6.4791, 221124: 4.506, 248891: 7.9858})),
 Row(body_vec_tfidf=SparseVector(262144, {86850: 5.0907, 181489: 5.3979, 198829: 6.5195, 208258: 2.7532}))]

In [26]:
df_raw.take(1)

[Row(body='almost a third of registered voters in CA are republican, though...', score_hidden=None, archived=None, name=None, author='Starkravingmad7', author_flair_text=None, downs=None, created_utc='1502391062', subreddit_id='t5_2cneq', link_id='t3_6stdi4', parent_id='t1_dlfk7kb', score='2', retrieved_on='1503941308', controversiality='0', gilded='0', id='dlft5kl', subreddit='politics', ups=None, distinguished=None, author_flair_css_class=None, body_vec_tf=SparseVector(262144, {7761: 1.0, 12074: 1.0, 68641: 1.0, 74383: 1.0, 145449: 1.0, 197339: 1.0, 223821: 1.0}), body_vec_tfidf=SparseVector(262144, {7761: 4.7521, 12074: 3.7989, 68641: 6.758, 74383: 6.2482, 145449: 7.753, 197339: 4.5417, 223821: 5.294}))]