In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler,VectorIndexer, OneHotEncoder, StringIndexer
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, CountVectorizer, HashingTF, IDF
from pyspark.ml.linalg import Vector
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import lower, length, size

In [2]:
spark

In [3]:
sc

In [4]:
# Prepare data
data = spark.read.csv("hdfs://devenv/user/spark/spark_mllib_101/spam_detection/data/sms_messages_with_labels.csv",
                      inferSchema=True,
                      header=True)

In [5]:
# Preprocessing and feature engineering
feature_prep = data.select(lower(data["message"]).alias("message"), length(data["message"]).alias("length"), "label")

In [6]:
feature_prep = RegexTokenizer(inputCol="message", outputCol="words", pattern="\\W+").transform(feature_prep)

In [7]:
feature_prep = StopWordsRemover(inputCol='words',outputCol='stop_words_removed').transform(feature_prep)

In [8]:
feature_prep = HashingTF(inputCol="stop_words_removed", outputCol="hashing_tf", numFeatures=4000).transform(feature_prep)

In [9]:
feature_prep = IDF(inputCol="hashing_tf", outputCol="tf_idf").fit(feature_prep).transform(feature_prep)

In [10]:
feature_prep = StringIndexer(inputCol='label',outputCol='label_indexed').fit(feature_prep).transform(feature_prep)

In [11]:
feature_prep = VectorAssembler(inputCols=["tf_idf", "length"],
                               outputCol="features").transform(feature_prep)

In [12]:
final_data = feature_prep.select("label_indexed", "features")

In [13]:
# Split data into train and test sets
train_data, test_data = final_data.randomSplit([0.7,0.3])

In [14]:
# Model training
classifier = RandomForestClassifier(featuresCol="features", labelCol="label_indexed", numTrees=100, maxDepth=25)
model = classifier.fit(train_data)

In [15]:
# Transform the test data using the model to get predictions
predicted_test_data = model.transform(test_data)

In [16]:
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='label_indexed',
                                                           predictionCol='prediction', 
                                                           metricName='accuracy')
print("Accuracy: {}", evaluator_accuracy.evaluate(predicted_test_data))

Accuracy: {} 0.9580877537655533


In [17]:
# Save the model
model.save("hdfs://devenv/user/spark/spark_mllib_101/spam_detection/spam_classifier")

In [18]:
# Read the saved model
model = RandomForestClassificationModel.load("hdfs://devenv/user/spark/spark_mllib_101/spam_detection/spam_classifier") 

In [19]:
# Predict some new records
# In real case, use VectorAssembler to transform df for features column

data = spark.read.csv("hdfs://devenv/user/spark/spark_mllib_101/spam_detection/data/sms_messages.csv",
                      inferSchema=True,
                      header=True)

In [20]:
# Preprocessing and feature engineering
feature_prep = data.select(lower(data["message"]).alias("message"), length(data["message"]).alias("length"))

feature_prep = RegexTokenizer(inputCol="message", outputCol="words", pattern="\\W+").transform(feature_prep)

feature_prep = StopWordsRemover(inputCol='words',outputCol='stop_words_removed').transform(feature_prep)

feature_prep = HashingTF(inputCol="stop_words_removed", outputCol="hashing_tf", numFeatures=4000).transform(feature_prep)

feature_prep = IDF(inputCol="hashing_tf", outputCol="tf_idf").fit(feature_prep).transform(feature_prep)

unclassified_final_data = VectorAssembler(inputCols=["tf_idf", "length"],
                                              outputCol="features").transform(feature_prep)

In [21]:
# Prediction
predicted_final_data = model.transform(unclassified_final_data)

In [22]:
result = predicted_final_data.select("message", "prediction")
result.show(1000)

+--------------------+----------+
|             message|prediction|
+--------------------+----------+
|"speak only when ...|       0.0|
|&lt;#&gt;  great ...|       0.0|
|'wnevr i wana fal...|       0.0|
|1apple/day=no doc...|       0.0|
|7 lor... change 2...|       0.0|
|7 wonders in my w...|       0.0|
|:-( that's not v ...|       0.0|
|a boy loved a gal...|       0.0|
|a bit of ur smile...|       0.0|
|a gram usually ru...|       0.0|
|actually i decide...|       0.0|
|ah you see. you h...|       0.0|
|ahhh. work. i vag...|       0.0|
|aight fuck it, i'...|       0.0|
|aight ill get on ...|       0.0|
|aight no rush, i'...|       0.0|
|aight, i'll hit y...|       0.0|
|aight, call me on...|       0.0|
|aight, lemme know...|       0.0|
|aiyah e rain like...|       0.0|
|aiyar dun disturb...|       0.0|
|alex says he's no...|       0.0|
|all done? all han...|       0.0|
|already one guy l...|       0.0|
|alright, we're al...|       0.0|
|also remember the...|       0.0|
|am surfing on