In [45]:
from __future__ import print_function
import sys
from pyspark.sql import SparkSession
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.classification import LogisticRegressionWithSGD

In [46]:
#===============================================
# This function returns a LabeledPoint
#
# Parameters:
#   label: as 0 (for non-spam), 1 (for spam)
#   email: a String of words representing an email
#   tf: an instance of HashingTF
#
def createLabeledPoint(label, email, tf):
    print("createLabeledPoint() label = ", label)
    print("createLabeledPoint() email = ", email)
    print("createLabeledPoint() tf = ", tf)
    # featurize email
    features = tf.transform(email)
    # create a LabeledPoint
    return LabeledPoint(label, features)
#end-def
#===============================================

In [47]:
spark = SparkSession\
    .builder\
    .appName("logistic_regression_builder")\
    .getOrCreate()

In [48]:
# Step-1: define input and output paths
# define training data paths 
training_emails_nospam_path = "/user/arnavmoutl12edu/module7/training_emails_nospam.txt"
# "/.../training_emails_nospam.txt"
training_emails_spam_path = "/user/arnavmoutl12edu/module7/training_emails_spam.txt"
# "/.../training_emails_spam.txt"
# define output path for the built model
saved_model_path = "/user/arnavmoutl12edu/module7/model"
# "/.../model"

In [49]:
# check out the inputs
print("training_emails_nospam_path: {}".format(training_emails_nospam_path))
print("training_emails_spam_path: {}".format(training_emails_spam_path))
print("saved_model_path: {}".format(saved_model_path))

training_emails_nospam_path: /user/arnavmoutl12edu/module7/training_emails_nospam.txt
training_emails_spam_path: /user/arnavmoutl12edu/module7/training_emails_spam.txt
saved_model_path: /user/arnavmoutl12edu/module7/model


In [50]:
# spam_emails is an RDD[String]
spam_emails = spark.sparkContext.textFile(training_emails_spam_path)
print("spam_emails.collect()=", spam_emails.collect())
print("spam_emails.count()=", spam_emails.count())

