In [28]:
!pip install -q streamlit pyngrok plotly pandas

In [29]:
%%bash
cat > feature_extraction.py <<'PY'

import socket
import ssl
import hashlib
import urllib.parse
from datetime import datetime, timezone
import whois
from fuzzywuzzy import fuzz
import geoip2.database
import urllib.parse
import sqlite3
from datetime import datetime

reader = geoip2.database.Reader("GeoLite2-City.mmdb")

# ---------------------------------------------
# 1️⃣ Resolve domain to IP
# ---------------------------------------------
def resolve_ip(domain):
    try:
        return socket.gethostbyname(domain)
    except Exception as e:
        print(f"Failed to resolve {domain}: {e}")
        return None

# ---------------------------------------------
# 2️⃣ WHOIS Information
# ---------------------------------------------
def get_whois_data(domain):
    try:
        w = whois.whois(domain)
        created = w.creation_date
        if isinstance(created, list):
            created = created[0]
        if created is not None:
            # Normalize datetime
            if created.tzinfo is not None:
                created = created.replace(tzinfo=None)
            domain_age_days = (datetime.utcnow() - created).days
            creation_date = created.strftime("%Y-%m-%d %H:%M:%S")
        else:
            domain_age_days = None
            creation_date = None

        return {
            "registrar": w.registrar,
            "creation_date": creation_date,
            "domain_age_days": domain_age_days
        }
    except Exception as e:
        print(f"WHOIS lookup failed for {domain}: {e}")
        return {"registrar": None, "creation_date": None, "domain_age_days": None}

# ---------------------------------------------
# 3️⃣ Brand Similarity
# ---------------------------------------------
brand_list = ["Google", "Facebook", "Microsoft", "Amazon", "PayPal", "Netflix"]

def compute_similarity(domain_name):
    if not domain_name:
        return 0
    domain = domain_name.split('.')[0]  # remove .com, .net, etc.
    try:
        return max(fuzz.ratio(domain.lower(), brand.lower()) for brand in brand_list)
    except Exception as e:
        print(f"Brand similarity failed: {e}")
        return 0

# ---------------------------------------------
# 4️⃣ GeoIP Lookup
# ---------------------------------------------
def get_geoip_data(domain):
    """
    Takes a domain name, resolves it to an IP, and returns GeoIP information.
    Uses the GeoLite2-City.mmdb database.
    """
    try:
        ip = resolve_ip(domain)
        if not ip:
            return {"ip": None, "country": None, "region": None, "city": None}

        response = reader.city(ip)
        return {
            "ip": ip,
            "country": response.country.name,
            "region": response.subdivisions.most_specific.name,
            "city": response.city.name
        }

    except Exception as e:
        print(f"GeoIP lookup failed for {domain}: {e}")
        return {"ip": None, "country": None, "region": None, "city": None}

# ---------------------------------------------
# 5️⃣ ASN Info (placeholder — you already have it)
# ---------------------------------------------
def get_asn_info_from_ip(ip):
    # Placeholder structure
    try:
        # Example only: replace with your own MaxMind ASN reader if available
        return {"asn": "AS15169", "asn_description": "GOOGLE LLC"}
    except Exception:
        return {"asn": None, "asn_description": None}

def get_asn_reputation(asn, asn_description):
    # Simple logic example; replace with your scoring system
    if asn_description and "GOOGLE" in asn_description.upper():
        return "Good"
    elif asn_description:
        return "Unknown"
    else:
        return "Bad"

# ---------------------------------------------
# 6️⃣ SSL Certificate Core Features
# ---------------------------------------------
def get_ssl_core_features(url, timeout=5):
    """
    Returns dict with SSL certificate features:
    fingerprint_sha256, issuer, subject, not_valid_before, not_valid_after, error
    """
    try:
        parsed = urllib.parse.urlparse(url)
        hostname = parsed.hostname or url
        scheme = parsed.scheme or "http"
        port = parsed.port or (443 if scheme == "https" else 80)

        if scheme == "http":
            return {
                "fingerprint_sha256": None,
                "issuer": None,
                "subject": None,
                "not_valid_before": None,
                "not_valid_after": None,
                "error": "no_ssl_for_http"
            }

        ctx = ssl.create_default_context()
        with socket.create_connection((hostname, port), timeout=timeout) as sock:
            with ctx.wrap_socket(sock, server_hostname=hostname) as ssock:
                der_cert = ssock.getpeercert(binary_form=True)
                cert_info = ssock.getpeercert()

                fingerprint = hashlib.sha256(der_cert).hexdigest().upper()

                issuer = ", ".join([f"{k}={v}" for part in cert_info.get("issuer", []) for k, v in part]) or None
                subject = ", ".join([f"{k}={v}" for part in cert_info.get("subject", []) for k, v in part]) or None

                nb = na = None
                try:
                    nb = datetime.strptime(cert_info["notBefore"], "%b %d %H:%M:%S %Y %Z").isoformat()
                    na = datetime.strptime(cert_info["notAfter"], "%b %d %H:%M:%S %Y %Z").isoformat()
                except Exception:
                    pass

                return {
                    "fingerprint_sha256": fingerprint,
                    "issuer": issuer,
                    "subject": subject,
                    "not_valid_before": nb,
                    "not_valid_after": na,
                    "error": None
                }
    except Exception as e:
        return {
            "fingerprint_sha256": None,
            "issuer": None,
            "subject": None,
            "not_valid_before": None,
            "not_valid_after": None,
            "error": str(e)
        }
