# IST 718: Big Data Analytics

- Professor: Daniel Acuna <deacuna@syr.edu>

## General instructions:

- You are welcome to discuss the problems with your classmates but __you are not allowed to copy any part of your answers either from your classmates or from the internet__
- You can put the homework files anywhere you want in your http://notebook.acuna.io workspace but _do not change_ the file names. The TAs and the professor use these names to grade your homework.
- Remove or comment out code that contains `raise NotImplementedError`. This is mainly to make the `assert` statement fail if nothing is submitted.
- The tests shown in some cells (i.e., `assert` and `np.testing.` statements) are used to grade your answers. **However, the professor and TAs will use __additional__ test for your answer. Think about cases where your code should run even if it passess all the tests you see.**
- Before downloading and submitting your work through Blackboard, remember to save and press `Validate` (or go to 
`Kernel`$\rightarrow$`Restart and Run All`). 
- Good luck!

In [1]:
# Load the packages needed for this part
# create spark and sparkcontext objects
from pyspark.sql import SparkSession
import numpy as np

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

import pyspark
from pyspark.ml import feature, regression, Pipeline, classification, pipeline, evaluation
from pyspark.sql import functions as fn, Row
from pyspark import sql

import matplotlib.pyplot as plt
import pandas as pd

# Part 2

In this section, you are going to develop a SMS spam detector based on logistic regression. This is the same idea behind sentiment analysis, but instead of predicting positive sentiment vs negative sentiment, you are going to predict whether a SMS text is spam or not.

The dataset will be in `sms_spam_df`

In [2]:
sms_spam_df = spark.read.csv('sms_spam.csv', header=True, inferSchema=True)
sms_spam_df.show()

+----+--------------------+
|type|                text|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
|spam|FreeMsg Hey there...|
| ham|Even my brother i...|
| ham|As per your reque...|
|spam|WINNER!! As a val...|
|spam|Had your mobile 1...|
| ham|I'm gonna be home...|
|spam|SIX chances to wi...|
|spam|URGENT! You have ...|
| ham|I've been searchi...|
| ham|I HAVE A DATE ON ...|
|spam|XXXMobileMovieClu...|
| ham|Oh k...i'm watchi...|
| ham|Eh u remember how...|
| ham|Fine if that's th...|
|spam|England v Macedon...|
+----+--------------------+
only showing top 20 rows



# Question 2.1

Encode the `type` column to be 1 for `spam` and 0 for `ham` and store the result in `sms_spam2_df`

In [3]:
# create sms_spam2_df below
sms_spam2_df = sms_spam_df.select(fn.when(fn.col('type') == 'spam', 1).otherwise(0).alias('type'), fn.col('text')) # Created sms_spam2_df with new 'type' column assigning 1 to values that equal 'spam' and 0 to all other values

In [4]:
# check here
sms_spam2_df.show() # Viewed resulting dataframe

+----+--------------------+
|type|                text|
+----+--------------------+
|   0|Go until jurong p...|
|   0|Ok lar... Joking ...|
|   1|Free entry in 2 a...|
|   0|U dun say so earl...|
|   0|Nah I don't think...|
|   1|FreeMsg Hey there...|
|   0|Even my brother i...|
|   0|As per your reque...|
|   1|WINNER!! As a val...|
|   1|Had your mobile 1...|
|   0|I'm gonna be home...|
|   1|SIX chances to wi...|
|   1|URGENT! You have ...|
|   0|I've been searchi...|
|   0|I HAVE A DATE ON ...|
|   1|XXXMobileMovieClu...|
|   0|Oh k...i'm watchi...|
|   0|Eh u remember how...|
|   0|Fine if that's th...|
|   1|England v Macedon...|
+----+--------------------+
only showing top 20 rows



In [5]:
# (5 pts)
np.testing.assert_array_equal(
    sms_spam2_df.groupBy('type').count().orderBy('type').rdd.map(lambda x: x['count']).collect(),
    [4827, 747]
)

