## SPAM MESSAGE

## NON SPAM

In [21]:
import os
import logging
from typing import TypedDict, List
from pprint import pprint
from pydantic import BaseModel, Field, ValidationError
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.tools import tool
from langchain_core.callbacks.manager import CallbackManager
from langchain_core.callbacks.base import BaseCallbackHandler
from langchain_google_genai import ChatGoogleGenerativeAI

# ============================================================
# CONFIGURATION AND INITIAL SETUP
# ============================================================

# Configure the LLM
llm = ChatGoogleGenerativeAI(
    model="gemini-1.5-flash",
    api_key=os.getenv("GOOGLE_API_KEY"),
    temperature=0
)

# Set up checkpointing
checkpointer = MemorySaver()

# Setup logging for debugging
logging.basicConfig(level=logging.INFO)

# Define a callback handler for debugging
class SimpleCallbackHandler(BaseCallbackHandler):
    def on_tool_start(self, serialized: dict, input_str: str, **kwargs):
        logging.info(f"Tool started with input: {input_str}")

    def on_tool_end(self, output: str, **kwargs):
        logging.info(f"Tool ended with output: {output}")

    def on_tool_error(self, error: Exception, **kwargs):
        logging.error(f"Tool error: {error}")

# Configure the CallbackManager
callback_handler = SimpleCallbackHandler()
callback_manager = CallbackManager(handlers=[callback_handler])

# ============================================================
# DATA STRUCTURES
# ============================================================

class EmailState(TypedDict):
    messages: List[str]
    sender: str
    subject: str
    body: str
    attachments: List[str]
    is_whitelisted: bool
    is_blacklisted: bool
    spam_score: float
    content_category: str
    is_malicious_attachment: bool
    is_quarantined: bool
    human_decision: str
    classification: str

class EmailContent(BaseModel):
    subject: str = Field(..., description="Subject of the email")
    body: str = Field(..., description="Body of the email")

class Attachments(BaseModel):
    files: List[str] = Field(..., description="List of attachment filenames")

# ============================================================
# TOOLS
# ============================================================

@tool(args_schema=EmailContent)
def analyze_spam_score(subject: str, body: str) -> float:
    """
    Analyze the email content to determine its spam score using LLM.

    Parameters:
    - subject (str): The subject of the email.
    - body (str): The body of the email.

    Returns:
    - float: A spam score between 0.0 (not spam) and 1.0 (definitely spam).
    """
    prompt = (
        f"Analyze the following email for spam likelihood.\n\n"
        f"Subject: {subject}\n"
        f"Body: {body}\n\n"
        "Provide a spam score between 0.0 (not spam) and 1.0 (definitely spam). "
        "Include a clear numeric score in your response."
    )
    response = llm.invoke(prompt, config={"callbacks": callback_manager})

    import re
    match = re.search(r"(\d*\.?\d+)", response.content.strip())
    if match:
        spam_score = float(match.group(1))
        return max(0.0, min(spam_score, 1.0))
    logging.error(f"Invalid response for spam score: {response.content.strip()}")
    return 0.1


@tool(args_schema=Attachments)
def scan_attachments(files: List[str]) -> bool:
    """
    Scan the provided attachments for malicious content using LLM.

    Parameters:
    - files (List[str]): A list of attachment filenames.

    Returns:
    - bool: True if malicious content is detected; False otherwise.
    """
    malicious_extensions = [".exe", ".zip", ".rar", ".js"]
    if any(file.lower().endswith(tuple(malicious_extensions)) for file in files):
        logging.info(f"Malicious file detected: {files}")
        return True

    prompt = f"Analyze the following attachments for malicious content:\n{files}\nRespond with 'Yes' or 'No'."
    response = llm.invoke(prompt, config={"callbacks": callback_manager})
    return "yes" in response.content.lower()


# ============================================================
# NODE FUNCTIONS
# ============================================================

def process_email(state: EmailState) -> EmailState:
    whitelist = ["colleague@company.com", "boss@company.com"]
    blacklist = ["unknown@spamsite.com", "phishing@malicious.com"]

    state["is_whitelisted"] = state["sender"] in whitelist
    state["is_blacklisted"] = state["sender"] in blacklist

    try:
        state["spam_score"] = analyze_spam_score.invoke(
            input={"subject": state["subject"], "body": state["body"]},
            config={"callbacks": callback_manager}
        )
    except ValidationError as e:
        logging.error(f"Spam score validation error: {e}")
        state["spam_score"] = 0.8

    try:
        state["is_malicious_attachment"] = scan_attachments.invoke(
            input={"files": state["attachments"]},
            config={"callbacks": callback_manager}
        )
    except ValidationError as e:
        logging.error(f"Attachment scan validation error: {e}")
        state["is_malicious_attachment"] = False

    return state

