Course: ITAI 2277 ‚Äì AI Capstone Project

Binte Zahra

Instructor: Sitaram Ayyagari

# Gmail-Integrated Phishing Detector with Reinforcement Learning
This project builds an AI-powered security tool that connects to a live Gmail inbox, scans emails for phishing threats using GPT-4o and VirusTotal, and improves over time by learning from user feedback.

## Cell 1: Setup & Environment
#### What we did:
Installed necessary Python packages (gradio, openai, google-api-python-client) and imported core libraries for handling email data (imaplib, email), HTML parsing, and file storage (json, os).

#### Why:
We need specific libraries to communicate with external APIs (OpenAI, VirusTotal), parse complex email formats (MIME/HTML), and create the web interface.

#### How:
Used !pip install for dependencies and set up a file structure (feedback_data/) to store the "memory" of the model.

In [None]:
# CELL 1: SETUP & IMPORTS
!pip install -q gradio openai requests google-auth-oauthlib google-auth-httplib2 google-api-python-client

import os
import re
import json
import time
import imaplib
import email as email_lib
import html
import requests
import openai
import gradio as gr
from email.header import decode_header
from getpass import getpass

# Setup Storage
FEEDBACK_DIR = "feedback_data"
os.makedirs(FEEDBACK_DIR, exist_ok=True)
CORRECTIONS_JSON = "user_corrections.json"

print("‚úÖ Environment Ready.")

‚úÖ Environment Ready.


## Cell 2: Core Logic (The Brain)
#### What we did:
Defined the backend functions that power the entire system.

* Gmail Integration: Created a robust fetcher that logs into Gmail via IMAP, downloads the latest $N$ emails, and strips away HTML/CSS to give GPT clean text to read.

* VirusTotal Integration: Built a "Fact Checker" that scans links against 70+ security vendors.

* GPT Analysis: Created a "Balanced Expert" prompt that analyzes sender reputation, urgency, and context.

* Reinforcement Learning (RL): Implemented a memory system (save_feedback) that records user corrections and injects them into the GPT prompt (get_learned_context) so the model doesn't repeat mistakes.

* Scoring Engine: Developed a "Max Risk" logic engine that combines GPT's psychological analysis with VirusTotal's technical analysis.

#### Why:
* HTML Cleaning: GPT cannot read raw HTML spam accurately; it needs plain text.Consensus Rule: We ignore single VirusTotal flags to prevent false positives on marketing links (Shein, Quora).

* Sanity Check: We force the numeric Score (0-100) to match the Text Verdict (SAFE/PHISHING) to prevent confusing results (e.g., "Safe" but score 90).

#### How:
Used Python functions with error handling. The scoring logic uses conditional overrides (e.g., if gpt_verdict == "PHISHING" -> force score 85).

In [None]:
# ==========================================
# PASTE THIS INTO CELL 2 (Replacing previous code)
# ==========================================

import os, re, json, time, imaplib, email as email_lib, html, requests, openai
from email.header import decode_header

current_imap = None
CORRECTIONS_FILE = "user_corrections.json"

# --- 1. LEARNING MEMORY ---
def save_feedback(subject, sender, actual_label):
    try:
        data = []
        if os.path.exists(CORRECTIONS_FILE):
            with open(CORRECTIONS_FILE, 'r') as f:
                data = json.load(f)
        new_entry = {"subject": subject, "sender": sender, "actual_label": actual_label}
        data.append(new_entry)
        with open(CORRECTIONS_FILE, 'w') as f:
            json.dump(data[-20:], f, indent=2)
        return len(data)
    except: return 0

def get_learned_context():
    if not os.path.exists(CORRECTIONS_FILE): return ""
    try:
        with open(CORRECTIONS_FILE, 'r') as f:
            data = json.load(f)
        if not data: return ""
        context = "IMPORTANT - LEARN FROM THESE PAST MISTAKES:\n"
        for item in data[-5:]:
            context += f"- Email from '{item['sender']}' with subject '{item['subject']}' was actually {item['actual_label']}.\n"
        return context
    except: return ""

