In [1]:
import pandas as pd
import numpy as np
import random
import time
from datetime import datetime, timedelta
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score, classification_report

In [None]:
# Load the simulated historical data
file_path = "transaction_dataset.csv"
df = pd.read_csv(file_path)

# Drop irrelevant columns (adjust column names if necessary)
df = df.drop(columns=['Unnamed: 0', 'Index', 'Address'])

In [None]:
# Define target and features
y = df['FLAG']  # Assuming 'FLAG' is the target indicating bot activity (1 = Bot, 0 = Legitimate)
X = df.drop(columns=['FLAG'])

##  Feature Engineering for Bot Detection

In [None]:


def feature_engineering_for_bot_detection(df):
    """
    Feature engineering to capture bot behavior characteristics:
    1. Transaction Timing Consistency
    2. Unusual Transaction Amounts
    3. Pattern Consistency in Transactions
    """
    # Feature 1: Transaction Timing Consistency
    # (Assumes the CSV has a column 'Time Diff between first and last (Mins)')
    if 'Time Diff between first and last (Mins)' in df.columns:
        df['Transaction Time Diff'] = df['Time Diff between first and last (Mins)']
    else:
        df['Transaction Time Diff'] = 0

    # Feature 2: Unusual Transaction Amounts
    # Bots typically send similar amounts.
    if set(['avg val sent', 'avg val received']).issubset(df.columns):
        df['Transaction Amount Variance'] = df[['avg val sent', 'avg val received']].std(axis=1)
    else:
        df['Transaction Amount Variance'] = 0

    # Feature 3: Pattern Consistency
    if 'Unique Sent To Addresses' in df.columns:
        df['Unique Sent Addresses'] = df['Unique Sent To Addresses']
    else:
        df['Unique Sent Addresses'] = 0

    # Bot Activity Indicator (for demo purposes)
    df['Bot Activity Indicator'] = (
        df['Transaction Time Diff'] * df['Transaction Amount Variance'] * df['Unique Sent Addresses']
    )
    
    return df

In [None]:

# Apply feature engineering for bot detection on historical data
df = feature_engineering_for_bot_detection(df)

## Build and Train the Model Pipeline

In [None]:
X = df.drop(columns=['FLAG'])


In [None]:
# Identify numerical and categorical columns from the training set
numerical_cols = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
categorical_cols = X.select_dtypes(include=['object']).columns.tolist()

In [None]:

# Build and store the list of training columns (order matters)
training_columns = X.columns.tolist()

In [None]:

# Preprocessing pipelines for numerical and categorical features
numerical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])
preprocessor = ColumnTransformer(transformers=[
    ('num', numerical_transformer, numerical_cols),
    ('cat', categorical_transformer, categorical_cols)
])

In [None]:
# Build the complete pipeline
pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(random_state=42))
])

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
pipeline.fit(X_train, y_train)

In [None]:
# Evaluate the model
y_pred = pipeline.predict(X_test)
test_accuracy = accuracy_score(y_test, y_pred)
print("\nTest Accuracy: {:.4f}".format(test_accuracy))
print("\nClassification Report:\n", classification_report(y_test, y_pred))

## Real-Time Bot Detection Simulation

In [None]:
def align_features_for_real_time(df_sim):
    """
    Ensure that the simulation DataFrame has exactly the same columns (and order)
    as in the training data. For any missing column, we add a default value:
      - For numerical features: 0
      - For categorical features: 'Unknown'
    """
    # Loop over all training columns
    for col in training_columns:
        if col not in df_sim.columns:
            if col in numerical_cols:
                df_sim[col] = 0
            else:
                df_sim[col] = 'Unknown'
    # Reorder columns to match training data
    df_sim = df_sim[training_columns]
    return df_sim


In [None]:
def simulate_real_time_transaction():
    """
    Simulate a single real-time transaction with raw data.
    Some raw columns are used in our historical feature engineering.
    """
    transaction_data = {
        'Sent tnx': random.randint(1, 10),
        'Received Tnx': random.randint(1, 10),
        'avg val sent': random.uniform(0.01, 5.0),
        'avg val received': random.uniform(0.01, 5.0),
        'Unique Sent To Addresses': random.randint(1, 20),
        'Unique Received From Addresses': random.randint(1, 20),
        'Time Diff between first and last (Mins)': random.uniform(1, 120),
        'Timestamp': datetime.now()
    }
    return transaction_data