def evaluate_email_with_human_check(state: EmailState) -> EmailState:
    spam_detected = state["is_blacklisted"] or state["spam_score"] > 0.8 or state["is_malicious_attachment"]

    if spam_detected:
        print("\n⚠️ Potential Spam Detected ⚠️")
        print(f"Subject: {state['subject']}")
        print(f"Spam Score: {state['spam_score']:.2f}")
        print("Options: 1 - Spam | 2 - Not Spam | 3 - Quarantine")
        user_choice = input("Choose classification (1/2/3): ").strip()
        if user_choice == "1":
            state["classification"] = "spam"
            state["is_blacklisted"] = True
            state["human_decision"] = f"Classified as spam by human (Score: {state['spam_score']:.2f})."
        elif user_choice == "2":
            state["classification"] = "not_spam"
            state["is_whitelisted"] = True
            state["human_decision"] = (
                f"Originally detected as spam (Score: {state['spam_score']:.2f}), "
                "but user declared it as not spam."
            )
        else:
            state["classification"] = "quarantine"
            state["human_decision"] = "Kept in quarantine by user."
    elif state["is_whitelisted"]:
        state["classification"] = "not_spam"
        state["human_decision"] = "Automatically classified as not spam (Whitelisted sender)."
    else:
        state["classification"] = "not_spam"
        state["human_decision"] = "Automatically classified as not spam."

    return state

def display_final_state(state: EmailState):
    print("\n✅ Final Processed State:")
    print(f"  Sender: {state['sender']}")
    print(f"  Subject: {state['subject']}")
    print(f"  Body:\n    {state['body']}")
    print(f"  Attachments: {', '.join(state['attachments'])}")
    print(f"  Whitelisted: {state['is_whitelisted']}")
    print(f"  Blacklisted: {state['is_blacklisted']}")
    print(f"  Spam Score: {state['spam_score']:.2f}")
    print(f"  Malicious Attachments: {state['is_malicious_attachment']}")
    print(f"  Classification: {state['classification']}")
    print(f"  Human Decision: {state['human_decision']}")

# ============================================================
# MAIN EXECUTION
# ============================================================

if __name__ == "__main__":
    sender = "promotions@shoppingdeals.com"
    subject = "Limited Time Offer on Electronics"
    body = (
        "Dear Valued Customer,\n\n"
        "We’re excited to bring you an exclusive deal on the latest electronics! "
        "Enjoy up to 50% off on top brands for a limited time only.\n\n"
        "Best regards,\n"
        "The Shopping Deals Team"
    )
    attachments = ["a.exe"]

    initial_state = EmailState(
        messages=[],
        sender=sender,
        subject=subject,
        body=body,
        attachments=attachments,
        is_whitelisted=False,
        is_blacklisted=False,
        spam_score=0.0,
        content_category="",
        is_malicious_attachment=False,
        is_quarantined=False,
        human_decision="",
        classification=""
    )

    pprint(initial_state)

    print("\n🚀 Processing Email...\n")
    state = process_email(initial_state)
    state = evaluate_email_with_human_check(state)
    display_final_state(state)


INFO:root:Tool started with input: {'subject': 'Limited Time Offer on Electronics', 'body': 'Dear Valued Customer,\n\nWe’re excited to bring you an exclusive deal on the latest electronics! Enjoy up to 50% off on top brands for a limited time only.\n\nBest regards,\nThe Shopping Deals Team'}


{'attachments': ['a.exe'],
 'body': 'Dear Valued Customer,\n'
         '\n'
         'We’re excited to bring you an exclusive deal on the latest '
         'electronics! Enjoy up to 50% off on top brands for a limited time '
         'only.\n'
         '\n'
         'Best regards,\n'
         'The Shopping Deals Team',
 'classification': '',
 'content_category': '',
 'human_decision': '',
 'is_blacklisted': False,
 'is_malicious_attachment': False,
 'is_quarantined': False,
 'is_whitelisted': False,
 'messages': [],
 'sender': 'promotions@shoppingdeals.com',
 'spam_score': 0.0,
 'subject': 'Limited Time Offer on Electronics'}

