In [1]:
# import pyspark
# from pyspark.sql import SQLContext
# 
# # create spark contexts
# sc = pyspark.SparkContext()
# sqlContext = SQLContext(sc)

## Neccessary modules and python files
* **langid**
* **nltk**

### Install **langid**

```
pip install langid
```

### Download nltk

* install **nltk** module
```
pip install nltk
```

* download corpora
```
# enter python interactive environment
python
# type python script
from ntlk import download
download()
```


### Get `preproc.py`
* Reference: https://github.com/dreyco676/nlp_spark
* Get `preproc.py`: `wget https://raw.githubusercontent.com/dreyco676/nlp_spark/master/preproc.py`
* `preproc.py` has to be in the same directory with your *ipynb* file

### Get practice data

```
git clone https://github.com/dreyco676/nlp_spark.git
cd nlp_spark/
unzip data.zip
```

# User defined functions

In [2]:
# from pyspark.sql.functions import udf
# from pyspark.sql.types import StringType
# import preproc as pp
# # Register all the functions in Preproc with Spark Context
# check_lang_udf = udf(pp.check_lang, StringType())
# remove_stops_udf = udf(pp.remove_stops, StringType())
# remove_features_udf = udf(pp.remove_features, StringType())
# tag_and_remove_udf = udf(pp.tag_and_remove, StringType())
# lemmatize_udf = udf(pp.lemmatize, StringType())
# check_blanks_udf = udf(pp.check_blanks, StringType())

# Data Preprocessing

### Load data

In [3]:
raw_classified = spark.read.csv('nlp_spark/data/raw_classified.txt', inferSchema=True, sep='\t').toDF('text', 'id', 'label')

In [4]:
raw_classified.show(5)

+--------------------+------------------+-----+
|                text|                id|label|
+--------------------+------------------+-----+
|Fresh install of ...|        1018769417|  1.0|
|Well. Now I know ...|       10284216536|  1.0|
|"Literally six we...|       10298589026|  1.0|
|Mitsubishi i MiEV...|109017669432377344|  1.0|
|'Cheap Eats in SL...|109642968603963392|  1.0|
+--------------------+------------------+-----+
only showing top 5 rows



### Remove single/double quotes and space at the begining and end of string

In [5]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
trim_quotes_and_space_udf = udf(lambda s: s.strip().strip('"').strip("'"), StringType())
raw_classified_v1 = raw_classified.withColumn('text', trim_quotes_and_space_udf(raw_classified.text))

In [6]:
raw_classified_v1.show(5)
print('Total rows: {}'.format(raw_classified_v1.count()))

+--------------------+------------------+-----+
|                text|                id|label|
+--------------------+------------------+-----+
|Fresh install of ...|        1018769417|  1.0|
|Well. Now I know ...|       10284216536|  1.0|
|Literally six wee...|       10298589026|  1.0|
|Mitsubishi i MiEV...|109017669432377344|  1.0|
|Cheap Eats in SLP...|109642968603963392|  1.0|
+--------------------+------------------+-----+
only showing top 5 rows

Total rows: 115886


In [7]:
raw_classified_v1.select('text').toPandas()['text'][4]

  from pkg_resources import resource_stream


u"Cheap Eats in SLP' - http://t.co/4w8gRp7"

### Check string length in column 'text'

In [8]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
string_length_udf = udf(lambda s: len(s), IntegerType())
raw_classified_v2 = raw_classified_v1.withColumn('text_length', string_length_udf(raw_classified_v1.text))

In [26]:
raw_classified_v2.orderBy('text_length').show(5)
raw_classified_v2.orderBy('text_length', ascending=False).show(5)

+----------+------------------+-----+-----------+
|      text|                id|label|text_length|
+----------+------------------+-----+-----------+
|   awesome|         882098800|  1.0|          7|
|  City Wok|665255511273570305|  1.0|          8|
| #iknowhow|537734337655889921|  1.0|          9|
|It's cold.|159983261911760896|  1.0|         10|
|Boarded :)|413084478760685568|  1.0|         10|
+----------+------------------+-----+-----------+
only showing top 5 rows

