Imports

In [None]:
from transformers import pipeline
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import re
from urllib.parse import urlparse
from sklearn.model_selection import train_test_split

Load the Data

Using dataset from cresci-2017 https://botometer.osome.iu.edu/bot-repository/datasets/cresci-2017/cresci-2017.csv.zip and at http://mib.projects.iit.cnr.it/dataset.html

In [None]:
data = pd.read_csv('genuine_accounts.csv/tweets.csv', encoding='latin1', low_memory=False)
data = data.drop_duplicates()

In [None]:
def replace_urls_with_hostnames(text):
    url_pattern = r"http[s]?://[^\s]+"
    def extract_hostname(match):
        url = match.group(0)
        try:
            hostname = urlparse(url).hostname
            return hostname if hostname else url
        except Exception as e:
            return url
    try:
        result = re.sub(url_pattern, extract_hostname, text)
    except Exception as e:
        result = text
    return result
data['text'] = data['text'].astype(str)
data['text'] = data['text'].fillna('')
data['text'] = data['text'].apply(replace_urls_with_hostnames)

Adds 'bot' label to dataset 'data'

In [None]:
# Add a new column "bot" with value 0 for "not a bot"
data['bot'] = 0
data = data.sample(n=400000, random_state=42)

Add in 'traditional_spaambots_1' into dataset 'data' with bot label of 1

In [None]:
bot_data = pd.read_csv('traditional_spambots_1.csv/tweets.csv', encoding='latin1', low_memory=False)
bot_data = bot_data.drop_duplicates()
bot_data['text'] = bot_data['text'].astype(str)
bot_data['text'] = bot_data['text'].fillna('')
bot_data['text'] = bot_data['text'].apply(replace_urls_with_hostnames)

bot_data2 = pd.read_csv('fake_followers.csv/tweets.csv', encoding='latin1', low_memory=False)
bot_data2 = bot_data2.drop_duplicates()
bot_data2['text'] = bot_data2['text'].astype(str)
bot_data2['text'] = bot_data2['text'].fillna('')
bot_data2['text'] = bot_data2['text'].apply(replace_urls_with_hostnames)

bot_data = pd.concat([bot_data, bot_data2], ignore_index=True)

bot_data2 = pd.read_csv('social_spambots_1.csv/tweets.csv', encoding='latin1', low_memory=False)
bot_data2 = bot_data2.drop_duplicates()
bot_data2['text'] = bot_data2['text'].astype(str)
bot_data2['text'] = bot_data2['text'].fillna('')
bot_data2['text'] = bot_data2['text'].apply(replace_urls_with_hostnames)

bot_data = pd.concat([bot_data, bot_data2], ignore_index=True)

bot_data2 = pd.read_csv('social_spambots_2.csv/tweets.csv', encoding='latin1', low_memory=False)
bot_data2 = bot_data2.drop_duplicates()
bot_data2['text'] = bot_data2['text'].astype(str)
bot_data2['text'] = bot_data2['text'].fillna('')
bot_data2['text'] = bot_data2['text'].apply(replace_urls_with_hostnames)

bot_data = pd.concat([bot_data, bot_data2], ignore_index=True)

bot_data2 = pd.read_csv('social_spambots_3.csv/tweets.csv', encoding='latin1', low_memory=False)
bot_data2 = bot_data2.drop_duplicates()
bot_data2['text'] = bot_data2['text'].astype(str)
bot_data2['text'] = bot_data2['text'].fillna('')
bot_data2['text'] = bot_data2['text'].apply(replace_urls_with_hostnames)

bot_data = pd.concat([bot_data, bot_data2], ignore_index=True)

del bot_data2

# Add the 'bot' column to the bot dataset with value 1
bot_data['bot'] = 1
bot_data = bot_data.sample(n=100000, random_state=42)
# Concatenate the two datasets
combined_data = pd.concat([data, bot_data], ignore_index=True)

# Verify the concatenation
print(combined_data['bot'].value_counts())  # Check the distribution of bots
# print(combined_data.head())                 # Preview the combined dataset

