In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (classification_report, confusion_matrix,
                             precision_recall_curve, auc, f1_score,
                             precision_score, recall_score, roc_auc_score)
from imblearn.over_sampling import SMOTE
import xgboost as xgb
from collections import defaultdict
import warnings
warnings.filterwarnings('ignore')

In [None]:
# Set random seed for reproducibility
RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)

print("=" * 80)
print("ETHEREUM ADDRESS POISONING DETECTION SYSTEM")
print("=" * 80)

ETHEREUM ADDRESS POISONING DETECTION SYSTEM


In [None]:
# ============================================================================
# PHASE A: FEATURE ENGINEERING
# ============================================================================

class EthereumFeatureEngineering:
    """
    Feature engineering pipeline for Ethereum transaction analysis
    Focuses on behavioral patterns indicative of address poisoning attacks
    """

    def __init__(self):
        self.from_address_history = defaultdict(list)
        self.to_address_seen = defaultdict(set)
        self.address_freq_from = {}
        self.address_freq_to = {}

    def load_and_analyze_data(self, filepath):
        """Load CSV and perform initial analysis"""
        print("\n[1] Loading Dataset...")
        df = pd.read_csv(filepath)

        print(f"Dataset Shape: {df.shape}")
        print(f"\nColumn Names: {df.columns.tolist()}")
        print(f"\nData Types:\n{df.dtypes}")
        print(f"\nMissing Values:\n{df.isnull().sum()}")
        print(f"\nClass Distribution:\n{df['Class'].value_counts()}")
        print(f"\nClass Percentage:\n{df['Class'].value_counts(normalize=True) * 100}")

        return df

    def engineer_features(self, df):
        """
        Transform raw data into behavioral features for fraud detection
        """
        print("\n[2] Engineering Behavioral Features...")

        # Make a copy to avoid modifying original
        df = df.copy()

        # --- Data Cleaning ---
        print("   ‚Üí Cleaning data...")
        # Handle missing values in 'To' address (contract creation transactions)
        df['To'] = df['To'].fillna('CONTRACT_CREATION')

        # Handle missing 'Input' (simple ETH transfers have empty input)
        df['Input'] = df['Input'].fillna('0x')

        # Convert timestamp to datetime
        df['TimeStamp'] = pd.to_datetime(df['TimeStamp'], unit='s')

        # Sort by timestamp for temporal features
        df = df.sort_values('TimeStamp').reset_index(drop=True)

        # --- Feature 1: Value Binning (Dust Detection) ---
        print("   ‚Üí Creating dust transaction flag...")
        df['is_dust'] = (df['Value'] < 0.001).astype(int)

        # Additional value features
        df['value_log'] = np.log1p(df['Value'])  # Log transform for skewed distribution
        df['is_zero_value'] = (df['Value'] == 0).astype(int)

        # --- Feature 2: Address Frequency (Rolling Window) ---
        print("   ‚Üí Calculating address frequencies...")

        # Calculate transactions per address (frequency encoding)
        from_counts = df['From'].value_counts()
        to_counts = df['To'].value_counts()

        df['from_address_freq'] = df['From'].map(from_counts)
        df['to_address_freq'] = df['To'].map(to_counts)

        # Store for later use
        self.address_freq_from = from_counts.to_dict()
        self.address_freq_to = to_counts.to_dict()

        # --- Feature 3: Time Delta ---
        print("   ‚Üí Computing time deltas between transactions...")

        # Calculate time difference from previous transaction (same sender)
        df['time_delta_seconds'] = df.groupby('From')['TimeStamp'].diff().dt.total_seconds()
        df['time_delta_seconds'] = df['time_delta_seconds'].fillna(0)

        # Time-based features
        df['hour_of_day'] = df['TimeStamp'].dt.hour
        df['day_of_week'] = df['TimeStamp'].dt.dayofweek
        df['is_weekend'] = (df['day_of_week'] >= 5).astype(int)

        # --- Feature 4: Address Interaction (New vs Recurring) ---
        print("   ‚Üí Identifying new vs recurring address interactions...")

        # Track if recipient address is new for this sender
        df['is_new_recipient'] = 0

        for idx, row in df.iterrows():
            from_addr = row['From']
            to_addr = row['To']

            # Check if this 'To' address has been seen before by this 'From' address
            if to_addr not in self.to_address_seen[from_addr]:
                df.at[idx, 'is_new_recipient'] = 1
                self.to_address_seen[from_addr].add(to_addr)

        # --- Feature 5: Transaction Burst Detection ---
        print("   ‚Üí Detecting transaction bursts...")

        # Count transactions from same address within 1 hour window
        df['tx_count_1h'] = df.groupby('From')['TimeStamp'].transform(
            lambda x: x.rolling('1H', on=x).count()
        )

        # --- Feature 6: Contract Interaction Features ---
        print("   ‚Üí Analyzing contract interactions...")

        df['has_contract_address'] = (df['ContractAddress'].notna()).astype(int)
        df['has_input_data'] = (df['Input'] != '0x').astype(int)
        df['input_length'] = df['Input'].str.len()

        # --- Feature 7: Target Encoding for High Cardinality ---
        print("   ‚Üí Applying target encoding for addresses...")

        # Target encoding for 'From' addresses
        from_target_mean = df.groupby('From')['Class'].mean()
        df['from_fraud_rate'] = df['From'].map(from_target_mean)

        # Target encoding for 'To' addresses
        to_target_mean = df.groupby('To')['Class'].mean()
        df['to_fraud_rate'] = df['To'].map(to_target_mean)

        # Fill NaN for new addresses
        df['from_fraud_rate'] = df['from_fraud_rate'].fillna(df['Class'].mean())
        df['to_fraud_rate'] = df['to_fraud_rate'].fillna(df['Class'].mean())

        # --- Feature 8: Statistical Features ---
        print("   ‚Üí Creating statistical aggregations...")

        # Average value sent by this address
        from_value_mean = df.groupby('From')['Value'].transform('mean')
        df['from_avg_value'] = from_value_mean

        # Deviation from average
        df['value_deviation'] = abs(df['Value'] - df['from_avg_value'])

        print(f"\n   ‚úì Feature engineering complete! Total features: {df.shape[1]}")

        return df

