In [None]:
def setupSpark():
  # Spark needs to run with Java 8 ... 
  !pip install -q findspark
  !apt-get install openjdk-8-jdk-headless > /dev/null
  !echo 2 | update-alternatives --config java > /dev/null
  !java -version
  import os, findspark
  os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
  # !echo JAVA_HOME=$JAVA_HOME
  !pip install -q pyspark
  findspark.init(spark_home='/usr/local/lib/python3.7/dist-packages/pyspark')
  !pyspark --version

setupSpark()

openjdk version "1.8.0_292"
OpenJDK Runtime Environment (build 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10)
OpenJDK 64-Bit Server VM (build 25.292-b10, mixed mode)
[K     |████████████████████████████████| 281.3 MB 41 kB/s 
[K     |████████████████████████████████| 198 kB 53.7 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 1.8.0_292
Branch HEAD
Compiled by user ubuntu on 2021-10-06T12:46:30Z
Revision 5d45a415f3a29898d92380380cfd82bfc7f579ea
Url https://github.com/apache/spark
Type --help for more information.


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SpamClassifier').getOrCreate()

In [None]:
# Load data and rename column
data = spark.read.option("header", "false") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .csv("/content/SMSSpamCollection") \
    .withColumnRenamed("_c0", "class") \
    .withColumnRenamed("_c1", "text")

In [None]:
data.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import length
data = data.withColumn('length',length(data['text']))
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [None]:
data.groupby('class').mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



In [None]:
# Import the required packages
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes

In [None]:
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")

In [None]:
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')


In [None]:
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')


In [None]:
idf = IDF(inputCol="c_vec", outputCol="tf_idf")


In [None]:
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')


In [None]:
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

In [None]:
#pipeline
from pyspark.ml import Pipeline
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,tokenizer,stopremove,count_vec,idf,clean_up])
cleaner = data_prep_pipe.fit(data)
clean_data = cleaner.transform(data)

