# PySpark Installation for Google Colab

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
version = "3.5.3"
!wget https://downloads.apache.org/spark/spark-{version}/spark-{version}-bin-hadoop3.tgz

In [None]:
!tar xzvf spark-{version}-bin-hadoop3.tgz

In [38]:
!pip install -q findspark

In [39]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-{version}-bin-hadoop3"

In [40]:
import findspark
findspark.init()

# Common Imports

In [119]:
from itertools import product

from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Tokenizer, StopWordsRemover, HashingTF, IDF, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Preparing the data

In [None]:
spark = SparkSession.builder.getOrCreate()

In [47]:
# Reading the wiki edits file
edits_df = spark.read.json("data/edits.json")
print(edits_df.count())
edits_df.printSchema()

40000
root
 |-- $schema: string (nullable = true)
 |-- bot: double (nullable = true)
 |-- comment: string (nullable = true)
 |-- id: double (nullable = true)
 |-- length: struct (nullable = true)
 |    |-- new: long (nullable = true)
 |    |-- old: long (nullable = true)
 |-- log_action: string (nullable = true)
 |-- log_action_comment: string (nullable = true)
 |-- log_id: double (nullable = true)
 |-- log_params: string (nullable = true)
 |-- log_type: string (nullable = true)
 |-- meta: struct (nullable = true)
 |    |-- domain: string (nullable = true)
 |    |-- dt: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- offset: long (nullable = true)
 |    |-- partition: long (nullable = true)
 |    |-- request_id: string (nullable = true)
 |    |-- stream: string (nullable = true)
 |    |-- topic: string (nullable = true)
 |    |-- uri: string (nullable = true)
 |-- minor: double (nullable = true)
 |-- namespace: double (nullable = true)
 |-- notify_url: string 

In [48]:
@F.udf(returnType=IntegerType())
def length_diff_udf(length_col):
    """Helper UDF to compute length_diff feature"""
    if isinstance(length_col, dict):
        return length_col.get('new', 0) - length_col.get('old', 0)
    return 0

In [100]:
df = edits_df.select(
    F.col("user"),
    F.col("bot").cast("int").alias("bot"), # target that we try to predict
    F.col("type"),
    F.col("namespace").cast("int").alias("namespace"),
    F.col("comment"),
    F.length(F.col("comment")).alias("comment_length"),
    length_diff_udf(F.col("length")).alias("length_diff"),
).distinct()
print(df.count())
df.show(5)

