In [16]:
# Import the findspark library to locate and initialize Spark
import findspark

# Initialize Spark with the path to your Spark installation
findspark.init('/home/mina/python-spark/spark-3.4.0-bin-hadoop3/')

# Import the pyspark library
import pyspark

# Import the SparkSession class from pyspark.sql
from pyspark.sql import SparkSession

# Create a SparkSession with the specified application name
spark = SparkSession.builder.appName('NLP_Project').getOrCreate()

# Read a CSV file into a DataFrame named 'data' with schema inference and '\t' as the separator
data = spark.read.csv('smsspamcollection/SMSSpamCollection', inferSchema = True , sep ='\t')

# Show the contents of the DataFrame 'data'
data.show()

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
|spam|FreeMsg Hey there...|
| ham|Even my brother i...|
| ham|As per your reque...|
|spam|WINNER!! As a val...|
|spam|Had your mobile 1...|
| ham|I'm gonna be home...|
|spam|SIX chances to wi...|
|spam|URGENT! You have ...|
| ham|I've been searchi...|
| ham|I HAVE A DATE ON ...|
|spam|XXXMobileMovieClu...|
| ham|Oh k...i'm watchi...|
| ham|Eh u remember how...|
| ham|Fine if thats th...|
|spam|England v Macedon...|
+----+--------------------+
only showing top 20 rows



In [17]:
# Rename the columns '_c0' to 'class' and '_c1' to 'text' in the DataFrame 'data'
data = data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')

# Show the contents of the DataFrame 'data' after renaming the columns
data.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [18]:
# Import the length function from pyspark.sql.functions
from pyspark.sql.functions import length

# Create a new DataFrame 'new_data' with an additional 'length' column
new_data = data.withColumn('lenght' , length(data['text']))

# Show the contents of the DataFrame 'new_data' with the added 'length' column
new_data.show()

+-----+--------------------+------+
|class|                text|lenght|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [19]:
# Group the 'new_data' DataFrame by the 'class' column and calculate the mean of the 'length' column for each group
mean_class = new_data.groupBy('class').mean('lenght')

# Show the mean length for each class
mean_class.show()

+-----+-----------------+
|class|      avg(lenght)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



In [20]:
# Import necessary classes from pyspark.ml.feature
from pyspark.ml.feature import (Tokenizer,StopWordsRemover,CountVectorizer,IDF,StringIndexer)

# Create a Tokenizer instance with input column 'text' and output column 'words'
tokenizer = Tokenizer(inputCol='text' , outputCol='words')

# Create a StopWordsRemover instance with input column 'words' and output column 'stopword'
stopword = StopWordsRemover(inputCol='words' , outputCol='stopword')

# Create a CountVectorizer instance with input column 'stopword' and output column 'cv_word'
cv = CountVectorizer(inputCol='stopword' , outputCol='cv_word')

# Create an IDF instance with input column 'cv_word' and output column 'idf'
idf = IDF(inputCol='cv_word' , outputCol='idf')

# Create a StringIndexer instance with input column 'class' and output column 'label'
ham_spam_convert = StringIndexer(inputCol='class', outputCol='label')

In [21]:
# Import the VectorAssembler class from pyspark.ml.feature
from pyspark.ml.feature import VectorAssembler

# Create a VectorAssembler instance with specified input columns 'idf' and 'length', and output column 'features'
assembler = VectorAssembler(inputCols=['idf', 'lenght'], outputCol='features')

In [22]:
# Import the Pipeline class from pyspark.ml
from pyspark.ml import Pipeline

# Create a Pipeline with a list of stages representing the data transformation steps
pipe = Pipeline(stages=[tokenizer, stopword, cv, idf,ham_spam_convert,assembler])

# Fit the pipeline on the 'new_data' DataFrame, which applies each stage's transformation to the data
data_pipe_fit = pipe.fit(new_data)

# Transform the 'new_data' DataFrame using the fitted pipeline to produce 'data_pipe_tranc'
data_pipe_tranc = data_pipe_fit.transform(new_data)

# Show the transformed DataFrame 'data_pipe_tranc'
data_pipe_tranc.show()

