In [0]:
#Import the required libraries
spark_install=0
if spark_install:
  !wget -q https://www-us.apache.org/dist/spark/spark-3.0.0-preview2/spark-3.0.0-preview2-bin-hadoop3.2.tgz
  !tar -xvf spark-3.0.0-preview2-bin-hadoop3.2.tgz
  !pip install -q findspark

In [0]:
#Import the required libraries
java_install=0
if java_install:
  !apt-get install openjdk-8-jdk-headless -qq > /dev/null
  !update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
  !java -version

In [0]:
import os
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-preview2-bin-hadoop3.2"
import findspark
findspark.init()

In [0]:
#Import the required libraries
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('nb_spam').getOrCreate()

In [0]:
file = "SMS-Messages2.csv"

In [0]:
df=spark.read.csv(file,inferSchema=True,header=True)

In [0]:
print((df.count(), len(df.columns)))

(200, 2)


In [0]:
df.printSchema()

root
 |-- Message: string (nullable = true)
 |-- Class_Label: string (nullable = true)



In [0]:
df.show(5)

+--------------------+-----------+
|             Message|Class_Label|
+--------------------+-----------+
|'Go until jurong ...|        ham|
|'Ok lar... Joking...|        ham|
|'Free entry in 2 ...|       spam|
|'U dun say so ear...|        ham|
|'Nah I don\'t thi...|        ham|
+--------------------+-----------+
only showing top 5 rows



In [0]:
df.describe().show()

+-------+--------------------+-----------+
|summary|             Message|Class_Label|
+-------+--------------------+-----------+
|  count|                 200|        200|
|   mean|                null|       null|
| stddev|                null|       null|
|    min|"'A swt thought: ...|        ham|
|    max|'Ü predict wat ti...|       spam|
+-------+--------------------+-----------+



In [0]:
df.groupBy('Class_Label').count().show()

+-----------+-----+
|Class_Label|count|
+-----------+-----+
|        ham|  167|
|       spam|   33|
+-----------+-----+



In [0]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer

In [0]:
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="Message", outputCol="words", pattern="\\W")

In [0]:
# bag of words count
countVectors = CountVectorizer(inputCol="words", outputCol="features", vocabSize=10000, minDF=5)

In [0]:
#Convert Label to numeric
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
label_stringIdx = StringIndexer(inputCol ="Class_Label", outputCol = "label")

In [0]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[regexTokenizer, countVectors,label_stringIdx])
pipelineFit = pipeline.fit(df)
dataset = pipelineFit.transform(df)
dataset.show(5)

+--------------------+-----------+--------------------+--------------------+-----+
|             Message|Class_Label|               words|            features|label|
+--------------------+-----------+--------------------+--------------------+-----+
|'Go until jurong ...|        ham|[go, until, juron...|    (102,[51],[1.0])|  0.0|
|'Ok lar... Joking...|        ham|[ok, lar, joking,...|(102,[6,50],[1.0,...|  0.0|
|'Free entry in 2 ...|       spam|[free, entry, in,...|(102,[2,3,10,20,2...|  1.0|
|'U dun say so ear...|        ham|[u, dun, say, so,...|(102,[6,28,62,80,...|  0.0|
|'Nah I don\'t thi...|        ham|[nah, i, don, t, ...|(102,[0,2,26,34,9...|  0.0|
+--------------------+-----------+--------------------+--------------------+-----+
only showing top 5 rows



In [0]:
dataset.printSchema()

root
 |-- Message: string (nullable = true)
 |-- Class_Label: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



In [0]:
dataset.select(['features','label']).show(10,False)

+--------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                            |label|
+--------------------------------------------------------------------------------------------------------------------+-----+
|(102,[51],[1.0])                                                                                                    |0.0  |
|(102,[6,50],[1.0,1.0])                                                                                              |0.0  |
|(102,[2,3,10,20,22,26,46,77,90,92],[3.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0])                                       |1.0  |
|(102,[6,28,62,80,90],[2.0,1.0,1.0,1.0,1.0])                                                                         |0.0  |
|(102,[0,2,26,34,98],[1.0,1.0,1.0,1.0,1.0])                                                                          |0.0  |


