In [1]:
# pip install pyspark

In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark =SparkSession.builder.appName('spam').getOrCreate()

In [5]:
data= spark.read.csv("C:\\Users\\dhine\\Downloads\\SMSSpamCollection",inferSchema=True,header=False,sep='\t')

In [6]:
data = data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')

In [7]:
data.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [8]:
from pyspark.sql.functions import length

In [9]:
data = data.withColumn('length',length(data['text']))

In [10]:
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [11]:
#pretty clear difference
data.groupby('class').mean().show()
#this gives the average length

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



# Feature Transformations

In [21]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover,CountVectorizer,IDF,StringIndexer

tokenizer=Tokenizer(inputCol="text",outputCol="token_text")
stopremove=StopWordsRemover(inputCol="token_text",outputCol="stop_tokens")
count_vec=CountVectorizer(inputCol="stop_tokens",outputCol="c_vec")
idf=IDF(inputCol="c_vec",outputCol="tf_idf")
ham_spam_to_num=StringIndexer(inputCol="class",outputCol="label")

In [22]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

In [23]:
clean_up = VectorAssembler(inputCols=["tf_idf","length"],outputCol="features")

In [24]:
clean_up

VectorAssembler_d76dd21ae524

# The Model:

In [25]:
from pyspark.ml.classification import NaiveBayes

In [26]:
#use defaults
nb = NaiveBayes()

# Pipeline:

In [27]:
from pyspark.ml import Pipeline

In [28]:
#prepare the pipeline stages
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,tokenizer,stopremove,count_vec,idf,clean_up])

In [29]:
#fit the data to the pipeline
cleaner = data_prep_pipe.fit(data)

In [30]:
#clean the data
clean_data=cleaner.transform(data)

# Training and Evaluation

In [31]:
clean_data=clean_data.select(['label','features'])

