In [None]:
!pip install fastapi uvicorn pyngrok scikit-learn pandas numpy nest-asyncio

Collecting pyngrok
  Downloading pyngrok-7.4.0-py3-none-any.whl.metadata (8.1 kB)
Downloading pyngrok-7.4.0-py3-none-any.whl (25 kB)
Installing collected packages: pyngrok
Successfully installed pyngrok-7.4.0


In [None]:
import re
from urllib.parse import urlparse
import numpy as np

In [None]:
from google.colab import files
uploaded = files.upload()

Saving dataset.csv to dataset.csv


In [None]:
class URLFeatureExtractor:

    def extract_features(self, url):
        """Extract all 21 features from a URL"""
        features = {}

        if not url.startswith(('http://', 'https://')):
            url = 'http://' + url

        parsed = urlparse(url)
        domain = parsed.netloc
        path = parsed.path

        features['having_IPhaving_IP_Address'] = self.having_ip(domain)
        features['URLURL_Length'] = self.url_length(url)
        features['Shortining_Service'] = self.shortening_service(domain)
        features['having_At_Symbol'] = self.having_at_symbol(url)
        features['double_slash_redirecting'] = self.double_slash_redirect(url)
        features['Prefix_Suffix'] = self.prefix_suffix(domain)
        features['having_Sub_Domain'] = self.having_sub_domain(domain)
        features['Favicon'] = -1
        features['port'] = self.port(parsed)
        features['HTTPS_token'] = self.https_token(domain)
        features['Request_URL'] = -1
        features['URL_of_Anchor'] = -1
        features['Links_in_tags'] = -1
        features['SFH'] = -1
        features['Submitting_to_email'] = -1
        features['Abnormal_URL'] = -1
        features['Redirect'] = self.redirect(path)
        features['on_mouseover'] = -1
        features['RightClick'] = -1
        features['popUpWidnow'] = -1
        features['Iframe'] = -1

        return features

    def having_ip(self, domain):
        ip_pattern = re.compile(r'(\d{1,3}\.){3}\d{1,3}')
        return -1 if ip_pattern.search(domain) else 1

    def url_length(self, url):
        if len(url) < 54: return 1
        elif len(url) <= 75: return 0
        return -1

    def shortening_service(self, domain):
        services = ['bit.ly', 'goo.gl', 'tinyurl', 't.co', 'ow.ly', 'is.gd']
        return -1 if any(s in domain for s in services) else 1

    def having_at_symbol(self, url):
        return -1 if '@' in url else 1

    def double_slash_redirect(self, url):
        return -1 if url[7:].count('//') > 0 else 1

    def prefix_suffix(self, domain):
        return -1 if '-' in domain else 1

    def having_sub_domain(self, domain):
        dots = domain.count('.')
        if dots == 1: return 1
        elif dots == 2: return 0
        return -1

    def port(self, parsed):
        return -1 if parsed.port else 1

    def https_token(self, domain):
        return -1 if 'https' in domain.lower() and 'https://' not in domain.lower() else 1

    def redirect(self, path):
        return -1 if path.count('//') > 1 else 1

def extract_simple_features(url):
    extractor = URLFeatureExtractor()
    return extractor.extract_features(url)

print("✅ Feature extraction code loaded!")

✅ Feature extraction code loaded!


In [None]:
# CELL 4: Train Model
# ============================================================
import pandas as pd
import pickle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, roc_auc_score

INTERNAL_FEATURES = [
    'having_IPhaving_IP_Address', 'URLURL_Length', 'Shortining_Service',
    'having_At_Symbol', 'double_slash_redirecting', 'Prefix_Suffix',
    'having_Sub_Domain', 'Favicon', 'port', 'HTTPS_token',
    'Request_URL', 'URL_of_Anchor', 'Links_in_tags', 'SFH',
    'Submitting_to_email', 'Abnormal_URL', 'Redirect',
    'on_mouseover', 'RightClick', 'popUpWidnow', 'Iframe'
]

print("Loading dataset...")
df = pd.read_csv('dataset.csv')

X = df[INTERNAL_FEATURES]
y = df['Result'].replace(-1, 0)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

print("Scaling features...")
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print("Training Random Forest...")
model = RandomForestClassifier(n_estimators=200, random_state=42, class_weight="balanced")
model.fit(X_train_scaled, y_train)

y_pred = model.predict(X_test_scaled)
y_pred_proba = model.predict_proba(X_test_scaled)[:, 1]

print(f"\n{'='*60}")
print(f"✅ MODEL TRAINED!")
print(f"Accuracy: {accuracy_score(y_test, y_pred):.4f}")
print(f"ROC AUC: {roc_auc_score(y_test, y_pred_proba):.4f}")
print(f"{'='*60}\n")

# Save model files
with open('phishing_model.pkl', 'wb') as f:
    pickle.dump(model, f)
with open('scaler.pkl', 'wb') as f:
    pickle.dump(scaler, f)
with open('feature_names.pkl', 'wb') as f:
    pickle.dump(INTERNAL_FEATURES, f)

