In [1]:
# Load the packages needed for this part
# create spark and sparkcontext objects
from pyspark.sql import SparkSession
import numpy as np

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

import pyspark
from pyspark.ml import feature, regression, Pipeline, classification, pipeline, evaluation
from pyspark.sql import functions as fn, Row
from pyspark import sql

import matplotlib.pyplot as plt
import pandas as pd

In [2]:
sms_spam_df = spark.read.csv('/datasets/sms_spam.csv', header=True, inferSchema=True)


Encode the `type` column to be 1 for `spam` and 0 for `ham` and store the result in `sms_spam2_df`

In [1]:

sms_spam_df.createOrReplaceTempView('temp')
sms_spam2_df=spark.sql('select case type when "spam" then 1.0  else 0 end as type, text from temp')
sms_spam2_df.show(5,truncate=False)

#raise NotImplementedError()

NameError: name 'sms_spam_df' is not defined

In [4]:
sms_spam2_df.show()

+----+--------------------+
|type|                text|
+----+--------------------+
| 0.0|Go until jurong p...|
| 0.0|Ok lar... Joking ...|
| 1.0|Free entry in 2 a...|
| 0.0|U dun say so earl...|
| 0.0|Nah I don't think...|
| 1.0|FreeMsg Hey there...|
| 0.0|Even my brother i...|
| 0.0|As per your reque...|
| 1.0|WINNER!! As a val...|
| 1.0|Had your mobile 1...|
| 0.0|I'm gonna be home...|
| 1.0|SIX chances to wi...|
| 1.0|URGENT! You have ...|
| 0.0|I've been searchi...|
| 0.0|I HAVE A DATE ON ...|
| 1.0|XXXMobileMovieClu...|
| 0.0|Oh k...i'm watchi...|
| 0.0|Eh u remember how...|
| 0.0|Fine if that's th...|
| 1.0|England v Macedon...|
+----+--------------------+
only showing top 20 rows



In [5]:
sms_spam2_df = sms_spam_df.\
    select(
fn.when(fn.col('type')=='spam',1).otherwise(0).alias('type'),
fn.col('text')
)

In [6]:
sms_spam2_df.show()

+----+--------------------+
|type|                text|
+----+--------------------+
|   0|Go until jurong p...|
|   0|Ok lar... Joking ...|
|   1|Free entry in 2 a...|
|   0|U dun say so earl...|
|   0|Nah I don't think...|
|   1|FreeMsg Hey there...|
|   0|Even my brother i...|
|   0|As per your reque...|
|   1|WINNER!! As a val...|
|   1|Had your mobile 1...|
|   0|I'm gonna be home...|
|   1|SIX chances to wi...|
|   1|URGENT! You have ...|
|   0|I've been searchi...|
|   0|I HAVE A DATE ON ...|
|   1|XXXMobileMovieClu...|
|   0|Oh k...i'm watchi...|
|   0|Eh u remember how...|
|   0|Fine if that's th...|
|   1|England v Macedon...|
+----+--------------------+
only showing top 20 rows



In [7]:
sms_spam2_df.count()

5574

In [8]:
# (5 pts)
np.testing.assert_array_equal(
    sms_spam2_df.groupBy('type').count().orderBy('type').rdd.map(lambda x: x['count']).collect(),
    [4827, 747]
)


Create a pipeline that combines a `Tokenizer`, `CounterVectorizer`, and a `IDF` estimator to compute the tfidf vectors of each SMS. Fit this pipeline and assign the pipeline transformer to a variable `tfidf_pipeline`. The `Tokenizer` step should create a column `words`, the `CounterVectorizer` step should create a column `tf`, and the `IDF` step should create a column `tfidf`.

In [9]:
# create a Pipeline transformer and name it tfidf_pipeline
# YOUR CODE HERE
from pyspark.ml.feature import  Tokenizer
tokenizer = Tokenizer(inputCol="text", outputCol="words")

from pyspark.ml.feature import CountVectorizer
countvectorizer = CountVectorizer (inputCol="words", outputCol="tf")

from pyspark.ml.feature import  IDF
idf = IDF(inputCol="tf", outputCol="tfidf")

