**1**. (100 points)

In this exercise you will use Spark to build and run a machine learning pipeline to separate 'ham' from 'spam' in SMS text messages. Then you will use the pipeline to classify SMS texts.

- Create a Pandas DataFraem form the data in the file`SMSSpamCollection` where each line is tab separated into (label, text). If you find that the read_xxx function in Pandas does not do the job correctly, read in the file line by line before converting to a DataFrame. Create an index column so that each row has a unique number id.
- Convert to a Spark DataFrame that has two columns (klass, SMS) and split into test and training data sets with proportions 0.8 and 0.2 respectively using a random seed of 123.
- Build a Spark ML pipeline consisting of the following 
    - StringIndexer: To convert `klass` into a numeric `labels` column
    - Tokenizer: To covert `SMS` into a list of tokens
    - StopWordsRemover: To remove "stop words" from the tokens
    - CountVectorizer: To count words (use a vocabular size of 100 and minimum number of occureences of 2)
    - LogisticRegression: Use `maxIter=10`, `regParam=0.001`

- Train the model on the test data.
- Evaluate the precision, recall and accuracy of this model on the test data.

In [1]:
from pyspark.sql import SparkSession
spark_0 = SparkSession.builder.getOrCreate()

In [2]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import PCA
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.clustering import GaussianMixture

from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.regression import LabeledPoint

import pandas as pd

Load the dataset and create a spark dataframe

In [38]:
url="https://raw.githubusercontent.com/cliburn/bios-823-2019/master/homework/SMSSpamCollection"
data = pd.read_csv(url,"\t", header=None, names=["label","text"])
data.reset_index(inplace=True)

In [19]:
cols = ['klass', 'SMS']
df = spark_0.createDataFrame(data[['label', 'text']], cols)
df

DataFrame[klass: string, SMS: string]

Split the code in train and test:

In [20]:
train, test = df.randomSplit([0.8, 0.2], seed=123)
train.cache()

DataFrame[klass: string, SMS: string]

In [22]:
train.show(5)

+-----+--------------------+
|klass|                 SMS|
+-----+--------------------+
|  ham| said kiss, kiss,...|
|  ham|4 oclock at mine....|
|  ham|7 at esplanade.. ...|
|  ham|8 at the latest, ...|
|  ham|A famous quote : ...|
+-----+--------------------+
only showing top 5 rows



Build a ML pipeline

In [39]:
indexer = StringIndexer(
    inputCol="klass", 
    outputCol="label"
)

tokenizer = Tokenizer(
    inputCol="SMS",
    outputCol="tokens"
)

remover = StopWordsRemover(
    inputCol="tokens",
    outputCol="filtered"
)

CountVectorizerModel = CountVectorizer(
    inputCol="filtered",
    outputCol="features",
    vocabSize=100,
    minDF=2
)

lr = LogisticRegression(
    featuresCol="features", 
    labelCol="label",
    maxIter=10,
    regParam=0.001
)

pipeline = Pipeline(stages=[indexer, tokenizer, remover, CountVectorizerModel,lr])

Let's train and evaluate the model

In [40]:
model = pipeline.fit(train)

In [41]:
import warnings

with warnings.catch_warnings():
    warnings.simplefilter('ignore')
    prediction = model.transform(test)

In [42]:
score = prediction.select(['label', 'prediction'])

In [44]:
tp = float(score.rdd.map(lambda x: x[0]==1 and x[1]==1).sum())
tn = float(score.rdd.map(lambda x: x[0]==0 and x[1]==0).sum())
fp = float(score.rdd.map(lambda x: x[0]==0 and x[1]==1).sum())
fn = float(score.rdd.map(lambda x: x[0]==1 and x[1]==0).sum())
p = float(score.count())

In [45]:
print('Accuracy = %s' % ((tp+tn)/p))
print('Recall = %s' % (tp/(tp+fn)))
print('Precision = %s' % (tp/(tp+fp)))

Accuracy = 0.9617058311575283
Recall = 0.7972972972972973
Precision = 0.8939393939393939


**2** (100 points)

In this exercise, you will simulate running a machine learning pipeline to classify steaming data.

- Convert the test DataFrame into a Pandas DataFrame
- Write each row of the DataFrame to a separate tab-delimited file in a folder called "incoming_sms"
- Create a Structured Streaming DataFrame using `readStream` with `option("maxFilesPerTrigger", 1)` to simulate streaming data
- Use the fitted pipeline created in Ex. 1 to transform the input stream
- Write the transformed stream to memory with name `sms_pred
- Sleep 30 seconds
- Use an SQL query to show the `index`, `label` and `prediction` columns
- Sleep 30 more seconds
- Use an SQL query to show the `index`, `label` and `prediction` columns

In [46]:
import string
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
spark_1 = (
    SparkSession.builder 
    .master("local") 
    .appName("BIOS-823") 
    .config("spark.executor.cores", 4) 
    .getOrCreate()    
)

Converto to pandas dataframe and create folder "incoming_sms"

In [47]:
test_df = test.toPandas()
test_df.shape

(1149, 2)

In [48]:
import os
import shutil

dir_path = 'incoming_sms/'
if os.path.exists(dir_path):
    shutil.rmtree(dir_path)
os.makedirs(dir_path)

for i in range(test_df.shape[0]):
    filename = 'file{}.csv'.format(i)
    full_name = dir_path+filename
    test_df.iloc[[i]].to_csv(full_name ,sep = "\t", encoding = 'utf-8', index=False)

Structured Streaming DataFrame

In [49]:
schema = test.schema
dir_path = 'incoming_sms/'

streamingDF = (
  spark_1
    .readStream
    .schema(schema)
    .option("maxFilesPerTrigger", 1)
    .csv(dir_path)
)

Use pipeline and save to memory

In [50]:
stream_pipeline = (
    model
    .transform(streamingDF)
)

Write query stream to memory

In [51]:
from time import sleep

query = (
    stream_pipeline.writeStream.
    queryName("sms_pred").
    format("memory").
    start()
)

In [53]:
for i in range(2):
    sleep(30)
    spark_1.sql('''
    SELECT label, prediction 
    FROM sms_pred
    ''')