print("✅ Model files saved!")

Loading dataset...
Scaling features...
Training Random Forest...

✅ MODEL TRAINED!
Accuracy: 0.9163
ROC AUC: 0.9766

✅ Model files saved!


In [None]:
# CELL 5: Test Model with Sample URLs
# ============================================================
test_urls = [
    "https://www.google.com",
    "http://192.168.1.1/fake.html",
    "http://bit.ly/suspicious",
    "https://olympus.mygreatlearning.com/courses/105609?th=b&pb_id=581"
]

print("\n🧪 TESTING MODEL:\n")
for url in test_urls:
    print(f"Processing URL: {url}")
    features = extract_simple_features(url)
    print("Extracted Features and Values:")
    for feature, value in features.items():
        print(f"  {feature}: {value}")

    features_array = np.array([features[f] for f in INTERNAL_FEATURES]).reshape(1, -1)
    features_scaled = scaler.transform(features_array)
    prediction = model.predict(features_scaled)[0]

    result = "🚨 PHISHING" if prediction == 0 else "✅ LEGITIMATE"
    print(f"\nPrediction for {url}: {result}")
    print("-" * 50) # Separator for clarity


🧪 TESTING MODEL:

Processing URL: https://www.google.com
Extracted Features and Values:
  having_IPhaving_IP_Address: 1
  URLURL_Length: 1
  Shortining_Service: 1
  having_At_Symbol: 1
  double_slash_redirecting: 1
  Prefix_Suffix: 1
  having_Sub_Domain: 0
  Favicon: -1
  port: 1
  HTTPS_token: 1
  Request_URL: -1
  URL_of_Anchor: -1
  Links_in_tags: -1
  SFH: -1
  Submitting_to_email: -1
  Abnormal_URL: -1
  Redirect: 1
  on_mouseover: -1
  RightClick: -1
  popUpWidnow: -1
  Iframe: -1

Prediction for https://www.google.com: ✅ LEGITIMATE
--------------------------------------------------
Processing URL: http://192.168.1.1/fake.html
Extracted Features and Values:
  having_IPhaving_IP_Address: -1
  URLURL_Length: 1
  Shortining_Service: 1
  having_At_Symbol: 1
  double_slash_redirecting: 1
  Prefix_Suffix: 1
  having_Sub_Domain: -1
  Favicon: -1
  port: 1
  HTTPS_token: 1
  Request_URL: -1
  URL_of_Anchor: -1
  Links_in_tags: -1
  SFH: -1
  Submitting_to_email: -1
  Abnormal_URL: -1
  



In [None]:
# CELL 6: Create FastAPI App
# ============================================================
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import nest_asyncio
from pyngrok import ngrok
import uvicorn

In [None]:
# Allow nested event loops in Colab
nest_asyncio.apply()

app = FastAPI(title="Phishing Detection API")

class URLRequest(BaseModel):
    url: str

@app.get("/")
def home():
    return {"message": "Phishing Detection API is running!", "status": "healthy"}

@app.post("/predict")
def predict_phishing(request: URLRequest):
    try:
        url = request.url
        features = extract_simple_features(url)
        features_array = np.array([features[f] for f in INTERNAL_FEATURES]).reshape(1, -1)
        features_scaled = scaler.transform(features_array)

        prediction = model.predict(features_scaled)[0]
        prediction_proba = model.predict_proba(features_scaled)[0]
        confidence = float(prediction_proba[prediction])

        is_phishing = bool(prediction == 0)

        return {
            "url": url,
            "is_phishing": is_phishing,
            "confidence": confidence,
            "prediction_label": "Phishing" if is_phishing else "Legitimate"
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

print("✅ FastAPI app created!")

✅ FastAPI app created!


In [None]:
# CELL 7: Run API in Colab with Public URL
# ============================================================
# Get your ngrok auth token from: https://dashboard.ngrok.com/get-started/your-authtoken
# Paste it below:
NGROK_AUTH_TOKEN = "2v1dM5so5K2sBaycIZ0gpD26eHi_2bGuT9HkcpYGbmJtNvw2q"  # Replace this!

# Set ngrok auth token
ngrok.set_auth_token(NGROK_AUTH_TOKEN)

# Start ngrok tunnel
public_url = ngrok.connect(8000)
print(f"\n{'='*60}")
print(f"🌐 PUBLIC API URL: {public_url}")
print(f"{'='*60}\n")
print(f"📖 API Docs: {public_url}/docs")
print(f"🧪 Test endpoint: {public_url}/predict")
print("\n⚠️ Keep this cell running to keep the API active!\n")

# Allow nested event loops in Colab
nest_asyncio.apply()

# Run FastAPI
uvicorn.run(app, host="0.0.0.0", port=8000)


# import requests

# API_URL = "https://ca58a39764a7.ngrok-free.app"  # your ngrok URL
# payload = {"url": "http://paypal-verify@malicious.com"}

# response = requests.post(f"{API_URL}/predict", json=payload)
# print(response.json())


# # CELL 8: Test API from Python
# # ============================================================
# import requests
# import json

# # Use the public URL from above
# # Extract the clean public URL from the ngrok output
# raw_url = public_url._url
# API_URL = https://chatgpt.com/c/68dd3b49-73ac-832d-8e9e-03ec8c9a3cdd  # e.g., "https://abc123.ngrok.io"


# test_urls = [
#     "https://www.google.com",
#     "http://192.168.1.1/fake",
#     "http://paypal-secure.com"
# ]

# print("\n🧪 TESTING API:\n")
# for url in test_urls:
#     try:
#         response = requests.post(
#             f"{API_URL}/predict",
#             json={"url": url},
#             timeout=10 # Add a timeout to prevent hanging
#         )
#         response.raise_for_status() # Raise an exception for bad status codes
#         result = response.json()
#         status = "🚨 PHISHING" if result['is_phishing'] else "✅ LEGITIMATE"
#         print(f"{url:<40} -> {status} (Confidence: {result['confidence']:.2%})")
#     except requests.exceptions.RequestException as e:
#         print(f"Error testing {url}: {e}")



INFO:     Started server process [992]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)