+--------------------+------------------+-----+-----------+
|                text|                id|label|text_length|
+--------------------+------------------+-----+-----------+
|A girlfriend that...|424763466897846272|  1.0|        256|
|Real boyfriends &...|429215572241285121|  1.0|        166|
|RT @julieklausner...|199565254542376960|  1.0|        164|
|Real boyfriends &...|418449856298889216|  1.0|        164|
|Damn. I want to s...|445063125696401409|  1.0|        159|
+--------------------+------------------+----

* The minimum length is 7 and maximum length is 256. Therefore, no empty strings or None values in column 'text'

In [27]:
raw_classified_v2.show(5)

+--------------------+------------------+-----+-----------+
|                text|                id|label|text_length|
+--------------------+------------------+-----+-----------+
|Fresh install of ...|        1018769417|  1.0|         61|
|Well. Now I know ...|       10284216536|  1.0|         85|
|Literally six wee...|       10298589026|  1.0|        134|
|Mitsubishi i MiEV...|109017669432377344|  1.0|         90|
|Cheap Eats in SLP...|109642968603963392|  1.0|         40|
+--------------------+------------------+-----+-----------+
only showing top 5 rows



* How many labels are there?

In [28]:
raw_classified.select('label').distinct().show()

+-----+
|label|
+-----+
|  0.0|
|  1.0|
+-----+



* Check values in column 'id'
    + From the id sorted results, no NA, None values exist in column 'id'

In [29]:
raw_classified_v2.orderBy('id').show(5)
raw_classified_v2.orderBy('id', ascending=False).show(5)

+--------------------+---------+-----+-----------+
|                text|       id|label|text_length|
+--------------------+---------+-----+-----------+
|Sorry! Account de...|797858706|  1.0|         59|
|Yo am I imagining...|798243247|  1.0|        127|
|Midnight coffee i...|798474877|  1.0|         48|
|I'm sad that Mike...|799151574|  1.0|         45|
|      Peter fixed it|799331338|  1.0|         14|
+--------------------+---------+-----+-----------+
only showing top 5 rows

+--------------------+------------------+-----+-----------+
|                text|                id|label|text_length|
+--------------------+------------------+-----+-----------+
|White Dwarf 100 "...|679856481798369282|  1.0|        123|
|White Dwarf 100 "...|679856481798369282|  1.0|        123|
|White Dwarf 100 �...|679851755815985153|  1.0|        135|
|RT @iwan0www: Pro...|679847620995579904|  1.0|        122|
|RT @mikeolson: 9 ...|679847263238250497|  0.0|        140|
+--------------------+--------------

### Identify language

In [36]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from langid import classify
identify_lang_code_udf = udf(lambda s: classify(s)[0])
identify_lang_prob_udf = udf(lambda s: classify(s)[1])
raw_classified_v3 = raw_classified_v2.\
                    withColumn('lang', identify_lang_code_udf(raw_classified_v2.text)).\
                    withColumn('lang_prob', identify_lang_prob_udf(raw_classified_v3.text))

In [37]:
raw_classified_v3.show(5)

+--------------------+------------------+-----+-----------+----+-------------------+
|                text|                id|label|text_length|lang|          lang_prob|
+--------------------+------------------+-----+-----------+----+-------------------+
|Fresh install of ...|        1018769417|  1.0|         61|  en| -29.55938959121704|
|Well. Now I know ...|       10284216536|  1.0|         85|  en|-159.12265014648438|
|Literally six wee...|       10298589026|  1.0|        134|  en| -183.2326889038086|
|Mitsubishi i MiEV...|109017669432377344|  1.0|         90|  en|-108.64056301116943|
|Cheap Eats in SLP...|109642968603963392|  1.0|         40|  en|-16.089602947235107|
+--------------------+------------------+-----+-----------+----+-------------------+
only showing top 5 rows



In [39]:
raw_classified_v3.describe('lang_prob').show()

+-------+--------------------+
|summary|           lang_prob|
+-------+--------------------+
|  count|              115886|
|   mean| -138.90801413837985|
| stddev|   78.35839492856009|
|    min|-0.05613517761230469|
|    max|  3.6121368408203125|
+-------+--------------------+



In [40]:
raw_classified_v3.orderBy('lang_prob').show(5)