Check functionality of PyTorch

In [None]:
torch.cuda.is_available()

In [None]:
torch.__version__

Optional Sentiment Analysis
Ensure torch.cuda.is_available() returns True if enabling section

In [None]:
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# sentiment_analysis = pipeline(model="finiteautomata/bertweet-base-sentiment-analysis", device=device)
# temp = sentiment_analysis(combined_data['text'].to_numpy().tolist())

In [None]:
# sentiment = [i['label'] for i in temp]
# combined_data['Sentiment'] = sentiment

Split data into X and y and train test

In [None]:
from sklearn.model_selection import train_test_split

# Separate features (X) and target labels (y)
X = combined_data.drop(columns=['bot'])  # Drop the 'bot' column for features
y = combined_data['bot']                # Target column is 'bot'

# Perform the train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, random_state=42)

# Verify the splits
print(f"Training set size: {X_train.shape[0]} samples")
print(f"Test set size: {X_test.shape[0]} samples")

TF-IDF Vectorization

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

# Ensure no missing values in the 'text' column
X_train['text'] = X_train['text'].fillna("")
X_test['text'] = X_test['text'].fillna("")

# Initialize TF-IDF vectorizer
tfidf_vectorizer = TfidfVectorizer(max_features=1000)  # Adjust max_features as needed

# Fit and transform the text data for training set
X_train_tfidf = tfidf_vectorizer.fit_transform(X_train['text'])

# Transform the text data for the test set
X_test_tfidf = tfidf_vectorizer.transform(X_test['text'])

Preprocessing

In [None]:
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler

# Define columns
categorical_columns = ['source', 'in_reply_to_screen_name', 'place', 'created_at', 'timestamp', 'crawled_at', 'updated']
numeric_columns = ['user_id', 'truncated', 'in_reply_to_status_id', 'in_reply_to_user_id', 'retweeted_status_id',
                   'geo', 'contributors', 'retweet_count', 'reply_count', 'favorite_count', 'favorited', 'retweeted',
                   'possibly_sensitive', 'num_hashtags', 'num_urls', 'num_mentions']
text_column = 'text'

# Impute missing values in numeric and categorical columns
numeric_imputer = SimpleImputer(strategy='mean')
categorical_imputer = SimpleImputer(strategy='most_frequent')

# Update the preprocessor to include imputers
preprocessor = ColumnTransformer(
    transformers=[
        ('cat', Pipeline([
            ('impute', categorical_imputer),
            ('encode', OneHotEncoder(handle_unknown='ignore'))
        ]), categorical_columns),
        ('num', Pipeline([
            ('impute', numeric_imputer),
            ('scale', StandardScaler())
        ]), numeric_columns)
    ],
    remainder='drop'
)

# Transform the non-text features
X_train_non_text = preprocessor.fit_transform(X_train)
X_test_non_text = preprocessor.transform(X_test)

# Ensure no NaNs in TF-IDF vectorization
X_train_tfidf = tfidf_vectorizer.fit_transform(X_train[text_column].fillna(''))
X_test_tfidf = tfidf_vectorizer.transform(X_test[text_column].fillna(''))

# Combine features
from scipy.sparse import hstack
X_train_combined = hstack([X_train_non_text, X_train_tfidf])
X_test_combined = hstack([X_test_non_text, X_test_tfidf])

# Confirm no NaNs in combined datasets
print("Number of NaNs in X_train_combined after preprocessing:", np.isnan(X_train_combined.data).sum())
print("Number of NaNs in X_test_combined after preprocessing:", np.isnan(X_test_combined.data).sum())


In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.metrics import classification_report, confusion_matrix
from tqdm import tqdm
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression, RidgeClassifier

Further separate the validation set from the test set

In [None]:
X_val_combined, X_test_combined, y_val, y_test = train_test_split(X_test_combined, y_test, test_size=0.5, random_state=42)

Train Model
Use scoring block at bottom with proper variables for ROC-AUC and PRC curves

Ridge Classifier