In [None]:
clean_data = clean_data.select(['label','features'])
clean_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,297,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
|  1.0|(13424,[10,60,139...|
|  0.0|(13424,[10,53,103...|
|  0.0|(13424,[125,184,4...|
|  1.0|(13424,[1,47,118,...|
|  1.0|(13424,[0,1,13,27...|
|  0.0|(13424,[18,43,120...|
|  1.0|(13424,[8,17,37,8...|
|  1.0|(13424,[13,30,47,...|
|  0.0|(13424,[39,96,217...|
|  0.0|(13424,[552,1697,...|
|  1.0|(13424,[30,109,11...|
|  0.0|(13424,[82,214,47...|
|  0.0|(13424,[0,2,49,13...|
|  0.0|(13424,[0,74,105,...|
|  1.0|(13424,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows



In [None]:
#model building
#NAIVE BAYES CLASSIFICATION
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes()

In [None]:
(training,testing) = clean_data.randomSplit([0.7,0.3])
spam_predictor = nb.fit(training)
test_results = spam_predictor.transform(testing)
test_results.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,2,7,8...|[-806.94799466044...|[1.0,2.3479410130...|       0.0|
|  0.0|(13424,[0,1,5,15,...|[-1001.6408342948...|[1.0,9.3702119212...|       0.0|
|  0.0|(13424,[0,1,5,20,...|[-805.78872577995...|[1.0,1.4166581215...|       0.0|
|  0.0|(13424,[0,1,7,8,1...|[-876.71486473776...|[1.0,1.1602302466...|       0.0|
|  0.0|(13424,[0,1,7,8,1...|[-1156.6611689974...|[1.0,5.2461835890...|       0.0|
|  0.0|(13424,[0,1,7,15,...|[-660.46559620044...|[1.0,6.6279539962...|       0.0|
|  0.0|(13424,[0,1,12,33...|[-439.98325489427...|[1.0,2.1646020202...|       0.0|
|  0.0|(13424,[0,1,14,31...|[-216.55291369779...|[1.0,5.8368106453...|       0.0|
|  0.0|(13424,[0,1,30,12...|[-594.78841734486...|[1.0,4.1693998733...|       0.0|
|  0.0|(13424,[0

In [None]:
#model evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
accuracy = evaluator.evaluate(test_results)
print ("Model Accuracy: ", accuracy)

Model Accuracy:  0.9344879330138764


In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


In [None]:
# Create ParamGrid and Evaluator for Cross Validation
paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0, 1.5, 2.0]).build()
cvEvaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")

In [None]:
# Cross-validation
cv = CrossValidator(estimator=nb, estimatorParamMaps=paramGrid, evaluator=cvEvaluator)
cvModel = cv.fit(training)

In [22]:
# Making predictions on testData
cvPredictions = cvModel.transform(testing)

In [23]:
evaluator.evaluate(cvPredictions)

0.9388165119297196

In [30]:
#LOGISTIC REGRESSION
from pyspark.ml.classification import LogisticRegression
model_LR = LogisticRegression()
spam_predictor2 = model_LR.fit(training)
test_results2 = spam_predictor2.transform(testing)
test_results2.show()


+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,2,7,8...|[23.9109924693349...|[0.99999999995873...|       0.0|
|  0.0|(13424,[0,1,5,15,...|[21.1380224050058...|[0.99999999933949...|       0.0|
|  0.0|(13424,[0,1,5,20,...|[12.7586113244950...|[0.99999712257339...|       0.0|
|  0.0|(13424,[0,1,7,8,1...|[15.8444248117310...|[0.99999986852182...|       0.0|
|  0.0|(13424,[0,1,7,8,1...|[12.9184237154744...|[0.99999754755768...|       0.0|
|  0.0|(13424,[0,1,7,15,...|[20.9029208480573...|[0.99999999916444...|       0.0|
|  0.0|(13424,[0,1,12,33...|[20.1236195306672...|[0.99999999817852...|       0.0|
|  0.0|(13424,[0,1,14,31...|[20.5321123353411...|[0.99999999878935...|       0.0|
|  0.0|(13424,[0,1,30,12...|[14.0861932214231...|[0.99999923714204...|       0.0|
|  0.0|(13424,[0

In [31]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
accuracy = evaluator.evaluate(test_results2)
print ("Model Accuracy: ", accuracy)

Model Accuracy:  0.9383345188180093


In [26]:
#RANDOM FOREST CLASSIFICATION
from pyspark.ml.classification import RandomForestClassifier
model_RF = RandomForestClassifier(numTrees=100)
spam_predictor1 = model_RF.fit(training)
test_results1 = spam_predictor1.transform(testing)
test_results1.show()


+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,2,7,8...|[87.4441092587759...|[0.87444109258775...|       0.0|
|  0.0|(13424,[0,1,5,15,...|[86.3038622316144...|[0.86303862231614...|       0.0|
|  0.0|(13424,[0,1,5,20,...|[87.4363684576043...|[0.87436368457604...|       0.0|
|  0.0|(13424,[0,1,7,8,1...|[86.7312098575217...|[0.86731209857521...|       0.0|
|  0.0|(13424,[0,1,7,8,1...|[87.1489198922227...|[0.87148919892222...|       0.0|
|  0.0|(13424,[0,1,7,15,...|[87.7663452872763...|[0.87766345287276...|       0.0|
|  0.0|(13424,[0,1,12,33...|[87.7663452872763...|[0.87766345287276...|       0.0|
|  0.0|(13424,[0,1,14,31...|[87.7663452872763...|[0.87766345287276...|       0.0|
|  0.0|(13424,[0,1,30,12...|[84.5408326675586...|[0.84540832667558...|       0.0|
|  0.0|(13424,[0

In [29]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
accuracy = evaluator.evaluate(test_results1)
print ("Model Accuracy: ", accuracy)

Model Accuracy:  0.5022321428571429