In [None]:
# ============================================================================
# PHASE B: PREPROCESSING
# ============================================================================

class DataPreprocessor:
    """
    Preprocessing pipeline including scaling and imbalance handling
    """

    def __init__(self):
        self.scaler = StandardScaler()
        self.feature_columns = None

    def prepare_features(self, df):
        """
        Select and prepare features for modeling
        """
        print("\n[3] Preparing Features for Modeling...")

        # Define feature columns (exclude non-numeric and target)
        exclude_cols = ['TxHash', 'BlockHeight', 'TimeStamp', 'From', 'To',
                       'ContractAddress', 'Input', 'Class']

        self.feature_columns = [col for col in df.columns if col not in exclude_cols]

        print(f"   Selected Features ({len(self.feature_columns)}):")
        for feat in self.feature_columns:
            print(f"      - {feat}")

        X = df[self.feature_columns]
        y = df['Class']

        # Handle any remaining NaN values
        X = X.fillna(0)

        return X, y

    def split_and_scale(self, X, y):
        """
        Split data and apply scaling
        """
        print("\n[4] Splitting Data (80/20 Stratified Split)...")

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=RANDOM_STATE, stratify=y
        )

        print(f"   Training set: {X_train.shape[0]} samples")
        print(f"   Test set: {X_test.shape[0]} samples")
        print(f"   Train class distribution:\n{y_train.value_counts()}")

        # Scale features
        print("\n[5] Scaling Features...")
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)

        # Convert back to DataFrame for easier handling
        X_train_scaled = pd.DataFrame(X_train_scaled, columns=self.feature_columns)
        X_test_scaled = pd.DataFrame(X_test_scaled, columns=self.feature_columns)

        return X_train_scaled, X_test_scaled, y_train, y_test

    def handle_imbalance(self, X_train, y_train):
        """
        Apply SMOTE to handle class imbalance
        """
        print("\n[6] Handling Class Imbalance with SMOTE...")

        smote = SMOTE(random_state=RANDOM_STATE)
        X_train_balanced, y_train_balanced = smote.fit_resample(X_train, y_train)

        print(f"   Original training samples: {len(y_train)}")
        print(f"   After SMOTE: {len(y_train_balanced)}")
        print(f"   Balanced class distribution:\n{pd.Series(y_train_balanced).value_counts()}")

        return X_train_balanced, y_train_balanced