# --- 2. GMAIL UTILS ---
def connect_gmail_simple(email_address, app_password):
    try:
        imap = imaplib.IMAP4_SSL("imap.gmail.com")
        imap.login(email_address, app_password)
        return imap
    except Exception as e:
        print(f"Connection Error: {e}")
        return None

def clean_html(html_content):
    clean = re.sub(r'<style.*?>.*?</style>', '', html_content, flags=re.DOTALL)
    clean = re.sub(r'<script.*?>.*?</script>', '', clean, flags=re.DOTALL)
    clean = re.sub(r'<[^>]+>', ' ', clean)
    return html.unescape(clean).strip()

def fetch_recent_emails(imap, max_emails):
    try:
        imap.select("INBOX")
        status, messages = imap.search(None, "ALL")
        email_ids = messages[0].split()[-max_emails:]
        emails = []
        for email_id in reversed(email_ids):
            try:
                status, msg_data = imap.fetch(email_id, "(RFC822)")
                msg = email_lib.message_from_bytes(msg_data[0][1])
                subject, encoding = decode_header(msg.get("Subject", "No Subject"))[0]
                if isinstance(subject, bytes): subject = subject.decode(encoding or "utf-8", errors="ignore")

                body = ""
                if msg.is_multipart():
                    for part in msg.walk():
                        if part.get_content_type() == "text/plain":
                            body = part.get_payload(decode=True).decode("utf-8", errors="ignore"); break
                        elif part.get_content_type() == "text/html":
                            body = clean_html(part.get_payload(decode=True).decode("utf-8", errors="ignore"))
                else:
                    body = clean_html(msg.get_payload(decode=True).decode("utf-8", errors="ignore"))

                urls = re.findall(r'https?://[^\s<>"]+', body)
                emails.append({
                    "subject": subject,
                    "from": msg.get("From", "Unknown"),
                    "date": msg.get("Date", ""),
                    "body": body[:2000],
                    "url": urls[0] if urls else ""
                })
            except: continue
        return emails
    except: return []

# --- 3. ANALYSIS LOGIC ---
def check_virustotal(url):
    """Check URL with FALSE POSITIVE protection"""
    if not url: return 0, "No URL found."
    api = os.environ.get('VT_API_KEY')
    if not api: return 0, "‚ö†Ô∏è VT Key Missing"
    try:
        id_resp = requests.post('https://www.virustotal.com/api/v3/urls', headers={'x-apikey': api}, data={'url': url}).json()
        rep = requests.get(f"https://www.virustotal.com/api/v3/analyses/{id_resp['data']['id']}", headers={'x-apikey': api}).json()

        stats = rep['data']['attributes']['stats']
        mal = stats['malicious']
        susp = stats['suspicious']

        if mal >= 2: return 100, f"üö® DANGEROUS: {mal} vendors confirmed malware."
        elif mal == 1: return 45, "‚ö†Ô∏è CAUTION: 1 vendor flagged this (could be false positive)."
        elif susp > 1: return 50, "‚ö†Ô∏è SUSPICIOUS: Multiple suspicious flags."
        return 0, "‚úÖ CLEAN URL"
    except: return 0, "‚ö†Ô∏è Scan Failed"

