# üõ°Ô∏è Mosaic Protocol - Smart Contract Vulnerability Classifier

This notebook trains an XGBoost classifier to detect vulnerable smart contracts.

**Dataset:** 37,000+ labeled contracts (safe vs exploit)
**Model:** XGBoost with class imbalance handling
**Output:** ONNX model for Node.js integration

---

## 1Ô∏è‚É£ Setup & Mount Google Drive

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Create output directory
import os
OUTPUT_DIR = '/content/drive/MyDrive/mosaic-ml'
os.makedirs(OUTPUT_DIR, exist_ok=True)
print(f"‚úÖ Output directory: {OUTPUT_DIR}")

In [None]:
# Install dependencies (including onnxmltools for XGBoost ONNX export)
!pip install xgboost scikit-learn onnxmltools onnx onnxruntime matplotlib seaborn --quiet
print("‚úÖ Dependencies installed")

## 2Ô∏è‚É£ Load Training Data

In [None]:
import json
import gzip
import numpy as np
from pathlib import Path

# Find the training data file
DATA_DIR = '/content/drive/MyDrive/mosaic-ml'

# Look for .json.gz or .json files
data_files = list(Path(DATA_DIR).glob('training_data_*.json*'))
if not data_files:
    raise FileNotFoundError(f"No training data found in {DATA_DIR}. Please upload training_data_*.json.gz")

data_file = sorted(data_files)[-1]  # Use most recent
print(f"üìÇ Loading: {data_file.name}")

# Load data (handle both gzip and plain JSON)
if str(data_file).endswith('.gz'):
    with gzip.open(data_file, 'rt', encoding='utf-8') as f:
        data = json.load(f)
else:
    with open(data_file, 'r') as f:
        data = json.load(f)

# Display metadata
print(f"""\nüìä Dataset Summary:
   Total samples: {data['metadata']['totalSamples']:,}
   Features: {data['metadata']['featureCount']}
   Safe: {data['metadata']['labelDistribution']['safe']:,}
   Exploit: {data['metadata']['labelDistribution']['exploit']:,}
""")

In [None]:
# Convert to numpy arrays
def to_arrays(samples):
    X = np.array([s['features'] for s in samples], dtype=np.float32)
    y = np.array([s['label'] for s in samples], dtype=np.int32)
    return X, y

X_train, y_train = to_arrays(data['train'])
X_val, y_val = to_arrays(data['validation'])
X_test, y_test = to_arrays(data['test'])

print(f"""üìê Array shapes:
   Train: {X_train.shape} ({(y_train==0).sum()} safe, {(y_train==1).sum()} exploit)
   Val:   {X_val.shape} ({(y_val==0).sum()} safe, {(y_val==1).sum()} exploit)
   Test:  {X_test.shape} ({(y_test==0).sum()} safe, {(y_test==1).sum()} exploit)
""")

# Feature names
feature_names = data['metadata']['featureNames']
print(f"Feature names ({len(feature_names)}): {feature_names[:5]}...")

## 3Ô∏è‚É£ Train XGBoost Model

In [None]:
from xgboost import XGBClassifier
from sklearn.metrics import classification_report, confusion_matrix, precision_recall_curve, roc_auc_score
import matplotlib.pyplot as plt
import seaborn as sns

# Calculate class weight for imbalance handling
# Higher weight on exploit class to reduce false negatives
scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum()
print(f"‚öñÔ∏è Class imbalance ratio: {scale_pos_weight:.2f}:1 (safe:exploit)")
print(f"   Using scale_pos_weight={scale_pos_weight:.2f} to boost exploit detection")

# Create and train model
model = XGBClassifier(
    n_estimators=200,
    max_depth=6,
    learning_rate=0.1,
    scale_pos_weight=scale_pos_weight,  # Handle class imbalance
    max_delta_step=1,  # Stabilize with imbalanced data
    eval_metric='aucpr',  # Precision-Recall AUC
    use_label_encoder=False,
    random_state=42,
    verbosity=1
)

print("\nüöÄ Training XGBoost classifier...")
model.fit(
    X_train, y_train,
    eval_set=[(X_val, y_val)],
    verbose=20  # Print every 20 iterations
)
print("‚úÖ Training complete!")

## 4Ô∏è‚É£ Evaluate Model

In [None]:
# Get predictions
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]

# Classification report
print("üìä Classification Report (default threshold=0.5):")
print(classification_report(y_test, y_pred, target_names=['Safe', 'Exploit']))

# ROC-AUC
roc_auc = roc_auc_score(y_test, y_pred_proba)
print(f"üéØ ROC-AUC Score: {roc_auc:.4f}")

In [None]:
# Confusion Matrix
plt.figure(figsize=(8, 6))
cm = confusion_matrix(y_test, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Safe', 'Exploit'],
            yticklabels=['Safe', 'Exploit'])
plt.title('Confusion Matrix')
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.tight_layout()
plt.savefig(f"{OUTPUT_DIR}/confusion_matrix.png")
plt.show()
print(f"üíæ Saved: {OUTPUT_DIR}/confusion_matrix.png")

## 5Ô∏è‚É£ Optimize Threshold for High Recall

In [None]:
# For security classifiers, we want HIGH RECALL on exploits
precision, recall, thresholds = precision_recall_curve(y_test, y_pred_proba)