🌐 PUBLIC API URL: NgrokTunnel: "https://2ed1333ddbcf.ngrok-free.app" -> "http://localhost:8000"

📖 API Docs: NgrokTunnel: "https://2ed1333ddbcf.ngrok-free.app" -> "http://localhost:8000"/docs
🧪 Test endpoint: NgrokTunnel: "https://2ed1333ddbcf.ngrok-free.app" -> "http://localhost:8000"/predict

⚠️ Keep this cell running to keep the API active!



INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [992]


In [None]:
import pickle
import pandas as pd
import numpy as np
import re
from urllib.parse import urlparse
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier

# Assuming URLFeatureExtractor, model, scaler, and INTERNAL_FEATURES are already defined
# in the previous cells and are in the current environment.

class PhishingDetector:
    def __init__(self, model, scaler, feature_names, feature_extractor):
        self.model = model
        self.scaler = scaler
        self.feature_names = feature_names
        self.feature_extractor = feature_extractor

    def predict(self, url):
        """
        Extracts features from a URL, scales them, and predicts if it's phishing.
        Returns True for phishing, False for legitimate.
        """
        features = self.feature_extractor.extract_features(url)
        features_array = np.array([features[f] for f in self.feature_names]).reshape(1, -1)
        features_scaled = self.scaler.transform(features_array)
        prediction = self.model.predict(features_scaled)[0]
        return bool(prediction == 0) # 0 for phishing, 1 for legitimate

    def predict_proba(self, url):
        """
        Extracts features from a URL, scales them, and returns prediction probabilities.
        """
        features = self.feature_extractor.extract_features(url)
        features_array = np.array([features[f] for f in self.feature_names]).reshape(1, -1)
        features_scaled = self.scaler.transform(features_array)
        prediction_proba = self.model.predict_proba(features_scaled)[0]
        return prediction_proba


# Create an instance of the combined class
combined_detector = PhishingDetector(model, scaler, INTERNAL_FEATURES, URLFeatureExtractor())

# Save the combined object to a single pickle file
with open('phishing_detector.pkl', 'wb') as f:
    pickle.dump(combined_detector, f)

print("✅ Combined phishing detector saved to 'phishing_detector.pkl'")

# Example of how to load and use the combined object
# with open('phishing_detector.pkl', 'rb') as f:
#     loaded_detector = pickle.load(f)

# test_url = "http://bit.ly/suspicious"
# is_phishing = loaded_detector.predict(test_url)
# probability = loaded_detector.predict_proba(test_url)

# print(f"\nTest URL: {test_url}")
# print(f"Is Phishing: {is_phishing}")
# print(f"Prediction Probabilities (Legit, Phish): {probability}")

✅ Combined phishing detector saved to 'phishing_detector.pkl'


In [None]:
import pickle

# Save model files
with open('phishing_model.pkl', 'wb') as f:
    pickle.dump(model, f)
with open('scaler.pkl', 'wb') as f:
    pickle.dump(scaler, f)
with open('feature_names.pkl', 'wb') as f:
    pickle.dump(INTERNAL_FEATURES, f)

print("✅ Model files saved!")

ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-1' coro=<Server.serve() done, defined at /usr/local/lib/python3.12/dist-packages/uvicorn/server.py:69> exception=KeyboardInterrupt()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/uvicorn/main.py", line 580, in run
    server.run()
  File "/usr/local/lib/python3.12/dist-packages/uvicorn/server.py", line 67, in run
    return asyncio.run(self.serve(sockets=sockets))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/nest_asyncio.py", line 30, in run
    return loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/nest_asyncio.py", line 92, in run_until_complete
    self._run_once()
  File "/usr/local/lib/python3.12/dist-packages/nest_asyncio.py", line 133, in _run_once
    handle._run()
  File "/usr/lib/python3.12/asyncio/events.py", line 88, in _run
    se

✅ Model files saved!