tfidf_pipeline = Pipeline(stages=[tokenizer,countvectorizer,idf]).fit(sms_spam2_df)

#raise NotImplementedError()

In [10]:
tfidf_pipeline.transform(sms_spam2_df).show()

+----+--------------------+--------------------+--------------------+--------------------+
|type|                text|               words|                  tf|               tfidf|
+----+--------------------+--------------------+--------------------+--------------------+
|   0|Go until jurong p...|[go, until, juron...|(13525,[8,42,51,6...|(13525,[8,42,51,6...|
|   0|Ok lar... Joking ...|[ok, lar..., joki...|(13525,[5,74,404,...|(13525,[5,74,404,...|
|   1|Free entry in 2 a...|[free, entry, in,...|(13525,[0,3,8,20,...|(13525,[0,3,8,20,...|
|   0|U dun say so earl...|[u, dun, say, so,...|(13525,[5,22,60,1...|(13525,[5,22,60,1...|
|   0|Nah I don't think...|[nah, i, don't, t...|(13525,[0,1,66,86...|(13525,[0,1,66,86...|
|   1|FreeMsg Hey there...|[freemsg, hey, th...|(13525,[0,2,6,10,...|(13525,[0,2,6,10,...|
|   0|Even my brother i...|[even, my, brothe...|(13525,[0,7,9,13,...|(13525,[0,7,9,13,...|
|   0|As per your reque...|[as, per, your, r...|(13525,[0,10,11,4...|(13525,[0,10,11,4...|

In [11]:
# (5 pts)
np.testing.assert_array_equal([type(s) for s in tfidf_pipeline.stages],
                              [feature.Tokenizer, feature.CountVectorizerModel, feature.IDFModel])



Typical spam messages contain words that are upper case. Create a dataframe `sms_spam3_df` where you add a new column `has_uppercase` which contains an integer `1` if the first sequence of uppercase letters is longer or equal to 3 and an integer `0` otherwise. We can extract sequence of 3 or more uppercase letters by using the regular expression `[A-Z]{3,}`. We will use the function `fn.regexp_extract` to find those sequences and extract the first one (e.g., with index 0) and then use `fn.length` to compute the length of such sequence.

In [12]:
# create sms_spam3_df below
# YOUR CODE HERE
from pyspark.sql.functions import regexp_extract, col,length
sms_spam3_df=sms_spam2_df.select('type','text', regexp_extract(col('text'), '([A-Z]{3,})', 1).alias('has_upppercase'))
sms_spam3_df = sms_spam3_df.select('type','text', fn.when(fn.length(fn.col('has_upppercase')) >= 3, 1).otherwise(0).alias('has_uppercase'))

#raise NotImplementedError()

The first three messages with `has_uppercase == 1` are as follows:

```python
sms_spam3_df.where('has_uppercase == 1').take(3)
```

```console
[Row(type=1, text='WINNER!! As a valued network customer you have been selected to receivea £900 prize reward! To claim call 09061701461. Claim code KL341. Valid 12 hours only.', has_uppercase=1),
 Row(type=1, text='Had your mobile 11 months or more? U R entitled to Update to the latest colour mobiles with camera for Free! Call The Mobile Update Co FREE on 08002986030', has_uppercase=1),
 Row(type=1, text='SIX chances to win CASH! From 100 to 20,000 pounds txt> CSH11 and send to 87575. Cost 150p/day, 6days, 16+ TsandCs apply Reply HL 4 info', has_uppercase=1)]
```

In [13]:
# try it here
sms_spam3_df.where('has_uppercase == 1').take(3)

[Row(type=1, text='WINNER!! As a valued network customer you have been selected to receivea £900 prize reward! To claim call 09061701461. Claim code KL341. Valid 12 hours only.', has_uppercase=1),
 Row(type=1, text='Had your mobile 11 months or more? U R entitled to Update to the latest colour mobiles with camera for Free! Call The Mobile Update Co FREE on 08002986030', has_uppercase=1),
 Row(type=1, text='SIX chances to win CASH! From 100 to 20,000 pounds txt> CSH11 and send to 87575. Cost 150p/day, 6days, 16+ TsandCs apply Reply HL 4 info', has_uppercase=1)]

In [14]:
# (5 pts)
np.testing.assert_equal(set(sms_spam3_df.columns), {'has_uppercase', 'text', 'type'})
np.testing.assert_equal(type(sms_spam3_df.schema['has_uppercase'].dataType), sql.types.IntegerType)
np.testing.assert_equal(sms_spam3_df.rdd.map(lambda x : x['has_uppercase']).sum(), 891)

#  Compare models

Using the following splits:

In [15]:
training_df, validation_df, testing_df = sms_spam2_df.randomSplit([0.6, 0.3, 0.1], seed=0)

In [16]:
[training_df.count(), validation_df.count(), testing_df.count()]

[3349, 1674, 551]

**(5 pts)** Create pipelines where the first stage is the `tfidf_pipeline` created above and the second stage is a `LogisticRegression` model with different regularization parameters ($\lambda$) and elastic net mixture ($\alpha$). Fit those pipelines to the appropriate data split.

1. Logistic regression with $\lambda=0$ and $\alpha=0$ (assign the fitted pipeline to `lr_pipeline1`)
2. Logistic regression with $\lambda=0.02$ and $\alpha=0.2$ (assign the fitted pipeline to `lr_pipeline2`)
3. Logistic regression with $\lambda=0.1$ and $\alpha=0.4$ (assign the fitted pipeline to `lr_pipeline3`)

In [17]:
# create lr_pipeline1, lr_pipeline2, and lr_pipeline3
# YOUR CODE HERE

lr1 = classification.LogisticRegression(regParam=0, elasticNetParam=0, labelCol = 'type', featuresCol = 'tfidf')
lr_pipeline1 = Pipeline(stages=[tfidf_pipeline, lr1]).fit(training_df)
lr2 = classification.LogisticRegression(regParam=0.02, elasticNetParam=0.2, labelCol = 'type', featuresCol = 'tfidf')
lr_pipeline2 = Pipeline(stages=[tfidf_pipeline, lr2]).fit(training_df)
lr3 = classification.LogisticRegression(regParam=0.1, elasticNetParam=0.4, labelCol = 'type', featuresCol = 'tfidf')
lr_pipeline3 = Pipeline(stages=[tfidf_pipeline, lr3]).fit(training_df)

#raise NotImplementedError()

In [18]:
# (10 pts)
np.testing.assert_equal(type(lr_pipeline1), pipeline.PipelineModel)
np.testing.assert_equal(type(lr_pipeline2), pipeline.PipelineModel)
np.testing.assert_equal(type(lr_pipeline3), pipeline.PipelineModel)
np.testing.assert_array_equal([type(s) for s in lr_pipeline1.stages],
                              [pipeline.PipelineModel, classification.LogisticRegressionModel])
np.testing.assert_array_equal([type(s) for s in lr_pipeline2.stages],
                              [pipeline.PipelineModel, classification.LogisticRegressionModel])
np.testing.assert_array_equal([type(s) for s in lr_pipeline3.stages],
                              [pipeline.PipelineModel, classification.LogisticRegressionModel])

**(5 pts)** Use the evaluator object defined below to compute the area under the curve of your predictors. For example, to compute the area under the curve of pipeline 1 for a dataframe `df`, you would run

```python
evaluator.evaluate(lr_pipeline1.transform(df))
```

Assign the AUC of the three models to the variables `AUC1`, `AUC2`, and `AUC3`, and and assign the pipeline with the best model to a variable `best_model`

In [19]:
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='type')

For example, the AUC on training of the first model is perfect:

```
evaluator.evaluate(lr_pipeline1.transform(training_df))
```

```console
1.0
```

In [20]:
# print the AUC for the three models as follows
# print("Model 1 AUC: ", evaluator.evaluate(....))
# etc
# finally, based on these, assign the best validated 
# model to a variable best_model
# YOUR CODE HERE
AUC1 = evaluator.evaluate(lr_pipeline1.transform(validation_df))
print("Model 1 AUC: ", AUC1)
AUC2 = evaluator.evaluate(lr_pipeline2.transform(validation_df))
print("Model 2 AUC: ", AUC2)
AUC3 = evaluator.evaluate(lr_pipeline3.transform(validation_df))
print("Model 3 AUC: ", AUC3)

#raise NotImplementedError()

Model 1 AUC:  0.9557218489415972
Model 2 AUC:  0.9871555611031791
Model 3 AUC:  0.9686667539402503


In [21]:
# (5 pts)
np.testing.assert_array_equal([type(AUC1), type(AUC2), type(AUC3)],
                             [float, float, float])
# AUC less than 1
np.testing.assert_array_less([AUC1, AUC2, AUC3], [1, 1, 1])
# AUC more than 0.5
np.testing.assert_array_less([.5, .5, .5],
                            [AUC1, AUC2, AUC3])

# Question 2.5: Choose best model

Using the right split and the best model selected before, compute the generalization performance and assign it to a variable `AUC_best`

In [22]:
# assign to AUC_best the AUC of the best model selected before
# YOUR CODE HERE
AUC_best = evaluator.evaluate(lr_pipeline2.transform(testing_df))
#raise NotImplementedError()

In [23]:
# (5 pts)
np.testing.assert_approx_equal(AUC_best, 
                               0.976126746201693, significant=2)

# Question 2.6: Inference

Use the pipeline 2 fitted above (`lr_pipeline2`) to create Pandas dataframes that contain the most negative words and the most positive words. In particular, create a dataframe `positive_words` with the columns `word` and `weight` with the top 20 positive words, sorted by descending coefficient. Similarly create a `negative_words` Pandas dataframe with the top 20 negative words where the coefficient are sorted in ascending order. **Hint: follow the `sentiment_analysis.ipynb` notebook in the repo**

In [24]:
# create positive_words and negative_words pandas dataframe below
# YOUR CODE HERE
vocabulary = lr_pipeline2.stages[0].stages[-2].vocabulary
weights = lr_pipeline2.stages[-1].coefficients.toArray()
coeffs_df = pd.DataFrame({'word': vocabulary, 'weight': weights})
negative_words = coeffs_df.sort_values('weight').head(20)
positive_words = coeffs_df.sort_values('weight', ascending=False).head(20)
#raise NotImplementedError()

In [25]:
# examine positive vocabulary
positive_words.head()

Unnamed: 0,word,weight
3555,widelive.com/index.,0.59087
12237,08714712388,0.533567
15,call,0.5171
81,txt,0.513278
9064,gbp/sms,0.468274


In [26]:
# examine solutions
negative_words.head()

Unnamed: 0,word,weight
1,i,-0.162493
2444,fighting,-0.060939
3221,dificult,-0.059061
3371,fightng,-0.059061
3332,lose.,-0.059061


The `positive_words` and `negative_words` dataframe should look like this:

```python
positive_words.head()
```

<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th></th>
      <th>word</th>
      <th>weight</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <th>3555</th>
      <td>widelive.com/index.</td>
      <td>0.590870</td>
    </tr>
    <tr>
      <th>12237</th>
      <td>08714712388</td>
      <td>0.533567</td>
    </tr>
    <tr>
      <th>15</th>
      <td>call</td>
      <td>0.517100</td>
    </tr>
    <tr>
      <th>81</th>
      <td>txt</td>
      <td>0.513278</td>
    </tr>
    <tr>
      <th>9064</th>
      <td>gbp/sms</td>
      <td>0.468274</td>
    </tr>
  </tbody>
</table>

and 

```python
negative_words.head()
```

<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th></th>
      <th>word</th>
      <th>weight</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <th>1</th>
      <td>i</td>
      <td>-0.162493</td>
    </tr>
    <tr>
      <th>2444</th>
      <td>fighting</td>
      <td>-0.060939</td>
    </tr>
    <tr>
      <th>3221</th>
      <td>dificult</td>
      <td>-0.059061</td>
    </tr>
    <tr>
      <th>3371</th>
      <td>fightng</td>
      <td>-0.059061</td>
    </tr>
    <tr>
      <th>3332</th>
      <td>lose.</td>
      <td>-0.059061</td>
    </tr>
  </tbody>
</table>

In [27]:
# (5 pts)
np.testing.assert_equal(set(positive_words.columns), {'weight', 'word'})
np.testing.assert_equal(set(negative_words.columns), {'weight', 'word'})
np.testing.assert_approx_equal(positive_words.weight.sum(), 8.3701485692317927, significant=2)
np.testing.assert_approx_equal(negative_words.weight.sum(), -0.6661952507442954, significant=2)
np.testing.assert_array_less(positive_words.weight.iloc[-1], positive_words.weight.iloc[0])
np.testing.assert_array_less(negative_words.weight.iloc[0], negative_words.weight.iloc[-1])

# Question 2.7
Use the dataframe `sms_spam3_df` to create a model where the first feature is `has_uppercase` and the next set of features are the tfidf of the text. Perform feature engineering in all features using a max absolute scaler ([`MaxAbsScaler`](https://spark.apache.org/docs/2.0.2/ml-features.html#maxabsscaler)). Do a logistic regression on the resulting scaled features with regularization parameter $\lambda = 0.2$ and elastic net mixture $\alpha=0.1$ for the entire data (all of `sms_spam3_df`). Since you have scaled all features to be within the same range, you can compare them. 

**(5 pts)** with code and comments, answer below

1. is `has_uppercase` a feature that is positively or negative related to an SMS being spam?
2. what is the ratio of the coefficient of `has_uppercase` to the biggest positive tfidf coefficient?

In [29]:
# your code and comments below
# YOUR CODE HERE
from pyspark.ml.feature import  Tokenizer
tokenizer_df3 = Tokenizer(inputCol="text", outputCol="words")
from pyspark.ml.feature import CountVectorizer,VectorAssembler
countvectorizer_df3 = CountVectorizer(inputCol="words", outputCol="tf")
from pyspark.ml.feature import  IDF
idf_3 = IDF(inputCol="tf", outputCol="tfidf")
assembler = VectorAssembler(inputCols=["has_uppercase", "tfidf"],outputCol="features")

tfidf_pipeline_upper = Pipeline(stages=[tokenizer_df3,countvectorizer_df3,idf_3,assembler]).fit(sms_spam3_df)
df_upper = tfidf_pipeline_upper.transform(sms_spam3_df)

!pip install git+https://github.com/daniel-acuna/pyspark_pipes.git
from pyspark_pipes import pipe


scaled_model=pipe(feature.VectorAssembler(inputCols=['has_uppercase','tfidf']),feature.MaxAbsScaler(),classification.LogisticRegression(regParam=0.2, elasticNetParam=0.1, labelCol = 'type'))
scaled_model_fitted = scaled_model.fit(df_upper)

my_coeff=scaled_model_fitted.stages[-1].coefficients
has_uppercase_coeff = my_coeff.toArray()[0]
print('has_uppercase feature is positively related to an SMS being spam with a coefficient of:',has_uppercase_coeff)

max_coeff=my_coeff.toArray().max()
my_ratio=has_uppercase_coeff/max_coeff

print('The ratio of the coefficient of has_uppercase to the biggest positive tfidf coefficient is :',my_ratio)

#raise NotImplementedError()

Collecting git+https://github.com/daniel-acuna/pyspark_pipes.git
  Cloning https://github.com/daniel-acuna/pyspark_pipes.git to /tmp/pip-req-build-_jwl5rq9
Building wheels for collected packages: pyspark-pipes
  Running setup.py bdist_wheel for pyspark-pipes ... [?25ldone
[?25h  Stored in directory: /tmp/pip-ephem-wheel-cache-79b03m_v/wheels/58/e0/91/9f974ba72a9ea731a32cba0cccc17064e7d959a8f37e00263f
Successfully built pyspark-pipes
has_uppercase feature is positively related to an SMS being spam with a coefficient of: 0.9289178747599827
The ratio of the coefficient of has_uppercase to the biggest positive tfidf coefficient is : 0.4617005798337622