# Find threshold for 95% recall
target_recall = 0.95
idx = np.where(recall >= target_recall)[0]
if len(idx) > 0:
    best_idx = idx[-1]
    optimal_threshold = thresholds[best_idx] if best_idx < len(thresholds) else 0.5
    print(f"üéØ For {target_recall:.0%} recall on exploits:")
    print(f"   Optimal threshold: {optimal_threshold:.3f}")
    print(f"   Precision at this threshold: {precision[best_idx]:.3f}")
else:
    optimal_threshold = 0.3
    best_idx = 0
    print(f"‚ö†Ô∏è Using default threshold: {optimal_threshold}")

# Apply optimized threshold
y_pred_optimized = (y_pred_proba >= optimal_threshold).astype(int)
print("\nüìä Classification Report (optimized threshold):")
print(classification_report(y_test, y_pred_optimized, target_names=['Safe', 'Exploit']))

In [None]:
# Plot Precision-Recall curve
plt.figure(figsize=(10, 6))
plt.plot(recall, precision, 'b-', linewidth=2)
plt.axvline(x=target_recall, color='r', linestyle='--', label=f'Target recall ({target_recall:.0%})')
plt.xlabel('Recall (Sensitivity)')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve for Exploit Detection')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(f"{OUTPUT_DIR}/precision_recall_curve.png")
plt.show()

## 6Ô∏è‚É£ Feature Importance

In [None]:
# Plot feature importance
importances = model.feature_importances_
indices = np.argsort(importances)[::-1][:15]

plt.figure(figsize=(12, 6))
plt.bar(range(15), importances[indices])
plt.xticks(range(15), [feature_names[i] for i in indices], rotation=45, ha='right')
plt.title('Top 15 Most Important Features')
plt.tight_layout()
plt.savefig(f"{OUTPUT_DIR}/feature_importance.png")
plt.show()

print("\nüîù Top 10 Features:")
for i, idx_feat in enumerate(indices[:10]):
    print(f"   {i+1}. {feature_names[idx_feat]}: {importances[idx_feat]:.4f}")

## 7Ô∏è‚É£ Export Model to ONNX

In [None]:
# Use onnxmltools for XGBoost (sklearn-onnx doesn't support XGBoost directly)
from onnxmltools import convert_xgboost
from onnxmltools.convert.common.data_types import FloatTensorType
import onnx

# Define input type
num_features = X_train.shape[1]
initial_type = [('features', FloatTensorType([None, num_features]))]

# Convert XGBoost to ONNX
print("üîÑ Converting XGBoost model to ONNX format...")
onnx_model = convert_xgboost(
    model,
    initial_types=initial_type,
    target_opset=12
)

# Save ONNX model
onnx_path = f"{OUTPUT_DIR}/vulnerability_classifier.onnx"
onnx.save_model(onnx_model, onnx_path)

onnx_size = os.path.getsize(onnx_path) / 1024 / 1024
print(f"‚úÖ Saved: {onnx_path} ({onnx_size:.2f} MB)")

In [None]:
# Verify ONNX model works
import onnxruntime as ort

print("üîç Verifying ONNX model...")
session = ort.InferenceSession(onnx_path)

# Get input name from model
input_name = session.get_inputs()[0].name
print(f"   Input name: {input_name}")

# Test prediction
test_input = X_test[:5]
onnx_pred = session.run(None, {input_name: test_input})

print(f"   Input shape: {test_input.shape}")
print(f"   Output predictions: {onnx_pred[0][:5]}")
print(f"   Expected labels: {y_test[:5]}")
print("‚úÖ ONNX model verification passed!")

## 8Ô∏è‚É£ Save Model Metadata

In [None]:
# Save model metadata for Node.js integration
model_metadata = {
    'model_name': 'vulnerability_classifier',
    'model_type': 'XGBoost',
    'created_at': str(np.datetime64('now')),
    'num_features': int(num_features),
    'feature_names': feature_names,
    'classes': ['safe', 'exploit'],
    'optimal_threshold': float(optimal_threshold),
    'input_name': input_name,
    'metrics': {
        'roc_auc': float(roc_auc),
        'accuracy': float((y_pred == y_test).mean()),
        'recall_exploit': float(recall[best_idx]) if len(idx) > 0 else 0,
        'precision_exploit': float(precision[best_idx]) if len(idx) > 0 else 0,
    },
    'training_samples': int(len(y_train)),
    'class_distribution': {
        'safe': int((y_train == 0).sum()),
        'exploit': int((y_train == 1).sum()),
    }
}

metadata_path = f"{OUTPUT_DIR}/model_metadata.json"
with open(metadata_path, 'w') as f:
    json.dump(model_metadata, f, indent=2)

print(f"‚úÖ Saved: {metadata_path}")
print("\nüìã Model Metadata:")
for k, v in model_metadata.items():
    if k not in ['feature_names']:
        print(f"   {k}: {v}")

## ‚úÖ Training Complete!

**Files saved to Google Drive:**
1. `vulnerability_classifier.onnx` - The trained model
2. `model_metadata.json` - Model configuration and metrics
3. `confusion_matrix.png` - Evaluation visualization
4. `precision_recall_curve.png` - Threshold analysis
5. `feature_importance.png` - Feature ranking

**Next Steps:**
1. Download `vulnerability_classifier.onnx` and `model_metadata.json`
2. Place in `backend/src/agents/defi-safety/ml/models/`
3. The inference service will load and use the model

In [None]:
# List all output files
print("\nüìÅ Output files in Google Drive:")
for f in os.listdir(OUTPUT_DIR):
    size = os.path.getsize(f"{OUTPUT_DIR}/{f}") / 1024
    print(f"   {f} ({size:.1f} KB)")