def ask_gpt(subject, body, sender, url):
    learned_context = get_learned_context()
    prompt = f"""You are a cybersecurity expert.
{learned_context}
Sender: {sender}
Subject: {subject}
Body Snippet: {body[:1500]}
Link: {url}

Analyze for PHISHING vs SPAM vs SAFE.
RULES:
1. SAFE: Known brands (Shein, Khan Academy, Quora, LinkedIn) sending standard promos.
2. SPAM: Annoying marketing, but harmless.
3. PHISHING: Impersonation, credential theft, urgent threats.

OUTPUT FORMAT:
VERDICT: [PHISHING/SPAM/SAFE]
SCORE: [0-100] (0=Safe, 100=Danger)
REASON: [Short explanation]
"""
    try:
        res = openai.chat.completions.create(model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], temperature=0)
        content = res.choices[0].message.content

        # 1. Parse raw output
        score = 50
        if "SCORE:" in content: score = int(re.search(r'SCORE:\s*(\d+)', content).group(1))

        reason = content.split("REASON:")[1].strip() if "REASON:" in content else "Analysis failed"

        verdict = "SAFE"
        if "PHISHING" in content.upper(): verdict = "PHISHING"
        elif "SPAM" in content.upper(): verdict = "SPAM"

        # --- FIX: SANITY CHECK (Force Score to match Verdict) ---
        # GPT sometimes confuses "Score 90" with "90% Safe". We must correct this.

        if verdict == "SAFE" and score > 30:
            score = 10  # Force score down if Verdict is Safe

        elif verdict == "PHISHING" and score < 70:
            score = 85  # Force score up if Verdict is Phishing

        elif verdict == "SPAM":
            score = 50  # Keep Spam in the middle

        return verdict, score, reason
    except Exception as e: return "ERROR", 0, str(e)

def analyze_logic(subject, body, url, sender):
    gpt_verdict, gpt_score, gpt_reason = ask_gpt(subject, body, sender, url)
    vt_score, vt_reason = check_virustotal(url)

    # 1. Base Logic
    final_score = max(gpt_score, vt_score)

    # 2. Logic Overrides
    if gpt_verdict == "SAFE" and vt_score < 50:
        final_score = gpt_score  # Trust GPT on Safety if VT is weak

    if gpt_verdict == "PHISHING" and final_score < 75:
        final_score = 85        # Trust GPT on Phishing

    # 3. Final Labels
    if final_score < 30: final_verdict = "‚úÖ SAFE"
    elif final_score < 70: final_verdict = "‚ö†Ô∏è SPAM / SUSPICIOUS"
    else: final_verdict = "üö® PHISHING"

    report = f"""
# {final_verdict}
**Threat Score:** {final_score}/100

### ü§ñ GPT Analysis
* **Verdict:** {gpt_verdict}
* **Score:** {gpt_score}/100
* **Reason:** {gpt_reason}

### ü¶† VirusTotal
* {vt_reason}
"""
    return report, final_score, {"subject": subject, "sender": sender}

print("‚úÖ Logic Updated: Sanity Check Added.")

‚úÖ Logic Updated: Sanity Check Added.


## Cell 3: Security & API Keys

#### What we did:
Created a secure input method for OpenAI and VirusTotal API keys.

#### Why:
Hardcoding API keys is a security risk. Using getpass ensures keys are entered securely per session and not saved in the notebook file.

#### How:
Used the getpass library to accept inputs without echoing characters to the screen.

In [None]:
# CELL 3: KEYS
openai.api_key = getpass("Enter OpenAI API Key: ")
os.environ['VT_API_KEY'] = getpass("Enter VirusTotal API Key: ")

Enter OpenAI API Key: ¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑
Enter VirusTotal API Key: ¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑


## Cell 4: The Interface (Frontend)
#### What we did:
Built a modern web dashboard using Gradio.

* State Management: Used gr.State to store fetched emails in memory, ensuring that clicking "Scan Email #1" actually scans the correct email, not an old one.

* Feedback Loop: Added "Mark as Safe" and "Mark as Phishing" buttons that instantly update the JSON memory file.

* Controls: Added a slider to fetch between 5-15 emails (to manage API quotas).

#### Why:
A simple script is hard to use. A GUI allows users to browse their actual inbox, view threat scores visually, and easily provide training feedback.

#### How:
Wired Python backend functions to Gradio UI components (Button, Dataframe, Slider) using event listeners (.click()).

