In [None]:
from pyspark import SparkConf
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:

#create RDD from a text file
text_file = sc.textFile("/content/drive/MyDrive/pg1342.txt")
#1st line split line into words
#2nd line map word to (word,1)
#3rd line reduce values to summation
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)

In [None]:
#write output to a text file
#and output file must not exist!
counts.saveAsTextFile("/content/output")

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SpamDetector").getOrCreate()

In [None]:
# Import ml package since we will use ML models
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml import Pipeline
from pyspark.sql.functions import lit
# Training: load spam and ham examples
spam_df = spark.read.text("/content/drive/MyDrive/spam.txt")
ham_df = spark.read.text("/content/drive/MyDrive/ham.txt")

In [None]:
# Label spam columns with value 1, ham columns with value 0
spam_df = spam_df.withColumn("label", lit(1))
ham_df = ham_df.withColumn("label", lit(0))

In [None]:
# Combine the spam and ham datasets into a single dataset
combined_df = spam_df.union(ham_df)

In [None]:
# Create a ML pipeline
tokenizer = Tokenizer(inputCol="value", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label")
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [None]:
# Split data into training and testing sets
(trainingData, testData) = combined_df.randomSplit([0.95, 0.05], seed=100)
print(f"Training Data Count: {trainingData.count()}")
print(f"Testing Data Count: {testData.count()}")

Training Data Count: 1429
Testing Data Count: 71


In [None]:
# Train the model
model = pipeline.fit(trainingData)
# Make predictions on the test set
predictions = model.transform(testData)
# Evaluate the model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")
# Print 70 records
predictions.select("value", "label", "prediction").show(70, truncate = False)

Accuracy: 0.9912559618441971
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+----------+
|value                                                                                                                                                                                                                        |label|prediction|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+----------+
|18 days to Euro2004 kickoff! U will be kept informed of all the latest news and results daily. Unsubscribe send GET EURO STOP to 83222.                                                                                      |1    |1.0       |
|<Forwa