# Question 2.2: tfidf feature engineering
Create a pipeline that combines a `Tokenizer`, `CounterVectorizer`, and a `IDF` estimator to compute the tfidf vectors of each SMS. Fit this pipeline and assign the pipeline transformer to a variable `tfidf_pipeline`. The `Tokenizer` step should create a column `words`, the `CounterVectorizer` step should create a column `tf`, and the `IDF` step should create a column `tfidf`.

In [6]:
# create a Pipeline transformer and name it tfidf_pipeline
pipe = Pipeline(stages=[ # Created new pipeline with:
    feature.Tokenizer().setInputCol('text').setOutputCol('words'), # Tokenizer, accepting 'text' column as input and outputting 'words' column
    feature.CountVectorizer().setInputCol('words').setOutputCol('tf'), # CountVectorizer, accepting 'words' column as input and outputting 'tf' column
    feature.IDF().setInputCol('tf').setOutputCol('tfidf')]) # IDF, accepting 'tf' column as input and outputting 'tfidf' column
tfidf_pipeline = pipe.fit(sms_spam2_df) # Fit pipeline to sms_spam2_df data as tfidf_pipeline

In [7]:
tfidf_pipeline.transform(sms_spam2_df).show() # Viewed resulting dataframe

+----+--------------------+--------------------+--------------------+--------------------+
|type|                text|               words|                  tf|               tfidf|
+----+--------------------+--------------------+--------------------+--------------------+
|   0|Go until jurong p...|[go, until, juron...|(13525,[8,42,51,6...|(13525,[8,42,51,6...|
|   0|Ok lar... Joking ...|[ok, lar..., joki...|(13525,[5,74,404,...|(13525,[5,74,404,...|
|   1|Free entry in 2 a...|[free, entry, in,...|(13525,[0,3,8,20,...|(13525,[0,3,8,20,...|
|   0|U dun say so earl...|[u, dun, say, so,...|(13525,[5,22,60,1...|(13525,[5,22,60,1...|
|   0|Nah I don't think...|[nah, i, don't, t...|(13525,[0,1,66,86...|(13525,[0,1,66,86...|
|   1|FreeMsg Hey there...|[freemsg, hey, th...|(13525,[0,2,6,10,...|(13525,[0,2,6,10,...|
|   0|Even my brother i...|[even, my, brothe...|(13525,[0,7,9,13,...|(13525,[0,7,9,13,...|
|   0|As per your reque...|[as, per, your, r...|(13525,[0,10,11,4...|(13525,[0,10,11,4...|

In [8]:
# (5 pts)
np.testing.assert_array_equal([type(s) for s in tfidf_pipeline.stages],
                              [feature.Tokenizer, feature.CountVectorizerModel, feature.IDFModel])

# Question 2.3: uppercase feature

Typical spam messages contain words that are upper case. Create a dataframe `sms_spam3_df` where you add a new column `has_uppercase` which contains an integer `1` if the first sequence of uppercase letters is longer or equal to 3 and an integer `0` otherwise. You can extract sequence of 3 or more uppercase letters by using the regular expression `[A-Z]{3,}`. You will use the function `fn.regexp_extract` to find those sequences and extract the first one (e.g., with index 0) and then use `fn.length` to compute the length of such sequence.

In [9]:
# create sms_spam3_df below
sms_spam3_df = sms_spam2_df.withColumn('has_uppercase', fn.regexp_extract(fn.col('text'), '[A-Z]{3,}', 0)) # Used regexp_extract() function to only return text values if the first sequence of uppercase letters is longer or equal to 3, and an empty string otherwise 
sms_spam3_df = sms_spam3_df.select(fn.col('type'), fn.col('text'), fn.when(fn.col('has_uppercase') == '', 0).otherwise(1).alias('has_uppercase')) # Replaced empty string values with 0 and 1 otherwise, assigned to has_uppercase feature
sms_spam3_df.show() # Viewed resulting dataframe

+----+--------------------+-------------+
|type|                text|has_uppercase|
+----+--------------------+-------------+
|   0|Go until jurong p...|            0|
|   0|Ok lar... Joking ...|            0|
|   1|Free entry in 2 a...|            0|
|   0|U dun say so earl...|            0|
|   0|Nah I don't think...|            0|
|   1|FreeMsg Hey there...|            0|
|   0|Even my brother i...|            0|
|   0|As per your reque...|            0|
|   1|WINNER!! As a val...|            1|
|   1|Had your mobile 1...|            1|
|   0|I'm gonna be home...|            0|
|   1|SIX chances to wi...|            1|
|   1|URGENT! You have ...|            1|
|   0|I've been searchi...|            0|
|   0|I HAVE A DATE ON ...|            1|
|   1|XXXMobileMovieClu...|            1|
|   0|Oh k...i'm watchi...|            0|
|   0|Eh u remember how...|            0|
|   0|Fine if that's th...|            0|
|   1|England v Macedon...|            1|
+----+--------------------+-------

The first three messages with `has_uppercase == 1` are as follows:

```python
sms_spam3_df.where('has_uppercase == 1').take(3)
```

```console
[Row(type=1, text='WINNER!! As a valued network customer you have been selected to receivea £900 prize reward! To claim call 09061701461. Claim code KL341. Valid 12 hours only.', has_uppercase=1),
 Row(type=1, text='Had your mobile 11 months or more? U R entitled to Update to the latest colour mobiles with camera for Free! Call The Mobile Update Co FREE on 08002986030', has_uppercase=1),
 Row(type=1, text='SIX chances to win CASH! From 100 to 20,000 pounds txt> CSH11 and send to 87575. Cost 150p/day, 6days, 16+ TsandCs apply Reply HL 4 info', has_uppercase=1)]
```

In [10]:
# try it here
sms_spam3_df.where('has_uppercase == 1').take(3) # Tested first three instances of 1 in the 'has_uppercase' feature column

[Row(type=1, text='WINNER!! As a valued network customer you have been selected to receivea £900 prize reward! To claim call 09061701461. Claim code KL341. Valid 12 hours only.', has_uppercase=1),
 Row(type=1, text='Had your mobile 11 months or more? U R entitled to Update to the latest colour mobiles with camera for Free! Call The Mobile Update Co FREE on 08002986030', has_uppercase=1),
 Row(type=1, text='SIX chances to win CASH! From 100 to 20,000 pounds txt> CSH11 and send to 87575. Cost 150p/day, 6days, 16+ TsandCs apply Reply HL 4 info', has_uppercase=1)]

In [11]:
# (5 pts)
np.testing.assert_equal(set(sms_spam3_df.columns), {'has_uppercase', 'text', 'type'})
np.testing.assert_equal(type(sms_spam3_df.schema['has_uppercase'].dataType), sql.types.IntegerType)
np.testing.assert_equal(sms_spam3_df.rdd.map(lambda x : x['has_uppercase']).sum(), 891)

# Question 2.4: Compare models

Using the following splits:

In [12]:
training_df, validation_df, testing_df = sms_spam2_df.randomSplit([0.6, 0.3, 0.1], seed=0)

In [13]:
[training_df.count(), validation_df.count(), testing_df.count()]

[3311, 1709, 554]

**(5 pts)** Create pipelines where the first stage is the `tfidf_pipeline` created above and the second stage is a `LogisticRegression` model with different regularization parameters ($\lambda$) and elastic net mixture ($\alpha$). Fit those pipelines to the appropriate data split.

1. Logistic regression with $\lambda=0$ and $\alpha=0$ (assign the fitted pipeline to `lr_pipeline1`)
2. Logistic regression with $\lambda=0.02$ and $\alpha=0.2$ (assign the fitted pipeline to `lr_pipeline2`)
3. Logistic regression with $\lambda=0.1$ and $\alpha=0.4$ (assign the fitted pipeline to `lr_pipeline3`)

In [14]:
# create lr_pipeline1, lr_pipeline2, and lr_pipeline3
lr1 = classification.LogisticRegression(featuresCol = 'tfidf', labelCol = 'type', maxIter=100, regParam=0, elasticNetParam=0) # Created logistic regression stage with lamba = 0 and alpha = 0
pipe1 = Pipeline(stages=[tfidf_pipeline, lr1]) # Created pipeline with previously defined tfidf_pipeline and the new logistic regression stage
lr_pipeline1 = pipe1.fit(training_df) # Fit pipe to training data, assigned to lr_pipeline1
lr2 = classification.LogisticRegression(featuresCol = 'tfidf', labelCol = 'type', maxIter=100, regParam=0.02, elasticNetParam=0.2) # Created logistic regression stage with lamba = 0.02 and alpha = 0.2
pipe2 = Pipeline(stages=[tfidf_pipeline, lr2]) # Created pipeline with previously defined tfidf_pipeline and the new logistic regression stage
lr_pipeline2 = pipe2.fit(training_df) # Fit pipe to training data, assigned to lr_pipeline2
lr3 = classification.LogisticRegression(featuresCol = 'tfidf', labelCol = 'type', maxIter=100, regParam=0.1, elasticNetParam=0.4) # Created logistic regression stage with lamba = 0.1 and alpha = 0.4
pipe3 = Pipeline(stages=[tfidf_pipeline, lr3]) # Created pipeline with previously defined tfidf_pipeline and the new logistic regression stage
lr_pipeline3 = pipe3.fit(training_df) # Fit pipe to training data, assigned to lr_pipeline3

In [15]:
# (10 pts)
np.testing.assert_equal(type(lr_pipeline1), pipeline.PipelineModel)
np.testing.assert_equal(type(lr_pipeline2), pipeline.PipelineModel)
np.testing.assert_equal(type(lr_pipeline3), pipeline.PipelineModel)
np.testing.assert_array_equal([type(s) for s in lr_pipeline1.stages],
                              [pipeline.PipelineModel, classification.LogisticRegressionModel])
np.testing.assert_array_equal([type(s) for s in lr_pipeline2.stages],
                              [pipeline.PipelineModel, classification.LogisticRegressionModel])
np.testing.assert_array_equal([type(s) for s in lr_pipeline3.stages],
                              [pipeline.PipelineModel, classification.LogisticRegressionModel])

**(5 pts)** Use the evaluator object defined below to compute the area under the curve of your predictors. For example, to compute the area under the curve of pipeline 1 for a dataframe `df`, you would run

```python
evaluator.evaluate(lr_pipeline1.transform(df))
```

Assign the AUC of the three models to the variables `AUC1`, `AUC2`, and `AUC3`, and and assign the pipeline with the best model to a variable `best_model`

In [16]:
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='type')

For example, the AUC on training of the first model is perfect:

```
evaluator.evaluate(lr_pipeline1.transform(training_df))
```

```console
1.0
```

In [17]:
# print the AUC for the three models as follows
AUC1 = evaluator.evaluate(lr_pipeline1.transform(validation_df)) # Calculated AUC for lr_pipeline1 using the predefined evaluator on the validation data
AUC2 = evaluator.evaluate(lr_pipeline2.transform(validation_df)) # Calculated AUC for lr_pipeline2 using the predefined evaluator on the validation data
AUC3 = evaluator.evaluate(lr_pipeline3.transform(validation_df)) # Calculated AUC for lr_pipeline3 using the predefined evaluator on the validation data
print("Model 1 AUC: ", AUC1) # Printed AUC1
print("Model 2 AUC: ", AUC2) # Printed AUC2
print("Model 3 AUC: ", AUC3) # Printed AUC3
best_model = lr_pipeline2 # Assigned best_model to lr_pipeline2 because it had the best AUC score

Model 1 AUC:  0.9606171229900157
Model 2 AUC:  0.9900130378096487
Model 3 AUC:  0.9543473851948426


In [18]:
# (5 pts)
np.testing.assert_array_equal([type(AUC1), type(AUC2), type(AUC3)],
                             [float, float, float])
# AUC less than 1
np.testing.assert_array_less([AUC1, AUC2, AUC3], [1, 1, 1])
# AUC more than 0.5
np.testing.assert_array_less([.5, .5, .5],
                            [AUC1, AUC2, AUC3])

# Question 2.5: Choose best model

Using the right split and the best model selected before, compute the generalization performance and assign it to a variable `AUC_best`

In [19]:
# assign to AUC_best the AUC of the best model selected before
AUC_best = evaluator.evaluate(best_model.transform(testing_df)) # Assigned AUC_best to AUC score for best_model using the predefined evaluator on the testing data
AUC_best # Returned AUC_best score

0.982324286708421

In [20]:
# (5 pts)
np.testing.assert_approx_equal(AUC_best, 
                               0.976126746201693, significant=2)

Using the same split and the best model, compute `precision`, `recall` and `f1_score`. You should first count the numbers in the confusion matrix, and then compute these metrics based on the formula.

In [76]:
predictions = best_model.transform(testing_df).select('type', 'prediction').toPandas() # Returned pandas dataframe for predictions and labels 
ham = predictions.loc[predictions['type']==0] # Returned all values labeled as 1
spam = predictions.loc[predictions['type']==1] # Returned all values labeled as 0
print(ham['prediction'].value_counts()) # For values where True = 0, how many predictions were 0.0 and 1.0?
print(spam['prediction'].value_counts()) # For values where True = 1, how many predictions were 0.0 and 1.0?

print(""" Confusion Matrix
         _______
        |60 |15 |
        |___|___|
        |0  |479|
        |___|___|
        """) 
precision = 60/(60+0) # Precision = TP/TP+FP
recall = 60/(60+15) # Recall = TP/True Condition Positive
f1_score = (2*precision*recall)/(precision+recall) # f1_score
print("precision: ",precision)
print("recall: ",recall)
print("f1_score: ",f1_score)

0.0    479
Name: prediction, dtype: int64
1.0    60
0.0    15
Name: prediction, dtype: int64
 Confusion Matrix
         _______
        |60 |15 |
        |___|___|
        |0  |479|
        |___|___|
        
precision:  1.0
recall:  0.8
f1_score:  0.888888888888889


In [26]:
# (5 pts)
np.testing.assert_array_almost_equal([precision, recall, f1_score],
    [1.0, 0.7976190476190477, 0.8874172185430463],
                                     decimal=2)

# Question 2.6: Inference

Use the pipeline 2 fitted above (`lr_pipeline2`) to create Pandas dataframes that contain the most negative words and the most positive words. In particular, create a dataframe `positive_words` with the columns `word` and `weight` with the top 20 positive words, sorted by descending coefficient. Similarly create a `negative_words` Pandas dataframe with the top 20 negative words where the coefficient are sorted in ascending order. **Hint: follow the `sentiment_analysis.ipynb` notebook in the repo**

In [None]:
# create positive_words and negative_words pandas dataframe below
vocabulary = lr_pipeline2.stages[0].stages[1].vocabulary # Assigned vocabulary to vocabulary values found in the second stage of the tfidf_pipeline
weights = lr_pipeline2.stages[-1].coefficients.toArray() # Assigned weights to weights values found in the logistic regression stage of the lr_pipeline2
coeffs_df = pd.DataFrame({'word': vocabulary, 'weight': weights}) # Created pandas dataframe with word and weight features, assigned to coeffs_df
negative_words = coeffs_df.sort_values('weight').head(20) # Returned negative words sorted by weight
positive_words = coeffs_df.sort_values('weight', ascending=False).head(20) # Returned positive words sorted by weight

In [None]:
# examine positive vocabulary
positive_words.head() # Viewed positive vocabulary

In [None]:
# examine solutions
negative_words.head() # Viewed negative vocabulary

The `positive_words` and `negative_words` dataframe should look like this:

```python
positive_words.head()
```

<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th></th>
      <th>word</th>
      <th>weight</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <th>3555</th>
      <td>widelive.com/index.</td>
      <td>0.590870</td>
    </tr>
    <tr>
      <th>12237</th>
      <td>08714712388</td>
      <td>0.533567</td>
    </tr>
    <tr>
      <th>15</th>
      <td>call</td>
      <td>0.517100</td>
    </tr>
    <tr>
      <th>81</th>
      <td>txt</td>
      <td>0.513278</td>
    </tr>
    <tr>
      <th>9064</th>
      <td>gbp/sms</td>
      <td>0.468274</td>
    </tr>
  </tbody>
</table>

and 

```python
negative_words.head()
```

<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th></th>
      <th>word</th>
      <th>weight</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <th>1</th>
      <td>i</td>
      <td>-0.162493</td>
    </tr>
    <tr>
      <th>2444</th>
      <td>fighting</td>
      <td>-0.060939</td>
    </tr>
    <tr>
      <th>3221</th>
      <td>dificult</td>
      <td>-0.059061</td>
    </tr>
    <tr>
      <th>3371</th>
      <td>fightng</td>
      <td>-0.059061</td>
    </tr>
    <tr>
      <th>3332</th>
      <td>lose.</td>
      <td>-0.059061</td>
    </tr>
  </tbody>
</table>

In [None]:
# (5 pts)
np.testing.assert_equal(set(positive_words.columns), {'weight', 'word'})
np.testing.assert_equal(set(negative_words.columns), {'weight', 'word'})
np.testing.assert_approx_equal(positive_words.weight.sum(), 8.3701485692317927, significant=2)
np.testing.assert_approx_equal(negative_words.weight.sum(), -0.6661952507442954, significant=2)
np.testing.assert_array_less(positive_words.weight.iloc[-1], positive_words.weight.iloc[0])
np.testing.assert_array_less(negative_words.weight.iloc[0], negative_words.weight.iloc[-1])

# Question 2.7
Use the dataframe `sms_spam3_df` to create a model where the first feature is `has_uppercase` and the next set of features are the tfidf of the text. Perform feature engineering in all features using a max absolute scaler ([`MaxAbsScaler`](https://spark.apache.org/docs/2.0.2/ml-features.html#maxabsscaler)). Do a logistic regression on the resulting scaled features with regularization parameter $\lambda = 0.2$ and elastic net mixture $\alpha=0.1$ for the entire data (all of `sms_spam3_df`). Since you have scaled all features to be within the same range, you can compare them. 

**(5 pts)** with code and comments, answer below

1. is `has_uppercase` a feature that is positively or negative related to an SMS being spam?
2. what is the ratio of the coefficient of `has_uppercase` to the biggest positive tfidf coefficient?

In [None]:
# your code and comments below
va = feature.VectorAssembler(inputCols=['has_uppercase', 'tfidf'], outputCol='features') # Created a model where the first feature is 'has_uppercase' and the next set of features are the tfidf of the text
ms = feature.MaxAbsScaler(inputCol='features',outputCol='scaledFeatures') # Used a max absolute scaler to perform feature engineering on all features
lr = classification.LogisticRegression(featuresCol = 'scaledFeatures', labelCol = 'type', maxIter=100, regParam=0.2, elasticNetParam=0.1) # Performed logistic regression on the resulting scaled features with regularization parameter = 0.2 and elastic net measure = 0.1
pipe = Pipeline(stages=[tfidf_pipeline, va, ms, lr]) # Created pipe with existing tfidf_pipeline, vector assembler, max absolute scaler, and logistic regression
pipe_model_scaled = pipe.fit(sms_spam3_df) # Fit pipe to sms_spam3_df
pipe_model_scaled.stages[-1].coefficients # Returned model coefficients

#1 has_uppercase is a feature that is positively related to SMS being spam (0.9289)
#0.9289/2.0119
#2 The ratio of the coefficient of has_uppercase to the biggest positive tfidf coefficient is 0.4612