In [None]:
# Install necessary packages
!pip install torch torchvision torchaudio
!pip install torch-geometric
!pip install sentence-transformers
!pip install optuna

In [47]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler, StandardScaler, LabelEncoder
import torch
from torch_geometric.data import Data, DataLoader
import networkx as nx
from torch_geometric.utils import to_networkx
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, GATConv, SAGEConv
import numpy as np
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sentence_transformers import SentenceTransformer
import optuna
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

In [3]:
# Load data
df = pd.read_csv('bot_detection_data.csv')

In [4]:
# Encode categorical features
label_encoder = LabelEncoder()
df['Username'] = label_encoder.fit_transform(df['Username'])
df['Location'] = label_encoder.fit_transform(df['Location'])
df['Hashtags'] = df['Hashtags'].fillna('').apply(lambda x: x.split())

In [5]:
# Convert columns to appropriate formats
df['Retweet Count'] = df['Retweet Count'].astype(float)
df['Mention Count'] = df['Mention Count'].astype(float)
df['Follower Count'] = df['Follower Count'].astype(float)
df['Verified'] = df['Verified'].astype(int)
df['Bot Label'] = df['Bot Label'].astype(int)

In [6]:
# Select features and labels
features = df[['Username', 'Retweet Count', 'Mention Count', 'Follower Count', 'Verified']]
labels = df['Bot Label']

In [7]:
# Preprocess data
numeric_features = df[['Retweet Count', 'Mention Count', 'Follower Count']].values
non_numeric_features = df[['Verified']].values
labels = df['Bot Label'].values

In [8]:
# Scale numeric features
scaler = MinMaxScaler()
scaled_numeric_features = scaler.fit_transform(numeric_features)

In [None]:
# Use SentenceTransformer for tweet embeddings
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
tweet_embeddings = model.encode(df['Hashtags'].apply(lambda x: ' '.join(x)).tolist(), convert_to_tensor=True)

In [10]:
# Perform PCA on tweet embeddings
scaler_tweet = StandardScaler()
tweet_embeddings_scaled = scaler_tweet.fit_transform(tweet_embeddings)
pca = PCA(n_components=100)  # Reduce to 100 dimensions
tweet_embeddings_reduced = pca.fit_transform(tweet_embeddings_scaled)

In [11]:
# Combine all features
features = np.hstack((non_numeric_features, scaled_numeric_features, tweet_embeddings_reduced))

In [12]:
# Create a graph
k = 5  # Number of neighbors
G = nx.Graph()
for i, node in enumerate(df.index):
    G.add_node(i)
    neighbors = df.index[(df['Follower Count'] - df.iloc[i]['Follower Count']).abs().argsort()[1:k+1]]
    for neighbor in neighbors:
        G.add_edge(i, neighbor)

In [13]:
# Convert to PyTorch Geometric data format
edge_index = torch.tensor(list(G.edges)).t().contiguous()
x = torch.tensor(features, dtype=torch.float)
y = torch.tensor(labels, dtype=torch.long)

data = Data(x=x, edge_index=edge_index, y=y)

In [14]:
# Save the data
torch.save(data, 'bot_detection_data.pyg')
data = torch.load('bot_detection_data.pyg')

In [15]:
# Split the data into training, validation, and testing sets
def split_data(data, train_ratio=0.8, val_ratio=0.1):
    num_nodes = data.num_nodes
    indices = torch.randperm(num_nodes)

    train_split = int(train_ratio * num_nodes)
    val_split = int((train_ratio + val_ratio) * num_nodes)

    train_mask = torch.zeros(num_nodes, dtype=torch.bool)
    val_mask = torch.zeros(num_nodes, dtype=torch.bool)
    test_mask = torch.zeros(num_nodes, dtype=torch.bool)

    train_mask[indices[:train_split]] = True
    val_mask[indices[train_split:val_split]] = True
    test_mask[indices[val_split:]] = True

    return train_mask, val_mask, test_mask

data.train_mask, data.val_mask, data.test_mask = split_data(data)

In [None]:
# Create DataLoader objects
train_loader = DataLoader([data], batch_size=1, shuffle=True)
val_loader = DataLoader([data], batch_size=1, shuffle=False)
test_loader = DataLoader([data], batch_size=1, shuffle=False)

In [17]:
# Define Hybrid GNN model with GCN, GAT and GraphSAGE
class HybridGNN(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels, dropout_rate=0.5):
        super(HybridGNN, self).__init__()
        self.gcn1 = GCNConv(in_channels, hidden_channels)
        self.gat1 = GATConv(hidden_channels, hidden_channels, heads=8, dropout=dropout_rate)
        self.sage1 = SAGEConv(hidden_channels * 8, hidden_channels)
        self.gat2 = GATConv(hidden_channels, out_channels, heads=1, concat=False, dropout=dropout_rate)
        self.dropout = torch.nn.Dropout(dropout_rate)

    def forward(self, x, edge_index):
        x = F.elu(self.gcn1(x, edge_index))
        x = self.dropout(x)
        x = F.elu(self.gat1(x, edge_index))
        x = self.dropout(x)
        x = F.elu(self.sage1(x, edge_index))
        x = self.dropout(x)
        x = self.gat2(x, edge_index)
        return F.log_softmax(x, dim=1)