🚀 Processing Email...



INFO:root:Tool ended with output: 0.6
INFO:root:Tool started with input: {'files': ['a.exe']}
INFO:root:Malicious file detected: ['a.exe']
INFO:root:Tool ended with output: True



⚠️ Potential Spam Detected ⚠️
Subject: Limited Time Offer on Electronics
Spam Score: 0.60
Options: 1 - Spam | 2 - Not Spam | 3 - Quarantine

✅ Final Processed State:
  Sender: promotions@shoppingdeals.com
  Subject: Limited Time Offer on Electronics
  Body:
    Dear Valued Customer,

We’re excited to bring you an exclusive deal on the latest electronics! Enjoy up to 50% off on top brands for a limited time only.

Best regards,
The Shopping Deals Team
  Attachments: a.exe
  Whitelisted: True
  Blacklisted: False
  Spam Score: 0.60
  Malicious Attachments: True
  Classification: not_spam
  Human Decision: Originally detected as spam (Score: 0.60), but user declared it as not spam.


In [None]:
import os
import logging
from typing import TypedDict, List
from pprint import pprint
from pydantic import BaseModel, Field, ValidationError
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.tools import tool
from langchain_core.callbacks.manager import CallbackManager
from langchain_core.callbacks.base import BaseCallbackHandler
from langchain_google_genai import ChatGoogleGenerativeAI

# ============================================================
# CONFIGURATION AND INITIAL SETUP
# ============================================================

# Configure the LLM
llm = ChatGoogleGenerativeAI(
    model="gemini-1.5-flash",
    api_key=os.getenv("GOOGLE_API_KEY"),
    temperature=0
)

# Set up checkpointing
checkpointer = MemorySaver()

# Setup logging for debugging
logging.basicConfig(level=logging.INFO)

# Define a callback handler for debugging
class SimpleCallbackHandler(BaseCallbackHandler):
    def on_tool_start(self, serialized: dict, input_str: str, **kwargs):
        logging.info(f"Tool started with input: {input_str}")

    def on_tool_end(self, output: str, **kwargs):
        logging.info(f"Tool ended with output: {output}")

    def on_tool_error(self, error: Exception, **kwargs):
        logging.error(f"Tool error: {error}")

# Configure the CallbackManager
callback_handler = SimpleCallbackHandler()
callback_manager = CallbackManager(handlers=[callback_handler])

# ============================================================
# DATA STRUCTURES
# ============================================================

class EmailState(TypedDict):
    messages: List[str]
    sender: str
    subject: str
    body: str
    attachments: List[str]
    is_whitelisted: bool
    is_blacklisted: bool
    spam_score: float
    content_category: str
    is_malicious_attachment: bool
    is_quarantined: bool
    human_decision: str
    classification: str

class EmailContent(BaseModel):
    subject: str = Field(..., description="Subject of the email")
    body: str = Field(..., description="Body of the email")

class Attachments(BaseModel):
    files: List[str] = Field(..., description="List of attachment filenames")

# ============================================================
# TOOLS
# ============================================================

@tool(args_schema=EmailContent)
def analyze_spam_score(subject: str, body: str) -> float:
    """
    Analyze the email content to determine its spam score using LLM.

    Parameters:
    - subject (str): The subject of the email.
    - body (str): The body of the email.

    Returns:
    - float: A spam score between 0.0 (not spam) and 1.0 (definitely spam).
    """
    prompt = (
        f"Analyze the following email for spam likelihood.\n\n"
        f"Subject: {subject}\n"
        f"Body: {body}\n\n"
        "Provide a spam score between 0.0 (not spam) and 1.0 (definitely spam). "
        "Include a clear numeric score in your response."
    )
    response = llm.invoke(prompt, config={"callbacks": callback_manager})

    import re
    match = re.search(r"(\d*\.?\d+)", response.content.strip())
    if match:
        spam_score = float(match.group(1))
        return max(0.0, min(spam_score, 1.0))
    logging.error(f"Invalid response for spam score: {response.content.strip()}")
    return 0.1


@tool(args_schema=Attachments)
def scan_attachments(files: List[str]) -> bool:
    """
    Scan the provided attachments for malicious content using LLM.

    Parameters:
    - files (List[str]): A list of attachment filenames.

    Returns:
    - bool: True if malicious content is detected; False otherwise.
    """
    malicious_extensions = [".exe", ".zip", ".rar", ".js"]
    if any(file.lower().endswith(tuple(malicious_extensions)) for file in files):
        logging.info(f"Malicious file detected: {files}")
        return True

    prompt = f"Analyze the following attachments for malicious content:\n{files}\nRespond with 'Yes' or 'No'."
    response = llm.invoke(prompt, config={"callbacks": callback_manager})
    return "yes" in response.content.lower()