+--------------------+------------------+-----+-----------+----+--------------------+
|                text|                id|label|text_length|lang|           lang_prob|
+--------------------+------------------+-----+-----------+----+--------------------+
|Automating big-da...|654986512367398915|  0.0|         74|  en|-0.05613517761230469|
|2010 Home Remodel...|       12962035649|  1.0|         66|  en| -0.5808463096618652|
|Men Explain &lt;e...|678837202969751552|  1.0|         84|  en| -0.9656310081481934|
|Google Analytics ...|513002286629806080|  1.0|         85|  en| -0.9861445426940918|
|RT @albertpak: #E...|659205951313022977|  1.0|         93|  en| -1.0300307273864746|
+--------------------+------------------+-----+-----------+----+--------------------+
only showing top 5 rows



In [41]:
raw_classified_v3.orderBy('lang_prob').toPandas()[:10]

Unnamed: 0,text,id,label,text_length,lang,lang_prob
0,Automating big-data analysis http://t.co/oKmeA...,654986512367398915,0.0,74,en,-0.0561351776123046
1,2010 Home Remodeling Tour in St. Louis Park - ...,12962035649,1.0,66,en,-0.5808463096618652
2,Men Explain &lt;em&gt;Lolita &lt;/em&gt;to Me ...,678837202969751552,1.0,84,en,-0.9656310081481934
3,Google Analytics Adds AdWords Return on Ad Spe...,513002286629806080,1.0,85,en,-0.9861445426940918
4,RT @albertpak: #ES6 Overview in 350 Bullet Poi...,659205951313022977,1.0,93,en,-1.0300307273864746
5,RT @blattnerma: Modern Methods for Sentiment A...,583126282373865472,0.0,93,en,-1.1199603080749512
6,#TipOfTheDay Indexing in #SQL http://t.co/XM9K...,622269821862981632,1.0,52,en,-1.2104015350341797
7,BBC News: Donut-shaped 'compass' in fly brain ...,598903110258524160,1.0,70,en,-1.233856201171875
8,Tell me a story!,25346982865,1.0,16,en,-1.2732758522033691
9,I love his smile.,426575800360394752,1.0,17,en,-1.4227490425109863


* select 'english' rows

In [42]:
raw_classified_v4 = raw_classified_v3.filter(raw_classified_v3.lang == 'en')
raw_classified_v4.count()

115886

### Remove stopwords  to reduce dimensionality

    * reference: http://www.nltk.org/book/ch02.html

In [71]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from nltk.corpus import stopwords

def remove_stopwords(s):
    stopwords_arr = stopwords.words('English')
    words_arr = s.split()
    cleaned_words = []
    for word in words_arr:
        ## lowercase the word
        if word.lower() not in stopwords_arr:
            cleaned_words.append(word)
    return ' '.join(cleaned_words)

remove_stopwords_udf = udf(remove_stopwords, StringType())

from preproc import remove_features
remove_features_udf = udf(remove_features, StringType())

raw_classified_v5 = raw_classified_v4.\
                    withColumn('no_stopwords_text', remove_stopwords_udf(raw_classified_v4.text))
raw_classified_v6 = raw_classified_v5.\
                    withColumn('no_features_text', remove_features_udf(raw_classified_v5.no_stopwords_text))

In [73]:
raw_classified_v7 = raw_classified_v6.select('text','id','label','lang','lang_prob','no_features_text')
raw_classified_v7.show()

+--------------------+------------------+-----+----+-------------------+--------------------+
|                text|                id|label|lang|          lang_prob|    no_features_text|
+--------------------+------------------+-----+----+-------------------+--------------------+
|Fresh install of ...|        1018769417|  1.0|  en| -29.55938959121704|fresh install  ne...|
|Well. Now I know ...|       10284216536|  1.0|  en|-159.12265014648438|well know  want k...|
|Literally six wee...|       10298589026|  1.0|  en| -183.2326889038086|literally six wee...|
|Mitsubishi i MiEV...|109017669432377344|  1.0|  en|-108.64056301116943|mitsubishi miev w...|
|Cheap Eats in SLP...|109642968603963392|  1.0|  en|-16.089602947235107|      cheap eats slp|
|Teenage Mutant Ni...|       10995492579|  1.0|  en| -97.68339467048645|teenage mutant ni...|
|New demographic s...|       11713360136|  1.0|  en| -67.72868013381958|new demographic s...|
|hi all - i'm goin...|        1208319583|  1.0|  en|-223.625

### Tag and keep verbs, nouns, adjectives