In [None]:
# ============================================================================
# PHASE C: MODEL TRAINING & EVALUATION
# ============================================================================

class FraudDetectionModel:
    """
    XGBoost-based fraud detection model with comprehensive evaluation
    """

    def __init__(self):
        self.model = None
        self.feature_importance = None

    def train_model(self, X_train, y_train):
        """
        Train XGBoost classifier optimized for precision-recall
        """
        print("\n[7] Training XGBoost Model...")

        # Calculate scale_pos_weight for imbalance (alternative to SMOTE)
        scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum()

        # XGBoost parameters optimized for fraud detection
        params = {
            'max_depth': 6,
            'learning_rate': 0.1,
            'n_estimators': 200,
            'objective': 'binary:logistic',
            'eval_metric': 'aucpr',  # Precision-Recall AUC
            'scale_pos_weight': 1,  # Using SMOTE, so set to 1
            'subsample': 0.8,
            'colsample_bytree': 0.8,
            'random_state': RANDOM_STATE,
            'tree_method': 'hist',
            'enable_categorical': False
        }

        self.model = xgb.XGBClassifier(**params)

        # Train model
        self.model.fit(
            X_train, y_train,
            verbose=False
        )

        print("   ‚úì Model training complete!")

        return self.model

    def evaluate_model(self, X_test, y_test):
        """
        Comprehensive model evaluation with focus on precision-recall
        """
        print("\n[8] Evaluating Model Performance...")

        # Predictions
        y_pred = self.model.predict(X_test)
        y_pred_proba = self.model.predict_proba(X_test)[:, 1]

        # Calculate metrics
        precision = precision_score(y_test, y_pred)
        recall = recall_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred)
        roc_auc = roc_auc_score(y_test, y_pred_proba)

        # Precision-Recall AUC
        precision_curve, recall_curve, _ = precision_recall_curve(y_test, y_pred_proba)
        pr_auc = auc(recall_curve, precision_curve)

        print("\n" + "=" * 60)
        print("MODEL EVALUATION METRICS")
        print("=" * 60)
        print(f"Precision:           {precision:.4f}  (Minimize False Positives)")
        print(f"Recall:              {recall:.4f}  (Catch High-Value Thefts)")
        print(f"F1-Score:            {f1:.4f}  (Harmonic Mean)")
        print(f"ROC-AUC:             {roc_auc:.4f}")
        print(f"Precision-Recall AUC: {pr_auc:.4f}  ‚≠ê PRIMARY METRIC")
        print("=" * 60)

        # Detailed classification report
        print("\nDetailed Classification Report:")
        print(classification_report(y_test, y_pred,
                                   target_names=['Normal (0)', 'Phishing (1)']))

        # Confusion Matrix
        cm = confusion_matrix(y_test, y_pred)
        print("\nConfusion Matrix:")
        print(f"                Predicted Normal  Predicted Phishing")
        print(f"Actual Normal        {cm[0,0]:6d}         {cm[0,1]:6d}")
        print(f"Actual Phishing      {cm[1,0]:6d}         {cm[1,1]:6d}")

        return {
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'roc_auc': roc_auc,
            'pr_auc': pr_auc,
            'y_pred': y_pred,
            'y_pred_proba': y_pred_proba
        }

    def plot_feature_importance(self, feature_names, top_n=15):
        """
        Visualize feature importance to identify fraud indicators
        """
        print("\n[9] Analyzing Feature Importance...")

        # Get feature importance
        importance = self.model.feature_importances_
        feature_importance_df = pd.DataFrame({
            'Feature': feature_names,
            'Importance': importance
        }).sort_values('Importance', ascending=False)

        print("\nTop 10 Most Important Features:")
        for idx, row in feature_importance_df.head(10).iterrows():
            print(f"   {row['Feature']:30s}: {row['Importance']:.4f}")

        # Plot
        plt.figure(figsize=(10, 8))
        top_features = feature_importance_df.head(top_n)
        plt.barh(range(len(top_features)), top_features['Importance'])
        plt.yticks(range(len(top_features)), top_features['Feature'])
        plt.xlabel('Importance Score', fontsize=12)
        plt.ylabel('Feature', fontsize=12)
        plt.title('Top Feature Importance for Fraud Detection', fontsize=14, fontweight='bold')
        plt.gca().invert_yaxis()
        plt.tight_layout()
        plt.savefig('feature_importance.png', dpi=300, bbox_inches='tight')
        print("\n   ‚úì Feature importance plot saved as 'feature_importance.png'")

        return feature_importance_df