In [None]:
ridge = RidgeClassifier(class_weight='balanced', random_state=42, max_iter=10000)
scaler = StandardScaler(with_mean=False)
X_train_ridge = scaler.fit_transform(X_train_combined)
ridge.fit(X_train_ridge, y_train)

X_val_ridge = scaler.transform(X_val_combined)
y_pred = ridge.predict(X_val_ridge)

print("Classification Report:")
print(classification_report(y_val, y_pred))

print("\nConfusion Matrix:")
print(confusion_matrix(y_val, y_pred))
yscores = ridge.decision_function(X_val_combined)



In [None]:
incorrect_ridge = np.where(y_pred != y_val)
incorrect_preds_ridge = X_val_combined.iloc[incorrect_ridge]
# print(incorrect_preds_ridge['text'])

SVC Sigmoid

In [None]:
svc = SVC(kernel='sigmoid', random_state=42, class_weight='balanced', verbose=True, cache_size=4000, max_iter=10000)

scaler = StandardScaler(with_mean=False)
X_train_svc = scaler.fit_transform(X_train_combined)
svc.fit(X_train_svc, y_train)
# Predict
X_val_svc = scaler.transform(X_val_combined)
y_pred = svc.predict(X_val_svc)

# Evaluate
print("Classification Report:")
print(classification_report(y_val, y_pred))

print("\nConfusion Matrix:")
print(confusion_matrix(y_val, y_pred))

In [None]:
incorrect_svc = np.where(y_pred != y_val)
incorrect_preds_svc = X_val_combined.iloc[incorrect_svc]
# print(incorrect_preds_svc['text'])

SVC Polynomial

In [None]:
svc = SVC(kernel='poly', random_state=42, class_weight='balanced', verbose=True, cache_size=4000, max_iter=10000)

scaler = StandardScaler(with_mean=False)
X_train_svc = scaler.fit_transform(X_train_combined)
svc.fit(X_train_svc, y_train)

X_val_svc = scaler.transform(X_val_combined)
y_pred = svc.predict(X_val_svc)

print("Classification Report:")
print(classification_report(y_val, y_pred))

print("\nConfusion Matrix:")
print(confusion_matrix(y_val, y_pred))

In [None]:
incorrect_svc = np.where(y_pred != y_val)
incorrect_preds_svc2 = X_val_combined.iloc[incorrect_svc]
# print(incorrect_preds_svc['text'])

SVC RBF

In [None]:
svc = SVC(kernel='rbf', random_state=42, class_weight='balanced', verbose=True, cache_size=4000, max_iter=10000)

scaler = StandardScaler(with_mean=False)
X_train_svc = scaler.fit_transform(X_train_combined)
svc.fit(X_train_svc, y_train)

X_val_svc = scaler.transform(X_val_combined)
y_pred = svc.predict(X_val_svc)

print("Classification Report:")
print(classification_report(y_val, y_pred))

print("\nConfusion Matrix:")
print(confusion_matrix(y_val, y_pred))

In [None]:
incorrect_svc = np.where(y_pred != y_val)
incorrect_preds_svc3 = X_val_combined.iloc[incorrect_svc]
# print(incorrect_preds_svc['text'])

Logistic Regression

In [None]:
lr = LogisticRegression(random_state=42, class_weight='balanced', verbose=True, max_iter=10000)
scaler = StandardScaler(with_mean=False)
X_train_svc = scaler.fit_transform(X_train_combined)
lr.fit(X_train_svc, y_train)
# Predict
X_val_svc = scaler.transform(X_val_combined)
y_pred = lr.predict(X_val_svc)

# Evaluate
print("Classification Report:")
print(classification_report(y_val, y_pred))

print("\nConfusion Matrix:")
print(confusion_matrix(y_val, y_pred))

In [None]:
incorrect_lr = np.where(y_pred != y_val)
incorrect_preds_lr = X_val_combined.iloc[incorrect_lr]
# print(incorrect_preds_lr['text'])

Random Forest

