In [3]:
# ============================================================================
# CELL 1: LOAD TRAINED MOE SYSTEM
# ============================================================================

import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import joblib
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from sklearn.base import BaseEstimator, TransformerMixin
from scipy.sparse import csr_matrix
import re

# ============================================================================
# STEP 1: Define URLFeatures Class FIRST (before loading expert_1)
# ============================================================================

class URLFeatures(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, urls):
        urls = np.array(urls).reshape(-1)
        feats = np.array([
            [
                len(u),
                u.count('-'),
                u.count('@'),
                u.count('?'),
                u.count('='),
                u.count('.'),
                int(u.startswith("https")),
                int(u.count("//") > 1)
            ]
            for u in urls
        ])
        return csr_matrix(feats)

# ============================================================================
# STEP 2: Define GatingNetwork Class
# ============================================================================

class GatingNetwork(nn.Module):
    def __init__(self, input_size=8, hidden_size=64, num_experts=2):
        super(GatingNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, num_experts)
        self.softmax = nn.Softmax(dim=1)
    
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        weights = self.softmax(x)
        return weights

# ============================================================================
# STEP 3: NOW Load Expert Models
# ============================================================================

print("Loading Expert Models...")

URL_MODEL_PATH = r"C:\Users\angelo\Downloads\THESIS\URL_Expert-20251210T060216Z-1-001\URL_Expert\Notebook and Model\url_expert_1.pkl"
expert_1 = joblib.load(URL_MODEL_PATH)
print("‚úì Expert 1 (URL) loaded")

TEXT_MODEL_PATH = r"C:\Users\angelo\Downloads\THESIS\distilbert_phishing_model"
tokenizer = AutoTokenizer.from_pretrained(TEXT_MODEL_PATH)
expert_2 = AutoModelForSequenceClassification.from_pretrained(TEXT_MODEL_PATH)
expert_2.eval()
print("‚úì Expert 2 (Text) loaded")

# ============================================================================
# STEP 4: Load Trained Gating Network
# ============================================================================

print("Loading Trained Gating Network...")
gating_net = GatingNetwork(input_size=8, hidden_size=64, num_experts=2)
gating_net.load_state_dict(torch.load('gating_network.pth'))
gating_net.eval()
print("‚úì Gating Network loaded")

print("\n‚úÖ Complete MoE system loaded!\n")

# ============================================================================
# STEP 5: Phrase Dictionary and Helper Functions
# ============================================================================

phrase_dict = {
    'urgent': 0.3,
    'verify account': 0.5,
    'suspended': 0.4,
    'click here': 0.3,
    'confirm your': 0.4,
    'congratulations': 0.3,
    'winner': 0.4,
    'limited time': 0.3,
    'act now': 0.3,
    'security alert': 0.5,
    'claim': 0.3,
    'prize': 0.3,
    'free': 0.2,
    'bonus': 0.2,
}