+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+
|class|                text|lenght|               words|            stopword|             cv_word|                 idf|label|            features|
+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+
|  ham|Go until jurong p...|   111|[go, until, juron...|[go, jurong, poin...|(13423,[7,11,31,6...|(13423,[7,11,31,6...|  0.0|(13424,[7,11,31,6...|
|  ham|Ok lar... Joking ...|    29|[ok, lar..., joki...|[ok, lar..., joki...|(13423,[0,24,301,...|(13423,[0,24,301,...|  0.0|(13424,[0,24,301,...|
| spam|Free entry in 2 a...|   155|[free, entry, in,...|[free, entry, 2, ...|(13423,[2,13,19,3...|(13423,[2,13,19,3...|  1.0|(13424,[2,13,19,3...|
|  ham|U dun say so earl...|    49|[u, dun, say, so,...|[u, dun, say, ear...|(13423,[0,70,80,1...|(13423,[0,70,80,1...

In [23]:
# Retrieve the column names of the 'data_pipe_tranc' DataFrame
data_pipe_tranc.columns

['class',
 'text',
 'lenght',
 'words',
 'stopword',
 'cv_word',
 'idf',
 'label',
 'features']

In [24]:
# Select the 'features' and 'label' columns from the 'data_pipe_tranc' DataFrame and create a new DataFrame 'final_data'
final_data = data_pipe_tranc.select('features','label')

# Show the contents of the 'final_data' DataFrame
final_data.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(13424,[7,11,31,6...|  0.0|
|(13424,[0,24,301,...|  0.0|
|(13424,[2,13,19,3...|  1.0|
|(13424,[0,70,80,1...|  0.0|
|(13424,[36,134,31...|  0.0|
|(13424,[10,60,140...|  1.0|
|(13424,[10,53,102...|  0.0|
|(13424,[127,185,4...|  0.0|
|(13424,[1,47,121,...|  1.0|
|(13424,[0,1,13,27...|  1.0|
|(13424,[18,43,117...|  0.0|
|(13424,[8,16,37,8...|  1.0|
|(13424,[13,30,47,...|  1.0|
|(13424,[39,95,221...|  0.0|
|(13424,[555,1797,...|  0.0|
|(13424,[30,109,11...|  1.0|
|(13424,[82,214,44...|  0.0|
|(13424,[0,2,49,13...|  0.0|
|(13424,[0,74,105,...|  0.0|
|(13424,[4,30,33,5...|  1.0|
+--------------------+-----+
only showing top 20 rows



In [25]:
# Import the NaiveBayes class from pyspark.ml.classification
from pyspark.ml.classification import NaiveBayes

# Create a NaiveBayes model instance
model_1 = NaiveBayes()

# Split the 'final_data' DataFrame into training and testing sets with a 70-30 ratio
train, test = final_data.randomSplit([0.7,0.3])

# Fit the NaiveBayes model on the training data
model_fit_1 = model_1.fit(train)

# Transform the test data using the fitted model to make predictions
model_test_1 = model_fit_1.transform(test)

# Show the results, including predictions, in the 'model_test_1' DataFrame
model_test_1.show()

23/09/19 09:58:39 WARN DAGScheduler: Broadcasting large task binary with size 1131.3 KiB
23/09/19 09:58:40 WARN DAGScheduler: Broadcasting large task binary with size 1105.2 KiB
23/09/19 09:58:40 WARN DAGScheduler: Broadcasting large task binary with size 1336.7 KiB


+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(13424,[0,1,2,12,...|  1.0|[-1144.2264606083...|[1.46430636370738...|       1.0|
|(13424,[0,1,2,12,...|  1.0|[-1142.1225033729...|[1.43771372374487...|       1.0|
|(13424,[0,1,2,13,...|  0.0|[-608.43874039574...|[1.0,6.2032321227...|       0.0|
|(13424,[0,1,2,15,...|  1.0|[-1154.1128942288...|[6.03435503490663...|       1.0|
|(13424,[0,1,4,13,...|  1.0|[-1428.2838348287...|[4.76835746901457...|       1.0|
|(13424,[0,1,4,50,...|  0.0|[-827.19824673355...|[1.0,2.1485954144...|       0.0|
|(13424,[0,1,4,137...|  1.0|[-1792.3271693142...|[1.68951245805226...|       1.0|
|(13424,[0,1,7,8,1...|  0.0|[-1175.2086573754...|[1.0,9.7601529288...|       0.0|
|(13424,[0,1,7,15,...|  0.0|[-664.19069718438...|[1.0,2.2988465945...|       0.0|
|(13424,[0,1,9,1

In [26]:
# Import the MulticlassClassificationEvaluator class from pyspark.ml.evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create an instance of the MulticlassClassificationEvaluator
eval_model_1 = MulticlassClassificationEvaluator()

# Evaluate the model's performance on the test data and calculate the accuracy
acc_1 = eval_model_1.evaluate(model_test_1)

# Display the accuracy of the model
acc_1

23/09/19 09:58:40 WARN DAGScheduler: Broadcasting large task binary with size 1341.3 KiB


0.9238406554058418

In [27]:
# Import the RandomForestClassifier class from pyspark.ml.classification
from pyspark.ml.classification import RandomForestClassifier

# Create a RandomForestClassifier model instance with 100 trees
model_2 = RandomForestClassifier(numTrees=100)

# Split the 'final_data' DataFrame into training and testing sets with a 70-30 ratio
train, test = final_data.randomSplit([0.7,0.3])

# Fit the RandomForestClassifier model on the training data
model_fit_2 = model_2.fit(train)

# Transform the test data using the fitted model to make predictions
model_test_2 = model_fit_2.transform(test)

# Show the results, including predictions, in the 'model_test_2' DataFrame
model_test_2.show()

23/09/19 09:58:41 WARN DAGScheduler: Broadcasting large task binary with size 1128.2 KiB
23/09/19 09:58:41 WARN DAGScheduler: Broadcasting large task binary with size 1128.2 KiB
23/09/19 09:58:41 WARN DAGScheduler: Broadcasting large task binary with size 1267.1 KiB
23/09/19 09:58:44 WARN DAGScheduler: Broadcasting large task binary with size 1494.1 KiB
23/09/19 09:58:45 WARN DAGScheduler: Broadcasting large task binary with size 1541.4 KiB
23/09/19 09:58:45 WARN DAGScheduler: Broadcasting large task binary with size 1588.3 KiB
23/09/19 09:58:45 WARN DAGScheduler: Broadcasting large task binary with size 1634.4 KiB
23/09/19 09:58:46 WARN DAGScheduler: Broadcasting large task binary with size 1682.7 KiB
23/09/19 09:58:46 WARN DAGScheduler: Broadcasting large task binary with size 1430.4 KiB


+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(13424,[0,1,2,12,...|  1.0|[73.9955868850982...|[0.73995586885098...|       0.0|
|(13424,[0,1,2,13,...|  0.0|[82.1020981056429...|[0.82102098105642...|       0.0|
|(13424,[0,1,2,15,...|  1.0|[67.9045675664677...|[0.67904567566467...|       0.0|
|(13424,[0,1,2,15,...|  1.0|[69.5083779286195...|[0.69508377928619...|       0.0|
|(13424,[0,1,2,41,...|  0.0|[83.2912386785097...|[0.83291238678509...|       0.0|
|(13424,[0,1,4,137...|  1.0|[81.1750539098457...|[0.81175053909845...|       0.0|
|(13424,[0,1,5,20,...|  0.0|[86.2891199955098...|[0.86289119995509...|       0.0|
|(13424,[0,1,7,8,1...|  0.0|[83.0186019698717...|[0.83018601969871...|       0.0|
|(13424,[0,1,9,14,...|  0.0|[86.6973852356783...|[0.86697385235678...|       0.0|
|(13424,[0,1,13,

In [28]:
# Import the MulticlassClassificationEvaluator class from pyspark.ml.evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create an instance of the MulticlassClassificationEvaluator
eval_model_2 = MulticlassClassificationEvaluator()

# Evaluate the performance of the model_2 (Random Forest) on the test data and calculate the accuracy
acc_2 = eval_model_2.evaluate(model_test_2)

# Display the accuracy of the model
acc_2

23/09/19 09:58:47 WARN DAGScheduler: Broadcasting large task binary with size 1435.0 KiB


0.8048182251628536

In [29]:
# Import the GBTClassifier class from pyspark.ml.classification
from pyspark.ml.classification import GBTClassifier

# Create a GBTClassifier model instance
model_3 = GBTClassifier()

# Split the 'final_data' DataFrame into training and testing sets with a 70-30 ratio
train, test = final_data.randomSplit([0.7,0.3])

# Fit the GBTClassifier model on the training data
model_fit_3 = model_3.fit(train)

# Transform the test data using the fitted model to make predictions
model_test_3 = model_fit_3.transform(test)

# Show the results, including predictions, in the 'model_test_3' DataFrame
model_test_3.show()

23/09/19 09:58:47 WARN DAGScheduler: Broadcasting large task binary with size 1127.7 KiB
23/09/19 09:58:47 WARN DAGScheduler: Broadcasting large task binary with size 1127.8 KiB
23/09/19 09:58:48 WARN DAGScheduler: Broadcasting large task binary with size 1266.7 KiB
23/09/19 09:58:50 WARN DAGScheduler: Broadcasting large task binary with size 1443.5 KiB
23/09/19 09:58:51 WARN DAGScheduler: Broadcasting large task binary with size 1444.3 KiB
23/09/19 09:58:51 WARN DAGScheduler: Broadcasting large task binary with size 1444.8 KiB
23/09/19 09:58:51 WARN DAGScheduler: Broadcasting large task binary with size 1445.9 KiB
23/09/19 09:58:51 WARN DAGScheduler: Broadcasting large task binary with size 1447.5 KiB
23/09/19 09:58:52 WARN DAGScheduler: Broadcasting large task binary with size 1452.7 KiB
23/09/19 09:58:52 WARN DAGScheduler: Broadcasting large task binary with size 1453.2 KiB
23/09/19 09:58:52 WARN DAGScheduler: Broadcasting large task binary with size 1453.7 KiB
23/09/19 09:58:53 WAR

23/09/19 09:59:17 WARN DAGScheduler: Broadcasting large task binary with size 1539.8 KiB
23/09/19 09:59:17 WARN DAGScheduler: Broadcasting large task binary with size 1540.3 KiB
23/09/19 09:59:17 WARN DAGScheduler: Broadcasting large task binary with size 1540.9 KiB
23/09/19 09:59:17 WARN DAGScheduler: Broadcasting large task binary with size 1541.8 KiB
23/09/19 09:59:18 WARN DAGScheduler: Broadcasting large task binary with size 1542.9 KiB
23/09/19 09:59:18 WARN DAGScheduler: Broadcasting large task binary with size 1544.5 KiB
23/09/19 09:59:18 WARN DAGScheduler: Broadcasting large task binary with size 1545.0 KiB
23/09/19 09:59:18 WARN DAGScheduler: Broadcasting large task binary with size 1545.6 KiB
23/09/19 09:59:19 WARN DAGScheduler: Broadcasting large task binary with size 1546.5 KiB
23/09/19 09:59:19 WARN DAGScheduler: Broadcasting large task binary with size 1547.6 KiB
23/09/19 09:59:19 WARN DAGScheduler: Broadcasting large task binary with size 1230.5 KiB


+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(13424,[0,1,2,4,3...|  1.0|[-0.2826313667944...|[0.36233064133634...|       1.0|
|(13424,[0,1,2,12,...|  1.0|[-1.0362358508548...|[0.11180135848404...|       1.0|
|(13424,[0,1,2,13,...|  0.0|[1.06730480541037...|[0.89422182052853...|       0.0|
|(13424,[0,1,2,20,...|  1.0|[-1.3671619444239...|[0.06097810741251...|       1.0|
|(13424,[0,1,2,41,...|  0.0|[-0.0131523174510...|[0.49342422043712...|       1.0|
|(13424,[0,1,2,91,...|  1.0|[-0.8181686294204...|[0.16296406874331...|       1.0|
|(13424,[0,1,3,9,1...|  0.0|[1.49047297190227...|[0.95170586715115...|       0.0|
|(13424,[0,1,4,137...|  1.0|[-1.0677598556183...|[0.10569212472182...|       1.0|
|(13424,[0,1,5,15,...|  0.0|[0.90289253309279...|[0.85885168938253...|       0.0|
|(13424,[0,1,9,1

In [30]:
# Import the MulticlassClassificationEvaluator class from pyspark.ml.evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create an instance of the MulticlassClassificationEvaluator
eval_model_3 = MulticlassClassificationEvaluator()

# Evaluate the performance of the model_3 (Gradient-Boosted Tree) on the test data and calculate the accuracy
acc_3 = eval_model_3.evaluate(model_test_3)


# Display the accuracy of the model
acc_3

23/09/19 09:59:20 WARN DAGScheduler: Broadcasting large task binary with size 1235.1 KiB


0.9447222142282418