# ---------------------------------------------
# 7️⃣ Risk Score Calculation (based on your DB logic)
# ---------------------------------------------
def calculate_risk_score(domain, domain_age_days, brand_similarity, fingerprint_sha256, all_domains):
    """
    Compute risk score:
    - feed frequency (log scale)
    - domain age (younger = riskier)
    - SSL mismatch
    - brand similarity
    """
    try:
        feed_freq = all_domains.count(domain) if domain else 1
        total_domains = max(all_domains.count(d) for d in all_domains)
        feed_score = np.log1p(feed_freq) / np.log1p(total_domains) * 25 if total_domains > 0 else 0
    except Exception:
        feed_score = 0

    try:
        domain_age_days = float(domain_age_days or 0)
    except:
        domain_age_days = 0
    max_age = 365 * 5
    age_score = (1 - (domain_age_days / max_age)) * 25
    age_score = np.clip(age_score, 0, 25)

    ssl_score = 25 if fingerprint_sha256 is None or str(fingerprint_sha256).strip() == "" else 5

    try:
        sim = float(brand_similarity or 0)
        if sim > 1:
            sim /= 100.0
        sim = np.clip(sim, 0, 1)
    except Exception:
        sim = 0
    brand_score = sim * 25

    total = feed_score + age_score + ssl_score + brand_score
    total = np.clip(total, 0, 100)

    if total >= 70:
        level = "High"
    elif total >= 40:
        level = "Medium"
    else:
        level = "Low"

    return round(total, 2), level

# =====================================================
# 2️⃣ Helper function — check if URL already exists in DB
# =====================================================
def url_exists_in_db(url, db_path):
    db_path = "/content/phishing_dataset.db"
    """Check if the URL already exists in the database and return its record."""

    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    cursor.execute("""
        SELECT url_id, url, domain, date_added, verified, url_status, label,
               whois_registrar, creation_date, domain_age_days, brand_similarity,
               ip, country, region, city,
               fingerprint_sha256, issuer, subject, not_valid_before, not_valid_after,
               error, risk_score, risk_level
        FROM urls
        WHERE url = ?;
    """, (url,))
    row = cursor.fetchone()
    columns = [desc[0] for desc in cursor.description]
    conn.close()

    if row:
        return dict(zip(columns, row))
    return None

PY

In [30]:
%%bash
cat > phishing_detector.py <<'PY'
# phishing_detector.py
from urllib.parse import urlparse
from feature_extraction import (
    get_whois_data,
    compute_similarity,
    get_geoip_data,
    get_ssl_core_features,
    get_asn_info_from_ip,
    get_asn_reputation,
)
import numpy as np

def enrich_url(url):
    """Gather real enrichment features from the actual extraction functions"""
    parsed = urlparse(url if url.startswith("http") else "http://" + url)
    domain = parsed.netloc or parsed.path

    # WHOIS
    whois_info = get_whois_data(domain)
    domain_age_days = whois_info.get("domain_age_days") or 0
    registrar = whois_info.get("registrar")

    # Brand similarity
    brand_similarity = compute_similarity(domain)

    # GeoIP
    geo_data = get_geoip_data(domain)
    ip = geo_data.get("ip")
    country = geo_data.get("country")
    region = geo_data.get("region")
    city = geo_data.get("city")

    # ASN
    asn_info = get_asn_info_from_ip(ip) if ip else {"asn": None, "asn_description": None}
    asn = asn_info.get("asn")
    asn_desc = asn_info.get("asn_description")
    asn_reputation = get_asn_reputation(asn, asn_desc)

    # SSL
    ssl_info = get_ssl_core_features(url)
    ssl_valid = "No" if ssl_info.get("error") else "Yes"

    enrich = {
        "domain": domain,
        "domain_age_days": domain_age_days,
        "ssl_valid": ssl_valid,
        "brand_similarity": brand_similarity,
        "geo_location": country or "Unknown",
        "asn": asn or "Unknown",
        "asn_reputation": asn_reputation,
        "registrar": registrar,
        "city": city,
        "region": region
    }
    return enrich