In [None]:
def feature_engineering_for_real_time_bot_detection(transaction_data, previous_data=None):
    """
    Generate features for real-time bot detection based on transaction data.
    This function computes a few derived features and then aligns the result
    with the training feature set.
    """
    # Compute derived features from the raw transaction data
    transaction_frequency = transaction_data['Sent tnx'] + transaction_data['Received Tnx']
    average_sent_amount = transaction_data['avg val sent']
    average_received_amount = transaction_data['avg val received']
    unique_sent_addresses = transaction_data['Unique Sent To Addresses']
    
    # Compute Transaction Time Consistency based on previous transaction timestamps
    if previous_data is not None:
        time_diff = (transaction_data['Timestamp'] - previous_data['Timestamp']).total_seconds() / 60.0
    else:
        time_diff = 0

    # Compute the amount variance for the current transaction (using sent and received amounts)
    transaction_amount_variance = np.std([average_sent_amount, average_received_amount])
    
    # Build a features dictionary.
    # Note: We include only a subset of features; all other features from training will be added as defaults.
    features = {
        'Transaction Frequency': transaction_frequency,
        'Average Sent Amount': average_sent_amount,
        'Average Received Amount': average_received_amount,
        'Unique Sent Addresses': unique_sent_addresses,
        # For this simulation, we use our computed time difference as "Transaction Time Consistency"
        'Transaction Time Consistency': time_diff,
        'Transaction Time Diff': time_diff,  # to mimic the historical feature
        'Transaction Amount Variance': transaction_amount_variance,
        'Bot Activity Indicator': transaction_frequency * transaction_amount_variance * unique_sent_addresses,
        # Optionally, you could add more simulated values here if desired.
    }
    
    features_df = pd.DataFrame([features])
    
    # Align simulated features with training columns
    features_df = align_features_for_real_time(features_df)
    
    return features_df

In [None]:
def real_time_bot_detection(num_transactions=10, delay=2):
    """
    Simulate real-time monitoring of incoming transactions for bot detection.
    """
    previous_data = None  # Used for computing time differences between transactions
    print("\n--- Starting Real-Time Bot Detection Simulation ---")
    
    for i in range(num_transactions):
        transaction_data = simulate_real_time_transaction()
        transaction_df = feature_engineering_for_real_time_bot_detection(transaction_data, previous_data)
        
        # Predict using the trained model
        # (Note: The pipeline expects exactly the same features as in training.)
        bot_prob = pipeline.predict_proba(transaction_df)[0, 1]
        prediction = pipeline.predict(transaction_df)[0]
        
        print(f"\nTransaction {i + 1}:")
        print(f"Predicted Bot Activity Probability: {bot_prob:.4f}")
        if prediction == 1:
            print("⚠️ Bot Detected! Stopping transaction.")
        else:
            print("Transaction appears legitimate. Proceeding with processing.")
        
        previous_data = transaction_data  # Update for next iteration
        time.sleep(delay)
    
    print("\n--- Simulation Completed ---")

In [None]:
# Run the real-time Bot detection monitoring simulation
real_time_bot_detection(num_transactions=10, delay=2)



Test Accuracy: 0.9888

Classification Report:
               precision    recall  f1-score   support

           0       0.99      1.00      0.99      1533
           1       1.00      0.95      0.97       436

    accuracy                           0.99      1969
   macro avg       0.99      0.98      0.98      1969
weighted avg       0.99      0.99      0.99      1969


--- Starting Real-Time Bot Detection Simulation ---

Transaction 1:
Predicted Bot Activity Probability: 0.3900
Transaction appears legitimate. Proceeding with processing.

Transaction 2:
Predicted Bot Activity Probability: 0.3600
Transaction appears legitimate. Proceeding with processing.

Transaction 3:
Predicted Bot Activity Probability: 0.3600
Transaction appears legitimate. Proceeding with processing.

Transaction 4:
Predicted Bot Activity Probability: 0.3900
Transaction appears legitimate. Proceeding with processing.

Transaction 5:
Predicted Bot Activity Probability: 0.3600
Transaction appears legitimate. Proce

In [None]:
import joblib 

#save the feature columns 

joblib.dump(training_columns, 'bot_detection_features.joblib')

['bot_detection_features.joblib']

In [None]:

# Save the trained model

joblib.dump(pipeline, 'bot_detection_model.joblib')

['bot_detection_model.joblib']

: 