In [1]:
!pip install pyspark



In [2]:
!apt install openjdk-8-jdk-headless -qq > /dev/null





In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [4]:
spark = SparkSession.builder\
.appName("Mon premier application Spark")\
.config("spark.memory.offHeap.enabled","true")\
.config("spark.memory.offHeap.size","10g")\
.getOrCreate()

In [5]:
import pandas as pd
fichier = "/content/spam.csv"

df = pd.read_csv(fichier, encoding='latin-1')
print(df.head())
print(df.columns)

assert 'texte' in df.columns or 'v2' in df.columns, "La colonne 'texte' ou 'v2' est manquante !"
assert 'label' in df.columns or 'v1' in df.columns, "La colonne 'label' ou 'v1' est manquante !"


print(" Donn√©es charg√©es correctement !")

     v1                                                 v2 Unnamed: 2  \
0   ham  Go until jurong point, crazy.. Available only ...        NaN   
1   ham                      Ok lar... Joking wif u oni...        NaN   
2  spam  Free entry in 2 a wkly comp to win FA Cup fina...        NaN   
3   ham  U dun say so early hor... U c already then say...        NaN   
4   ham  Nah I don't think he goes to usf, he lives aro...        NaN   

  Unnamed: 3 Unnamed: 4  
0        NaN        NaN  
1        NaN        NaN  
2        NaN        NaN  
3        NaN        NaN  
4        NaN        NaN  
Index(['v1', 'v2', 'Unnamed: 2', 'Unnamed: 3', 'Unnamed: 4'], dtype='object')
 Donn√©es charg√©es correctement !


In [6]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession

spark_df = spark.createDataFrame(df)


tokenizer = Tokenizer(inputCol="v2", outputCol="tokens")

remover = StopWordsRemover(inputCol="tokens", outputCol="mots_utiles")

hashingTF = HashingTF(inputCol="mots_utiles", outputCol="rawFeatures", numFeatures=20000)

idf = IDF(inputCol="rawFeatures", outputCol="features")
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf])
model = pipeline.fit(spark_df)
df_prepared = model.transform(spark_df)
df_prepared.select("v2", "tokens", "mots_utiles", "features").show(5, truncate=True)