# ============================================================
# NODE FUNCTIONS
# ============================================================

def process_email(state: EmailState) -> EmailState:
    whitelist = ["colleague@company.com", "boss@company.com"]
    blacklist = ["unknown@spamsite.com", "phishing@malicious.com"]

    state["is_whitelisted"] = state["sender"] in whitelist
    state["is_blacklisted"] = state["sender"] in blacklist

    try:
        state["spam_score"] = analyze_spam_score.invoke(
            input={"subject": state["subject"], "body": state["body"]},
            config={"callbacks": callback_manager}
        )
    except ValidationError as e:
        logging.error(f"Spam score validation error: {e}")
        state["spam_score"] = 0.8

    try:
        state["is_malicious_attachment"] = scan_attachments.invoke(
            input={"files": state["attachments"]},
            config={"callbacks": callback_manager}
        )
    except ValidationError as e:
        logging.error(f"Attachment scan validation error: {e}")
        state["is_malicious_attachment"] = False

    return state

def evaluate_email_with_human_check(state: EmailState) -> EmailState:
    spam_detected = state["is_blacklisted"] or state["spam_score"] > 0.8 or state["is_malicious_attachment"]

    if spam_detected:
        print("\n⚠️ Potential Spam Detected ⚠️")
        print(f"Subject: {state['subject']}")
        print(f"Spam Score: {state['spam_score']:.2f}")
        print("Options: 1 - Spam | 2 - Not Spam | 3 - Quarantine")
        user_choice = input("Choose classification (1/2/3): ").strip()
        if user_choice == "1":
            state["classification"] = "spam"
            state["is_blacklisted"] = True
            state["human_decision"] = f"Classified as spam by human (Score: {state['spam_score']:.2f})."
        elif user_choice == "2":
            state["classification"] = "not_spam"
            state["is_whitelisted"] = True
            state["human_decision"] = (
                f"Originally detected as spam (Score: {state['spam_score']:.2f}), "
                "but user declared it as not spam."
            )
        else:
            state["classification"] = "quarantine"
            state["human_decision"] = "Kept in quarantine by user."
    elif state["is_whitelisted"]:
        state["classification"] = "not_spam"
        state["human_decision"] = "Automatically classified as not spam (Whitelisted sender)."
    else:
        state["classification"] = "not_spam"
        state["human_decision"] = "Automatically classified as not spam."

    return state

def display_final_state(state: EmailState):
    print("\n✅ Final Processed State:")
    print(f"  Sender: {state['sender']}")
    print(f"  Subject: {state['subject']}")
    print(f"  Body:\n    {state['body']}")
    print(f"  Attachments: {', '.join(state['attachments'])}")
    print(f"  Whitelisted: {state['is_whitelisted']}")
    print(f"  Blacklisted: {state['is_blacklisted']}")
    print(f"  Spam Score: {state['spam_score']:.2f}")
    print(f"  Malicious Attachments: {state['is_malicious_attachment']}")
    print(f"  Classification: {state['classification']}")
    print(f"  Human Decision: {state['human_decision']}")

# ============================================================
# MAIN EXECUTION
# ============================================================

if __name__ == "__main__":
    sender = "unknown@spamsite.com"
    subject = "Congratulations! You've won a free iPhone!"
    body = (
        "Dear user,\n\n"
        "You have been selected as a winner in our exclusive giveaway! "
        "Claim your free iPhone now by clicking the link below.\n\n"
        "Hurry, this offer expires soon!\n\n"
        "Best regards,\n"
        "Spammy Promotions Team"
    )
    attachments = ["offer.zip"]

    initial_state = EmailState(
        messages=[],
        sender=sender,
        subject=subject,
        body=body,
        attachments=attachments,
        is_whitelisted=False,
        is_blacklisted=False,
        spam_score=0.0,
        content_category="",
        is_malicious_attachment=False,
        is_quarantined=False,
        human_decision="",
        classification=""
    )

    pprint(initial_state)

    print("\n🚀 Processing Email...\n")
    state = process_email(initial_state)
    state = evaluate_email_with_human_check(state)
    display_final_state(state)