def score_from_enrichment(enrich):
    """Compute risk score using real features"""
    score = 0
    if enrich["ssl_valid"] == "No":
        score += 30
    if enrich["domain_age_days"] and enrich["domain_age_days"] < 60:
        score += 25
    if enrich["brand_similarity"] and enrich["brand_similarity"] > 70:
        score += 20
    if enrich["asn_reputation"] == "Bad":
        score += 15
    score = np.clip(score, 0, 100)
    return float(score)


def analyze_url(url):
    """Main function: analyze URL and return phishing risk"""
    if not url:
        raise ValueError("Empty URL provided")

    enrich = enrich_url(url)
    score = score_from_enrichment(enrich)
    verdict = "Phishing" if score >= 60 else "Benign"

    result = {
        "url": url,
        "score": score,
        "verdict": verdict,
        "details": enrich
    }
    return result
PY


In [31]:
pip install python-whois fuzzywuzzy[speedup] geoip2 plotly streamlit pandas numpy thefuzz



In [32]:
!wget -O GeoLite2-City.mmdb "https://github.com/P3TERX/GeoLite.mmdb/raw/download/GeoLite2-City.mmdb"

--2025-10-16 14:43:21--  https://github.com/P3TERX/GeoLite.mmdb/raw/download/GeoLite2-City.mmdb
Resolving github.com (github.com)... 20.27.177.113
Connecting to github.com (github.com)|20.27.177.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/P3TERX/GeoLite.mmdb/download/GeoLite2-City.mmdb [following]
--2025-10-16 14:43:22--  https://raw.githubusercontent.com/P3TERX/GeoLite.mmdb/download/GeoLite2-City.mmdb
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.108.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 62557403 (60M) [application/octet-stream]
Saving to: ‘GeoLite2-City.mmdb’


2025-10-16 14:43:22 (83.4 MB/s) - ‘GeoLite2-City.mmdb’ saved [62557403/62557403]



In [33]:
%%bash
cat > app.py <<'PY'
import streamlit as st
import pandas as pd
import plotly.express as px
from phishing_detector import analyze_url

st.set_page_config(page_title="Phishing Detection Dashboard", layout="wide")
st.title("Phishing Detection Dashboard")
st.markdown("Enter a URL and get a phishing risk score and details.")

with st.expander("How to use"):
    st.write("Paste a URL (e.g. https://example.com) then click Analyze URL.")

url = st.text_input("🔗 Enter a URL to analyze:")

if st.button("Analyze URL"):
    if url.strip():
        try:
            result = analyze_url(url.strip())
            col1, col2 = st.columns([1,1])
            with col1:
                st.metric("Risk Score", f"{result['score']} / 100")
            with col2:
                color = "red" if result['verdict'] == "Phishing" else "green"
                st.markdown(f"### Verdict: <span style='color:{color}'>{result['verdict']}</span>", unsafe_allow_html=True)

            st.subheader("Details")
            st.json(result["details"])

            df = pd.DataFrame([result["details"]])
            st.table(df.T.rename(columns={0:"Value"}))

        except Exception as e:
            st.error(f"Error analyzing URL: {e}")
    else:
        st.warning("Please enter a valid URL first!")

st.subheader("Sample: Top Targeted Brands (Example Data)")
df = pd.DataFrame({
    "Brand": ["PayPal", "Google", "Microsoft", "Netflix"],
    "Phishing Attempts": [23, 17, 31, 9]
})
fig = px.bar(df, x="Brand", y="Phishing Attempts", title="Top Targeted Brands")
st.plotly_chart(fig, use_container_width=True)
PY


In [34]:
!ngrok authtoken 349GdIdCVwNzMfnr6VflsBxNbBX_6atP5FyipTgnDcgdiQnT3

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [35]:
from pyngrok import ngrok
import subprocess, time, os

public_url = ngrok.connect(8501)
print("Public URL:", public_url)

cmd = "nohup streamlit run app.py --server.port 8501 > streamlit.log 2>&1 &"
print("Starting Streamlit...")
subprocess.Popen(cmd, shell=True)
time.sleep(3)
print("Streamlit started. Check the public URL above.")


Public URL: NgrokTunnel: "https://maidenly-oneirocritically-adrianna.ngrok-free.dev" -> "http://localhost:8501"
Starting Streamlit...
Streamlit started. Check the public URL above.
