**1**. (100 points)

In this exercise you will use Spark to build and run a machine learning pipeline to separate 'ham' from 'spam' in SMS text messages. Then you will use the pipeline to classify SMS texts.

- Create a Pandas DataFraem form the data in the file`SMSSpamCollection` where each line is tab separated into (label, text). If you find that the read_xxx function in Pandas does not do the job correctly, read in the file line by line before converting to a DataFrame. Create an index column so that each row has a unique number id.
- Convert to a Spark DataFrame that has two columns (klass, SMS) and split into test and training data sets with proportions 0.8 and 0.2 respectively using a random seed of 123.
- Build a Spark ML pipeline consisting of the following 
    - StringIndexer: To convert `klass` into a numeric `labels` column
    - Tokenizer: To covert `SMS` into a list of tokens
    - StopWordsRemover: To remove "stop words" from the tokens
    - CountVectorizer: To count words (use a vocabular size of 100 and minimum number of occureences of 2)
    - LogisticRegression: Use `maxIter=10`, `regParam=0.001`

- Train the model on the test data.
- Evaluate the precision, recall and accuracy of this model on the test data.

In [1]:
import os
os.chdir('/Users/iuliia/bios-823-2019/homework/')

In [2]:
import pandas as pd
from pyspark.sql import SparkSession

with open('SMSSpamCollection') as f:
    content = f.readlines()
content = [x.strip() for x in content] #remove whitespace characters like `\n` at the end of each line
data=pd.DataFrame(content, columns=['text'])
data = pd.DataFrame(data.text.str.split('\t',1).tolist(), columns = ['klass','SMS']) #Separating each line by \t into two columns.

In [3]:
print ('Not spam:', len(data[data['klass']=='ham']))
print ('Spam:', len(data[data['klass']=='spam']))    

Not spam: 4827
Spam: 747


In [4]:
#Initializing Spark session.
spark = (
    SparkSession.builder 
    .master("local") 
    .appName("BIOS-823") 
    .config("spark.executor.cores", 4) 
    .getOrCreate()    
)

In [5]:
cols=["klass","SMS"]
df = spark.createDataFrame(data, cols)
train, test = df.randomSplit([0.8, 0.2], seed=123)

In [6]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [7]:
#The most frequent index gets 0, so ham = 0 and spam = 1.
indexer = StringIndexer(
    inputCol="klass", 
    outputCol="labels"
)

tokenizer = Tokenizer(
    inputCol="SMS", 
    outputCol="SMS_tokens")

remover = StopWordsRemover(
    inputCol="SMS_tokens", 
    outputCol="clean_tokens")

countvectorizer=CountVectorizer(
    inputCol="clean_tokens", 
    outputCol="SMS_count"
)

lr = LogisticRegression(
    featuresCol='SMS_count', 
    labelCol='labels',
    maxIter=10, 
    regParam=0.001
)

pipeline = Pipeline(stages=[indexer, tokenizer, remover, countvectorizer, lr])

In [8]:
model = pipeline.fit(train)
prediction = model.transform(test)
score = prediction.select(['labels', 'prediction'])

In [9]:
score.show(5)

+------+----------+
|labels|prediction|
+------+----------+
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
|   0.0|       0.0|
+------+----------+
only showing top 5 rows



In [10]:
accuracy = round(score.rdd.map(lambda x: x[0] == x[1]).sum() / float(score.count()),3)

#Values for identifying "not spam".
tp=score.rdd.map(lambda x: (x[0] == 0) and (x[0] == 0)).sum()
fp=score.rdd.map(lambda x: (x[0] == 1) and (x[1] == 0)).sum()
tn=score.rdd.map(lambda x: (x[0] == 1) and (x[1] == 1)).sum()

print ('Accuracy of the model:', accuracy)
print ('Precision (identifying not spam):', round(tp/(tp+fp),3))
print ('Recall (identifying not spam):', round(tp/(tp+tn),3))

#Values for identifying "Spam".
tp=score.rdd.map(lambda x: (x[0] == 1) and (x[0] == 1)).sum()
fp=score.rdd.map(lambda x: (x[0] == 0) and (x[1] == 1)).sum()
tn=score.rdd.map(lambda x: (x[0] == 0) and (x[1] == 0)).sum()

print ('\n')
print ('Accuracy of the model:', accuracy)
print ('Precision (identifying spam):', round(tp/(tp+fp),3))
print ('Recall (identifying spam):', round(tp/(tp+tn),3))

Accuracy of the model: 0.972
Precision (identifying not spam): 0.968
Recall (identifying not spam): 0.891


Accuracy of the model: 0.972
Precision (identifying spam): 1.0
Recall (identifying spam): 0.134


**2** (100 points)

In this exercise, you will simulate running a machine learning pipeline to classify steaming data.

- Convert the test DataFrame into a Pandas DataFrame
- Write each row of the DataFrame to a separate tab-delimited file in a folder called "incoming_sms"
- Create a Structured Streaming DataFrame using `readStream` with `option("maxFilesPerTrigger", 1)` to simulate streaming data
- Use the fitted pipeline created in Ex. 1 to transform the input stream
- Write the transformed stream to memory with name `sms_pred
- Sleep 30 seconds
- Use an SQL query to show the `index`, `label` and `prediction` columns
- Sleep 30 more seconds
- Use an SQL query to show the `index`, `label` and `prediction` columns

In [11]:
test = test.select("*").toPandas()

In [12]:
import csv
import os
os.chdir('/Users/iuliia/bios-823-2019/homework/incoming_sms')

for index, row in test.iterrows():
    row = index, row['klass'], row['SMS'] #each row is index, klass label, and SMS.
    row=pd.DataFrame(row).T
    row.to_csv("%d.csv" % index, header=False, sep='\t')  

In [13]:
import string
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import col, split
from time import sleep

In [14]:
df = (
    spark.
    readStream.
    format("text").
    option("maxFilesPerTrigger", 1).
    load('/Users/iuliia/bios-823-2019/homework/incoming_sms')
)

#Separating one column "value" into three new columns: index, klass, and SMS.
df=df.withColumn('temp', split('value', '\t')).select(*(col('temp').getItem(i).alias(f'col{i}') for i in range(4))).drop('col0')
df = df.select(col("col1").alias("index"), col("col2").alias("klass"), col("col3").alias("SMS"))


In [15]:
#Applying transformation using pipeline.
prediction = model.transform(df) 

In [16]:
query = (
    prediction.writeStream.
    queryName("sms_pred").
    format("memory").
    outputMode("append").
    start()
)

In [17]:
spark.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x1198e5860>]

In [21]:
sleep(30)
spark.sql('''SELECT index, labels, prediction FROM sms_pred''').show(3)
sleep(30)
spark.sql('''SELECT index, labels, prediction FROM sms_pred''').show(6)

+-----+------+----------+
|index|labels|prediction|
+-----+------+----------+
|    6|   0.0|       0.0|
|    7|   0.0|       0.0|
|    5|   0.0|       0.0|
+-----+------+----------+
only showing top 3 rows

+-----+------+----------+
|index|labels|prediction|
+-----+------+----------+
|    6|   0.0|       0.0|
|    7|   0.0|       0.0|
|    5|   0.0|       0.0|
|    4|   0.0|       0.0|
|    0|   0.0|       0.0|
|    1|   0.0|       0.0|
+-----+------+----------+
only showing top 6 rows