In [76]:
from preproc import tag_and_remove
tag_and_remove_udf = udf(tag_and_remove, StringType())
raw_classified_v8 = raw_classified_v7.\
                    withColumn('tagged_text', remove_features_udf(raw_classified_v7.no_features_text))

In [77]:
raw_classified_v8.show(5)

+--------------------+------------------+-----+----+-------------------+--------------------+--------------------+
|                text|                id|label|lang|          lang_prob|    no_features_text|         tagged_text|
+--------------------+------------------+-----+----+-------------------+--------------------+--------------------+
|Fresh install of ...|        1018769417|  1.0|  en| -29.55938959121704|fresh install  ne...|fresh install new...|
|Well. Now I know ...|       10284216536|  1.0|  en|-159.12265014648438|well know  want k...|well know want kn...|
|Literally six wee...|       10298589026|  1.0|  en| -183.2326889038086|literally six wee...|literally six wee...|
|Mitsubishi i MiEV...|109017669432377344|  1.0|  en|-108.64056301116943|mitsubishi miev w...|mitsubishi miev w...|
|Cheap Eats in SLP...|109642968603963392|  1.0|  en|-16.089602947235107|      cheap eats slp|      cheap eats slp|
+--------------------+------------------+-----+----+-------------------+--------

### Lemmitizing remaining words

In [79]:
from preproc import lemmatize
lemmatize_udf = udf(lemmatize, StringType())
lemm_df = raw_classified_v8 = raw_classified_v8.\
                              withColumn('lemm_text', remove_features_udf(raw_classified_v8.tagged_text))

In [80]:
lemm_df.orderBy('tagged_text').show()

+--------------------+------------------+-----+----+-------------------+--------------------+--------------------+--------------------+
|                text|                id|label|lang|          lang_prob|    no_features_text|         tagged_text|           lemm_text|
+--------------------+------------------+-----+----+-------------------+--------------------+--------------------+--------------------+
|KS is up! http://...|335930628354752512|  1.0|  en|  -31.7122220993042|                    |                    |                    |
|RT @AAA_Minneapol...|561370893637545987|  1.0|  en| -156.6396360397339|  aaa minneapolis...|aaa minneapolis k...|aaa minneapolis k...|
|Aaaa, don't ask m...|106045997377003521|  1.0|  en|-200.86466455459595|aaaa don  ask rea...|aaaa don ask read...|aaaa don ask read...|
|AAAAAAAAAAAAA Kin...|        2365823927|  1.0|  en|-25.333629608154297|aaaaaaaaaaaaa kin...|aaaaaaaaaaaaa kin...|aaaaaaaaaaaaa kin...|
|RT @Mike_Doughty_...|356241623140671490|  1.0| 

### remove all rows containing only blank spaces

In [81]:
from preproc import check_blanks
check_blanks_udf = udf(check_blanks, StringType())

check_blanks_df = lemm_df.withColumn("is_blank", check_blanks_udf(lemm_df["lemm_text"]))
no_blanks_df = check_blanks_df.filter(check_blanks_df["is_blank"] == "False")

In [86]:
no_blanks_df.show(5)

+--------------------+------------------+-----+----+-------------------+--------------------+--------------------+--------------------+--------+
|                text|                id|label|lang|          lang_prob|    no_features_text|         tagged_text|           lemm_text|is_blank|
+--------------------+------------------+-----+----+-------------------+--------------------+--------------------+--------------------+--------+
|fresh install new...|        1018769417|  1.0|  en| -29.55938959121704|fresh install  ne...|fresh install new...|fresh install new...|   False|
|well know want kn...|       10284216536|  1.0|  en|-159.12265014648438|well know  want k...|well know want kn...|well know want kn...|   False|
|literally six wee...|       10298589026|  1.0|  en| -183.2326889038086|literally six wee...|literally six wee...|literally six wee...|   False|
|mitsubishi miev w...|109017669432377344|  1.0|  en|-108.64056301116943|mitsubishi miev w...|mitsubishi miev w...|mitsubishi miev 

In [87]:
# rename columns
no_blanks_df = no_blanks_df.withColumn("text",no_blanks_df.lemm_text)

In [88]:
# dedupe important since alot of the tweets only differed by url's and RT mentions
dedup_df = no_blanks_df.dropDuplicates(['text', 'label'])