+--------------------+--------------------+--------------------+--------------------+
|                  v2|              tokens|         mots_utiles|            features|
+--------------------+--------------------+--------------------+--------------------+
|Go until jurong p...|[go, until, juron...|[go, jurong, poin...|(20000,[740,750,1...|
|Ok lar... Joking ...|[ok, lar..., joki...|[ok, lar..., joki...|(20000,[2630,2645...|
|Free entry in 2 a...|[free, entry, in,...|[free, entry, 2, ...|(20000,[587,1169,...|
|U dun say so earl...|[u, dun, say, so,...|[u, dun, say, ear...|(20000,[3783,8419...|
|Nah I don't think...|[nah, i, don't, t...|[nah, think, goes...|(20000,[3163,3340...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [7]:
train_df, test_df = df_prepared.randomSplit([0.8, 0.2], seed=42)

print("Taille du dataset d'entra√Ænement :", train_df.count())
print("Taille du dataset de test :", test_df.count())

Taille du dataset d'entra√Ænement : 4518
Taille du dataset de test : 1054


In [8]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import StringIndexer, IndexToString

indexer = StringIndexer(inputCol="v1", outputCol="label")
indexed_train_df = indexer.fit(train_df).transform(train_df)
indexed_test_df = indexer.fit(test_df).transform(test_df)


nb = NaiveBayes(featuresCol="features", labelCol="label", smoothing=1.0, modelType="multinomial")

nb_model = nb.fit(indexed_train_df)

print("‚úÖ Mod√®le Naive Bayes entra√Æn√© !")

‚úÖ Mod√®le Naive Bayes entra√Æn√© !


In [9]:
from pyspark.ml.classification import RandomForestClassifier


rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=50,
    maxDepth=10
)


rf_model = rf.fit(indexed_train_df)

print("‚úÖ Mod√®le Random Forest entra√Æn√© !")

‚úÖ Mod√®le Random Forest entra√Æn√© !


In [10]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=50,
    regParam=0.01
)

lr_model = lr.fit(indexed_train_df)

print("‚úÖ Mod√®le Logistic Regression entra√Æn√© !")

‚úÖ Mod√®le Logistic Regression entra√Æn√© !


In [11]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Assurez-vous que indexed_test_df est disponible (cr√©√© dans la cellule Naive Bayes)

# Evaluators
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")

evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedPrecision")

evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedRecall")

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1")

# --- Predictions ---
# Effectuer les pr√©dictions sur le DataFrame de test index√© (qui contient la colonne 'label')
pred_nb = nb_model.transform(indexed_test_df)
pred_rf = rf_model.transform(indexed_test_df)
pred_lr = lr_model.transform(indexed_test_df)

# --- Calcul des m√©triques pour chaque mod√®le ---
def evaluate_model(pred):
    return {
        "Accuracy": evaluator_accuracy.evaluate(pred),
        "Precision": evaluator_precision.evaluate(pred),
        "Recall": evaluator_recall.evaluate(pred),
        "F1-score": evaluator_f1.evaluate(pred)
    }

metrics_nb = evaluate_model(pred_nb)
metrics_rf = evaluate_model(pred_rf)
metrics_lr = evaluate_model(pred_lr)

print("‚úÖ NAIVE BAYES")
print(metrics_nb)
print("\n‚úÖ RANDOM FOREST")
print(metrics_rf)
print("\n‚úÖ LOGISTIC REGRESSION")
print(metrics_lr)

‚úÖ NAIVE BAYES
{'Accuracy': 0.9383301707779886, 'Precision': 0.9548232465390606, 'Recall': 0.9383301707779886, 'F1-score': 0.9425531436838295}

‚úÖ RANDOM FOREST
{'Accuracy': 0.8709677419354839, 'Precision': 0.8876804915514593, 'Recall': 0.8709677419354839, 'F1-score': 0.8145033542839207}

‚úÖ LOGISTIC REGRESSION
{'Accuracy': 0.9724857685009488, 'Precision': 0.9733319113572292, 'Recall': 0.9724857685009488, 'F1-score': 0.9711111401239485}


In [12]:
new_emails = [
    ("Congratulations! You won a free iPhone!",),
    ("Bonjour, voici le rapport demand√© pour votre r√©union.",)
]

new_df = spark.createDataFrame(new_emails, ["v2"])

new_df_prepared = model.transform(new_df)

predictions = lr_model.transform(new_df_prepared)

predictions.select("v2", "prediction", "probability").show(truncate=False)

+-----------------------------------------------------+----------+-----------------------------------------+
|v2                                                   |prediction|probability                              |
+-----------------------------------------------------+----------+-----------------------------------------+
|Congratulations! You won a free iPhone!              |0.0       |[0.9831558705874133,0.016844129412586728]|
|Bonjour, voici le rapport demand√© pour votre r√©union.|0.0       |[0.9788631461216026,0.021136853878397366]|
+-----------------------------------------------------+----------+-----------------------------------------+



In [13]:
# Sauvegarde du pipeline et du mod√®le LR
model.write().overwrite().save("pipeline_model")
lr_model.write().overwrite().save("lr_model")

print("‚úÖ Pipeline et mod√®le sauvegard√©s avec succ√®s !")


‚úÖ Pipeline et mod√®le sauvegard√©s avec succ√®s !


In [14]:
!pip install gradio pyspark




In [15]:
# Load CSV into pandas first
df = pd.read_csv("/content/spam2.csv", encoding='latin-1')

# Check the columns
print(df.head())
print(df.columns)



                              COMMENT_ID  \
0    z13lgffb5w3ddx1ul22qy1wxspy5cpkz504   
1      z123dbgb0mqjfxbtz22ucjc5jvzcv3ykj   
2  z12quxxp2vutflkxv04cihggzt2azl34pms0k   
3      z12icv3ysqvlwth2c23eddlykyqut5z1h   
4      z133stly3kete3tly22petvwdpmghrlli   

                                              AUTHOR  \
0                                         dharma pal   
1                                      Tiza Arellano   
2  Pr√É¬¨√É¬±√É¬ße√Ö¬õ√Ö¬õ √É¬Çli√Ö¬õ √Ö¬Å√É¬∏v√É¬™ D√É¬∏m√É¬≠√É¬±√É¬∏ M√É¬¢√Ñ¬ëi...   
3                                      Eric Gonzalez   
4                                     Analena L√É¬≥pez   

                         DATE  \
0  2015-05-29T02:30:18.971000   
1  2015-05-29T00:14:48.748000   
2  2015-05-28T21:00:08.607000   
3  2015-05-28T20:47:12.193000   
4  2015-05-28T17:08:29.827000   

                                             CONTENT  CLASS  
0                                       Nice song√Ø¬ª¬ø      0  
1                                    I

In [16]:
text_col = "CONTENT"   # Change if needed
label_col = "CLASS"    # Change if needed

assert text_col in df.columns, f"'{text_col}' not found in CSV!"
assert label_col in df.columns, f"'{label_col}' not found in CSV!"

# Convert to Spark DataFrame
sdf = spark.createDataFrame(df)

# Check a few rows
sdf.select(text_col, label_col).show(5, truncate=False)


+----------------------------------------------------------------------------------------+-----+
|CONTENT                                                                                 |CLASS|
+----------------------------------------------------------------------------------------+-----+
|Nice song√Ø¬ª¬ø                                                                            |0    |
|I love song √Ø¬ª¬ø                                                                         |0    |
|I love song √Ø¬ª¬ø                                                                         |0    |
|860,000,000 lets make it first female to reach one billion!! Share it and replay it! √Ø¬ª¬ø|0    |
|shakira is best for worldcup√Ø¬ª¬ø                                                         |0    |
+----------------------------------------------------------------------------------------+-----+
only showing top 5 rows



In [17]:
tokenizer = Tokenizer(inputCol=text_col, outputCol="tokens")
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
indexer = StringIndexer(inputCol=label_col, outputCol="label")


In [18]:
train_df, test_df = sdf.randomSplit([0.8, 0.2], seed=42)


In [19]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import (
    LogisticRegression,
    NaiveBayes,
    RandomForestClassifier,
    DecisionTreeClassifier
)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pandas as pd

spark = SparkSession.builder.appName("YouTube_Spam_MultiModel").getOrCreate()


In [20]:
models = {
    "Logistic Regression": LogisticRegression(featuresCol="features", labelCol="label", maxIter=50, regParam=0.01),
    "Naive Bayes": NaiveBayes(featuresCol="features", labelCol="label", modelType="multinomial"),
    "Random Forest": RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=50, maxDepth=10),
    "Decision Tree": DecisionTreeClassifier(featuresCol="features", labelCol="label", maxDepth=10)
}


In [21]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

results = []

for name, clf in models.items():
    pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, indexer, clf])
    model = pipeline.fit(train_df)
    preds = model.transform(test_df)

    acc = evaluator.evaluate(preds, {evaluator.metricName: "accuracy"})
    f1 = evaluator.evaluate(preds, {evaluator.metricName: "f1"})
    precision = evaluator.evaluate(preds, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(preds, {evaluator.metricName: "weightedRecall"})

    results.append((name, acc, precision, recall, f1))
    print(f"‚úÖ {name} trained ‚Äî Accuracy: {acc:.4f}, F1: {f1:.4f}")

# Convert to Pandas for easy viewing
results_df = pd.DataFrame(results, columns=["Model", "Accuracy", "Precision", "Recall", "F1-score"])
results_df.sort_values(by="F1-score", ascending=False)


‚úÖ Logistic Regression trained ‚Äî Accuracy: 0.8571, F1: 0.8561
‚úÖ Naive Bayes trained ‚Äî Accuracy: 0.8254, F1: 0.8245
‚úÖ Random Forest trained ‚Äî Accuracy: 0.7460, F1: 0.7273
‚úÖ Decision Tree trained ‚Äî Accuracy: 0.9048, F1: 0.9043


Unnamed: 0,Model,Accuracy,Precision,Recall,F1-score
3,Decision Tree,0.904762,0.911229,0.904762,0.904279
0,Logistic Regression,0.857143,0.866053,0.857143,0.856053
1,Naive Bayes,0.825397,0.834215,0.825397,0.824513
2,Random Forest,0.746032,0.830688,0.746032,0.72726


In [22]:
best_model_name = results_df.sort_values(by="F1-score", ascending=False).iloc[0]["Model"]
print(f"üèÜ Best Model: {best_model_name}")

# Refit the best model pipeline and save
best_clf = models[best_model_name]
best_pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, indexer, best_clf])
best_model = best_pipeline.fit(sdf)

best_model.write().overwrite().save("/content/pipeline_model_youtube")
print("‚úÖ Saved best model to /content/pipeline_model_youtube")


üèÜ Best Model: Decision Tree
‚úÖ Saved best model to /content/pipeline_model_youtube


In [23]:
sample_comments = [
    ("Check my channel for free iPhone!",),
    ("Nice content bro!",),
    ("Click here for giveaway!",)
]

test_comments = spark.createDataFrame(sample_comments, [text_col])
preds = best_model.transform(test_comments)
preds.select(text_col, "prediction", "probability").show(truncate=False)


+---------------------------------+----------+----------------------------------------+
|CONTENT                          |prediction|probability                             |
+---------------------------------+----------+----------------------------------------+
|Check my channel for free iPhone!|1.0       |[0.0,1.0]                               |
|Nice content bro!                |0.0       |[0.9018691588785047,0.09813084112149532]|
|Click here for giveaway!         |0.0       |[0.9018691588785047,0.09813084112149532]|
+---------------------------------+----------+----------------------------------------+



In [29]:
import gradio as gr
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pyspark.ml.classification import (
    NaiveBayesModel,
    RandomForestClassificationModel,
    LogisticRegressionModel
)

# --- Initialize Spark ---
spark = SparkSession.builder.appName("Spam Detection Gradio").getOrCreate()

# --- Load all models ---
# Adjusting the model loading based on what was actually saved:
# For Email Spam Detection:
#   - pipeline_model: The feature engineering pipeline (Tokenizer, StopWordsRemover, HashingTF, IDF)
#   - lr_model: The Logistic Regression classifier model
# Other email models (NB, RF) were trained but not explicitly saved as separate models for reuse here.
#
# For YouTube Comment Spam Detection:
#   - pipeline_model_youtube: A *full* PipelineModel that includes feature engineering AND the best classifier (Decision Tree).
models = {
    "Email Spam Detection": {
        "feature_pipeline": PipelineModel.load("/content/pipeline_model"),
        "classifier": { # Store individual classifiers that were saved
            "Logistic Regression": LogisticRegressionModel.load("/content/lr_model")
            # Naive Bayes and Random Forest models for email were not explicitly saved separately
        },
        "input_col": "v2"
    },
    "YouTube Comment Spam Detection": {
        "full_pipeline": PipelineModel.load("/content/pipeline_model_youtube"), # This is the complete pipeline with the best classifier (Decision Tree)
        "input_col": "CONTENT"
    }
}

# --- Prediction function ---
def predict_text(text, detection_type, model_choice):
    if not text.strip():
        return "‚ö†Ô∏è Veuillez entrer un texte.", None

    try:
        model_set = models[detection_type]
    except KeyError:
        return "‚ùå Type de d√©tection non trouv√©.", None

    input_col = model_set["input_col"]

    try:
        # Create a Spark DataFrame from the input text
        new_df = spark.createDataFrame([(text,)], [input_col])
        preds = None

        if detection_type == "Email Spam Detection":
            # For email, first apply feature engineering pipeline
            feature_pipeline = model_set["feature_pipeline"]
            new_df_prepared = feature_pipeline.transform(new_df)

            # Select the appropriate classifier based on user choice
            classifier_to_use = model_set["classifier"].get(model_choice)
            if classifier_to_use is None:
                return f"‚ùå Mod√®le '{model_choice}' non disponible pour la d√©tection d'emails. Seul 'Logistic Regression' a √©t√© sauvegard√© s√©par√©ment.", None

            preds = classifier_to_use.transform(new_df_prepared)

        elif detection_type == "YouTube Comment Spam Detection":
            # For YouTube, use the pre-saved full pipeline which includes feature engineering and the best classifier (Decision Tree)
            # The 'model_choice' from the UI is effectively ignored here as only one full pipeline was saved for YouTube comments.
            full_pipeline_youtube = model_set["full_pipeline"]
            preds = full_pipeline_youtube.transform(new_df)
        else:
            return "‚ùå Type de d√©tection inconnu.", None

        if preds is None:
            return "‚ùå Erreur interne: Aucune pr√©diction g√©n√©r√©e.", None

        # Get the prediction result
        pred_row = preds.select("prediction", "probability").collect()[0]

        # Extract prediction and probability
        prob = pred_row["probability"]
        predicted_class_prob = float(prob[int(pred_row["prediction"])])

        label = "üö® Spam d√©tect√© !" if pred_row["prediction"] == 1.0 else "üì¨ Message normal"
        return label, f"Probabilit√© de la classe pr√©dite ({'Spam' if pred_row['prediction'] == 1.0 else 'Normal'}) : {predicted_class_prob:.3f}"

    except Exception as e:
        return f"‚ùå Erreur de pr√©diction : {str(e)}", None

# --- Interface Gradio ---
interface = gr.Interface(
    fn=predict_text,
    inputs=[
        gr.Textbox(
            label="üí¨ Entrez le texte √† analyser",
            lines=5,
            placeholder="Exemple : Check out my channel for free iPhones!"
        ),
        gr.Radio(
            ["Email Spam Detection", "YouTube Comment Spam Detection"],
            label="üìÇ Type de d√©tection",
            value="Email Spam Detection"
        ),
        gr.Radio(
            ["Naive Bayes", "Random Forest", "Logistic Regression"],
            label="üß† Choisissez le mod√®le" # Note: For YouTube, this choice is currently ignored as only one full pipeline is used.
        ),
    ],
    outputs=[
        gr.Textbox(label="R√©sultat de la pr√©diction"),
        gr.Textbox(label="D√©tails de la probabilit√©")
    ],
    title="üìßüé• D√©tection de Spam (Email & YouTube)",
    description="Utilisez des mod√®les PySpark pour d√©tecter les spams dans des emails ou des commentaires YouTube.",
    theme="soft"
)

# --- Launch app ---
interface.launch(share=True)


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://8640ad76ad488ffa06.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [25]:
!ls /content


lr_model	pipeline_model_youtube	spam2.csv
pipeline_model	sample_data		spam.csv