In [None]:
# ============================================================================
# PHASE D: REAL-TIME FRAUD DETECTION & ALERTING
# ============================================================================

class SocialEngineeringDetector:
    """
    Real-time detection system for address poisoning attacks
    """

    def __init__(self, model, preprocessor, feature_engineer):
        self.model = model
        self.preprocessor = preprocessor
        self.feature_engineer = feature_engineer
        self.alert_threshold = 0.7  # Probability threshold for alerts

    def predict_transaction(self, transaction_data):
        """
        Predict if a single transaction is fraudulent

        Args:
            transaction_data: Dict with keys matching original dataset columns
        """
        # Convert to DataFrame
        tx_df = pd.DataFrame([transaction_data])

        # Apply same feature engineering
        tx_df['TimeStamp'] = pd.to_datetime(tx_df['TimeStamp'], unit='s')
        tx_df['is_dust'] = (tx_df['Value'] < 0.001).astype(int)
        tx_df['value_log'] = np.log1p(tx_df['Value'])
        tx_df['is_zero_value'] = (tx_df['Value'] == 0).astype(int)

        # Apply stored frequency encodings
        tx_df['from_address_freq'] = tx_df['From'].map(
            self.feature_engineer.address_freq_from).fillna(1)
        tx_df['to_address_freq'] = tx_df['To'].map(
            self.feature_engineer.address_freq_to).fillna(1)

        # Extract features that model expects
        features = []
        for col in self.preprocessor.feature_columns:
            if col in tx_df.columns:
                features.append(tx_df[col].values[0])
            else:
                features.append(0)  # Default value for missing features

        # Scale features
        features_scaled = self.preprocessor.scaler.transform([features])

        # Predict
        prediction = self.model.predict(features_scaled)[0]
        probability = self.model.predict_proba(features_scaled)[0, 1]

        return prediction, probability

    def generate_alert(self, transaction_data, probability):
        """
        Generate detailed security alert for suspicious transactions
        """
        print("\n" + "üö®" * 40)
        print("‚ö†Ô∏è  SECURITY ALERT: POTENTIAL ADDRESS POISONING DETECTED")
        print("üö®" * 40)
        print(f"\nüìä Fraud Probability: {probability:.2%}")
        print(f"\nüìù Transaction Details:")
        print(f"   Transaction Hash: {transaction_data.get('TxHash', 'N/A')}")
        print(f"   From Address:     {transaction_data.get('From', 'N/A')}")
        print(f"   To Address:       {transaction_data.get('To', 'N/A')}")
        print(f"   Value:            {transaction_data.get('Value', 0)} ETH")
        print(f"   Timestamp:        {datetime.fromtimestamp(transaction_data.get('TimeStamp', 0))}")

        print(f"\n‚ö†Ô∏è  WARNING INDICATORS:")

        if transaction_data.get('Value', 0) < 0.001:
            print(f"   üî∏ DUST TRANSACTION: Value ({transaction_data.get('Value', 0)} ETH) is extremely low")
            print(f"      ‚Üí This is a common tactic in address poisoning attacks")

        print(f"   üî∏ SUSPICIOUS PATTERN: Transaction matches known fraud signatures")
        print(f"   üî∏ RECOMMENDATION: Verify recipient address carefully before sending funds")

        print(f"\nüõ°Ô∏è  PROTECTION ADVICE:")
        print(f"   1. Always verify the FULL address, not just first/last characters")
        print(f"   2. Use address book features instead of copying from transaction history")
        print(f"   3. Consider this address as HIGH RISK")
        print(f"   4. Double-check destination address on your hardware wallet")

        print("\n" + "üö®" * 40 + "\n")

    def monitor_transactions(self, transactions_df, sample_size=5):
        """
        Demonstrate real-time monitoring on sample transactions
        """
        print("\n[10] Demonstrating Real-Time Transaction Monitoring...")
        print("=" * 80)

        # Sample random transactions
        sample = transactions_df.sample(n=min(sample_size, len(transactions_df)))

        alerts_triggered = 0

        for idx, row in sample.iterrows():
            tx_data = row.to_dict()
            prediction, probability = self.predict_transaction(tx_data)

            if probability >= self.alert_threshold:
                self.generate_alert(tx_data, probability)
                alerts_triggered += 1
            else:
                print(f"‚úÖ Transaction {tx_data.get('TxHash', 'N/A')[:10]}... appears SAFE "
                      f"(Fraud probability: {probability:.2%})")

        print(f"\nüìà Monitoring Summary: {alerts_triggered}/{sample_size} alerts triggered")

