Imports: pyspark, ml and standard python libraries

In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

from pyspark.ml.classification import LinearSVC
from pyspark.ml.linalg import Vectors
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from collections import Counter
import re

Define the method that extracts a certain field from a row in the rdd eg. the post body or score. The method uses a regular expression that parses the xml text format strings within the rdd.

In [2]:
def get_field(line, field_name):
    found = re.search("{}=\"(.*?)\"".format(field_name), line)
    if found:
        return found.group(1)

Define the method that extracts all the relevant fields for this analysis and puts them in a list of elements.

In [3]:
def get_relevant_fields(line):
    return [get_field(line, "Body"),
            get_field(line, "Score")]

Define the method for the filter that eliminates rows that are missing any of the relevant fields.

In [4]:
def any_missing(row):
    return row[0] and row[1]

Define the method that preprocesses the score value. It converts the strings to 1 or 0 integers depending on whether the score is positive.

In [5]:
def preprocess_scores(score):
    score = int(score)
    return 1 if score > 0 else 0

Define the method that removes stop words from word counters. The stop words were taken from the default english language stop words list from https://www.ranks.nl/stopwords.

In [6]:
def remove_stop_words(counter):
    stop_words = set(["a", "about", "above", "after", "again", "against", "all", "am", "an", "and", "any", "are", "arent", 
                     "as", "at", "be", "because", "been", "before", "being", "below", "between", "both", "but", "by", 
                     "cant", "cannot", "could", "couldnt", "did", "didnt", "do", "does", "doesnt", "doing", "dont", "down", 
                     "during", "each", "few", "for", "from", "further", "had", "hadnt", "has", "hasnt", "have", "havent", 
                     "having", "he", "hed", "hell", "hes", "her", "here", "heres", "hers", "herself", "him", "himself", 
                     "his", "how", "hows", "i", "id", "ill", "im", "ive", "if", "in", "into", "is", "isnt", "it", "its", 
                     "its", "itself", "lets", "me", "more", "most", "mustnt", "my", "myself", "no", "nor", "not", "of", 
                     "off", "on", "once", "only", "or", "other", "ought", "our", "ours 	ourselves", "out", "over", "own", 
                     "same", "shant", "she", "shed", "shell", "shes", "should", "shouldnt", "so", "some", "such", "than", 
                     "that", "thats", "the", "their", "theirs", "them", "themselves", "then", "there", "theres", "these", 
                     "they", "theyd", "theyll", "theyre", "theyve", "this", "those", "through", "to", "too", "under", 
                     "until", "up", "very", "was", "wasnt", "we", "wed", "well", "were", "weve", "were", "werent", "what", 
                     "whats", "when", "whens", "where", "wheres", "which", "while", "who", "whos", "whom", "why", "whys", 
                     "with", "wont", "would", "wouldnt", "you", "youd", "youll", "youre", "youve", "your", "yours", 
                     "yourself", "yourselves"])
    words = set(counter.keys())
    for word in words:
        if word in stop_words:
            del counter[word]
    return counter

Define the method for preprocessing text and converting it to a python Counter object that counts the words in the text that will be used for a Bag of Words approach. The method has a number of steps each is described by a comment.

In [7]:
def preprocess_text(text):
    # remove code snippets
    text = re.sub("&lt;pre&gt;&lt;code&gt;.*?&lt;/pre&gt;&lt;/code&gt;", "", text)
    
    # remove html tags
    text = re.sub("&lt;.*?&gt;", "", text)
    
    #remove whatever this is
    text = re.sub("&#xA;", "", text)
    
    # remove links
    text = re.sub("https?://.*?\s|$", "", text)
    
    # remove symbols
    text = re.sub("[^a-zA-Z\s]", "", text)
    
    # normalize whitespace
    text = re.sub("\s+", " ", text)
    
    # lowercase everything
    text = text.lower()
    
    # split text into words
    split = text.split()

    # count words
    counter = Counter(split)
    
    # remove stop words
    counter = remove_stop_words(counter)
    
    count_tuple_list = []
    for key in counter.keys():
        count_tuple_list.append((key, counter[key]))
    
    return count_tuple_list