23053
+-----------+---+----------+---------+--------------------+--------------+-----------+
|       user|bot|      type|namespace|             comment|comment_length|length_diff|
+-----------+---+----------+---------+--------------------+--------------+-----------+
|Paul August|  0|      edit|        2|   /* Epic poetry */|            17|          0|
|        Xqt|  0|       log|        0|                    |             0|          0|
|  Holiday56|  0|categorize|       14|[[:File:Letters t...|            88|          0|
|     Ontzak|  0|      edit|        2|Añadiendo plantil...|            72|          0|
|      Sionk|  0|      edit|        6|removed [[Categor...|            86|          0|
+-----------+---+----------+---------+--------------------+--------------+-----------+
only showing top 5 rows



In [105]:
# Train / Test split, roughly 90/10 and not allowing the same user to appear in both parts
df = df.withColumn("is_test", F.lit((F.hash(F.col("user")) % 10 == 0))) 

train_df = df.filter(F.col("is_test") == 0)
test_df = df.filter(F.col("is_test") == 1)

train_df.show(3)
test_df.show(3)

+-----------+---+----------+---------+--------------------+--------------+-----------+-------+
|       user|bot|      type|namespace|             comment|comment_length|length_diff|is_test|
+-----------+---+----------+---------+--------------------+--------------+-----------+-------+
|Paul August|  0|      edit|        2|   /* Epic poetry */|            17|          0|  false|
|        Xqt|  0|       log|        0|                    |             0|          0|  false|
|  Holiday56|  0|categorize|       14|[[:File:Letters t...|            88|          0|  false|
+-----------+---+----------+---------+--------------------+--------------+-----------+-------+
only showing top 3 rows

+---------+---+----------+---------+--------------------+--------------+-----------+-------+
|     user|bot|      type|namespace|             comment|comment_length|length_diff|is_test|
+---------+---+----------+---------+--------------------+--------------+-----------+-------+
|     리듬|  0|       new|     11

In [106]:
# Validating that the same bot doesn't appear in both train and test
df.filter(F.col("bot") == 1).groupby("user").agg(F.size(F.collect_set(F.col("is_test"))).alias("cnt")).filter("cnt > 1").show()

+----+---+
|user|cnt|
+----+---+
+----+---+



In [110]:
print("Total distinct bot users:", df.filter(F.col("bot") == 1).select("user").distinct().count())
print("Total distinct human users:", df.filter(F.col("bot") == 0).select("user").distinct().count())

Total distinct bot users: 159
Total distinct human users: 4587


In [107]:
# Checking splits
df.groupBy("is_test").count().show() # overall train/test split counts
df.groupBy("bot", "is_test").count().show() # users train/test split counts

+-------+-----+
|is_test|count|
+-------+-----+
|   true| 2449|
|  false|20604|
+-------+-----+

+---+-------+-----+
|bot|is_test|count|
+---+-------+-----+
|  1|  false| 4269|
|  1|   true|  255|
|  0|  false|16335|
|  0|   true| 2194|
+---+-------+-----+



# Preparing the model

In [111]:
def create_pipeline(numFeatures, numTrees, maxDepth):
    """Returns a pipeline for bot classification on wiki edits, with customizable parameters"""
    # Encoding 'type' and 'namespace' columns
    type_indexer = StringIndexer(inputCol="type", outputCol="type_index", handleInvalid="keep")
    namespace_indexer = StringIndexer(inputCol="namespace", outputCol="namespace_index", handleInvalid="keep")
    type_encoder = OneHotEncoder(inputCol="type_index", outputCol="type_encoded")
    namespace_encoder = OneHotEncoder(inputCol="namespace_index", outputCol="namespace_encoded")
    
    # TF-IDF on 'comment'
    tokenizer = Tokenizer(inputCol="comment", outputCol="words")
    stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
    hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=numFeatures)
    idf = IDF(inputCol="raw_features", outputCol="tfidf_features")
    
    # Assembling all features into a single vector
    assembler = VectorAssembler(
      inputCols=["length_diff", "comment_length", "type_encoded", "namespace_encoded", "tfidf_features"],
      outputCol="features"
    )
    
    # Defining classifier model
    rf = RandomForestClassifier(labelCol="bot", featuresCol="features", numTrees=numTrees, maxDepth=maxDepth)
    
    # Building and returning the pipeline
    return Pipeline(stages=[
      type_indexer, namespace_indexer,
      type_encoder, namespace_encoder,
      tokenizer, stopwords_remover, hashing_tf, idf,
      assembler, rf
    ])

In [132]:
def fit_predict(train_df, test_df, numFeatures, numTrees, maxDepth, evaluate=True):
    """
        Fits the classifier model on train_df and runs prediction on test_df 
        returns the model, predictions dataframe evaluation metrics (optionally)
    """
    pipeline = create_pipeline(numFeatures, numTrees, maxDepth)
    
    model = pipeline.fit(train_df) # training the classifier
    predictions = model.transform(test_df) # predicting
    
    # Evaluating
    if not evaluate:
        return model, predictions, None
    
    evaluation_results = {
        "areaUnderROC": BinaryClassificationEvaluator(labelCol="bot", rawPredictionCol="prediction", metricName="areaUnderROC").evaluate(predictions),
        "areaUnderPR": BinaryClassificationEvaluator(labelCol="bot", rawPredictionCol="prediction", metricName="areaUnderPR").evaluate(predictions)
    }
    for metric in ("f1", "accuracy", "weightedPrecision", "weightedRecall"):
        evaluator = MulticlassClassificationEvaluator(labelCol="bot", predictionCol="prediction", metricName=metric)
        evaluation_results[metric] = evaluator.evaluate(predictions)
    
    return model, predictions, evaluation_results

In [None]:
!pip install tqdm

In [115]:
from tqdm import tqdm

In [123]:
# Define parameters grid
params = {
    "numFeatures": [100, 500, 1000],
    "numTrees": [10, 50, 100],
    "maxDepth": [5, 10, 20]
}

In [125]:
spark.sparkContext.setLogLevel("ERROR")

In [None]:
results = []
for numFeatures, numTrees, maxDepth in tqdm(product(params["numFeatures"], params["numTrees"], params["maxDepth"]), 
                                            total=len(params["numFeatures"]) * len(params["numTrees"]) * len(params["maxDepth"])):
        
      _, _, res = fit_predict(train_df, test_df, numFeatures, numTrees, maxDepth, evaluate=True)
      results.append(Row(
          numFeatures=numFeatures, numTrees=numTrees, maxDepth=maxDepth, 
          f1_score=round(res["f1"], 2), accuracy=round(res["accuracy"], 2), 
          precision=round(res["weightedPrecision"], 2), recall=round(res["weightedRecall"], 2),
          areaUnderROC=round(res["areaUnderROC"], 2), areaUnderPR=round(res["areaUnderPR"], 2)
      ))

In [131]:
results_df = spark.createDataFrame(results)
results_df.orderBy(F.desc("f1_score")).show()

+-----------+--------+--------+--------+--------+---------+------+------------+-----------+
|numFeatures|numTrees|maxDepth|f1_score|accuracy|precision|recall|areaUnderROC|areaUnderPR|
+-----------+--------+--------+--------+--------+---------+------+------------+-----------+
|       1000|      50|      20|    0.97|    0.97|     0.97|  0.97|        0.87|       0.84|
|       1000|     100|      20|    0.97|    0.97|     0.97|  0.97|        0.86|       0.85|
|       1000|      10|      20|    0.96|    0.96|     0.96|  0.96|        0.84|       0.78|
|        500|     100|      20|    0.95|    0.95|     0.95|  0.95|        0.81|       0.74|
|        500|      50|      20|    0.95|    0.95|     0.95|  0.95|        0.81|       0.75|
|        100|     100|      20|    0.93|    0.93|     0.93|  0.93|        0.76|       0.59|
|        100|      10|      20|    0.92|    0.92|     0.92|  0.92|         0.8|       0.53|
|        100|      50|      20|    0.91|    0.92|     0.91|  0.92|        0.72| 

In [133]:
best_result = results_df.orderBy(F.desc("f1_score")).first()
bestNumFeatures, bestNumTrees, bestMaxDepth = best_result.numFeatures, best_result.numTrees, best_result.maxDepth

In [136]:
best_model, best_preds_df, _ = fit_predict(train_df, test_df, bestNumFeatures, bestNumTrees, bestMaxDepth, evaluate=False)

                                                                                

In [149]:
from pyspark.ml.functions import vector_to_array
best_preds_df = best_preds_df.select(
    "user", "bot", "prediction",
    F.round(vector_to_array(F.col("probability"))[0], 3).alias(f"prob0"),
    F.round(vector_to_array(F.col("probability"))[1], 3).alias(f"prob1"),
)
best_preds_df.show(5, truncate=False)

+---------+---+----------+-----+-----+
|user     |bot|prediction|prob0|prob1|
+---------+---+----------+-----+-----+
|리듬     |0  |0.0       |0.925|0.075|
|Lohataona|0  |0.0       |0.911|0.089|
|Kevauto  |0  |0.0       |0.73 |0.27 |
|M2Ys4U   |0  |0.0       |0.953|0.047|
|Molgreen |0  |0.0       |0.729|0.271|
+---------+---+----------+-----+-----+
only showing top 5 rows



In [154]:
best_preds_df["bot"]

Column<'bot'>

In [158]:
from sklearn.metrics import classification_report
best_preds_pd_df = best_preds_df.toPandas()
print(classification_report(best_preds_pd_df["bot"].values, best_preds_pd_df["prediction"].values))

              precision    recall  f1-score   support

           0       0.97      1.00      0.98      2194
           1       0.95      0.75      0.84       255

    accuracy                           0.97      2449
   macro avg       0.96      0.87      0.91      2449
weighted avg       0.97      0.97      0.97      2449



In [171]:
(best_preds_pd_df["prediction"].values >= 0.9).astype(int)[:1000]

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,

In [162]:
from sklearn.metrics import classification_report

best_preds_pd_df = best_preds_df.toPandas()
print(classification_report(best_preds_pd_df["bot"].values, (best_preds_pd_df["prediction"].values >= 0.9).astype(int)))

              precision    recall  f1-score   support

           0       0.97      1.00      0.98      2194
           1       0.95      0.75      0.84       255

    accuracy                           0.97      2449
   macro avg       0.96      0.87      0.91      2449
weighted avg       0.97      0.97      0.97      2449



In [188]:
import altair as alt
import pandas as pd 
from sklearn.metrics import roc_curve, auc, f1_score, accuracy_score, precision_recall_curve


def evaluate_per_threshold(df):
    # Extract true labels and predicted probabilities
    pd_df = df.toPandas()
    y_true = pd_df["bot"].values
    y_score = pd_df["prob1"].values
    
    # Compute ROC curve and ROC area
    fpr, tpr, thresholds = roc_curve(y_true, y_score)
    roc_auc = auc(fpr, tpr)
    
    # Calculate accuracy and F1 score for each threshold
    accuracy = []
    f1 = []
    
    for threshold in thresholds:
        y_pred = (y_score >= threshold).astype(int)
        accuracy.append(accuracy_score(y_true, y_pred))
        f1.append(f1_score(y_true, y_pred, average="weighted"))
    
     # Create a DataFrame for Altair
    roc_df = pd.DataFrame({
        'False Positive Rate': fpr,
        'True Positive Rate': tpr,
        'Threshold': thresholds,
        'Accuracy': accuracy,
        'F1 Score': f1
    })
    
    # Create the ROC curve plot with tooltips
    roc_chart = alt.Chart(roc_df).mark_line().encode(
        x=alt.X('False Positive Rate', type="quantitative", title="False Positive Rate"),
        y=alt.Y('True Positive Rate', type="quantitative", title="True Positive Rate"),
        tooltip=[
            'False Positive Rate:Q',
            'True Positive Rate:Q',
            'Threshold:Q',
            'Accuracy:Q',
            'F1 Score:Q'
        ],
        strokeWidth=alt.value(2),
    ).properties(
        title=f'ROC of best model. AUC = {roc_auc:.2f})',
        width=400,
        height=400
    )
    
    # Add diagonal line
    diagonal = alt.Chart(pd.DataFrame({'x': [0, 1], 'y': [0, 1]})).mark_line(color='gray', strokeDash=[5, 5]).encode(
        x='x:Q',
        y='y:Q'
    )
    
    # Compute Precision-Recall curve and PR area
    precision, recall, pr_thresholds = precision_recall_curve(y_true, y_score)
    pr_auc = auc(recall, precision)
    
    # Compute F1 Score for each threshold in PR
    f1_scores_pr = [f1_score(y_true, (y_score >= t).astype(int), average="weighted") for t in pr_thresholds]
    # Compute accuracy for each threshold in PR
    accuracies_pr = [accuracy_score(y_true, (y_score >= t).astype(int)) for t in pr_thresholds]

    # Create a DataFrame for PR Curve
    pr_df = pd.DataFrame({
        'Precision': precision[:-1],  # Exclude last precision value (undefined)
        'Recall': recall[:-1],        # Exclude last recall value (undefined)
        'Threshold': pr_thresholds,
        'F1 Score': f1_scores_pr,
        'Accuracy': accuracies_pr
    })
    
    # Create PR curve plot with tooltips
    pr_chart = alt.Chart(pr_df).mark_line().encode(
        x=alt.X('Recall', type="quantitative", title="Recall"),
        y=alt.Y('Precision', type="quantitative", title="Precision", ),
        tooltip=[
            'Recall:Q',
            'Precision:Q',
            'Threshold:Q',
            'F1 Score:Q',
            'Accuracy:Q'
        ],
        strokeWidth=alt.value(2)
    ).properties(
        title=f'Precision-Recall Curve for best model. (AUC = {pr_auc:.2f})',
        width=400,
        height=400
    )
    
    return ((roc_chart + diagonal) | pr_chart).properties(title=alt.Title("test", anchor="middle"))

In [189]:
evaluate_per_threshold(best_preds_df)

In [183]:
bins = [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95, 1]
best_preds_pd_df['prob0_bin'] = pd.cut(best_preds_pd_df['prob0'], bins=bins, right=False).astype(str)
best_preds_pd_df['prob1_bin'] = pd.cut(best_preds_pd_df['prob1'], bins=bins, right=False).astype(str)
best_preds_pd_df.head()[["prob0", "prob0_bin", "prob1", "prob1_bin"]]

Unnamed: 0,prob0,prob0_bin,prob1,prob1_bin
0,0.925,"[0.9, 0.95)",0.075,"[0.0, 0.1)"
1,0.911,"[0.9, 0.95)",0.089,"[0.0, 0.1)"
2,0.73,"[0.7, 0.75)",0.27,"[0.2, 0.3)"
3,0.953,"[0.95, 1.0)",0.047,"[0.0, 0.1)"
4,0.729,"[0.7, 0.75)",0.271,"[0.2, 0.3)"


In [187]:
prob0 = alt.Chart(best_preds_pd_df).mark_bar().encode(
    x=alt.X("prob0_bin", title="Accuracy", type="nominal", axis=alt.Axis(labelAngle=0)),
    y=alt.Y("count(*)", type="quantitative", title="Count")
).properties(
    title="Count of Probability = 0 Binned",
    width=600,
    height=300
)

prob1 = alt.Chart(best_preds_pd_df).mark_bar().encode(
    x=alt.X("prob1_bin", title="Accuracy", type="nominal", axis=alt.Axis(labelAngle=0)),
    y=alt.Y("count(*)", type="quantitative", title="Count")
).properties(
    title="Count of Probabilities = 1 Binned",
    width=600,
    height=300
)
(prob0 | prob1)

In [None]:
best_result = results_df.orderBy(F.desc("f1")).first()

In [42]:
print(f"Best parameters: numFeatures={best_result.numFeatures}, numTrees={best_result.numTrees}, maxDepth={best_result.maxDepth}")
print(f"F1 Score: {f1_score};\tAccuracy: {accuracy};\tPrecision: {precision};\tRecall: {recall}")

Best parameters: numFeatures=100, numTrees=100, maxDepth=20
F1 Score: 0.911711785200636;	Accuracy: 0.9146480659480025;	Precision: 0.9226667180813601;	Recall: 0.9146480659480025


In [43]:
model, f1_score, accuracy, precision, recall = train_and_evaluate(train, test, best_result.numFeatures, best_result.numTrees, best_result.maxDepth)

best_model_path = "bot_classifier"
model.write().overwrite().save(best_model_path)

In [44]:
from pyspark.ml import PipelineModel


loaded_model = PipelineModel.load(best_model_path)