In [18]:
# Define training function
def train(model, data, optimizer, criterion, device):
    model.train()
    data = data.to(device)
    optimizer.zero_grad()
    out = model(data.x, data.edge_index)
    loss = criterion(out[data.train_mask], data.y[data.train_mask])
    loss.backward()
    optimizer.step()
    return loss.item()

In [19]:
# Define evaluation function
def evaluate(model, data, device, mask):
    model.eval()
    data = data.to(device)
    with torch.no_grad():
        out = model(data.x, data.edge_index)
        preds = out.argmax(dim=1)

    preds = preds[mask].cpu().numpy()
    labels = data.y[mask].cpu().numpy()

    accuracy = accuracy_score(labels, preds)
    precision = precision_score(labels, preds)
    recall = recall_score(labels, preds)
    f1 = f1_score(labels, preds)

    return accuracy, precision, recall, f1


In [20]:
# Define hyperparameters for Optuna
def objective(trial):
    hidden_channels = trial.suggest_int('hidden_channels', 32, 256)
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True)
    weight_decay = trial.suggest_float('weight_decay', 1e-6, 1e-3, log=True)
    dropout_rate = trial.suggest_float('dropout_rate', 0.1, 0.6)
    num_epochs = 200

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = HybridGNN(in_channels=data.x.shape[1], hidden_channels=hidden_channels, out_channels=2, dropout_rate=dropout_rate).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)  # Add learning rate scheduler
    criterion = torch.nn.CrossEntropyLoss()

    # Training loop
    for epoch in range(num_epochs):
        loss = train(model, data, optimizer, criterion, device)
        scheduler.step()  # Step the scheduler
        if epoch % 10 == 0:
            print(f"Epoch {epoch}, Loss: {loss:.4f}")

    # Save the trained model
    torch.save(model.state_dict(), 'bot_detection_hybrid_gnn.pyg')

    # Load the trained model (for inference or further training)
    model.load_state_dict(torch.load('bot_detection_hybrid_gnn.pyg'))
    model.eval()

    # Evaluate on the validation set
    accuracy, precision, recall, f1 = evaluate(model, data, device, data.val_mask)

    return f1  # Optimize for F1 Score

In [None]:
# Hyperparameter optimization with Optuna
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50)

In [39]:
# Save the study results to a CSV file
study_results = study.trials_dataframe()
study_results.to_csv('study_results.csv', index=False)

In [None]:
# Plot the optimization history
optuna.visualization.plot_optimization_history(study).show()

In [41]:
# Plot the hyperparameter importances
optuna.visualization.plot_param_importances(study).show()

In [None]:
# Best hyperparameters
best_params = study.best_params
print("Best hyperparameters:", best_params)

In [31]:
# Train final model with best hyperparameters
hidden_channels = best_params['hidden_channels']
learning_rate = best_params['learning_rate']
weight_decay = best_params['weight_decay']
dropout_rate = best_params['dropout_rate']
num_epochs = 200

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = HybridGNN(in_channels=data.x.shape[1], hidden_channels=hidden_channels, out_channels=2).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)  # Add learning rate scheduler
criterion = torch.nn.CrossEntropyLoss()


In [None]:
# Training loop
for epoch in range(num_epochs):
    loss = train(model, data, optimizer, criterion, device)
    scheduler.step()  # Step the scheduler
    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {loss:.4f}")

In [33]:
# Save the trained model
torch.save(model.state_dict(), 'bot_detection_hybrid_gnn_best.pyg')

In [None]:
# Load the trained model (for inference or further training)
model.load_state_dict(torch.load('bot_detection_hybrid_gnn_best.pyg'))
model.eval()

In [35]:
# Evaluate on the test set
accuracy, precision, recall, f1 = evaluate(model, data, device, data.test_mask)


In [None]:
# Convert to percentages
accuracy_pct = accuracy * 100
precision_pct = precision * 100
recall_pct = recall * 100
f1_pct = f1 * 100

print(f"Test Accuracy: {accuracy_pct:.2f}%")
print(f"Test Precision: {precision_pct:.2f}%")
print(f"Test Recall: {recall_pct:.2f}%")
print(f"Test F1 Score: {f1_pct:.2f}%")

In [None]:
# Metrics and their values
metrics = ['Test Accuracy', 'Test Precision', 'Test Recall', 'Test F1 Score']
values = [accuracy_pct, precision_pct, recall_pct, f1_pct]

# Plotting the bar graph
plt.figure(figsize=(10, 6))
plt.bar(metrics, values, color=['blue', 'orange', 'green', 'red'])
plt.title('Model Performance Metrics')
plt.xlabel('Metrics')
plt.ylabel('Values (%)')
plt.ylim(0, 100)
plt.grid(axis='y')

# Add value labels on the bars
for i, value in enumerate(values):
    plt.text(i, value + 1, f'{value:.2f}%', ha='center')

plt.savefig('performance_metrics.png')
plt.show()

In [48]:
def get_predictions(model, data, device):
    model.eval()
    with torch.no_grad():
        out = model(data.x.to(device), data.edge_index.to(device))
        _, pred = out.max(dim=1)
    return pred.cpu().numpy(), data.y.cpu().numpy()

In [None]:
# Get predictions and true labels
pred, true = get_predictions(model, data, device)

# Compute the confusion matrix
cm = confusion_matrix(true[data.test_mask], pred[data.test_mask])

# Plot the confusion matrix
plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Bot', 'Non Bot'], yticklabels=['Bot', 'Non Bot'])
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()

plt.savefig('confusion_matrix.png')