In [1]:
import findspark
# my local spark install
findspark.init('/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/')

In [2]:
import pyspark
from pyspark.sql import SQLContext

# create spark contexts
sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)

In [15]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType
import preproc as pp
# Register all the functions in Preproc with Spark Context
check_lang_udf = udf(pp.check_lang, StringType())
remove_stops_udf = udf(pp.remove_stops, StringType())
remove_features_udf = udf(pp.remove_features, StringType())
tag_and_remove_udf = udf(pp.tag_and_remove, StringType())
lemmatize_udf = udf(pp.lemmatize, StringType())
check_blanks_udf = udf(pp.check_blanks, StringType())
numeric_label_udf = udf(pp.numeric_label, DoubleType())

In [4]:
# Load a text file and convert each line to a Row.
data_rdd = sc.textFile("data/toy_set.txt")
parts_rdd = data_rdd.map(lambda l: l.split("\t"))
# Filter bad rows out
garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3)
# Create DataFrame
data_df = sqlContext.createDataFrame(garantee_col_rdd, ["text", "id", "text_label"])

In [5]:
# predict language and filter out those with less than 90% chance of being English
lang_df = data_df.withColumn("lang", check_lang_udf(data_df["text"]))
en_df = lang_df.filter(lang_df["lang"] == "en")

+--------------------+------------------+----------+
|                text|                id|text_label|
+--------------------+------------------+----------+
|What's my best op...|514511437985611776|    python|
|RT @AnthonyNystro...|492887601045467137|    python|
|RT @raymondh: #py...|464730495213768704|    python|
|Checked out https...|443215773168066560|    python|
|#openscience and ...|443004371425849344|    python|
+--------------------+------------------+----------+
only showing top 5 rows

+--------------------+------------------+----------+----+
|                text|                id|text_label|lang|
+--------------------+------------------+----------+----+
|What's my best op...|514511437985611776|    python|  en|
|RT @AnthonyNystro...|492887601045467137|    python|  en|
|RT @raymondh: #py...|464730495213768704|    python|  en|
|Checked out https...|443215773168066560|    python|  en|
|#openscience and ...|443004371425849344|    python|  en|
+--------------------+------------




In [9]:
# remove stop words to reduce dimensionality
rm_stops_df = en_df.withColumn("stop_text", remove_stops_udf(en_df["text"]))

+---------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                             |
+---------------------------------------------------------------------------------------------------------------------------------+
|What's my best option for doing a MANOVA in #python ? A brief google search leaves me thinking R is the way to go. #pydata #stats|
+---------------------------------------------------------------------------------------------------------------------------------+
only showing top 1 row

+--------------------------------------------------------------------------------------------------+
|stop_text                                                                                         |
+--------------------------------------------------------------------------------------------------+
|What's best 




In [11]:
# remove other non essential words, think of it as my personal stop word list
rm_features_df = rm_stops_df.withColumn("feat_text", remove_features_udf(rm_stops_df["stop_text"]))

+------------------------------------------------------------------------------------------------------------------------------------------+
|stop_text                                                                                                                                 |
+------------------------------------------------------------------------------------------------------------------------------------------+
|What's best option MANOVA #python ? A brief google search leaves thinking R way go. #pydata #stats                                        |
|RT @AnthonyNystrom: ML py -&gt; curated list awesome Machine Learning frameworks, libraries software -&gt; https://t.co/leY5PUi7Lh #python|
|RT @raymondh: #python data analysis commandment: Thou shalt access thine fields indexing. Instead, use #pandas dataframes nam…            |
|Checked https://t.co/Zj7Wry6vKh text-to-speech #python worked like champion box!                                                          |
|#openscience




In [13]:
# tag the words remaining and keep only Nouns, Verbs and Adjectives
tagged_df = rm_features_df.withColumn("tagged_text", tag_and_remove_udf(rm_features_df["feat_text"]))

+------------------------------------------------------------------------------------------------------------+
|feat_text                                                                                                   |
+------------------------------------------------------------------------------------------------------------+
|what  best option manova python  brief google search leaves thinking  way  pydata stats                     |
|     curated list awesome machine learning frameworks libraries software  python                            |
|  python data analysis commandment thou shalt access thine fields indexing instead use pandas dataframes    |
|checked text  speech python worked like champion box                                                        |
|openscience python peeps want service share research data looking want built  ipython notebook viewing ideas|
+------------------------------------------------------------------------------------------------------------+
o




In [18]:
# lemmatization of remaining words to reduce dimensionality & boost measures
lemm_df = tagged_df.withColumn("lemm_text", lemmatize_udf(tagged_df["tagged_text"]))