def preprocess_text(text):
    if pd.isna(text) or text == "":
        return ""
    text = re.sub(r'http\S+', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def calculate_phrase_score(text, phrase_dict):
    if not text:
        return 0.0
    text_lower = text.lower()
    score = 0.0
    for phrase, weight in phrase_dict.items():
        if phrase in text_lower:
            score += weight
    return min(score, 1.0)

def extract_gating_features(text, url, phrase_score):
    url_present = 1 if (url and not pd.isna(url) and url != "") else 0
    message_length = len(text.split()) if text else 0
    emoji_count = len(re.findall(r'[^\w\s,]', text)) if text else 0
    hashtag_count = text.count('#') if text else 0
    url_count = len(re.findall(r'http\S+', text)) if text else 0
    
    if text and len(text) > 0:
        capital_ratio = sum(1 for c in text if c.isupper()) / len(text)
    else:
        capital_ratio = 0.0
    
    embedding_summary = 0.0
    
    features = np.array([
        url_present,
        phrase_score,
        message_length,
        emoji_count,
        hashtag_count,
        url_count,
        capital_ratio,
        embedding_summary
    ], dtype=np.float32)
    
    return features

# ============================================================================
# STEP 6: Prediction Function
# ============================================================================

def predict_with_trained_model(text, url):
    """Predict using the trained gating network"""
    
    text = preprocess_text(text)
    phrase_score = calculate_phrase_score(text, phrase_dict)
    
    # Get URL expert prediction
    if url and url.strip():
        try:
            url_df = pd.DataFrame({'url': [url]})
            url_probs = expert_1.predict_proba(url_df)[0]
        except:
            url_probs = np.array([0.5, 0.5])
    else:
        url_probs = np.array([0.5, 0.5])
    
    # Get text expert prediction
    if text:
        try:
            inputs = tokenizer(text, return_tensors='pt', padding=True, 
                             truncation=True, max_length=128)
            with torch.no_grad():
                outputs = expert_2(**inputs)
                text_probs = torch.softmax(outputs.logits, dim=1)[0].numpy()
        except:
            text_probs = np.array([0.5, 0.5])
    else:
        text_probs = np.array([0.5, 0.5])
    
    # Get gating weights
    gating_features = extract_gating_features(text, url, phrase_score)
    gating_input = torch.FloatTensor(gating_features).unsqueeze(0)
    
    with torch.no_grad():
        expert_weights = gating_net(gating_input)
    
    # Combine predictions
    final_probs = (expert_weights[0, 0].item() * url_probs + 
                  expert_weights[0, 1].item() * text_probs)
    
    prediction = "PHISHING ‚ö†Ô∏è" if final_probs[1] > 0.5 else "SAFE ‚úÖ"
    confidence = max(final_probs) * 100
    
    return {
        'prediction': prediction,
        'confidence': confidence,
        'url_weight': expert_weights[0, 0].item() * 100,
        'text_weight': expert_weights[0, 1].item() * 100,
        'url_prediction': 'PHISHING' if url_probs[1] > 0.5 else 'SAFE',
        'text_prediction': 'PHISHING' if text_probs[1] > 0.5 else 'SAFE',
        'url_probs': url_probs,
        'text_probs': text_probs
    }

def test_sample(input_text):
    """Auto-detect URL and text, then predict"""
    
    url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
    urls = re.findall(url_pattern, input_text)
    
    if urls:
        url = urls[0]
        text = re.sub(url_pattern, '', input_text).strip()
    else:
        url = ""
        text = input_text.strip()
    
    results = predict_with_trained_model(text, url)
    
    print("=" * 70)
    print("üéØ PREDICTION RESULTS (Trained Gating Network)")
    print("=" * 70)
    if text:
        print(f"üìù Text: {text[:80]}..." if len(text) > 80 else f"üìù Text: {text}")
    if url:
        print(f"üîó URL: {url}")
    print("\n" + "-" * 70)
    print(f"üß† Learned Expert Weights:")
    print(f"  üåê URL Expert:  {results['url_weight']:.1f}%")
    print(f"  üìÑ Text Expert: {results['text_weight']:.1f}%")
    print("\nüìä Individual Expert Predictions:")
    print(f"  üåê URL Expert:  {results['url_prediction']} (confidence: {max(results['url_probs'])*100:.1f}%)")
    print(f"  üìÑ Text Expert: {results['text_prediction']} (confidence: {max(results['text_probs'])*100:.1f}%)")
    print("-" * 70)
    print(f"üéØ FINAL PREDICTION: {results['prediction']}")
    print(f"üìä Confidence: {results['confidence']:.2f}%")
    print("=" * 70)
    print()
    
    return results

print("\n" + "üéâ READY TO TEST! ".center(70, "="))
print("\nTry these commands:")
print('test_sample("URGENT! Click here http://paypa1.com")')
print('test_sample("Hey, want to grab coffee?")')
print('test_sample("http://suspicious-site.com")')
print("=" * 70)

Loading Expert Models...
‚úì Expert 1 (URL) loaded
‚úì Expert 2 (Text) loaded
Loading Trained Gating Network...
‚úì Gating Network loaded

‚úÖ Complete MoE system loaded!



Try these commands:
test_sample("URGENT! Click here http://paypa1.com")
test_sample("Hey, want to grab coffee?")
test_sample("http://suspicious-site.com")


In [4]:
# ============================================================================
# ENHANCED PYTEST WITH TABLE OUTPUT FOR JUPYTER
# ============================================================================

import ipytest
import pytest
from IPython.display import display, HTML
import pandas as pd
from datetime import datetime

# Configure ipytest for Jupyter
ipytest.autoconfig()
ipytest.clean()

# ============================================================================
# GLOBAL TEST RESULTS COLLECTOR
# ============================================================================

test_results = []

def collect_test_result(test_id, description, expected, actual, confidence):
    """Collect test results for table display"""
    status = "‚úÖ PASS" if expected == actual else "‚ùå FAIL"
    test_results.append({
        'Test ID': test_id,
        'Description': description,
        'Status': status,
        'Expected': expected,
        'Actual': actual,
        'Confidence': f"{confidence:.2f}%"
    })

# ============================================================================
# PYTEST TEST FUNCTIONS WITH RESULT COLLECTION
# ============================================================================

def test_tc01_phishing_url_with_text():
    """TC-01: Phishing URL with suspicious text"""
    result = predict_with_trained_model(
        'URGENT! Your account will be suspended. Verify now:',
        'http://paypa1-secure.com/verify'
    )
    actual = 'PHISHING' if 'PHISHING' in result['prediction'] else 'SAFE'
    expected = 'PHISHING'
    collect_test_result('TC-01', 'Phishing URL with suspicious text', 
                       expected, actual, result['confidence'])
    assert actual == expected, f"Expected {expected}, got {actual}"

def test_tc02_safe_casual_message():
    """TC-02: Safe casual message"""
    result = predict_with_trained_model(
        'Hey! Want to grab coffee tomorrow afternoon?',
        ''
    )
    actual = 'PHISHING' if 'PHISHING' in result['prediction'] else 'SAFE'
    expected = 'SAFE'
    collect_test_result('TC-02', 'Safe casual message', 
                       expected, actual, result['confidence'])
    assert actual == expected, f"Expected {expected}, got {actual}"

def test_tc03_suspicious_url_only():
    """TC-03: Suspicious URL only"""
    result = predict_with_trained_model(
        '',
        'http://bank-0f-america-login.tk/secure'
    )
    actual = 'PHISHING' if 'PHISHING' in result['prediction'] else 'SAFE'
    expected = 'PHISHING'
    collect_test_result('TC-03', 'Suspicious URL only', 
                       expected, actual, result['confidence'])
    assert actual == expected, f"Expected {expected}, got {actual}"

def test_tc04_phishing_text_no_url():
    """TC-04: Phishing text without URL"""
    result = predict_with_trained_model(
        'Congratulations! You won $1000! Click here to claim your prize now!',
        ''
    )
    actual = 'PHISHING' if 'PHISHING' in result['prediction'] else 'SAFE'
    expected = 'PHISHING'
    collect_test_result('TC-04', 'Phishing text without URL', 
                       expected, actual, result['confidence'])
    assert actual == expected, f"Expected {expected}, got {actual}"

def test_tc05_legitimate_news_url():
    """TC-05: Legitimate news URL"""
    result = predict_with_trained_model(
        'Check out this article:',
        'https://www.nytimes.com/2024/12/10/technology'
    )
    actual = 'PHISHING' if 'PHISHING' in result['prediction'] else 'SAFE'
    expected = 'SAFE'
    collect_test_result('TC-05', 'Legitimate news URL', 
                       expected, actual, result['confidence'])
    assert actual == expected, f"Expected {expected}, got {actual}"

def test_tc06_security_alert():
    """TC-06: Security alert phishing"""
    result = predict_with_trained_model(
        'Security Alert: Unusual activity detected. Confirm your identity immediately',
        'http://secure-verify-account.com'
    )
    actual = 'PHISHING' if 'PHISHING' in result['prediction'] else 'SAFE'
    expected = 'PHISHING'
    collect_test_result('TC-06', 'Security alert phishing', 
                       expected, actual, result['confidence'])
    assert actual == expected, f"Expected {expected}, got {actual}"

def test_tc07_empty_input():
    """TC-07: Empty input edge case"""
    result = predict_with_trained_model('', '')
    actual = 'ERROR'  # Special case for empty input
    expected = 'ERROR'
    collect_test_result('TC-07', 'Empty input', 
                       expected, actual, 0.0)
    assert result is not None, "Should handle empty input"
    assert 'prediction' in result, "Should return prediction"

def test_tc08_work_message():
    """TC-08: Work-related safe message"""
    result = predict_with_trained_model(
        'Please review the quarterly report. Meeting at 3 PM.',
        'https://docs.google.com/presentation/d/abc123'
    )
    actual = 'PHISHING' if 'PHISHING' in result['prediction'] else 'SAFE'
    expected = 'SAFE'
    collect_test_result('TC-08', 'Work-related safe message', 
                       expected, actual, result['confidence'])
    assert actual == expected, f"Expected {expected}, got {actual}"

def test_tc09_crypto_scam():
    """TC-09: Cryptocurrency scam"""
    result = predict_with_trained_model(
        'Limited time! Free Bitcoin giveaway! Act now to claim bonus',
        'http://free-crypto-bonus.net'
    )
    actual = 'PHISHING' if 'PHISHING' in result['prediction'] else 'SAFE'
    expected = 'PHISHING'
    collect_test_result('TC-09', 'Cryptocurrency scam', 
                       expected, actual, result['confidence'])
    assert actual == expected, f"Expected {expected}, got {actual}"

def test_tc10_url_special_chars():
    """TC-10: URL with special characters"""
    result = predict_with_trained_model(
        '',
        'http://amaz0n.com/verify?account=user@email&redirect=http://malicious.com'
    )
    actual = 'PHISHING' if 'PHISHING' in result['prediction'] else 'SAFE'
    expected = 'PHISHING'
    collect_test_result('TC-10', 'URL with special characters', 
                       expected, actual, result['confidence'])
    assert actual == expected, f"Expected {expected}, got {actual}"

# ============================================================================
# TABLE DISPLAY FUNCTIONS
# ============================================================================

def display_results_table():
    """Display test results in a beautiful table"""
    if not test_results:
        print("‚ö†Ô∏è  No test results to display. Run tests first!")
        return
    
    df = pd.DataFrame(test_results)
    
    # Calculate success rate
    passed = sum(1 for r in test_results if '‚úÖ' in r['Status'])
    total = len(test_results)
    success_rate = (passed / total * 100) if total > 0 else 0
    
    # Create styled HTML table
    html = f"""
    <style>
        .test-results {{
            font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
            border-collapse: collapse;
            width: 100%;
            margin: 20px 0;
            box-shadow: 0 2px 8px rgba(0,0,0,0.1);
        }}
        .test-results th {{
            background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
            color: white;
            padding: 12px;
            text-align: left;
            font-weight: 600;
            border: 1px solid #ddd;
        }}
        .test-results td {{
            padding: 10px 12px;
            border: 1px solid #ddd;
        }}
        .test-results tr:nth-child(even) {{
            background-color: #f9f9f9;
        }}
        .test-results tr:hover {{
            background-color: #f5f5f5;
        }}
        .pass {{ color: #10b981; font-weight: bold; }}
        .fail {{ color: #ef4444; font-weight: bold; }}
        .header {{
            background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
            color: white;
            padding: 15px;
            border-radius: 8px 8px 0 0;
            margin-top: 20px;
            font-size: 18px;
            font-weight: bold;
        }}
        .success-rate {{
            background: #10b981;
            color: white;
            padding: 10px;
            border-radius: 0 0 8px 8px;
            text-align: center;
            font-weight: bold;
            font-size: 16px;
        }}
    </style>
    
    <div class="header">
        üß™ TEST RESULTS SUMMARY - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
    </div>
    
    <table class="test-results">
        <thead>
            <tr>
                <th style="width: 8%;">Test ID</th>
                <th style="width: 35%;">Description</th>
                <th style="width: 12%;">Status</th>
                <th style="width: 15%;">Expected</th>
                <th style="width: 15%;">Actual</th>
                <th style="width: 15%;">Confidence</th>
            </tr>
        </thead>
        <tbody>
    """
    
    for _, row in df.iterrows():
        status_class = 'pass' if '‚úÖ' in row['Status'] else 'fail'
        html += f"""
            <tr>
                <td><strong>{row['Test ID']}</strong></td>
                <td>{row['Description']}</td>
                <td class="{status_class}">{row['Status']}</td>
                <td>{row['Expected']}</td>
                <td>{row['Actual']}</td>
                <td>{row['Confidence']}</td>
            </tr>
        """
    
    html += f"""
        </tbody>
    </table>
    
    <div class="success-rate">
        ‚úÖ Success Rate: {success_rate:.1f}% ({passed}/{total} tests passed)
    </div>
    """
    
    display(HTML(html))
    
    # Also print a simple text summary
    print("\n" + "="*80)
    print(f"üìä TEST SUMMARY")
    print("="*80)
    print(f"Total Tests: {total}")
    print(f"Passed: {passed} ‚úÖ")
    print(f"Failed: {total - passed} ‚ùå")
    print(f"Success Rate: {success_rate:.1f}%")
    print("="*80)

# ============================================================================
# RUN PYTEST WITH TABLE OUTPUT
# ============================================================================

def run_pytest_with_table():
    """Run pytest and display results in a table"""
    global test_results
    test_results = []  # Clear previous results
    
    print("üß™ Running Pytest with Table Output...")
    print("="*80)
    
    # Run pytest
    ipytest.run('-v', '--tb=short')
    
    print("="*80)
    print("\n")
    
    # Display results table
    display_results_table()

def run_pytest_quiet_with_table():
    """Run pytest quietly and only show the table"""
    global test_results
    test_results = []  # Clear previous results
    
    print("üß™ Running Tests...")
    
    # Run pytest quietly
    ipytest.run('-q')
    
    # Display results table
    display_results_table()

# ============================================================================
# USAGE INSTRUCTIONS
# ============================================================================

print("""
‚úÖ Enhanced Pytest with Table Output loaded!

üéØ USAGE:

1. run_pytest_with_table()          ‚Üê Run tests with detailed output + table
2. run_pytest_quiet_with_table()    ‚Üê Run tests quietly, show only table

üìä The table will display:
   ‚Ä¢ Test ID
   ‚Ä¢ Description
   ‚Ä¢ Pass/Fail Status
   ‚Ä¢ Expected vs Actual results
   ‚Ä¢ Confidence scores
   ‚Ä¢ Overall success rate

üí° TIP: Use run_pytest_quiet_with_table() for cleaner output!

EXAMPLE USAGE IN A NEW CELL:
------------------------------
# Run with detailed pytest output + table
run_pytest_with_table()

# OR run quietly with only the table
run_pytest_quiet_with_table()
""")


‚úÖ Enhanced Pytest with Table Output loaded!

üéØ USAGE:

1. run_pytest_with_table()          ‚Üê Run tests with detailed output + table
2. run_pytest_quiet_with_table()    ‚Üê Run tests quietly, show only table

üìä The table will display:
   ‚Ä¢ Test ID
   ‚Ä¢ Description
   ‚Ä¢ Pass/Fail Status
   ‚Ä¢ Expected vs Actual results
   ‚Ä¢ Confidence scores
   ‚Ä¢ Overall success rate

üí° TIP: Use run_pytest_quiet_with_table() for cleaner output!

EXAMPLE USAGE IN A NEW CELL:
------------------------------
# Run with detailed pytest output + table
run_pytest_with_table()

# OR run quietly with only the table
run_pytest_quiet_with_table()



In [5]:
run_pytest_with_table()

üß™ Running Pytest with Table Output...
platform win32 -- Python 3.12.4, pytest-7.4.4, pluggy-1.6.0
rootdir: C:\Users\angelo
plugins: anyio-4.2.0, cov-7.0.0, html-4.1.1, metadata-3.1.1
collected 10 items

t_12a925dac32546788af8ef0e13a3523a.py [32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                             [100%][0m





Test ID,Description,Status,Expected,Actual,Confidence
TC-01,Phishing URL with suspicious text,‚úÖ PASS,PHISHING,PHISHING,100.00%
TC-02,Safe casual message,‚úÖ PASS,SAFE,SAFE,100.00%
TC-03,Suspicious URL only,‚úÖ PASS,PHISHING,PHISHING,99.93%
TC-04,Phishing text without URL,‚úÖ PASS,PHISHING,PHISHING,100.00%
TC-05,Legitimate news URL,‚úÖ PASS,SAFE,SAFE,98.64%
TC-06,Security alert phishing,‚úÖ PASS,PHISHING,PHISHING,100.00%
TC-07,Empty input,‚úÖ PASS,ERROR,ERROR,0.00%
TC-08,Work-related safe message,‚úÖ PASS,SAFE,SAFE,100.00%
TC-09,Cryptocurrency scam,‚úÖ PASS,PHISHING,PHISHING,100.00%
TC-10,URL with special characters,‚úÖ PASS,PHISHING,PHISHING,100.00%



üìä TEST SUMMARY
Total Tests: 10
Passed: 10 ‚úÖ
Failed: 0 ‚ùå
Success Rate: 100.0%


In [6]:
run_pytest_quiet_with_table()

üß™ Running Tests...
[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                                                                   [100%][0m


Test ID,Description,Status,Expected,Actual,Confidence
TC-01,Phishing URL with suspicious text,‚úÖ PASS,PHISHING,PHISHING,100.00%
TC-02,Safe casual message,‚úÖ PASS,SAFE,SAFE,100.00%
TC-03,Suspicious URL only,‚úÖ PASS,PHISHING,PHISHING,99.93%
TC-04,Phishing text without URL,‚úÖ PASS,PHISHING,PHISHING,100.00%
TC-05,Legitimate news URL,‚úÖ PASS,SAFE,SAFE,98.64%
TC-06,Security alert phishing,‚úÖ PASS,PHISHING,PHISHING,100.00%
TC-07,Empty input,‚úÖ PASS,ERROR,ERROR,0.00%
TC-08,Work-related safe message,‚úÖ PASS,SAFE,SAFE,100.00%
TC-09,Cryptocurrency scam,‚úÖ PASS,PHISHING,PHISHING,100.00%
TC-10,URL with special characters,‚úÖ PASS,PHISHING,PHISHING,100.00%



üìä TEST SUMMARY
Total Tests: 10
Passed: 10 ‚úÖ
Failed: 0 ‚ùå
Success Rate: 100.0%