In [None]:
# ============================================================================
# MAIN EXECUTION PIPELINE
# ============================================================================

def main():
    """
    Execute complete fraud detection pipeline
    """

    CSV_FILE = '/content/1st dataset - balanced.csv'

    try:
        # Initialize components
        feature_engineer = EthereumFeatureEngineering()
        preprocessor = DataPreprocessor()
        fraud_model = FraudDetectionModel()

        # Load and engineer features
        df = feature_engineer.load_and_analyze_data(CSV_FILE)
        df_engineered = feature_engineer.engineer_features(df)

        # Prepare features
        X, y = preprocessor.prepare_features(df_engineered)
        X_train, X_test, y_train, y_test = preprocessor.split_and_scale(X, y)

        # Handle imbalance
        X_train_balanced, y_train_balanced = preprocessor.handle_imbalance(X_train, y_train)

        # Train model
        model = fraud_model.train_model(X_train_balanced, y_train_balanced)

        # Evaluate
        metrics = fraud_model.evaluate_model(X_test, y_test)

        # Feature importance
        feature_importance = fraud_model.plot_feature_importance(preprocessor.feature_columns)

        # Real-time detection demo
        detector = SocialEngineeringDetector(model, preprocessor, feature_engineer)
        detector.monitor_transactions(df_engineered, sample_size=3)

        print("\n" + "=" * 80)
        print("‚úÖ PIPELINE EXECUTION COMPLETE")
        print("=" * 80)
        print(f"\nüìä Final Model Performance Summary:")
        print(f"   ‚Ä¢ Precision-Recall AUC: {metrics['pr_auc']:.4f}")
        print(f"   ‚Ä¢ F1-Score: {metrics['f1']:.4f}")
        print(f"   ‚Ä¢ Precision: {metrics['precision']:.4f}")
        print(f"   ‚Ä¢ Recall: {metrics['recall']:.4f}")
        print(f"\nüíæ Model and visualizations saved successfully!")

    except FileNotFoundError:
        print(f"\n‚ùå ERROR: Could not find '{CSV_FILE}'")
        print("Please ensure the CSV file is in the same directory as this script.")
        print("Or update the CSV_FILE variable with the correct path.")
    except Exception as e:
        print(f"\n‚ùå ERROR: {str(e)}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    main()


[1] Loading Dataset...
Dataset Shape: (95300, 9)

Column Names: ['TxHash', 'BlockHeight', ' TimeStamp', 'From', 'To', 'Value', 'ContractAddress', 'Input', 'Class']

Data Types:
TxHash              object
BlockHeight          int64
 TimeStamp           int64
From                object
To                  object
Value              float64
ContractAddress     object
Input               object
Class              float64
dtype: object

Missing Values:
TxHash                 0
BlockHeight            0
 TimeStamp             0
From                   0
To                   110
Value                  0
ContractAddress    95190
Input               3643
Class                  1
dtype: int64

Class Distribution:
Class
0.0    79216
1.0    16083
Name: count, dtype: int64

Class Percentage:
Class
0.0    83.123642
1.0    16.876358
Name: proportion, dtype: float64

[2] Engineering Behavioral Features...
   ‚Üí Cleaning data...

‚ùå ERROR: 'TimeStamp'


Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/pandas/core/indexes/base.py", line 3805, in get_loc
    return self._engine.get_loc(casted_key)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "index.pyx", line 167, in pandas._libs.index.IndexEngine.get_loc
  File "index.pyx", line 196, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 7081, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 7089, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'TimeStamp'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/tmp/ipython-input-3701241975.py", line 20, in main
    df_engineered = feature_engineer.engineer_features(df)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipython-input-2617071249.py", line 49, in engineer_features
    df['TimeStamp'] = pd.t