<a href="https://colab.research.google.com/github/VISH101-vue/SPAM-HAM-DETECTION/blob/main/Spam%20Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [20]:
pip install pyspark



# New Section

In [21]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("SpamCommentsAnalysis").getOrCreate()

# Load the dataset
file_loc = "/content/Youtube02-KatyPerry.csv"  # Update with the correct local path
dataf = spark.read.csv(file_loc, header=True, inferSchema=True)

dataf.printSchema()
dataf.show(5)

# Drop the unwanted columns
columns_to_drop = ["COMMENT_ID", "DATE"]
dataf = dataf.drop(*columns_to_drop)

dataf.printSchema()

# Separate the DataFrame into two based on "Spam Indicator" values
ham_dataf = dataf.filter(col("CLASS") == 0)
spam_dataf = dataf.filter(col("CLASS") == 1)

total_ham_count = ham_dataf.count()
total_spam_count = spam_dataf.count()
ham_dataf.show()
spam_dataf.show()



root
 |-- COMMENT_ID: string (nullable = true)
 |-- AUTHOR: string (nullable = true)
 |-- DATE: timestamp (nullable = true)
 |-- CONTENT: string (nullable = true)
 |-- CLASS: string (nullable = true)

+--------------------+------------+-------------------+--------------------+-----+
|          COMMENT_ID|      AUTHOR|               DATE|             CONTENT|CLASS|
+--------------------+------------+-------------------+--------------------+-----+
|z12pgdhovmrktzm3i...| lekanaVEVO1|2014-07-22 15:27:50|i love this so mu...|    1|
|z13yx345uxepetggz...|    Pyunghee|2014-07-27 01:57:16|http://www.billbo...|    1|
|z12lsjvi3wa5x1vwh...|  Erica Ross|2014-07-27 02:51:43|Hey guys! Please ...|    1|
|z13jcjuovxbwfr0ge...|Aviel Haimov|2014-08-01 12:27:48|http://psnboss.co...|    1|
|z13qybua2yfydzxzj...|  John Bello|2014-08-01 21:04:03|Hey everyone. Wat...|    1|
+--------------------+------------+-------------------+--------------------+-----+
only showing top 5 rows