In [32]:
clean_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,301,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
|  1.0|(13424,[10,60,140...|
|  0.0|(13424,[10,53,102...|
|  0.0|(13424,[127,185,4...|
|  1.0|(13424,[1,47,121,...|
|  1.0|(13424,[0,1,13,27...|
|  0.0|(13424,[18,43,117...|
|  1.0|(13424,[8,16,37,8...|
|  1.0|(13424,[13,30,47,...|
|  0.0|(13424,[39,95,221...|
|  0.0|(13424,[555,1797,...|
|  1.0|(13424,[30,109,11...|
|  0.0|(13424,[82,214,44...|
|  0.0|(13424,[0,2,49,13...|
|  0.0|(13424,[0,74,105,...|
|  1.0|(13424,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows



In [33]:
(training,testing)=clean_data.randomSplit([0.7,0.3])

In [34]:
spam_predictor=nb.fit(training)

In [35]:
data.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)



In [36]:
test_results=spam_predictor.transform(testing)

In [37]:
test_results.show

<bound method DataFrame.show of DataFrame[label: double, features: vector, rawPrediction: vector, probability: vector, prediction: double]>

In [38]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [40]:
acc_eval=MulticlassClassificationEvaluator()
acc=acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was : {}".format(acc))

Accuracy of model at predicting spam was : 0.9227684465925983


Not bad considering we're using straight math on text data! Try switching out the classification models! Or even try to come up with other engineered features!

# Logistic Regression:

In [41]:
from pyspark.ml.classification import LogisticRegression

In [42]:
log_reg = LogisticRegression(featuresCol='features',
                             labelCol='label')

In [43]:
spam_predictor=log_reg.fit(training)

In [44]:
data.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)



In [45]:
test_results=spam_predictor.transform(testing)

In [46]:
test_results.show

<bound method DataFrame.show of DataFrame[label: double, features: vector, rawPrediction: vector, probability: vector, prediction: double]>

In [47]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [48]:
acc_eval=MulticlassClassificationEvaluator()
acc=acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was : {}".format(acc))

Accuracy of model at predicting spam was : 0.9857704279925621


# NLP:

In [49]:
example_corpus="The bike is driven on the road. The car is driven on the highway"

In [50]:
#tokenisation
from pprint import pprint

list_example=example_corpus.split('.')
print('raw text:')
print(list_example)

list_example_term=[str(document).lower().split() for document in list_example]
print('tokenized term:')
print(list_example_term)

raw text:
['The bike is driven on the road', ' The car is driven on the highway']
tokenized term:
[['the', 'bike', 'is', 'driven', 'on', 'the', 'road'], ['the', 'car', 'is', 'driven', 'on', 'the', 'highway']]


the parameters required for TFIDF calculations:

n=no of documents

d= document index

t = term

tc = term count in a specific document

tt = total terms in a specific document

df = number of documents containing a specific term

In [51]:
from collections import Counter
from pprint import pprint

example_n=len(list_example)
print('n:')
print(example_n)
print()
list_example_term_t=[str(document).lower().split() for document in list_example]
print('tokenized term (t):')
pprint(list_example_term_t)
print()
list_example_term_tc =[list(Counter(document).items()) for document in list_example_term_t]
print('term count (tc):')
pprint(list_example_term_tc)
print()
list_example_term_tt =[sum([term[1]  for term in document]) for document in list_example_term_tc]
print('total term (tt):')
pprint(list_example_term_tt)
print()
counter_example_term_df =Counter([term[0] for document in list_example_term_tc for term in document])
print('document frequency (df):')
pprint(list(counter_example_term_df.items()))
print()

n:
2

tokenized term (t):
[['the', 'bike', 'is', 'driven', 'on', 'the', 'road'],
 ['the', 'car', 'is', 'driven', 'on', 'the', 'highway']]

term count (tc):
[[('the', 2), ('bike', 1), ('is', 1), ('driven', 1), ('on', 1), ('road', 1)],
 [('the', 2), ('car', 1), ('is', 1), ('driven', 1), ('on', 1), ('highway', 1)]]

total term (tt):
[7, 7]

document frequency (df):
[('the', 2),
 ('bike', 1),
 ('is', 2),
 ('driven', 2),
 ('on', 2),
 ('road', 1),
 ('car', 1),
 ('highway', 1)]



In [52]:
import numpy as np

def calculate_example_tfidf(indx,t,tc,tt,n,df):
  tf=tc/tt
  idf=np.log10((n)/(df))
  tfidf=tf*idf
#weight associated with a word to recognised whether it is important(relavant) or not
  return [indx,t,tc,tt,tf,df,idf,tfidf]

for indx,document in enumerate(list_example_term_tc):
  for t,tc in document:
    print(calculate_example_tfidf(indx
                                  ,t
                                  ,tc
                                  ,list_example_term_tt[indx]
                                  ,example_n
                                  ,counter_example_term_df[t]))

[0, 'the', 2, 7, 0.2857142857142857, 2, 0.0, 0.0]
[0, 'bike', 1, 7, 0.14285714285714285, 1, 0.3010299956639812, 0.043004285094854454]
[0, 'is', 1, 7, 0.14285714285714285, 2, 0.0, 0.0]
[0, 'driven', 1, 7, 0.14285714285714285, 2, 0.0, 0.0]
[0, 'on', 1, 7, 0.14285714285714285, 2, 0.0, 0.0]
[0, 'road', 1, 7, 0.14285714285714285, 1, 0.3010299956639812, 0.043004285094854454]
[1, 'the', 2, 7, 0.2857142857142857, 2, 0.0, 0.0]
[1, 'car', 1, 7, 0.14285714285714285, 1, 0.3010299956639812, 0.043004285094854454]
[1, 'is', 1, 7, 0.14285714285714285, 2, 0.0, 0.0]
[1, 'driven', 1, 7, 0.14285714285714285, 2, 0.0, 0.0]
[1, 'on', 1, 7, 0.14285714285714285, 2, 0.0, 0.0]
[1, 'highway', 1, 7, 0.14285714285714285, 1, 0.3010299956639812, 0.043004285094854454]
