In [1]:
import findspark
# my local spark install
findspark.init('/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/')

import pyspark
from pyspark.sql import SQLContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import preproc as pp


sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)

In [2]:
# Register all the functions in Preproc with Spark Context
check_lang_udf = udf(pp.check_lang, StringType())
remove_stops_udf = udf(pp.remove_stops, StringType())
remove_features_udf = udf(pp.remove_features, StringType())
tag_and_remove_udf = udf(pp.tag_and_remove, StringType())
lemmatize_udf = udf(pp.lemmatize, StringType())
check_blanks_udf = udf(pp.check_blanks, StringType())
numeric_label_udf = udf(pp.numeric_label, DoubleType())

In [None]:
# UNCOMMENT IF LOADING FROM HIVE

In [5]:
# Load a text file and convert each line to a Row.
data_rdd = sc.textFile("data/raw_classified.txt")
parts_rdd = data_rdd.map(lambda l: l.split("\t"))
# Filter bad rows out
garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3)
# Create DataFrame
data_df = sqlContext.createDataFrame(garantee_col_rdd, ["text", "id", "text_label"])

In [None]:
# predict language and filter out those with less than 90% chance of being English
lang_df = training_df.withColumn("lang", check_lang_udf(data_df["text"]))
en_df = lang_df.filter(lang_df["lang"] == "en")

In [None]:
# remove stop words to reduce dimensionality
rm_stops_df = en_df.withColumn("stop_text", remove_stops_udf(en_df["text"]))

In [None]:
# remove other non essential words, think of it as my personal stop word list
rm_features_df = rm_stops_df.withColumn("feat_text", remove_features_udf(rm_stops_df["stop_text"]))

In [None]:
# tag the words remaining and keep only Nouns, Verbs and Adjectives
tagged_df = rm_features_df.withColumn("tagged_text", tag_and_remove_udf(rm_features_df["feat_text"]))

In [None]:
# lemmatization of remaining words to reduce dimensionality & boost measures
lemm_df = tagged_df.withColumn("lemm_text", lemmatize_udf(tagged_df["tagged_text"]))

In [None]:
# remove all rows containing only blank spaces
check_blanks_df = lemm_df.withColumn("is_blank", check_blanks_udf(lemm_df["lemm_text"]))
no_blanks_df = check_blanks_df.filter(check_blanks_df["is_blank"] == "False")

In [None]:
num_label_df = no_blanks_df.withColumn("label", numeric_label_udf(no_blanks_df['text_label']))

In [None]:
# rename columns
num_label_df.withColumnRenamed(num_label_df["lemm_text"], "text")

In [None]:
# select only the columns we care about
data_set = num_label_df.select(num_label_df['id'], num_label_df['text'], num_label_df['label'])

In [None]:
# split training & validation sets with 60% to training and use a seed value of 1987
splits = data_set.randomSplit([0.6, .04], 1987)
training_df = splits[0]
test_df = splits[1]

In [4]:
# UNCOMMENT TO SKIP PREPROCESSING AND START WITH CLEAN DATA!!!
def reload_checkpoint(data_rdd):
    parts_rdd = data_rdd.map(lambda l: l.split("\t"))
    # Filter bad rows out
    garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3)
    typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2])))
    # Create DataFrame
    df = sqlContext.createDataFrame(typed_rdd, ["id", "text", "label"])
    return df


# Load precleaned training set
training_rdd = sc.textFile("data/clean_training.txt")
training_df = reload_checkpoint(training_rdd)
# Load precleaned test set
test_rdd = sc.textFile("data/clean_test.txt")
test_df = reload_checkpoint(test_rdd)

In [5]:
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and nb.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
pipeline = Pipeline(stages=[tokenizer, hashingTF, nb])

In [6]:
# Fit the pipeline to training documents.
model = pipeline.fit(training_df)

In [None]:
# Make predictions on test documents
result = model.transform(test_df)

# print columns of interest
selected = result.select("id", "text", "prediction")
for row in selected.collect():
    print(row)