In [2]:
# Install required packages
!pip install pyspark nltk datasets sparknlp

Collecting datasets
  Downloading datasets-3.2.0-py3-none-any.whl.metadata (20 kB)
Collecting sparknlp
  Downloading sparknlp-1.0.0-py3-none-any.whl.metadata (1.2 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.9.0,>=2023.1.0 (from fsspec[http]<=2024.9.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Collecting spark-nlp (from sparknlp)
  Downloading spark_nlp-5.5.2-py2.py3-none-any.whl.metadata (19 kB)
Downloading datasets-3.2.0-py3-none-any.whl (480 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m480.6/480.6 kB[0m [31m8.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sparknlp-1.0.0-py3-no

In [3]:
# Download NLTK data
import nltk
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('stopwords')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [4]:
# Initialize Spark Session
from pyspark.sql import SparkSession

import sparknlp
spark = sparknlp.start()

In [10]:
# Import required libraries
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import lower, regexp_replace, udf
from pyspark.sql.types import ArrayType, StringType
from nltk.stem import WordNetLemmatizer
from datasets import load_dataset
from nltk.stem import PorterStemmer
from sparknlp.base import DocumentAssembler, TokenAssembler, Finisher
from sparknlp.annotator import Tokenizer, LemmatizerModel
from sparknlp.annotator import Normalizer
from pyspark.sql.functions import rand


In [12]:
splits = {'train': 'scam-dialogue_train.csv', 'test': 'scam-dialogue_test.csv'}

# Load the dataset using Hugging Face's datasets library
dataset = load_dataset("BothBosu/scam-dialogue", split="train")

# Convert the Hugging Face dataset to a Spark DataFrame
train_df = spark.createDataFrame(dataset.to_pandas())

# Display first few rows
train_df.orderBy(rand()).limit(10).show()

+--------------------+-------------+-----+
|            dialogue|         type|label|
+--------------------+-------------+-----+
|caller: Hi, is Jo...|        wrong|    0|
|caller: Hello, my...|       reward|    1|
|caller: Hello, my...|       reward|    1|
|caller: Hello, my...|    insurance|    0|
|caller: Hello, my...|       reward|    1|
|caller: Hello, my...|    insurance|    0|
|caller: Hi, I'm c...|     delivery|    0|
|caller: Hi, my na...|    insurance|    0|
|caller: Hello, co...|       reward|    1|
|caller: Hi, my na...|telemarketing|    0|
+--------------------+-------------+-----+



In [8]:
# Preprocess training data
# Text preprocessing
train_df = train_df.withColumn("dialogue", lower(train_df.dialogue))
train_df = train_df.withColumn("dialogue", regexp_replace(train_df.dialogue, "[^a-zA-Z\\s]", ""))


In [None]:
# Text preprocessing pipeline
document_assembler = DocumentAssembler() \
    .setInputCol("dialogue") \
    .setOutputCol("document")

spark_nlp_tokenizer = Tokenizer()\
    .setInputCols(["document"])\
    .setOutputCol("token")
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized") \
    .setLowercase(True) \
    .setCleanupPatterns(["[^a-zA-Z\\s]"])

lemmatizer = LemmatizerModel.pretrained("lemma_antbnc") \
    .setInputCols(["normalized"]) \
    .setOutputCol("lemmas")
finisher = Finisher() \
    .setInputCols(["lemmas"]) \
    .setOutputCols(["finished_lemmas"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)
stopwords_remover = StopWordsRemover(inputCol="finished_lemmas", outputCol="filtered_words")
count_vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="features")





rf = RandomForestClassifier(labelCol="label",
                          featuresCol="features",
                          numTrees=50,
                          seed=42)

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [None]:
pipeline = Pipeline(stages=[
    document_assembler,
    spark_nlp_tokenizer,
    normalizer,
    lemmatizer,
    finisher,
    stopwords_remover,
    count_vectorizer,
    idf,
    rf
])

In [None]:
model = pipeline.fit(train_df)

In [None]:
model.save("random_forest_model")

In [None]:
# Load and preprocess test data
test_dataset = load_dataset("BothBosu/scam-dialogue", split="test")
# Convert the Hugging Face dataset to a Spark DataFrame
test_df = spark.createDataFrame(test_dataset.to_pandas())

In [None]:
# Make predictions
predictions = model.transform(test_df)

In [None]:
from pyspark.sql.functions import rand

# Shuffle and display the first 5 rows
predictions.orderBy(rand()).select("label", "prediction").show(20)

+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
+-----+----------+
only showing top 20 rows



In [None]:
# Evaluate model
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

In [None]:
!zip -r /content/random_forest_model.zip /content/random_forest_model

  adding: content/random_forest_model/ (stored 0%)
  adding: content/random_forest_model/stages/ (stored 0%)
  adding: content/random_forest_model/stages/2_NORMALIZER_2a06bc737c29/ (stored 0%)
  adding: content/random_forest_model/stages/2_NORMALIZER_2a06bc737c29/metadata/ (stored 0%)
  adding: content/random_forest_model/stages/2_NORMALIZER_2a06bc737c29/metadata/part-00000 (deflated 37%)
  adding: content/random_forest_model/stages/2_NORMALIZER_2a06bc737c29/metadata/_SUCCESS (stored 0%)
  adding: content/random_forest_model/stages/2_NORMALIZER_2a06bc737c29/metadata/._SUCCESS.crc (stored 0%)
  adding: content/random_forest_model/stages/2_NORMALIZER_2a06bc737c29/metadata/.part-00000.crc (stored 0%)
  adding: content/random_forest_model/stages/2_NORMALIZER_2a06bc737c29/fields/ (stored 0%)
  adding: content/random_forest_model/stages/2_NORMALIZER_2a06bc737c29/fields/slangDict/ (stored 0%)
  adding: content/random_forest_model/stages/2_NORMALIZER_2a06bc737c29/fields/slangDict/part-00000 (d

In [None]:
# Classification Metrics
def compute_classification_metrics(predictions):
    evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
    evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
    evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

    precision = evaluator_precision.evaluate(predictions)
    recall = evaluator_recall.evaluate(predictions)
    f1 = evaluator_f1.evaluate(predictions)

    confusion_matrix = predictions.groupBy("label", "prediction").count().orderBy("label", "prediction")

    return precision, recall, f1, confusion_matrix

In [None]:
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy}")

precision, recall, f1, confusion_matrix = compute_classification_metrics(predictions)
print("\nClassification Report:")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1 Score: {f1:.2f}")
print("\nConfusion Matrix:")
confusion_matrix.show()

Model Accuracy: 1.0

Classification Report:
Precision: 1.00
Recall: 1.00
F1 Score: 1.00

Confusion Matrix:
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    0|       0.0|  160|
|    1|       1.0|  160|
+-----+----------+-----+



In [None]:
# Function to predict on new text
def predict_scam_or_not(text):
    # Create a single row DataFrame
    data = [(text,)]
    df = spark.createDataFrame(data, ["dialogue"])

    # Make prediction
    prediction = model.transform(df)
    result = prediction.select("prediction").collect()[0][0]

    return "Scam" if result == 1 else "Non-Scam"

In [None]:
# Sample transcripts
transcripts = [
    # Scam transcript
    """
    caller: Hello, this is Professor Armin from the University of Calgary. We've detected suspicious activity related to your course 550 project.
    receiver: Suspicious activity? What do you mean?
    caller: It appears that unauthorized access has been detected. We need your student ID and password to secure your account immediately.
    receiver: That sounds alarming. Shouldn't the IT department handle this?
    caller: I'm authorized to assist. Time is critical to prevent data loss. Please provide your credentials now.
    """,
    # Scam transcript
    """
    caller: Hello, this is Dr. Thompson from the University of Calgary IT Security Division. We've detected suspicious activity.
    receiver: Suspicious activity?
    caller: Yes, we've detected multiple unauthorized login attempts. We need your student ID and password to secure your account immediately.
    receiver: This is concerning. Should I contact my supervisor about this?
    caller: There's no time for that. I need your university login credentials and student ID immediately to block these attacks and secure your research files.
    """,
    # Non-Scam transcript
    """
    receiver: Hi Professor Armin, I wanted to discuss the upcoming demo for course 550.
    professor: Of course! I'm excited to see what you and your team have developed.
    receiver: Students have been working hard, and we're ready to present to you, the TAs.
    professor: Excellent. Let's schedule the presentation in Calgary next week.
    """,
    # Non-Scam transcript
    """
    receiver: Hello Professor Armin, we're ready to showcase our course 550 project.
    professor: That's wonderful! I'm looking forward to your demo.
    receiver: We'll be presenting to you, the TAs  in Calgary.
    professor: Sounds great. Make sure to prepare thoroughly.
    """
]

In [None]:


for idx, transcript in enumerate(transcripts):
    result = predict_scam_or_not(transcript)
    print(f"Transcript {idx+1} Prediction: {result}")

Transcript 1 Prediction: Scam
Transcript 2 Prediction: Scam
Transcript 3 Prediction: Non-Scam
Transcript 4 Prediction: Non-Scam


In [None]:
# Convert the Hugging Face dataset to a Spark DataFrame
test_df_2 = spark.createDataFrame(test_dataset.to_pandas())
test_df_2 = test_df_2.orderBy(rand())
# Collect the 'dialogue' column as a list of arrays
dialogue_array = test_df_2.rdd.map(lambda row: (row['dialogue'], row['label'])).collect()

for idx, row in enumerate(dialogue_array[:30]):
    result = predict_scam_or_not(row[0])
    print(f"Transcript {idx+1} Prediction: {result}, Actual: {row[1]}")