In [0]:
model_df=dataset.select(['features','label'])

In [0]:
training_df,test_df=model_df.randomSplit([0.75,0.25])

In [0]:
print(training_df.count())

144


In [0]:
training_df.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|  123|
|  1.0|   21|
+-----+-----+



In [0]:
test_df.count()

56

In [0]:
test_df.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|   44|
|  1.0|   12|
+-----+-----+



In [0]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(training_df)
train_results = model.transform(training_df)

In [0]:
train_results.show(5)

+-----------+-----+--------------------+--------------------+----------+
|   features|label|       rawPrediction|         probability|prediction|
+-----------+-----+--------------------+--------------------+----------+
|(102,[],[])|  0.0|[-0.1633250561032...|[0.84931506849315...|       0.0|
|(102,[],[])|  0.0|[-0.1633250561032...|[0.84931506849315...|       0.0|
|(102,[],[])|  0.0|[-0.1633250561032...|[0.84931506849315...|       0.0|
|(102,[],[])|  0.0|[-0.1633250561032...|[0.84931506849315...|       0.0|
|(102,[],[])|  0.0|[-0.1633250561032...|[0.84931506849315...|       0.0|
+-----------+-----+--------------------+--------------------+----------+
only showing top 5 rows



In [0]:
train_results.filter(train_results['label']==1).filter(train_results['prediction']==1).select(['label', 'prediction','probability']).show(10,False)

+-----+----------+------------------------------------------+
|label|prediction|probability                               |
+-----+----------+------------------------------------------+
|1.0  |1.0       |[6.603264555190262E-8,0.9999999339673543] |
|1.0  |1.0       |[7.944205863553586E-6,0.9999920557941365] |
|1.0  |1.0       |[1.052983244950227E-5,0.9999894701675506] |
|1.0  |1.0       |[1.096867893550462E-6,0.9999989031321065] |
|1.0  |1.0       |[1.4399220477145962E-6,0.9999985600779523]|
|1.0  |1.0       |[3.4996955812823725E-4,0.9996500304418718]|
|1.0  |1.0       |[2.6942888208903474E-8,0.9999999730571117]|
|1.0  |1.0       |[0.4811568223326079,0.5188431776673921]   |
|1.0  |1.0       |[8.244706361631026E-4,0.9991755293638369] |
|1.0  |1.0       |[0.004431263635207707,0.9955687363647924] |
+-----+----------+------------------------------------------+
only showing top 10 rows



In [0]:
test_results = model.transform(test_df)

In [0]:
test_results.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [0]:
test_results.select(['label','prediction']).show(10,False)

+-----+----------+
|label|prediction|
+-----+----------+
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
+-----+----------+
only showing top 10 rows



In [0]:
tp =  test_results[(test_results.label == 1) & (test_results.prediction == 1)].count()

In [0]:
tn =  test_results[(test_results.label == 0) & (test_results.prediction == 0)].count()

In [0]:
fp =  test_results[(test_results.label == 0) & (test_results.prediction == 1)].count()

In [0]:
fn =  test_results[(test_results.label == 1) & (test_results.prediction == 0)].count()

In [0]:
accuracy=float((tp+tn) /(test_results.count()))

In [0]:
print(accuracy)

0.9464285714285714


In [0]:
recall = float(tp/(tp+fn))

In [0]:
print(recall)

0.9166666666666666


In [0]:
precision = float(tp/(tp+fp))

In [0]:
print(precision)

0.8461538461538461


In [1]:
!jupyter nbconvert MODULE4_DAY4_SparkNB.ipynb

[NbConvertApp] Converting notebook MODULE4_DAY4_SparkNB.ipynb to html
[NbConvertApp] Writing 309803 bytes to MODULE4_DAY4_SparkNB.html