In [None]:
rf = RandomForestClassifier(warm_start = False, class_weight='balanced', random_state=42, n_jobs=-1, max_depth=100)
rf.fit(X_train_combined, y_train)
# n_estimators = 100  # Total number of trees
# for i in tqdm(range(1, n_estimators + 1), desc="Training Progress"):
#     rf.set_params(n_estimators=i)  # Increment the number of trees
#preprocess for svc
y_pred = rf.predict(X_val_combined)

# Evaluate
print("Classification Report:")
print(classification_report(y_val, y_pred))

print("\nConfusion Matrix:")
print(confusion_matrix(y_val, y_pred))

In [None]:
incorrect = np.where(y_pred != y_val)
incorrect_preds_rf = X_val_combined.iloc[incorrect]
# print(incorrect_preds['text'])

Ensemble

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import VotingClassifier

# Define individual models
ridge = RidgeClassifier(class_weight='balanced', random_state=42, max_iter=10000)
lr = LogisticRegression(class_weight='balanced', random_state=42, n_jobs=-1, max_iter=10000)
rf = RandomForestClassifier(warm_start = False, class_weight='balanced', random_state=42, n_jobs=-1, max_depth=100)
# Create the ensemble
ensemble = VotingClassifier(estimators=[
    ('rf', rf), ('ridge', ridge), ('lr', lr)
], voting='hard')  # Use 'hard' for majority vote or 'soft' for averaged probabilities

# Train the ensemble
X_train_svc = scaler.transform(X_train_combined)
ensemble.fit(X_train_svc, y_train)
# Predict
X_val_svc = scaler.transform(X_val_combined)
y_pred_ensemble = ensemble.predict(X_val_svc)

print("Ensemble Classification Report:")
print(classification_report(y_val, y_pred_ensemble))

print("\nConfusion Matrix:")
print(confusion_matrix(y_val, y_pred_ensemble))


In [None]:
#find the incorrect predictions
incorrect = np.where(y_pred_ensemble != y_val)
incorrect_preds = X_val_combined.iloc[incorrect]
print(incorrect_preds['text'])

In [None]:
lr = LogisticRegression(class_weight='balanced', random_state=42, n_jobs=-1, max_iter=10000)
rf = RandomForestClassifier(warm_start = False, class_weight='balanced', random_state=42, n_jobs=-1, max_depth=100)
# Create the ensemble
ensemble2 = VotingClassifier(estimators=[
    ('rf', rf), ('lr', lr)
], voting='soft')  # Use 'hard' for majority vote or 'soft' for averaged probabilities

# Train the ensemble
X_train_svc = scaler.transform(X_train_combined)
ensemble2.fit(X_train_svc, y_train)
# Predict
X_val_svc = scaler.transform(X_val_combined)
y_pred_ensemble2 = ensemble2.predict(X_val_svc)

print("Ensemble Classification Report:")
print(classification_report(y_val, y_pred_ensemble2))

print("\nConfusion Matrix:")
# print(confusion_matrix(y_val, y_pred_ensemble2))

In [None]:
lr = LogisticRegression(class_weight='balanced', random_state=42, n_jobs=-1, max_iter=10000)
ridge = RidgeClassifier(class_weight='balanced', random_state=42, max_iter=10000)
ensemble = VotingClassifier(estimators=[
    ('lr', lr), ('ridge', ridge)
], voting='hard')

# Train the ensemble
X_train_svc = scaler.transform(X_train_combined)
ensemble.fit(X_train_svc, y_train)
# Predict
X_test_svc = scaler.transform(X_test_combined)
y_pred_ensemble = ensemble.predict(X_test_svc)

print("Ensemble 1 Classification Report:")
print(classification_report(y_test, y_pred_ensemble))

print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred_ensemble))

In [None]:
lr = LogisticRegression(class_weight='balanced', random_state=42, n_jobs=-1, max_iter=10000)
rf = RandomForestClassifier(warm_start = False, class_weight='balanced', random_state=42, n_jobs=-1, max_depth=100)
svc = SVC(kernel='rbf', random_state=42, class_weight='balanced', verbose=True, cache_size=4000, max_iter=5000)
ensemble2 = VotingClassifier(estimators=[
    ('rf', rf), ('svc', svc), ('lr', lr)
], voting='hard')

