In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, lit, when
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import json

In [None]:
spark = SparkSession.builder.appName("SocialMediaBotDetection").getOrCreate()

In [None]:
df_raw = spark.read.json("/content/TwiBot-20_sample.json")
df_raw.printSchema()


In [None]:
df_raw.select('ID', 'domain', 'profile', 'tweet', 'neighbor').show(5)

In [None]:
df = df_raw.fillna("")

In [None]:
from pyspark.sql.functions import concat_ws, col

df = df.withColumn(
    "combined_text",
    concat_ws(" ",
              col("profile.description"),
              col("profile.name"),
              concat_ws(" ", col("tweet"))  # flatten the array of tweet strings
             )
)

In [None]:
df.select("profile.*").printSchema()

In [None]:
df = df.withColumn("followers_count_int", col("profile.followers_count").cast("int"))

In [None]:
df = df.withColumn(
    "label",
    when(col("profile.description").contains("bot") | (col("followers_count_int") < 50), 1).otherwise(0)
)

In [None]:
tokenizer = Tokenizer(inputCol="combined_text", outputCol="words")
words_data = tokenizer.transform(df)

In [None]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
filtered_data = remover.transform(words_data)

In [None]:
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=5000)
featurized_data = hashingTF.transform(filtered_data)

In [None]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)

In [None]:
final_data = rescaled_data.select("features", "label")

In [None]:
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

In [None]:
lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)

In [None]:
predictions = lr_model.transform(test_data)
predictions.select("label", "prediction", "probability").show(5)

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.4f}")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, when, length
from pyspark.ml.feature import Tokenizer, StopWordsRemover, Word2Vec
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Initialize Spark Session
spark = SparkSession.builder.appName("AdvancedBotDetection").getOrCreate()

# Load Data
df = spark.read.json("/content/TwiBot-20_sample.json")
df = df.fillna("")

In [None]:
from pyspark.sql.functions import concat_ws, col

df = df.withColumn(
    "combined_text",
    concat_ws(" ",
              col("profile.description"),
              col("profile.name"),
              concat_ws(" ", col("tweet"))  # flatten the array of tweet strings
             )
)

In [None]:
df = df.withColumn("followers_count_int", col("profile.followers_count").cast("int"))
df = df.withColumn(
    "label",
    when(col("profile.description").contains("bot") | (col("followers_count_int") < 50), 1).otherwise(0)
)

In [None]:
df.select("label").groupBy("label").count().show()
df = df.withColumn("text_length", length("combined_text"))

In [None]:
tokenizer = Tokenizer(inputCol="combined_text", outputCol="tokens")
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")

In [None]:
word2vec = Word2Vec(vectorSize=100, minCount=2, inputCol="filtered_tokens", outputCol="features")

In [None]:
lr = LogisticRegression(featuresCol="features", labelCol="label")
rf = RandomForestClassifier(featuresCol="features", labelCol="label")
gbt = GBTClassifier(featuresCol="features", labelCol="label")

In [None]:
pipeline = Pipeline(stages=[tokenizer, remover, word2vec, lr])

In [None]:
train, test = df.randomSplit([0.8, 0.2], seed=42)

In [None]:
paramGrid = ParamGridBuilder().addGrid(word2vec.vectorSize, [50, 100]).addGrid(lr.regParam, [0.01, 0.1]).build()

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

In [None]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

In [None]:
cv_model = crossval.fit(train)
predictions = cv_model.transform(test)

In [None]:
accuracy = evaluator.evaluate(predictions)
print(f"Optimized Model F1 Score: {accuracy:.4f}")
predictions.select("label", "prediction").groupBy("label", "prediction").count().show()

In [None]:
def evaluate_model(classifier, name):
    pipe = Pipeline(stages=[tokenizer, remover, word2vec, classifier])
    model = pipe.fit(train)
    pred = model.transform(test)
    score = evaluator.evaluate(pred)
    print(f"{name} F1 Score: {score:.4f}")

In [None]:
evaluate_model(rf, "Random Forest")
evaluate_model(gbt, "Gradient Boosted Trees")

In [None]:
cv_model.bestModel.write().overwrite().save("best_bot_detector_model")

In [None]:
model = PipelineModel.load("best_bot_detector_model")

In [None]:
!zip -r best_bot_detector_model.zip /content/best_bot_detector_model


In [45]:
from google.colab import files
files.download("best_bot_detector_model.zip")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>