+-------------------------------------------------------------------------------------------------------------+
|tagged_text                                                                                                  |
+-------------------------------------------------------------------------------------------------------------+
| best option manova python brief google search leaves thinking way pydata stats                              |
| curated list awesome machine learning frameworks libraries software python                                  |
| python data analysis commandment thou shalt access thine fields indexing use pandas dataframes              |
| checked text speech python worked champion box                                                              |
| openscience python peeps want service share research data looking want built ipython notebook viewing ideas |
| thanks missed john hunter rip matplotlib wonderful important legacy positivepython                    




In [None]:
# remove all rows containing only blank spaces
check_blanks_df = lemm_df.withColumn("is_blank", check_blanks_udf(lemm_df["lemm_text"]))
no_blanks_df = check_blanks_df.filter(check_blanks_df["is_blank"] == "False")

In [None]:
num_label_df = no_blanks_df.withColumn("label", numeric_label_udf(no_blanks_df['text_label']))

In [None]:
# rename columns
num_label_df.withColumnRenamed(num_label_df["lemm_text"], "text")

In [None]:
# dedupe important since alot of the tweets only differed by url's and RT mentions
dedup_df = num_label_df.dropDuplicates(['text', 'label'])

In [None]:
# select only the columns we care about
data_set = dedup_df.select(num_label_df['id'], num_label_df['text'], num_label_df['label'])

In [None]:
# split training & validation sets with 60% to training and use a seed value of 1987
splits = data_set.randomSplit([0.6, 0.4])
training_df = splits[0]
test_df = splits[1]

In [None]:
##################################################################
#
#   Spark ML Section
#   
#   Skip Preprocessing and use cleaned files by running next cell
#
##################################################################

In [3]:
# Load already cleaned data
def reload_checkpoint(data_rdd):
    parts_rdd = data_rdd.map(lambda l: l.split("\t"))
    # Filter bad rows out
    garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3)
    typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2])))
    # Create DataFrame
    df = sqlContext.createDataFrame(typed_rdd, ["id", "text", "label"])
    return df


# Load precleaned training set
training_rdd = sc.textFile("data/clean_training.txt")
training_df = reload_checkpoint(training_rdd)
# Load precleaned test set
test_rdd = sc.textFile("data/clean_test.txt")
test_df = reload_checkpoint(test_rdd)

In [4]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and nb.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
idf = IDF(minDocFreq=3, inputCol="features", outputCol="idf")
nb = NaiveBayes()
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, nb])


paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 1.0]).build()


cv = CrossValidator(estimator=pipeline, 
                    estimatorParamMaps=paramGrid, 
                    evaluator=MulticlassClassificationEvaluator(), 
                    numFolds=4)

cvModel = cv.fit(training_df)

In [5]:
result = cvModel.transform(test_df)
prediction_df = result.select("text", "label", "prediction")
prediction_df.show(truncate=False)

+----------------------------------------------------------------------------------------------------------------+-----+----------+
|text                                                                                                            |label|prediction|
+----------------------------------------------------------------------------------------------------------------+-----+----------+
|acolyte warmachine protectorateofmenoth                                                                         |1.0  |1.0       |
|alfred producthunt workflow product hunt workflow alfred python                                                 |1.0  |0.0       |
|alternative thanksgiving option inclined try something different year                                           |1.0  |1.0       |
|america greatest country world truth                                                                            |1.0  |1.0       |
|animatic hell lot better storyboards were le guesswork better camera angle 

In [6]:
datasci_df = prediction_df.filter(prediction_df['label']==0.0)
datasci_df.show(truncate=False)

+--------------------------------------------------------------------------------+-----+----------+
|text                                                                            |label|prediction|
+--------------------------------------------------------------------------------+-----+----------+
|big boy toy drone next big thing bigdata according story iot                    |0.0  |1.0       |
|big data result depend data look                                                |0.0  |1.0       |
|big data service say hadoop service inaccurate company                          |0.0  |0.0       |
|big data wild                                                                   |0.0  |0.0       |
|bigdata algorithm dominate life don consider datascience                        |0.0  |0.0       |
|bigdata analytics maturity continuum matrix http                                |0.0  |0.0       |
|cognitive technology microservices top analytics trend list bigdata cognitiveera|0.0  |0.0       |


In [7]:
ao_df = prediction_df.filter(prediction_df['label']==1.0)
ao_df.show(truncate=False)

+----------------------------------------------------------------------------------------------------------------+-----+----------+
|text                                                                                                            |label|prediction|
+----------------------------------------------------------------------------------------------------------------+-----+----------+
|acolyte warmachine protectorateofmenoth                                                                         |1.0  |1.0       |
|alfred producthunt workflow product hunt workflow alfred python                                                 |1.0  |0.0       |
|alternative thanksgiving option inclined try something different year                                           |1.0  |1.0       |
|america greatest country world truth                                                                            |1.0  |1.0       |
|animatic hell lot better storyboards were le guesswork better camera angle 

In [None]:
# TODO Add join back to original text
# TODO fix raw_classification labels
# TODO show accuracy measures

In [12]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(result, {evaluator.metricName: "precision"})

0.9228856806385485