In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName('sparkTrial').master('local').getOrCreate()
sc= spark.sparkContext

In [2]:
schema = StructType([StructField('SR',IntegerType()),
             StructField('text', StringType()),
             StructField('label', IntegerType())])

sms = spark.read.csv('file:///D:/Ravi_Data/Pyspark/sms.csv',header= False,sep = ";" , schema = schema, nullValue = "NA",)
print(sms.count())
sms = sms.drop("SR")
sms.show(5,truncate = False)

5574
+---------------------------------------------------------------------------------------------------------------+-----+
|text                                                                                                           |label|
+---------------------------------------------------------------------------------------------------------------+-----+
|Sorry, I'll call later in meeting                                                                              |0    |
|Dont worry. I guess he's busy.                                                                                 |0    |
|Call FREEPHONE 0800 542 0578 now!                                                                              |1    |
|Win a 1000 cash prize or a prize worth 5000                                                                    |1    |
|Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...|0    |
+----------------------------------

In [3]:
sms.dtypes

[('text', 'string'), ('label', 'int')]

In [4]:
# Import the necessary functions
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import Tokenizer

# Remove punctuation (REGEX provided) and numbers
wrangled = sms.withColumn('text', regexp_replace(sms.text,'[_():;,.!?\\-]',' '))
wrangled = wrangled.withColumn('text',regexp_replace(wrangled.text,'[0-9]',' '))

# Merge multiple spaces
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, ' +', ' '))

# Split the text into words
wrangled = Tokenizer(inputCol='text', outputCol='words').transform(wrangled)

wrangled.show(4,truncate = False)

+----------------------------------+-----+------------------------------------------+
|text                              |label|words                                     |
+----------------------------------+-----+------------------------------------------+
|Sorry I'll call later in meeting  |0    |[sorry, i'll, call, later, in, meeting]   |
|Dont worry I guess he's busy      |0    |[dont, worry, i, guess, he's, busy]       |
|Call FREEPHONE now                |1    |[call, freephone, now]                    |
|Win a cash prize or a prize worth |1    |[win, a, cash, prize, or, a, prize, worth]|
+----------------------------------+-----+------------------------------------------+
only showing top 4 rows



In [5]:
from pyspark.ml.feature import StopWordsRemover,HashingTF, IDF

# Remove stop words.
wrangled = StopWordsRemover(inputCol='words', outputCol='terms').transform(wrangled)

# Apply the hashing trick
wrangled = HashingTF(inputCol='terms', outputCol='hash', numFeatures=5000).transform(wrangled)  
wrangled.show(5)

+--------------------+-----+--------------------+--------------------+--------------------+
|                text|label|               words|               terms|                hash|
+--------------------+-----+--------------------+--------------------+--------------------+
|Sorry I'll call l...|    0|[sorry, i'll, cal...|[sorry, call, lat...|(5000,[20,146,560...|
|Dont worry I gues...|    0|[dont, worry, i, ...|[dont, worry, gue...|(5000,[2977,3343,...|
| Call FREEPHONE now |    1|[call, freephone,...|   [call, freephone]|(5000,[146,1957],...|
|Win a cash prize ...|    1|[win, a, cash, pr...|[win, cash, prize...|(5000,[1863,2213,...|
|Go until jurong p...|    0|[go, until, juron...|[go, jurong, poin...|(5000,[98,740,750...|
+--------------------+-----+--------------------+--------------------+--------------------+
only showing top 5 rows



In [11]:
# Split the data into training and testing sets
train, test = wrangled.randomSplit([0.8,0.2],seed=111)

# Convert hashed symbols to TF-IDF
tf_idf = IDF(inputCol='hash',outputCol='features').fit(train)

train = tf_idf.transform(train).select('label','features')
test = tf_idf.transform(test).select('label','features')

train.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|        (5000,[],[])|
|    0|        (5000,[],[])|
|    1|(5000,[15,581,102...|
|    0|(5000,[146,1388,3...|
|    1|(5000,[592,3372,4...|
+-----+--------------------+
only showing top 5 rows



In [12]:
# Fit a Logistic Regression model to the training data
from pyspark.ml.classification import LogisticRegression
lr_model = LogisticRegression(labelCol="label", featuresCol="features").fit(train)

# Make predictions on the testing data
prediction = lr_model.transform(test)
prediction.show(5)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|    0|        (5000,[],[])|[23.6961549531267...|[0.99999999994884...|       0.0|
|    0|(5000,[20,146,138...|[51.9045954068155...|[1.0,2.8715780792...|       0.0|
|    1|(5000,[139,188,22...|[37.4962669107522...|[1.0,5.1949119226...|       0.0|
|    1|(5000,[1233,1397,...|[-46.401831312161...|[7.04596872142492...|       1.0|
|    1|(5000,[122,146,25...|[35.0853059481375...|[0.99999999999999...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [13]:
from pyspark.ml import evaluation
evaluator = evaluation.MulticlassClassificationEvaluator(metricName="accuracy")

print("Accuracy decision tree :",evaluator.evaluate(prediction))

Accuracy decision tree : 0.9709355131698456