# Train the ensemble
X_train_svc = scaler.transform(X_train_combined)
ensemble2.fit(X_train_svc, y_train)
# Predict
X_test_svc = scaler.transform(X_test_combined)
y_pred_ensemble2 = ensemble.predict(X_test_svc)

print("Ensemble 2 Classification Report:")
print(classification_report(y_test, y_pred_ensemble2))

print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred_ensemble2))

In [None]:
lr = LogisticRegression(class_weight='balanced', random_state=42, n_jobs=-1, max_iter=10000)
rf = RandomForestClassifier(warm_start = False, class_weight='balanced', random_state=42, n_jobs=-1, max_depth=100)
ridge = RidgeClassifier(class_weight='balanced', random_state=42, max_iter=10000)
ensemble3 = VotingClassifier(estimators=[
    ('rf', rf), ('ridge', ridge), ('lr', lr)
], voting='hard')

# Train the ensemble
X_train_svc = scaler.transform(X_train_combined)
ensemble3.fit(X_train_svc, y_train)
# Predict
X_test_svc = scaler.transform(X_test_combined)
y_pred_ensemble3 = ensemble.predict(X_test_svc)

print("Ensemble 3 Classification Report:")
print(classification_report(y_test, y_pred_ensemble3))

print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred_ensemble3))

In [None]:
lr = LogisticRegression(class_weight='balanced', random_state=42, n_jobs=-1, max_iter=10000)
rf = RandomForestClassifier(warm_start = False, class_weight='balanced', random_state=42, n_jobs=-1, max_depth=100)
ensemble4 = VotingClassifier(estimators=[
    ('rf', rf), ('lr', lr)
], voting='hard')

# Train the ensemble
X_train_svc = scaler.transform(X_train_combined)
ensemble4.fit(X_train_svc, y_train)
# Predict
X_test_svc = scaler.transform(X_test_combined)
y_pred_ensemble4 = ensemble4.predict(X_test_svc)

print("Ensemble 4 Classification Report:")
print(classification_report(y_test, y_pred_ensemble4))

print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred_ensemble4))

Metrics
Change variables according to the model being scored (swap y_val for y_test if scoring the test set, and yscores for y_pred, y_pred_proba, or y_pred_ensemble as needed)

In [None]:
from sklearn.metrics import auc, roc_curve, roc_auc_score, precision_recall_curve

fpr, tpr, _ = roc_curve(y_val, yscores)
roc_auc = roc_auc_score(y_val, yscores)

plt.figure()
plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], 'k--', label='Random Guess')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc='lower right')
plt.grid()
plt.show()

# Generate Precision-Recall Curve
precision, recall, _ = precision_recall_curve(y_val, yscores)
pr_auc = auc(recall, precision)

plt.figure()
plt.plot(recall, precision, label=f'Precision-Recall Curve (AUC = {pr_auc:.2f})')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='lower left')
plt.grid()
plt.show()

Alternative Score Display Methods

In [None]:
#roc and auc curves
# from sklearn.metrics import roc_curve, roc_auc_score
# from sklearn.metrics import RocCurveDisplay
#
# # Plot ROC curve
# fig, ax = plt.subplots(figsize=(10, 8))
# fpr, tpr, _ = roc_curve(y_test, y_pred_ensemble)
# roc_auc = roc_auc_score(y_test, y_pred_ensemble)
# roc_display = RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=roc_auc, estimator_name='Ensemble')
# roc_display.plot(ax=ax)

In [None]:
# #precision recall curve
# from sklearn.metrics import precision_recall_curve
# from sklearn.metrics import PrecisionRecallDisplay
#
# # Plot precision-recall curve
# fig, ax = plt.subplots(figsize=(10, 8))
# precision, recall, _ = precision_recall_curve(y_test, y_pred_ensemble)
# pr_display = PrecisionRecallDisplay(precision=precision, recall=recall, estimator_name='Ensemble')
# pr_display.plot(ax=ax)