root
 |-- AUTHOR: string (n

In [22]:
ham_dataf = ham_dataf.filter(ham_dataf.CONTENT.isNotNull())
spam_dataf = spam_dataf.filter(spam_dataf.CONTENT.isNotNull())


In [23]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import lower,col
from pyspark.ml import Pipeline

# Tokenization
tokenizer = Tokenizer(inputCol="CONTENT", outputCol="text")

# Remove stopwords
remove = StopWordsRemover(inputCol="text", outputCol="processed_text")

# Convert text to lowercase
spam_dataf = spam_dataf.withColumn("processed_message", lower(col("CONTENT")))

spam_pipeline = Pipeline(stages=[tokenizer, remove])
spam_temp = spam_pipeline.fit(spam_dataf)
preprocessed_spam_data = spam_temp.transform(spam_dataf)

In [24]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import lower,col
from pyspark.ml import Pipeline

# Tokenization
tokenizer = Tokenizer(inputCol="CONTENT", outputCol="text")

# Remove stopwords
remove = StopWordsRemover(inputCol="text", outputCol="processed_text")

# Convert text to lowercase
ham_dataf = ham_dataf.withColumn("processed_message", lower(col("CONTENT")))

# Create a preprocessing pipeline


ham_pipeline = Pipeline(stages=[tokenizer, remove])
ham_temp = ham_pipeline.fit(ham_dataf)
preprocessed_ham_data = ham_temp.transform(ham_dataf)

In [25]:
preprocessed_ham_data.show(5)

+---------------+--------------------+-----+--------------------+--------------------+--------------------+
|         AUTHOR|             CONTENT|CLASS|   processed_message|                text|      processed_text|
+---------------+--------------------+-----+--------------------+--------------------+--------------------+
|    Daniel Korp|katy perry does r...|    0|katy perry does r...|[katy, perry, doe...|[katy, perry, rem...|
|    Paul Hannam|In what South Ame...|    0|in what south ame...|[in, what, south,...|[south, american,...|
|Angie Sivrikozi|Its a good song a...|    0|its a good song a...|[its, a, good, so...|[good, song, like...|
|    Zain Hassan|Thanks to this vi...|    0|thanks to this vi...|[thanks, to, this...|[thanks, video, k...|
|      Sam Klein|She named the tig...|    0|she named the tig...|[she, named, the,...|[named, tiger, ki...|
+---------------+--------------------+-----+--------------------+--------------------+--------------------+
only showing top 5 rows



In [26]:
preprocessed_spam_data.show(5)

+------------+--------------------+-----+--------------------+--------------------+--------------------+
|      AUTHOR|             CONTENT|CLASS|   processed_message|                text|      processed_text|
+------------+--------------------+-----+--------------------+--------------------+--------------------+
| lekanaVEVO1|i love this so mu...|    1|i love this so mu...|[i, love, this, s...|[love, much., als...|
|    Pyunghee|http://www.billbo...|    1|http://www.billbo...|[http://www.billb...|[http://www.billb...|
|  Erica Ross|Hey guys! Please ...|    1|hey guys! please ...|[hey, guys!, plea...|[hey, guys!, plea...|
|Aviel Haimov|http://psnboss.co...|    1|http://psnboss.co...|[http://psnboss.c...|[http://psnboss.c...|
|  John Bello|Hey everyone. Wat...|    1|hey everyone. wat...|[hey, everyone., ...|[hey, everyone., ...|
+------------+--------------------+-----+--------------------+--------------------+--------------------+
only showing top 5 rows



In [27]:
spam_texts = preprocessed_spam_data.filter(col("CLASS") == "1").select("processed_text").rdd.flatMap(lambda x: x).collect()
ham_texts = preprocessed_ham_data.filter(col("CLASS") == "0").select("processed_text").rdd.flatMap(lambda x: x).collect()

In [28]:
spam_texts_flat = [i for l in spam_texts for i in l]
ham_texts_flat = [i for l in ham_texts for i in l]


In [29]:
import nltk
spam_freq = nltk.FreqDist(spam_texts_flat)
spam_freq.most_common(10)
ham_freq = nltk.FreqDist(ham_texts_flat)
ham_freq.most_common(10)


[('', 170),
 ('katy', 49),
 ('perry', 30),
 ('love', 30),
 ('song', 29),
 ('like', 24),
 ('video', 22),
 ('\ufeff', 18),
 ('good', 11),
 ('roar', 11)]

In [30]:
from pyspark.sql.functions import col, expr, when

# Assuming preprocessed_spam_data is your DataFrame
top_10_spam_words = [item[0].strip("',") for item in spam_freq.most_common(10)]
output_spam_dataf = preprocessed_spam_data

# Define the conditions for each word dynamically
conditions = [expr(f"array_contains(text, '{word}')") for word in top_10_spam_words]

# Create a "result" column indicating whether any of the top 10 spam words are present
output_spam_dataf = output_spam_dataf.withColumn("result", when(conditions[0], "Yes"))

# Loop through the remaining top 10 spam words and update the "result" column
for i in range(1, len(conditions)):
    output_spam_dataf = output_spam_dataf.withColumn("result", when(conditions[i], "Yes").otherwise(col("result")))

# Display the rows where any of the top 10 spam words are present
output_spam_dataf.filter(col("result") == "Yes").show(10)
spam_filtered_data = output_spam_dataf.filter(col("result") == "Yes")

# Select specific columns ("AUTHOR" and "CLASS")
selected_columns = spam_filtered_data.select("AUTHOR", "CLASS")

# Show the combined result
selected_columns.show(10)


+----------------+--------------------+-----+--------------------+--------------------+--------------------+------+
|          AUTHOR|             CONTENT|CLASS|   processed_message|                text|      processed_text|result|
+----------------+--------------------+-----+--------------------+--------------------+--------------------+------+
|     lekanaVEVO1|i love this so mu...|    1|i love this so mu...|[i, love, this, s...|[love, much., als...|   Yes|
|      Erica Ross|Hey guys! Please ...|    1|hey guys! please ...|[hey, guys!, plea...|[hey, guys!, plea...|   Yes|
|      John Bello|Hey everyone. Wat...|    1|hey everyone. wat...|[hey, everyone., ...|[hey, everyone., ...|   Yes|
|Nere Overstylish|check out my rapp...|    1|check out my rapp...|[check, out, my, ...|[check, rapping, ...|   Yes|
|         Jayki L|Subscribe pleaaaa...|    1|subscribe pleaaaa...|[subscribe, pleaa...|[subscribe, pleaa...|   Yes|
|          djh3mi|hey guys!! visit ...|    1|hey guys!! visit ...|[hey, 

In [31]:
from pyspark.sql.functions import col, expr

# Assuming preprocessed_spam_data is your DataFrame
top_10_ham_words = [item[0].strip("',") for item in ham_freq.most_common(10)]
output_ham_dataf = preprocessed_ham_data

# Define the conditions for each word dynamically
conditions = [expr(f"array_contains(text, '{word}')") for word in top_10_ham_words]

# Create a "result" column indicating whether any of the top 10 spam words are present
output_ham_dataf = output_ham_dataf.withColumn("result", when(conditions[0], "Yes"))

# Loop through the remaining top 10 spam words and update the "result" column
for i in range(1, len(conditions)):
    output_ham_dataf = output_ham_dataf.withColumn("result", when(conditions[i], "Yes").otherwise(col("result")))

# Display the rows where any of the top 10 spam words are present
output_ham_dataf.filter(col("result") == "Yes").show(10)

# Filter rows where "result" is "Yes"
ham_filtered_data = output_ham_dataf.filter(col("result") == "Yes")

# Select specific columns ("AUTHOR" and "CLASS")
selected_columns = ham_filtered_data.select("AUTHOR", "CLASS")

# Show the combined result
selected_columns.show(10)


+---------------+--------------------+-----+--------------------+--------------------+--------------------+------+
|         AUTHOR|             CONTENT|CLASS|   processed_message|                text|      processed_text|result|
+---------------+--------------------+-----+--------------------+--------------------+--------------------+------+
|    Daniel Korp|katy perry does r...|    0|katy perry does r...|[katy, perry, doe...|[katy, perry, rem...|   Yes|
|    Paul Hannam|In what South Ame...|    0|in what south ame...|[in, what, south,...|[south, american,...|   Yes|
|Angie Sivrikozi|Its a good song a...|    0|its a good song a...|[its, a, good, so...|[good, song, like...|   Yes|
|    Zain Hassan|Thanks to this vi...|    0|thanks to this vi...|[thanks, to, this...|[thanks, video, k...|   Yes|
|      Sam Klein|She named the tig...|    0|she named the tig...|[she, named, the,...|[named, tiger, ki...|   Yes|
|   Justin Chery|And after the vid...|    0|and after the vid...|[and, after, th

In [32]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, DoubleType
from math import log

# Create a DataFrame`
dataf = spam_filtered_data.select("AUTHOR", "CONTENT")

# Tokenize the text column
tokenizer_udf = udf(lambda text: text.split(), ArrayType(StringType()))
dataf = dataf.withColumn("text", tokenizer_udf(dataf["CONTENT"]))

# Calculate Term Frequencies (TF)
def calculate_tf(text_list):
    total_count = {}
    total_words = len(text_list)
    for word in text_list:
        total_count[word] = total_count.get(word, 0) + 1
    return {k: v / total_words for k, v in total_count.items()}

calculate_tf_udf = udf(calculate_tf, StringType())
dataf = dataf.withColumn("tf", calculate_tf_udf(dataf["text"]))

# Create a list of all unique words in the documents
unique_words = list(set(dataf.selectExpr("explode(text) as word").select("word").distinct().rdd.flatMap(lambda x: x).collect()))

# Calculate Inverse Document Frequencies (IDF)
total_documents = dataf.count()

# Calculate document frequency (DF)
document_frequency = dataf.select("AUTHOR", "text").rdd.flatMap(lambda x: [(word, 1) for word in set(x[1])]).reduceByKey(lambda x, y: x + y)

idf_values = document_frequency.map(lambda x: (x[0], log(total_documents / x[1])))

# Broadcast IDF values
idf_broadcast = spark.sparkContext.broadcast(dict(idf_values.collect()))

# Calculate TF-IDF
def calculate_tfidf(row):
    user_name, words = row
    tfidf_values = {word: word.count(word) * idf_broadcast.value.get(word, 0.0) for word in words}
    return user_name, words, tfidf_values

tfidf_data = dataf.select("AUTHOR", "text").rdd.map(calculate_tfidf)

# Display the result
tfidf_df = spark.createDataFrame(tfidf_data, ["AUTHOR", "text", "tfidf"])
tfidf_df.show(10)

+----------------+--------------------+--------------------+
|          AUTHOR|                text|               tfidf|
+----------------+--------------------+--------------------+
|     lekanaVEVO1|[i, love, this, s...|{Pilot -> 4.88280...|
|      Erica Ross|[Hey, guys!, Plea...|{All -> 4.8828019...|
|      John Bello|[Hey, everyone., ...|{http://believeme...|
|Nere Overstylish|[check, out, my, ...|{https://soundclo...|
|         Jayki L|[Subscribe, pleaa...|{♥ -> 4.189654742...|
|          djh3mi|[hey, guys!!, vis...|{a -> 1.356441397...|
|    Manuel Ortiz|[Nice!, http://ww...|{Nice! -> 4.88280...|
|       Lil Misme|[Hey, Guys, this,...|{youtube -> 3.496...|
|          Emilie|[Hey, guys!, My, ...|{no -> 3.09104245...|
|    Jennika Chua|[https://www.face...|{https://www.face...|
+----------------+--------------------+--------------------+
only showing top 10 rows



In [33]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, DoubleType
from math import log

# Create a DataFrame`
dataf = ham_filtered_data.select("AUTHOR", "CONTENT")

# Tokenize the text
tokenizer_udf = udf(lambda text: text.split(), ArrayType(StringType()))
dataf = dataf.withColumn("text", tokenizer_udf(dataf["CONTENT"]))

# Calculate Term Frequencies (TF)
def calculate_tf(text_list):
    total_count = {}
    total_words = len(text_list)
    for word in text_list:
        total_count[word] = total_count.get(word, 0) + 1
    return {k: v / total_words for k, v in total_count.items()}

calculate_tf_udf = udf(calculate_tf, StringType())
dataf = dataf.withColumn("tf", calculate_tf_udf(dataf["text"]))

# Create a list of all unique words
unique_words = list(set(dataf.selectExpr("explode(text) as word").select("word").distinct().rdd.flatMap(lambda x: x).collect()))

# Calculate Inverse Document Frequencies (IDF)
total_documents = dataf.count()

# Calculate document frequency (DF) for
document_frequency = dataf.select("AUTHOR", "text").rdd.flatMap(lambda x: [(word, 1) for word in set(x[1])]).reduceByKey(lambda x, y: x + y)

idf_values = document_frequency.map(lambda x: (x[0], log(total_documents / x[1])))

# Broadcast IDF values
idf_broadcast = spark.sparkContext.broadcast(dict(idf_values.collect()))

# Calculate TF-IDF
def calculate_tfidf(row):
    user_name, words = row
    tfidf_values = {word: word.count(word) * idf_broadcast.value.get(word, 0.0) for word in words}
    return user_name, words, tfidf_values

tfidf_data = dataf.select("AUTHOR", "text").rdd.map(calculate_tfidf)

# Display the result
tfidf_dataf = spark.createDataFrame(tfidf_data, ["AUTHOR", "text", "tfidf"])
tfidf_dataf.show(10)

+---------------+--------------------+--------------------+
|         AUTHOR|                text|               tfidf|
+---------------+--------------------+--------------------+
|    Daniel Korp|[katy, perry, doe...|{a -> 1.524444699...|
|    Paul Hannam|[In, what, South,...|{American -> 4.12...|
|Angie Sivrikozi|[Its, a, good, so...|{love -> 1.684787...|
|    Zain Hassan|[Thanks, to, this...|{plane -> 4.12713...|
|      Sam Klein|[She, named, the,...|{did, -> 4.820281...|
|   Justin Chery|[And, after, the,...|{ft. -> 4.8202815...|
|      xhonavsky|["I, love, this, ...|{love -> 1.684787...|
|    Ricky Smith|[This, song, make...|{song -> 1.642227...|
|     Robert Kim|[I'm, sorry, Katy...|{love -> 1.684787...|
|tombraiderxXx12|[I'm, not, a, big...|{but -> 2.8743714...|
+---------------+--------------------+--------------------+
only showing top 10 rows



In [34]:

# Stop the Spark session
# spark.stop()