spam_emails.collect()= ["Samsung Galaxy End of YearPromo You have 1 week remaining to retrieve your won prize for the Samsung Galaxy Xmas Promo 'C' draw category winning prize of Seven Hundred and Fifty Thousand Euros each and a Samsung Galaxy S6 EDGE. Winning Ticket Number:WIN-707-COS.  We advise you to keep this winning notification confidential and away from public notice to avoid double claim/mistransfer or impersonation until after remittance/payment to you.", "We've picked out 10 new matches for you. Meet them now and then check out all the singles in your area! you might win a prize too", 'For claim fill in the attached claim application form completely and follow instructions carefully.', 'Dear sir, I am a Prince in a far kingdom you have not heard of.  I want to send you money via wire transfer so please ...', 'Get Viagra real cheap!  Send money right away to ...', 'Oh my gosh you can be really strong too with these drugs found in the rainforest. Get them cheap right now ...',

In [51]:
# nospam_emails is an RDD[String]
nospam_emails = spark.sparkContext.textFile(training_emails_nospam_path)
print("nospam_emails.collect()=", nospam_emails.collect())
print("nospam_emails.count()=", nospam_emails.count())

nospam_emails.collect()= ['Dear Spark Learner, Thanks so much for attending the Spark Summit 2014!  Check out videos of talks from the summit at ...', 'Hi Mom, Apologies for being late about emailing and forgetting to send you the package.  I hope you and bro have been ...', 'Wow, hey Fred, just heard about the Spark petabyte sort.  I think we need to take time to try it out immediately ...', 'Hi Spark user list, This is my first question to this list, so thanks in advance for your help!  I tried running ...', "Thanks Tom for your email.  I need to refer you to Alice for this one.  I haven't yet figured out that part either ...", 'Good job yesterday!  I was attending your talk, and really enjoyed it.  I want to try out GraphX ...', 'Summit demo got whoops from audience!  Had to let you know. --Joe', 'Hello Dr. Smith, thank you for great presentation, the whole class enjoyed your introduction to spark, hope you can give another presentation soon', 'Dear Jeff, We look forward to seeing y

In [52]:
# Step-3: create an HTF
# tf is an instance of HashingTF, which can hold up to 128 features
FEATURES_HTF = HashingTF(numFeatures=128)

In [53]:
# Step-4: create LabeledPoint per training email
# spam emails gets a "1" classification, 
# spam_labeled_points is an RDD[LabeledPoint]
spam_labeled_points = spam_emails.map(lambda email : createLabeledPoint(1, email, FEATURES_HTF))
print("spam_labeled_points.collect()=", spam_labeled_points.collect())
print("spam_labeled_points.count()=", spam_labeled_points.count())

spam_labeled_points.collect()= [LabeledPoint(1.0, (128,[5,7,8,12,14,19,20,23,24,30,32,38,41,42,47,57,61,67,71,73,74,80,82,85,87,89,90,93,95,99,100,107,112,119,122,127],[4.0,6.0,51.0,6.0,12.0,1.0,9.0,5.0,18.0,6.0,1.0,5.0,15.0,3.0,2.0,2.0,1.0,4.0,29.0,35.0,1.0,8.0,1.0,2.0,95.0,31.0,3.0,6.0,1.0,33.0,3.0,21.0,13.0,5.0,2.0,2.0])), LabeledPoint(1.0, (128,[5,8,12,14,20,24,38,41,42,57,67,71,73,74,80,82,87,89,90,93,99,107,112,114,119,127],[1.0,10.0,2.0,2.0,4.0,6.0,6.0,3.0,1.0,1.0,2.0,9.0,6.0,1.0,2.0,1.0,32.0,14.0,1.0,3.0,7.0,9.0,3.0,1.0,1.0,1.0])), LabeledPoint(1.0, (128,[5,8,12,14,20,24,38,41,42,71,73,87,89,93,99,107,112],[1.0,12.0,3.0,2.0,7.0,13.0,2.0,4.0,1.0,7.0,8.0,19.0,5.0,1.0,5.0,7.0,2.0])), LabeledPoint(1.0, (128,[8,12,14,20,23,24,38,41,42,47,67,71,73,80,87,89,93,99,107,112,119],[14.0,1.0,3.0,1.0,2.0,4.0,2.0,3.0,4.0,1.0,3.0,8.0,11.0,1.0,32.0,10.0,2.0,8.0,4.0,5.0,2.0])), LabeledPoint(1.0, (128,[8,12,14,20,24,30,38,41,42,59,71,73,80,87,89,93,99,107,114],[5.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,3.0

In [54]:
#
# nospam emails gets a "0" classification, 
# nospam_labeled_points is an RDD[LabeledPoint]
nospam_labeled_points = nospam_emails.map(lambda email : createLabeledPoint(0, email, FEATURES_HTF))
print("nospam_labeled_points.collect()=", nospam_labeled_points.collect())
print("nospam_labeled_points.count()=", nospam_labeled_points.count())
#

nospam_labeled_points.collect()= [LabeledPoint(0.0, (128,[7,8,12,14,20,23,24,30,38,41,42,67,68,71,73,74,80,82,85,87,89,99,107,112,114,119,122],[1.0,11.0,2.0,2.0,2.0,2.0,1.0,3.0,5.0,6.0,3.0,5.0,1.0,6.0,8.0,1.0,1.0,1.0,1.0,29.0,8.0,4.0,9.0,6.0,1.0,1.0,1.0])), LabeledPoint(0.0, (128,[5,8,12,14,20,23,24,38,41,42,61,67,71,73,80,87,89,99,100,107,112,119],[1.0,9.0,3.0,3.0,1.0,1.0,5.0,3.0,3.0,4.0,1.0,2.0,11.0,8.0,6.0,28.0,12.0,7.0,4.0,6.0,2.0,1.0])), LabeledPoint(0.0, (128,[5,8,12,14,23,24,30,38,41,42,67,71,73,87,89,90,93,99,100,106,107,112],[1.0,10.0,2.0,4.0,2.0,5.0,1.0,4.0,3.0,4.0,4.0,6.0,6.0,26.0,13.0,1.0,2.0,2.0,2.0,1.0,15.0,2.0])), LabeledPoint(0.0, (128,[8,12,14,20,23,24,30,38,41,42,67,71,73,80,85,87,89,99,107,112,114,119],[18.0,2.0,2.0,1.0,2.0,5.0,1.0,4.0,2.0,3.0,3.0,5.0,4.0,1.0,1.0,29.0,5.0,8.0,8.0,10.0,1.0,1.0])), LabeledPoint(0.0, (128,[8,12,14,20,24,38,41,42,61,67,71,73,80,85,87,89,99,107,112,119,127],[13.0,1.0,2.0,1.0,5.0,5.0,2.0,5.0,1.0,3.0,9.0,5.0,1.0,2.0,33.0,12.0,4.0,10.0,2.0,1

In [55]:
# Step-5: Create a final training dataset
# which includes spam and nonspam emails
# Since all training data is classified, 
# now we create a single RDD as training_data,
# which is an RDD[LabeledPoint]  
training_data = spam_labeled_points.union(nospam_labeled_points)
print("training_data.count()=", training_data.count())
print("training_data.collect()=", training_data.collect())
training_data.cache()

training_data.count()= 40
training_data.collect()= [LabeledPoint(1.0, (128,[5,7,8,12,14,19,20,23,24,30,32,38,41,42,47,57,61,67,71,73,74,80,82,85,87,89,90,93,95,99,100,107,112,119,122,127],[4.0,6.0,51.0,6.0,12.0,1.0,9.0,5.0,18.0,6.0,1.0,5.0,15.0,3.0,2.0,2.0,1.0,4.0,29.0,35.0,1.0,8.0,1.0,2.0,95.0,31.0,3.0,6.0,1.0,33.0,3.0,21.0,13.0,5.0,2.0,2.0])), LabeledPoint(1.0, (128,[5,8,12,14,20,24,38,41,42,57,67,71,73,74,80,82,87,89,90,93,99,107,112,114,119,127],[1.0,10.0,2.0,2.0,4.0,6.0,6.0,3.0,1.0,1.0,2.0,9.0,6.0,1.0,2.0,1.0,32.0,14.0,1.0,3.0,7.0,9.0,3.0,1.0,1.0,1.0])), LabeledPoint(1.0, (128,[5,8,12,14,20,24,38,41,42,71,73,87,89,93,99,107,112],[1.0,12.0,3.0,2.0,7.0,13.0,2.0,4.0,1.0,7.0,8.0,19.0,5.0,1.0,5.0,7.0,2.0])), LabeledPoint(1.0, (128,[8,12,14,20,23,24,38,41,42,47,67,71,73,80,87,89,93,99,107,112,119],[14.0,1.0,3.0,1.0,2.0,4.0,2.0,3.0,4.0,1.0,3.0,8.0,11.0,1.0,32.0,10.0,2.0,8.0,4.0,5.0,2.0])), LabeledPoint(1.0, (128,[8,12,14,20,24,30,38,41,42,59,71,73,80,87,89,93,99,107,114],[5.0,1.0,1.0,1.0

UnionRDD[445] at union at NativeMethodAccessorImpl.java:0

In [59]:
#Step-7:  Save The Built LR Model
# To save the built model, we use `LogisticRegressionModel.save()` 
# method as:
LR_model.save(spark.sparkContext, saved_model_path)

#====================================
# Calculate the accuracy of the model. 
#====================================
# create test data for checking accuracy
training_70_percent, test_30_percent = training_data.randomSplit((0.7, 0.3))

In [None]:
# Step-6: Use LogisticRegressionWithSGD to create an LR model.
# Train a classification model for Binary Logistic Regression 
# using Stochastic Gradient Descent (SGD). By default L2 
# regularization is used, which can be changed via 
# LogisticRegressionWithSGD.optimizer.
# Labels used in Logistic Regression should be 
# `{0, 1, ..., K - 1}` for K classes multi-label 
# classification problem. So if we have `K = 2`
# classes, then the labels will be `{0, 1}`.
#
# The following code snippet shows how to create an LR model:
# You should note that the LR_model is an instance of 
# pyspark.mllib.classification.LogisticRegressionModel 
#   
# build an LR model (`LogisticRegressionModel`) using  
# `LogisticRegressionWithSGD`
LR_model = LogisticRegressionWithSGD.train(training_70_percent)

In [60]:
print("training_70_percent=", training_70_percent)
print("training_70_percent.count()=", training_70_percent.count())
print("training_70_percent.collect()=", training_70_percent.collect())
#
print("test_30_percent=", test_30_percent)
print("test_30_percent.count()=", test_30_percent.count())
print("test_30_percent.collect()=", test_30_percent.collect())

training_70_percent= PythonRDD[664] at RDD at PythonRDD.scala:53
training_70_percent.count()= 28
training_70_percent.collect()= [LabeledPoint(1.0, (128,[5,7,8,12,14,19,20,23,24,30,32,38,41,42,47,57,61,67,71,73,74,80,82,85,87,89,90,93,95,99,100,107,112,119,122,127],[4.0,6.0,51.0,6.0,12.0,1.0,9.0,5.0,18.0,6.0,1.0,5.0,15.0,3.0,2.0,2.0,1.0,4.0,29.0,35.0,1.0,8.0,1.0,2.0,95.0,31.0,3.0,6.0,1.0,33.0,3.0,21.0,13.0,5.0,2.0,2.0])), LabeledPoint(1.0, (128,[5,8,12,14,20,24,38,41,42,71,73,87,89,93,99,107,112],[1.0,12.0,3.0,2.0,7.0,13.0,2.0,4.0,1.0,7.0,8.0,19.0,5.0,1.0,5.0,7.0,2.0])), LabeledPoint(1.0, (128,[8,12,14,20,23,24,38,41,42,47,67,71,73,80,87,89,93,99,107,112,119],[14.0,1.0,3.0,1.0,2.0,4.0,2.0,3.0,4.0,1.0,3.0,8.0,11.0,1.0,32.0,10.0,2.0,8.0,4.0,5.0,2.0])), LabeledPoint(1.0, (128,[5,7,8,12,14,23,24,30,32,38,41,42,47,49,53,61,67,71,73,85,87,89,90,93,98,99,107,112,114,122],[3.0,3.0,5.0,2.0,2.0,9.0,4.0,7.0,5.0,2.0,2.0,4.0,2.0,1.0,5.0,2.0,1.0,3.0,3.0,4.0,22.0,2.0,1.0,2.0,6.0,1.0,3.0,6.0,1.0,2.0]))

In [61]:
prediction_label = test_30_percent.map(lambda x : (LR_model.predict(x.features), x.label))
print("prediction_label.count()=", prediction_label.count())
print("prediction_label.collect()=", prediction_label.collect())

prediction_label.count()= 12
prediction_label.collect()= [(0, 1.0), (0, 1.0), (0, 1.0), (0, 1.0), (1, 1.0), (1, 1.0), (0, 1.0), (0, 0.0), (0, 0.0), (0, 0.0), (0, 0.0), (0, 0.0)]


In [62]:
# Accuracy can be calculated by 
# taking the matching terms from both the training and test data. This 
# can be done as follows:
accuracy = 1.0 * prediction_label.filter(lambda x : float(x[0]) == float(x[1])).count() / test_30_percent.count()
print("accuracy=", accuracy)

accuracy= 0.5833333333333334


In [63]:
# done!
spark.stop()