In [None]:
# CELL 4: UI WITH REINFORCEMENT LEARNING
with gr.Blocks(title="Phishing Detector", theme=gr.themes.Soft()) as demo:

    # State Storage
    emails_state = gr.State([])
    current_email_context = gr.State({}) # Stores current email info for feedback

    gr.Markdown("# üõ°Ô∏è Precision Phishing Detector + RL")

    with gr.Row():
        with gr.Column(scale=1):
            gr.Markdown("### 1. Connect")
            user = gr.Textbox(label="Gmail Address")
            pwd = gr.Textbox(label="App Password", type="password")
            login_btn = gr.Button("Login", variant="primary")
            login_status = gr.Textbox(label="Status")

            gr.Markdown("### 2. Fetch Emails")
            num_emails = gr.Slider(5, 15, value=5, step=1, label="Fetch Count")
            fetch_btn = gr.Button("Fetch Emails", variant="secondary")

        with gr.Column(scale=2):
            gr.Markdown("### 3. Select & Scan")
            email_table = gr.Dataframe(headers=["Index", "Subject", "Sender", "Date"], interactive=False)

            with gr.Row():
                idx_input = gr.Number(label="Email Index (0=Newest)", precision=0)
                scan_btn = gr.Button("üîç Scan This Email", variant="primary")

            report_out = gr.Markdown()
            score_out = gr.Slider(0, 100, label="Threat Score")

            # --- FEEDBACK SECTION ---
            with gr.Group():
                gr.Markdown("### üéì Reinforcement Learning (Teach the Model)")
                with gr.Row():
                    fb_safe = gr.Button("‚úÖ Wrong! Mark as SAFE")
                    fb_phish = gr.Button("üö® Wrong! Mark as PHISHING")
                fb_msg = gr.Textbox(label="Learning Status", interactive=False)

    # --- LOGIC ---
    def login(u, p):
        global current_imap
        current_imap = connect_gmail_simple(u, p)
        return "‚úÖ Connected" if current_imap else "‚ùå Failed"

    def fetch_and_store(n):
        if not current_imap: return None, []
        raw = fetch_recent_emails(current_imap, n)
        disp = [[i, e['subject'], e['from'], e['date']] for i, e in enumerate(raw)]
        return disp, raw

    def scan_specific_email(idx, stored_emails):
        if not stored_emails: return "Fetch first.", 0, {}
        try:
            i = int(idx)
            if i < 0 or i >= len(stored_emails): return "Invalid Index", 0, {}
            e = stored_emails[i]
            # Returns Report, Score, and CONTEXT (Sender/Subject) for feedback
            return analyze_logic(e['subject'], e['body'], e['url'], e['from'])
        except Exception as x: return str(x), 0, {}

    # --- FEEDBACK HANDLERS ---
    def learn_safe(ctx):
        if not ctx: return "Scan an email first."
        count = save_feedback(ctx['subject'], ctx['sender'], "SAFE")
        return f"‚úÖ Learned! '{ctx['subject'][:20]}...' is SAFE. (Memory: {count} items)"

    def learn_phish(ctx):
        if not ctx: return "Scan an email first."
        count = save_feedback(ctx['subject'], ctx['sender'], "PHISHING")
        return f"üö® Learned! '{ctx['subject'][:20]}...' is PHISHING. (Memory: {count} items)"

    # --- WIRING ---
    login_btn.click(login, [user, pwd], login_status)
    fetch_btn.click(fetch_and_store, num_emails, [email_table, emails_state])

    # Scanning updates the Report, Score, and saves Context for the feedback buttons
    scan_btn.click(scan_specific_email, [idx_input, emails_state], [report_out, score_out, current_email_context])

    # Feedback buttons read the Context
    fb_safe.click(learn_safe, current_email_context, fb_msg)
    fb_phish.click(learn_phish, current_email_context, fb_msg)

demo.launch(share=True, debug=True)

  with gr.Blocks(title="Phishing Detector", theme=gr.themes.Soft()) as demo:


Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://d23c84415a183a3348.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


# Project Logic Flow
* Fetch: User connects Gmail -> Script pulls raw emails -> Cleans HTML -> Stores in Memory.

* Scan: User selects email -> Script checks Link in VirusTotal + Sends Text to GPT (with past mistakes included).

* Score: Script compares GPT verdict vs. VirusTotal verdict -> Applies "Safety Overrides" -> Outputs Final Score.

* Learn: User marks prediction "Wrong" -> Script saves correction to JSON -> GPT sees this correction next time.