In [1]:
%pip install python-dotenv langchain langchain_community langchain-openai langsmith langgraph

Collecting python-dotenv
  Downloading python_dotenv-1.1.1-py3-none-any.whl.metadata (24 kB)
Collecting langchain_community
  Downloading langchain_community-0.3.27-py3-none-any.whl.metadata (2.9 kB)
Collecting langchain-openai
  Downloading langchain_openai-0.3.29-py3-none-any.whl.metadata (2.4 kB)
Collecting langgraph
  Downloading langgraph-0.6.4-py3-none-any.whl.metadata (6.8 kB)
Collecting dataclasses-json<0.7,>=0.5.7 (from langchain_community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting pydantic-settings<3.0.0,>=2.4.0 (from langchain_community)
  Downloading pydantic_settings-2.10.1-py3-none-any.whl.metadata (3.4 kB)
Collecting httpx-sse<1.0.0,>=0.4.0 (from langchain_community)
  Downloading httpx_sse-0.4.1-py3-none-any.whl.metadata (9.4 kB)
Collecting langchain-core<1.0.0,>=0.3.72 (from langchain)
  Downloading langchain_core-0.3.74-py3-none-any.whl.metadata (5.8 kB)
Collecting langgraph-checkpoint<3.0.0,>=2.1.0 (from langgraph)
  Downloadin

### **Core imports and configuration**

In [3]:
# Cell 1: Core imports and configuration
import os
import json
import re
import pandas as pd
from typing import List, Dict, Any, Optional
from datetime import datetime
from dotenv import load_dotenv

# LangChain imports
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain.tools import Tool, BaseTool
from langchain.prompts import PromptTemplate
from langchain.memory import ConversationBufferWindowMemory
from langchain.callbacks import StreamingStdOutCallbackHandler
from langchain.schema import BaseMessage, HumanMessage, SystemMessage
from pydantic import BaseModel, Field

# Email processing imports
import base64
from dateutil.parser import parse
from langchain_community.agent_toolkits import GmailToolkit
from langchain_community.tools.gmail.utils import build_resource_service

# Load environment variables
load_dotenv()

# Configure environment variables
os.environ["LANGSMITH_TRACING"] = os.getenv("LANGSMITH_TRACING")
os.environ["LANGSMITH_ENDPOINT"] = os.getenv("LANGSMITH_ENDPOINT")
os.environ["LANGSMITH_API_KEY"] = os.getenv("LANGSMITH_API_KEY")
os.environ["LANGSMITH_PROJECT"] = os.getenv("LANGSMITH_PROJECT")

# Get the OpenAI API Key from the .env file
openai_api_key = os.getenv("OPENAI_API_KEY")
if openai_api_key:
    os.environ["OPENAI_API_KEY"] = openai_api_key
else:
    raise ValueError("OPENAI_API_KEY not found in .env file!")

# To test the LangSmith connection
try:
    from langsmith import Client
    client = Client()
    print("LangSmith connection successful!")
    print(f"Proje: {os.environ.get('LANGSMITH_PROJECT')}")
except Exception as e:
    print(f"LangSmith connection error: {e}")

from langchain.chat_models import init_chat_model
llm = init_chat_model("gpt-5-mini", model_provider="openai")

# Send test message (to check tracing)
try:
    response = llm.invoke("Hi, this is a test message!")
    print("LLM test successful!")
    print("You can check the traces in LangSmith.")
    print(response)
except Exception as e:
    print(f"LLM test error: {e}")

print("✅ Core configuration completed")

LangSmith connection successful!
Proje: pr-husky-duster-52
LLM test successful!
You can check the traces in LangSmith.
content='Hello — got it! Test message received. How can I help you today?' additional_kwargs={'refusal': None} response_metadata={'token_usage': {'completion_tokens': 89, 'prompt_tokens': 14, 'total_tokens': 103, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 64, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-5-mini-2025-08-07', 'system_fingerprint': None, 'id': 'chatcmpl-C36Z9f1nq6lpSJVnKicrJbLMw2oan', 'service_tier': 'default', 'finish_reason': 'stop', 'logprobs': None} id='run--7628a244-add4-45b9-9450-8940e59b8480-0' usage_metadata={'input_tokens': 14, 'output_tokens': 89, 'total_tokens': 103, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 64}}
✅ Core configuration completed


### **Email Fetcher Class**

In [4]:
# Cell 2: Email Fetcher class - structured approach for email retrieval
class EmailFetcher:
    """Gmail API email fetcher and processor class"""

    def __init__(self):
        """Initialise Gmail API service"""
        self.api_resource = build_resource_service()
        self.toolkit = GmailToolkit(api_resource=self.api_resource)
        self.search_tool = next(
            (tool for tool in self.toolkit.get_tools() if tool.name == 'search_gmail'),
            None
        )

        if not self.search_tool:
            raise ValueError("Gmail search tool could not be initialised!")

    def get_email_contents(self, payload: Dict) -> Dict[str, str]:
        """
        Recursively extract content from email payload

        Args:
            payload: Email payload from Gmail API

        Returns:
            dict: Dictionary containing 'text' and 'html' content
        """
        plain_text = ""
        html_text = ""

        # Recursively process parts if they exist
        if 'parts' in payload:
            for part in payload['parts']:
                nested_content = self.get_email_contents(part)
                plain_text += nested_content['text']
                html_text += nested_content['html']

        # Decode body if present
        elif 'body' in payload and 'data' in payload['body']:
            mime_type = payload.get('mimeType', '')
            body_data = payload['body'].get('data', '')

            if body_data:
                try:
                    decoded_body = base64.urlsafe_b64decode(body_data).decode('utf-8', errors='ignore')
                    if 'text/plain' in mime_type:
                        plain_text += decoded_body
                    elif 'text/html' in mime_type:
                        html_text += decoded_body
                except Exception as e:
                    print(f"⚠️ Decode error: {e}")
                    pass

        return {'text': plain_text.strip(), 'html': html_text.strip()}

    def fetch_emails(self, query: str = "in:inbox", max_results: int = 10) -> pd.DataFrame:
        """
        Fetch emails from Gmail and return as DataFrame

        Args:
            query: Gmail search query
            max_results: Maximum number of emails to fetch

        Returns:
            pd.DataFrame: Email data
        """
        processed_emails = []

        try:
            # Search for emails
            search_params = {"query": query, "max_results": max_results}
            search_results = self.search_tool.run(search_params)

            print(f"📧 Found {len(search_results)} emails, fetching details...")

            # Process each email
            for summary in search_results:
                message_id = summary.get('id')
                message_detail = self.api_resource.users().messages().get(
                    userId='me',
                    id=message_id,
                    format='full'
                ).execute()

                payload = message_detail.get('payload', {})
                headers = payload.get('headers', [])

                # Extract header information
                subject = next((h['value'] for h in headers if h['name'].lower() == 'subject'), 'N/A')
                sender = next((h['value'] for h in headers if h['name'].lower() == 'from'), 'N/A')
                to = next((h['value'] for h in headers if h['name'].lower() == 'to'), 'N/A')
                cc = next((h['value'] for h in headers if h['name'].lower() == 'cc'), 'N/A')
                date_str = next((h['value'] for h in headers if h['name'].lower() == 'date'), None)

                # Parse date
                try:
                    email_date = parse(date_str) if date_str else None
                except:
                    email_date = None

                # Check labels and read status
                labels = message_detail.get('labelIds', [])
                is_unread = 'UNREAD' in labels

                # Extract content - FULL TEXT, no truncation for security
                contents = self.get_email_contents(payload)

                # Recursive attachment check
                has_attachment = False
                attachment_names = []

                def check_attachments(part):
                    """Recursive attachment check"""
                    if part.get('filename'):
                        return True, part.get('filename')
                    if 'parts' in part:
                        for subpart in part['parts']:
                            has_att, filename = check_attachments(subpart)
                            if has_att:
                                return True, filename
                    return False, None

                if 'parts' in payload:
                    for part in payload['parts']:
                        has_att, filename = check_attachments(part)
                        if has_att:
                            has_attachment = True
                            if filename:
                                attachment_names.append(filename)

                # Add email data - storing FULL content
                processed_emails.append({
                    'id': message_id,
                    'is_unread': is_unread,
                    'date': email_date,
                    'from': sender,
                    'to': to,
                    'cc': cc,
                    'labels': labels,
                    'subject': subject,
                    'body_text': contents['text'],  # Full text for analysis
                    'body_html': contents['html'],  # Full HTML for security checks
                    'has_attachment': has_attachment,
                    'attachment_names': attachment_names
                })

            # Create DataFrame
            if processed_emails:
                df = pd.DataFrame(processed_emails)
                df = df.sort_values(by='date', ascending=False, na_position='last').reset_index(drop=True)
                print(f"✅ Successfully processed {len(df)} emails")
                return df
            else:
                print("⚠️ No emails found")
                return pd.DataFrame()

        except Exception as e:
            print(f"❌ Email fetch error: {e}")
            return pd.DataFrame()

    def save_to_csv(self, df: pd.DataFrame, filename: str = 'fetched_emails.csv'):
        """
        Save DataFrame to CSV file

        Args:
            df: DataFrame to save
            filename: Output filename
        """
        if not df.empty:
            df.to_csv(filename, index=False, encoding='utf-8-sig')
            print(f"✅ DataFrame successfully saved to '{filename}'")
        else:
            print("⚠️ No data to save")

# Test the fetcher
email_fetcher = EmailFetcher()
print("✅ EmailFetcher successfully created")

✅ EmailFetcher successfully created


### **Security Analysis Module**

In [5]:
# Cell 3: Security analysis module with hybrid LLM integration
class SecurityAnalyser:
    """Email security analysis class with hybrid deterministic + LLM approach"""

    def __init__(self, llm_model=None):
        """Initialise threat patterns and LLM integration"""

        # Optional LLM for domain assessment
        self.llm = llm_model or ChatOpenAI(
            model="gpt-4o-mini",  # Using lighter model for quick assessments
            temperature=0.1,  # Low temperature for consistent security decisions
        )

        # Phishing/Scam patterns
        self.phishing_patterns = [
            # URL patterns
            r'bit\.ly|tinyurl|short\.link|clck\.ru',  # URL shorteners
            r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}',  # IP addresses
            r'@[^@]*@',  # Double @ signs

            # Word patterns
            r'urgent.{0,20}action.{0,20}required',
            r'verify.{0,20}account.{0,20}immediately',
            r'suspended.{0,20}account',
            r'click.{0,20}here.{0,20}immediately',
            r'limited.{0,20}time.{0,20}offer',
            r'congratulations.{0,20}won',
            r'claim.{0,20}prize',
            r'tax.{0,20}refund',
            r'nigerian?.{0,20}prince',
        ]

        # Prompt injection patterns
        self.injection_patterns = [
            r'ignore.{0,20}previous.{0,20}instructions',
            r'disregard.{0,20}all.{0,20}prior',
            r'forget.{0,20}everything',
            r'new.{0,20}instructions.{0,20}follow',
            r'system.{0,20}prompt.{0,20}override',
            r'admin.{0,20}mode',
            r'developer.{0,20}mode',
            r'bypass.{0,20}security',
            r'<script',  # XSS attempts
            r'javascript:',
            r'eval\(',
            r'onerror=',
        ]

        # Sample suspicious domains (20 examples)
        self.suspicious_domains = [
            'phishing-site.com', 'fake-bank.net', 'suspicious-domain.org',
            'secure-verify.com', 'account-update.net', 'paypal-verify.com',
            'amazon-security.net', 'microsoft-alert.com', 'google-secure.org',
            'apple-id-verify.com', 'netflix-billing.net', 'facebook-alert.org',
            'instagram-verify.com', 'linkedin-update.net', 'dropbox-alert.com',
            'adobe-verify.org', 'office365-alert.net', 'icloud-verify.com',
            'ebay-security.net', 'steam-verify.org'
        ]

        # Sample trusted domains (20 examples)
        self.trusted_domains = [
            'gmail.com', 'outlook.com', 'yahoo.com', 'google.com',
            'microsoft.com', 'apple.com', 'amazon.com', 'facebook.com',
            'linkedin.com', 'github.com', 'stackoverflow.com', 'twitter.com',
            'paypal.com', 'ebay.com', 'netflix.com', 'spotify.com',
            'adobe.com', 'dropbox.com', 'slack.com', 'zoom.us'
        ]

    def assess_domain_with_llm(self, domain: str, email_context: Dict) -> Dict[str, Any]:
        """
        Use LLM to assess domain trustworthiness

        Args:
            domain: Domain to assess
            email_context: Email context for better assessment

        Returns:
            Dict with assessment results
        """
        try:
            # Prepare prompt for domain assessment
            prompt = f"""Analyse this email sender domain for security risks.

Domain: {domain}
Email Subject: {email_context.get('subject', 'N/A')}
Sender Full Address: {email_context.get('from', 'N/A')}

Based on the domain name pattern, common phishing tactics, and email context, assess if this domain appears:
1. SUSPICIOUS (likely phishing/scam)
2. TRUSTED (legitimate business/service)
3. UNKNOWN (cannot determine)

Consider:
- Does the domain mimic known brands?
- Does it use suspicious patterns (extra words, misspellings)?
- Is it a legitimate business domain?

Respond with ONLY one word: SUSPICIOUS, TRUSTED, or UNKNOWN

CRITICAL NOTE: Try not to mark as ‘UNKNOWN’ as much as possible.

Decision:"""

            # Get LLM assessment
            response = self.llm.invoke(prompt)
            assessment = response.content.strip().upper()

            # Validate response
            if assessment not in ['SUSPICIOUS', 'TRUSTED', 'UNKNOWN']:
                assessment = 'UNKNOWN'

            return {
                'domain': domain,
                'llm_assessment': assessment,
                'confidence': 'medium'  # LLM assessments get medium confidence
            }

        except Exception as e:
            print(f"⚠️ LLM assessment failed for {domain}: {e}")
            return {
                'domain': domain,
                'llm_assessment': 'UNKNOWN',
                'confidence': 'low'
            }

    def check_phishing_indicators(self, email_data: Dict) -> Dict[str, Any]:
        """Check for phishing indicators with hybrid approach"""
        indicators = []
        risk_score = 0
        domain_assessment = {}

        # Analyse full text
        text = f"{email_data.get('subject', '')} {email_data.get('body_text', '')} {email_data.get('body_html', '')}".lower()

        # Pattern checking
        for pattern in self.phishing_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                indicators.append(f"Suspicious pattern detected: {pattern}")
                risk_score += 20

        # Urgency words analysis
        urgency_words = ['urgent', 'immediate', 'expire', 'suspend', 'limited time']
        urgency_count = sum(1 for word in urgency_words if word in text)
        if urgency_count > 2:
            indicators.append(f"High urgency level detected ({urgency_count} keywords)")
            risk_score += urgency_count * 10

        # Enhanced sender analysis with hybrid approach
        sender = email_data.get('from', '')
        sender_domain = sender.split('@')[-1].split('>')[0] if '@' in sender else ''

        if sender_domain:
            # Check display name vs actual email
            if '<' in sender and '>' in sender:
                display_name = sender.split('<')[0].strip()
                actual_email = sender.split('<')[1].split('>')[0]

                # Check if display name contains different email
                if '@' in display_name:
                    indicators.append("Display name contains different email address")
                    risk_score += 30

            # Domain trust assessment - Hybrid approach
            if sender_domain in self.suspicious_domains:
                # Known suspicious domain
                indicators.append(f"Known suspicious domain: {sender_domain}")
                risk_score += 30
                domain_assessment = {'status': 'suspicious', 'source': 'blacklist'}

            elif sender_domain in self.trusted_domains:
                # Known trusted domain
                indicators.append(f"Trusted domain: {sender_domain}")
                risk_score -= 20  # Reduce risk score
                risk_score = max(0, risk_score)  # Don't go below 0
                domain_assessment = {'status': 'trusted', 'source': 'whitelist'}

            else:
                # Unknown domain - use LLM for assessment
                llm_result = self.assess_domain_with_llm(sender_domain, email_data)
                domain_assessment = {'status': llm_result['llm_assessment'].lower(), 'source': 'llm'}

                if llm_result['llm_assessment'] == 'SUSPICIOUS':
                    indicators.append(f"LLM assessed domain as suspicious: {sender_domain}")
                    risk_score += 30  # Same weight as blacklist

                elif llm_result['llm_assessment'] == 'TRUSTED':
                    indicators.append(f"LLM assessed domain as potentially trusted: {sender_domain}")
                    risk_score -= 10  # Half the reduction of whitelist (less confidence)
                    risk_score = max(0, risk_score)

                else:  # UNKNOWN
                    indicators.append(f"Domain assessment inconclusive: {sender_domain}")
                    risk_score += 10  # Increase the Risk Score slightly

        # Attachment analysis
        if email_data.get('has_attachment'):
            attachments = email_data.get('attachment_names', [])
            dangerous_extensions = ['.exe', '.zip', '.rar', '.bat', '.cmd', '.scr', '.vbs']

            for att in attachments:
                if any(att.lower().endswith(ext) for ext in dangerous_extensions):
                    indicators.append(f"Dangerous file extension detected: {att}")
                    risk_score += 40

        return {
            'indicators': indicators,
            'risk_score': min(risk_score, 100),
            'risk_level': self._calculate_risk_level(risk_score),
            'domain_assessment': domain_assessment
        }

    def check_url_safety(self, text: str) -> Dict[str, Any]:
        """Check URL safety in email content"""
        urls = re.findall(r'https?://[^\s<>"{}|\\^`\[\]]+', text)
        suspicious_urls = []

        for url in urls:
            # Check for URL shorteners
            if any(short in url.lower() for short in ['bit.ly', 'tinyurl', 'short.link']):
                suspicious_urls.append({
                    'url': url,
                    'reason': 'URL shortener detected - could hide malicious destination'
                })

            # Check for IP addresses instead of domains
            if re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', url):
                suspicious_urls.append({
                    'url': url,
                    'reason': 'Contains IP address instead of domain name'
                })

            # Check for homograph attacks (similar looking characters)
            if any(char in url for char in ['а', 'е', 'о', 'р', 'с', 'у', 'х']):  # Cyrillic chars
                suspicious_urls.append({
                    'url': url,
                    'reason': 'Possible homograph attack - contains lookalike characters'
                })

        return {
            'total_urls': len(urls),
            'suspicious_urls': suspicious_urls,
            'risk_level': 'high' if suspicious_urls else 'low'
        }

    def check_prompt_injection(self, text: str) -> Dict[str, Any]:
        """Check for prompt injection attempts"""
        injections_found = []

        for pattern in self.injection_patterns:
            matches = re.findall(pattern, text, re.IGNORECASE)
            if matches:
                injections_found.append({
                    'pattern': pattern,
                    'matches': matches[:3]  # First 3 matches
                })

        return {
            'injection_detected': len(injections_found) > 0,
            'injection_patterns': injections_found,
            'risk_level': 'critical' if len(injections_found) > 2 else
                         'high' if len(injections_found) > 0 else 'none'
        }

    def _calculate_risk_level(self, score: int) -> str:
        """Calculate risk level from score"""
        if score >= 70:
            return 'critical'
        elif score >= 50:
            return 'high'
        elif score >= 30:
            return 'medium'
        elif score >= 10:
            return 'low'
        else:
            return 'safe'

    def analyse_email_security(self, email_data: Dict) -> Dict[str, Any]:
        """Complete security analysis with hybrid approach"""

        # Run all analyses using FULL text
        full_text = f"{email_data.get('subject', '')} {email_data.get('body_text', '')} {email_data.get('body_html', '')}"

        url_analysis = self.check_url_safety(full_text)
        phishing_analysis = self.check_phishing_indicators(email_data)
        injection_analysis = self.check_prompt_injection(full_text)

        # Calculate overall risk score
        overall_risk_score = phishing_analysis['risk_score']

        if url_analysis['risk_level'] == 'high':
            overall_risk_score = min(overall_risk_score + 30, 100)

        if injection_analysis['risk_level'] == 'critical':
            overall_risk_score = min(overall_risk_score + 50, 100)
        elif injection_analysis['risk_level'] == 'high':
            overall_risk_score = min(overall_risk_score + 30, 100)

        return {
            'email_id': email_data.get('id'),
            'subject': email_data.get('subject'),
            'sender': email_data.get('from'),
            'overall_risk_score': overall_risk_score,
            'overall_risk_level': self._calculate_risk_level(overall_risk_score),
            'domain_assessment': phishing_analysis.get('domain_assessment', {}),
            'url_analysis': url_analysis,
            'phishing_analysis': phishing_analysis,
            'injection_analysis': injection_analysis,
            'recommendations': self._generate_recommendations(
                overall_risk_score,
                phishing_analysis,
                injection_analysis
            )
        }

    def _generate_recommendations(self, risk_score: int, phishing: Dict, injection: Dict) -> List[str]:
        """Generate security recommendations"""
        recommendations = []

        if risk_score >= 70:
            recommendations.append("⛔ CRITICAL: Do NOT open this email - DELETE immediately!")
            recommendations.append("🚨 Report to IT security team")
        elif risk_score >= 50:
            recommendations.append("⚠️ HIGH RISK: Do not click any links")
            recommendations.append("📧 Verify sender identity independently")
        elif risk_score >= 30:
            recommendations.append("⚡ CAUTION: Suspicious content detected")

        if injection['injection_detected']:
            recommendations.append("🤖 PROMPT INJECTION detected - do not copy to AI systems")

        if phishing['indicators']:
            recommendations.append("🎣 Phishing indicators detected - do not share personal information")

        # Add domain-specific recommendation
        domain_assessment = phishing.get('domain_assessment', {})
        if domain_assessment.get('source') == 'llm' and domain_assessment.get('status') == 'suspicious':
            recommendations.append("🔍 AI assessment suggests sender domain may be suspicious")

        return recommendations

# Test the analyser with hybrid approach
security_analyser = SecurityAnalyser()
print("✅ SecurityAnalyser with hybrid approach successfully created")

✅ SecurityAnalyser with hybrid approach successfully created