Define a method that preprocesses all of the elements of the rdd. Converting score strings to 0 or 1 and the text body to word counters.

In [8]:
def preprocess_all(row):
    return (preprocess_scores(row[1]), preprocess_text(row[0]))

Assert the correctness of the large preprocessing method.

In [9]:
assert preprocess_text("&lt;pre&gt;&lt;code&gt;cprintf('hello world!');&lt;/pre&gt;&lt;/code&gt;"
                       "https://google.com "
                       "boulder leave leave it's it it?&lt;pre&gt;&lt;/pre&gt;") == [("boulder", 1), ("leave", 2)]

Define a method that converts the counter to a vector of relevant words. The arbitrary cutoff value is the number of repetition of the word that have to occur throughout the data for it to be added to the word vector.

In [10]:
ARBITRATRY_CUTOFF_VALUE = 10


def get_vector_words(reduced_count_tuple_list):
    vector = []
    for item in reduced_count_tuple_list:
        if item[1] < ARBITRATRY_CUTOFF_VALUE:
            vector.append(item[0])
    return vector

Initialize the spark context and load the text file to the context, it is an xml file containing post data. Each row of the rdd contains data about one post such as the text body, score and tags.

In [11]:
sc = SparkContext("local[*]")
rdd = sc.textFile("Posts.xml")

Apply all of the filtering and preprocessing methods to the rdd.

In [13]:
rdd_all = rdd.map(get_relevant_fields)
rdd_all = rdd_all.filter(any_missing)
rdd_all = rdd_all.map(preprocess_all)

Split the dataset between positive and negative scored posts.

In [14]:
rdd_pos = rdd_all.filter(lambda x: x[0] == 1)
rdd_neg = rdd_all.filter(lambda x: x[0] == 0)

Sample the positive posts to balance out the data.

In [15]:
rdd_pos = rdd_pos.sample(False, 0.1, 1)
rdd_all = rdd_pos.union(rdd_neg)

Reduce the rdd in order to get the word count for the entire dataset.

In [16]:
reduced = rdd_all.flatMap(lambda x: x[1]).reduceByKey(lambda x, y: x + y).collect()

Extract the word vector from the reduced word counts as defined in the get_vector_words method.

In [17]:
VECTOR_WORDS = get_vector_words(reduced)

N = len(VECTOR_WORDS)

Define a method that converts rdd rows to tuples containing float type category flags of either 1.0 or 0.0 and SparseVectors that rescribe the rows text in the VECTOR_WORDS space. Apply the method to the rdd.

In [18]:
def create_vector(row):
    vector = []
    index = []
    length = 0
    counter = {}
    for word, count in row[1]:
        counter[word] = count
    cat = float(row[0])
    for i in range(N):
        word = VECTOR_WORDS[i]
        if word in counter.keys():
            vector.append(float(counter[word]))
            index.append(i)
            length += 1
    return (cat, Vectors.sparse(N, index, vector))

Map the rdd data to vectors and then convert it to a spark DataFrame. Having the data in a DataFrame will enable the use of the state of the art dataframe based machine learning library that comes with spark.

In [19]:
rdd_points = rdd_all.map(create_vector)

sql = SQLContext(sc)

df_points = sql.createDataFrame(rdd_points, ["label", "features"])

Split the dataframe into a training and test set for modeling.

In [20]:
df_train, df_test = df_points.randomSplit(weights=[0.7, 0.3])

Train a support vector machine on the training data set.

In [21]:
lsvc = LinearSVC(maxIter = 10, regParam = 0.1)
lsvcModel = lsvc.fit(df_train)

Use the support vector model to predict values from the test data set.

In [22]:
predictions = lsvcModel.transform(df_test)

Calculate binary classification metrics to evaulate the model.

In [23]:
evaluation = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

evaluation.evaluate(predictions)

0.6404455099390671

The default metric returned by the evaluate method is the area under the ROC curve.