In [6]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.sql import functions as fn

conf = SparkConf().setAppName('sentiment_tweeter').setMaster('local[*]')
sc = pyspark.SparkContext(conf=conf)
sqlContext = HiveContext(sc)

In [7]:
df = sqlContext.read.parquet('sentiments.parquet')

In [11]:
df.show(5)

+-------------+---------+
|         word|sentiment|
+-------------+---------+
|   gratefully|        1|
|gratification|        1|
|    gratified|        1|
|    gratifies|        1|
|      gratify|        1|
+-------------+---------+
only showing top 5 rows



In [20]:
df.groupBy('sentiment').agg(fn.count('*')).show()

+---------+--------+
|sentiment|count(1)|
+---------+--------+
|        1|    2006|
|       -1|    4783|
+---------+--------+



In [21]:
df_imdb = sqlContext.read.parquet('imdb_reviews_preprocessed.parquet')

In [23]:
df_imdb.groupBy('score').agg(fn.count('*')).show()

+-----+--------+
|score|count(1)|
+-----+--------+
|  0.0|   12500|
|  1.0|   12500|
+-----+--------+



In [25]:
df_imdb.first()

Row(id='pos_10006', review='In this "critically acclaimed psychological thriller based on true events, Gabriel (Robin Williams), a celebrated writer and late-night talk show host, becomes captivated by the harrowing story of a young listener and his adoptive mother (Toni Collette). When troubling questions arise about this boy\'s (story), however, Gabriel finds himself drawn into a widening mystery that hides a deadly secret\x85" according to film\'s official synopsis.<br /><br />You really should STOP reading these comments, and watch the film NOW...<br /><br />The "How did he lose his leg?" ending, with Ms. Collette planning her new life, should be chopped off, and sent to "deleted scenes" land. It\'s overkill. The true nature of her physical and mental ailments should be obvious, by the time Mr. Williams returns to New York. Possibly, her blindness could be in question - but a revelation could have be made certain in either the "highway" or "video tape" scenes. The film would benefi

In [31]:
from pyspark.ml.feature import RegexTokenizer
tk = RegexTokenizer().setGaps(False).setPattern('\\p{L}+').setInputCol('review').setOutputCol('words')

In [32]:
df_words = tk.transform(df_imdb)

In [39]:
df_words.show(5)

+---------+--------------------+-----+--------------------+
|       id|              review|score|               words|
+---------+--------------------+-----+--------------------+
|pos_10006|In this "critical...|  1.0|[in, this, critic...|
|pos_10013|Like one of the p...|  1.0|[like, one, of, t...|
|pos_10022|Aro Tolbukhin bur...|  1.0|[aro, tolbukhin, ...|
|pos_10033|The movie Titanic...|  1.0|[the, movie, tita...|
| pos_1003|Another Aussie ma...|  1.0|[another, aussie,...|
+---------+--------------------+-----+--------------------+
only showing top 5 rows



## AVG

In [47]:
df_words.select('id', fn.explode('words').alias('word')).join(df, 'word').show(5)

+----------+---------+---------+
|      word|       id|sentiment|
+----------+---------+---------+
| acclaimed|pos_10006|        1|
|celebrated|pos_10006|        1|
| troubling|pos_10006|       -1|
|   mystery|pos_10006|       -1|
|    deadly|pos_10006|       -1|
+----------+---------+---------+
only showing top 5 rows



In [51]:
imdb_words_score = df_words.select('id', fn.explode('words').alias('word')).join(df, 'word')
simple_sentiment = imdb_words_score.groupBy('id').agg(fn.avg('sentiment').alias('avg_sent'))

In [52]:
simple_sentiment.show(5)

+---------+--------------------+
|       id|            avg_sent|
+---------+--------------------+
|pos_10149| 0.42857142857142855|
|pos_10377|  0.5384615384615384|
| pos_1299| 0.09090909090909091|
| pos_2228|-0.14285714285714285|
| pos_5052|  0.7777777777777778|
+---------+--------------------+
only showing top 5 rows



In [53]:
simple_sentiment_prd = simple_sentiment.withColumn('prediction', fn.when(fn.col('avg_sent') > 0, 1.0).otherwise(0.))

In [55]:
simple_sentiment_prd.show(5)

+---------+--------------------+----------+
|       id|            avg_sent|prediction|
+---------+--------------------+----------+
|pos_10149| 0.42857142857142855|       1.0|
|pos_10377|  0.5384615384615384|       1.0|
| pos_1299| 0.09090909090909091|       1.0|
| pos_2228|-0.14285714285714285|       0.0|
| pos_5052|  0.7777777777777778|       1.0|
+---------+--------------------+----------+
only showing top 5 rows



## Tf-Idf

In [60]:
from nltk.corpus import stopwords
stop_words = list(stopwords.words('english'))
# import requests
# stop_words = requests.get('http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words').text.split()
# stop_words[0:10]

In [65]:
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

sw_filter = StopWordsRemover().setStopWords(stop_words).setCaseSensitive(False).setInputCol("words").setOutputCol("filtered")
cv = CountVectorizer(minTF=1., minDF=5., vocabSize=2**17).setInputCol('filtered').setOutputCol('tf')
idf = IDF().setInputCol('tf').setOutputCol('tfidf')

In [68]:
from pyspark.ml import Pipeline

pipeline_tfidf = Pipeline(stages=[tk, sw_filter, cv, idf]).fit(df_imdb)
tfidf = pipeline_tfidf.transform(df_imdb)

In [69]:
tfidf.show(5)

+---------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+
|       id|              review|score|               words|            filtered|                  tf|               tfidf|
+---------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+
|pos_10006|In this "critical...|  1.0|[in, this, critic...|[critically, accl...|(26832,[0,2,4,5,6...|(26832,[0,2,4,5,6...|
|pos_10013|Like one of the p...|  1.0|[like, one, of, t...|[like, one, previ...|(26832,[1,2,3,4,8...|(26832,[1,2,3,4,8...|
|pos_10022|Aro Tolbukhin bur...|  1.0|[aro, tolbukhin, ...|[aro, tolbukhin, ...|(26832,[0,1,16,17...|(26832,[0,1,16,17...|
|pos_10033|The movie Titanic...|  1.0|[the, movie, tita...|[movie, titanic, ...|(26832,[0,1,2,3,4...|(26832,[0,1,2,3,4...|
| pos_1003|Another Aussie ma...|  1.0|[another, aussie,...|[another, aussie,...|(26832,[4,9,12,33...|(26832,[4,9,12,33...|
+---------+-----

## Train Test Split

In [71]:
training_df, validation_df, testing_df = df_imdb.randomSplit([0.6, 0.3, 0.1], seed=0)
[training_df.count(), validation_df.count(), testing_df.count()]

[15108, 7370, 2522]

## Logistic regression

In [74]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression().\
    setLabelCol('score').\
    setFeaturesCol('tfidf').\
    setRegParam(0.0).\
    setMaxIter(100).\
    setElasticNetParam(0.)

lr_pipeline = Pipeline(stages=[pipeline_tfidf, lr]).fit(training_df)

In [75]:
lr_pipeline.transform(validation_df).\
    select(fn.expr('float(prediction = score)').alias('correct')).\
    select(fn.avg('correct')).show()

+------------------+
|      avg(correct)|
+------------------+
|0.8508819538670285|
+------------------+



In [77]:
lr_pipeline.transform(testing_df).\
    select(fn.expr('float(prediction = score)').alias('correct')).\
    select(fn.avg('correct')).show()

+------------------+
|      avg(correct)|
+------------------+
|0.8528945281522601|
+------------------+



In [83]:
import pandas as pd
vocabulary = pipeline_tfidf.stages[2].vocabulary
weights = lr_pipeline.stages[-1].coefficients.toArray()
coeffs_df = pd.DataFrame({'word': vocabulary, 'weight': weights})

In [84]:
coeffs_df.sort_values('weight').head(5)

Unnamed: 0,word,weight
22237,abrahams,-4.415622
26517,octane,-4.086977
20468,baptism,-3.906266
26042,fizzled,-3.868855
17473,risible,-3.77465


In [85]:
coeffs_df.sort_values('weight', ascending=False).head(5)

Unnamed: 0,word,weight
26064,manifests,5.238295
19714,priscilla,4.658085
24097,ether,3.991256
26829,goriest,3.843036
24872,perversity,3.482327


## Regularization (Elastic Net)

In [87]:
lambda_par = 0.02
alpha_par = 0.3
en_lr = LogisticRegression().\
        setLabelCol('score').\
        setFeaturesCol('tfidf').\
        setRegParam(lambda_par).\
        setMaxIter(100).\
        setElasticNetParam(alpha_par)

In [88]:
en_lr_pipeline = Pipeline(stages=[pipeline_tfidf, en_lr]).fit(training_df)

In [89]:
en_lr_pipeline.transform(validation_df).select(fn.avg(fn.expr('float(prediction = score)'))).show()

+--------------------------------+
|avg(float((prediction = score)))|
+--------------------------------+
|              0.8648575305291724|
+--------------------------------+



In [90]:
en_weights = en_lr_pipeline.stages[-1].coefficients.toArray()
en_coeffs_df = pd.DataFrame({'word': vocabulary, 'weight': en_weights})

In [91]:
en_coeffs_df.sort_values('weight').head(5)

Unnamed: 0,word,weight
137,worst,-0.352431
310,waste,-0.327989
249,awful,-0.257681
15,bad,-0.244838
712,poorly,-0.206732


In [92]:
en_coeffs_df.query('weight == 0.0').head(5)

Unnamed: 0,word,weight
2,film,0.0
3,one,0.0
4,like,0.0
6,time,0.0
9,story,0.0


In [93]:
en_coeffs_df.query('weight == 0.0').shape[0]/en_coeffs_df.shape[0]

0.9584451401311866

## Grid Search

In [95]:
from pyspark.ml.tuning import ParamGridBuilder

en_lr_estimator = Pipeline(stages=[pipeline_tfidf, en_lr])
grid = ParamGridBuilder().\
    addGrid(en_lr.regParam, [0., 0.01, 0.02]).\
    addGrid(en_lr.elasticNetParam, [0., 0.2, 0.4]).\
    build()

In [96]:
all_models = []
for j in range(len(grid)):
    print("Fitting model {}".format(j+1))
    model = en_lr_estimator.fit(training_df, grid[j])
    all_models.append(model)

Fitting model 1
Fitting model 2
Fitting model 3
Fitting model 4
Fitting model 5
Fitting model 6
Fitting model 7
Fitting model 8
Fitting model 9


In [97]:
accuracies = [m.\
    transform(validation_df).\
    select(fn.avg(fn.expr('float(score = prediction)')).alias('accuracy')).\
    first().\
    accuracy for m in all_models]

In [98]:
accuracies

[0.8508819538670285,
 0.8508819538670285,
 0.8508819538670285,
 0.8636363636363636,
 0.8774762550881954,
 0.8697421981004071,
 0.8662143826322931,
 0.8727272727272727,
 0.8597014925373134]

In [99]:
import numpy as np
best_model_idx = np.argmax(accuracies)
grid[best_model_idx]

{Param(parent='LogisticRegression_5559e17965a9', name='regParam', doc='regularization parameter (>= 0).'): 0.01,
 Param(parent='LogisticRegression_5559e17965a9', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.2}