In [89]:
# select only the columns we care about
data_set = dedup_df.select('id', 'text','label')

In [90]:
data_set.show(4)

+------------------+--------------------+-----+
|                id|                text|label|
+------------------+--------------------+-----+
|563146734163865600|annual dinner tha...|  1.0|
|163934632260288512|new blog post arm...|  1.0|
|416823132796637184|help what area pa...|  1.0|
|615340297178992640|much panic would ...|  1.0|
+------------------+--------------------+-----+
only showing top 4 rows



In [91]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data_set.randomSplit([0.6, 0.4])

In [25]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier 
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer

#### Sting tokenizer

In [92]:
from pyspark.ml.feature import Tokenizer
tokenizer_mod = Tokenizer(inputCol='text', outputCol='tokens')
tokenizer_df = tokenizer_mod.transform(data_set)
tokenizer_df.show(5)

+------------------+--------------------+-----+--------------------+
|                id|                text|label|              tokens|
+------------------+--------------------+-----+--------------------+
|563146734163865600|annual dinner tha...|  1.0|[annual, dinner, ...|
|163934632260288512|new blog post arm...|  1.0|[new, blog, post,...|
|416823132796637184|help what area pa...|  1.0|[help, what, area...|
|615340297178992640|much panic would ...|  1.0|[much, panic, wou...|
|679049430213988352|improve dataliter...|  1.0|[improve, datalit...|
+------------------+--------------------+-----+--------------------+
only showing top 5 rows



#### Hashing token frequency

In [93]:
from pyspark.ml.feature import HashingTF
hashingTF_mod = HashingTF(inputCol='tokens', outputCol='features')
hashingTF_df = hashingTF_mod.transform(tokenizer_df)
hashingTF_df.show(5)

+------------------+--------------------+-----+--------------------+--------------------+
|                id|                text|label|              tokens|            features|
+------------------+--------------------+-----+--------------------+--------------------+
|563146734163865600|annual dinner tha...|  1.0|[annual, dinner, ...|(262144,[12336,24...|
|163934632260288512|new blog post arm...|  1.0|[new, blog, post,...|(262144,[20785,29...|
|416823132796637184|help what area pa...|  1.0|[help, what, area...|(262144,[2025,880...|
|615340297178992640|much panic would ...|  1.0|[much, panic, wou...|(262144,[11910,29...|
|679049430213988352|improve dataliter...|  1.0|[improve, datalit...|(262144,[48482,99...|
+------------------+--------------------+-----+--------------------+--------------------+
only showing top 5 rows



In [97]:
hashingTF_df.select('features').head()

Row(features=SparseVector(262144, {12336: 1.0, 24113: 1.0, 124634: 1.0, 132458: 1.0, 149413: 1.0, 156567: 1.0, 192114: 1.0, 194253: 1.0, 216199: 1.0, 251358: 1.0}))

In [26]:
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and nb.

# A tokenizer that converts the input string to lowercase and then splits it by white spaces.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
idf = IDF(minDocFreq=3, inputCol="features", outputCol="idf")

In [27]:
# 
nb = NaiveBayes()

In [28]:
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, nb])

In [29]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

In [30]:
# Make predictions.
predictions = model.transform(testData)

In [31]:
# Select example rows to display.
predictions.select("text", "label", "prediction").show(5)

+--------------------+-----+----------+
|                text|label|prediction|
+--------------------+-----+----------+
|           hurt much|  1.0|       1.0|
|teforia use machi...|  1.0|       1.0|
|              finish|  1.0|       1.0|
|future blase vice...|  1.0|       1.0|
|              divine|  1.0|       1.0|
+--------------------+-----+----------+
only showing top 5 rows



In [32]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.912655971479501

### Cross Validation

In [33]:
#paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 1.0]).build()
# paramGrid = ParamGridBuilder().addGrid(rf.maxDepth,[4,8,10]).\
#                     addGrid(rf.impurity, ['entropy','gini']).build()


# cv = CrossValidator(estimator=pipeline, 
#                     estimatorParamMaps=paramGrid, 
#                     evaluator=MulticlassClassificationEvaluator(), 
#                     numFolds=4)
                    

# #training_df.show(5)  
# cvModel = cv.fit(training_df)

In [34]:
#prediction = cvModel.transform(test_df)