# Email Fraud Analysis


In [None]:
!pip install kagglehub google-generativeai pandas nltk



In [None]:
# Import required dependencies
import os
import re
import time
import logging
import pandas as pd
import numpy as np
import nltk
import kagglehub
import google.generativeai as genai
from google.generativeai import caching
from typing import List, Dict, Optional, Union
from dataclasses import dataclass
import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# Download required NLTK data with proper error handling
try:
    nltk.download('punkt', quiet=True)
    logging.info("Successfully downloaded NLTK punkt resource")
except Exception as e:
    logging.error(f"Error downloading NLTK resources: {str(e)}")
    # Don't raise here - punkt might already be downloaded


In [None]:
# Configuration
@dataclass
class Config:
    """Simple configuration class for email analysis parameters"""
    # TESTING VALUE: Reduced to 1/10th for development
    # Original value was 1000, restore for production
    chunk_size: int = 100            # How many emails to process at once
    max_retries: int = 3             # Number of retries if API call fails
    min_message_length: int = 10     # Minimum length of message to process
    api_timeout: int = 30            # Timeout for API calls in seconds
    cache_ttl: int = 3600           # Cache lifetime in seconds (1 hour)
    model_name: str = 'gemini-1.5-flash-001'

# Create a single configuration instance with default values
config = Config()

# You can easily modify any settings here if needed
# For example:
# config.chunk_size = 500  # Process smaller chunks
# config.min_message_length = 20  # Require longer messages

In [None]:
# Set up Gemini API
os.environ['API_KEY'] = 'AIzaSyBPv2RhmsMT0HPcMyfibd2ELTuCAiCH1j0'  # Replace with your API key
genai.configure(api_key=os.environ['API_KEY'])

system_instruction = """
You are an expert fraud analyst.
Your task is to analyze the provided email chunks
and identify potential evidence of fraudulent activities.
Focus on:
1. Financial irregularities
2. Suspicious transactions
3. Attempts to conceal information
4. Unusual communication patterns
"""

In [None]:
def preprocess_message(message: str) -> str:
    """Clean and standardize message text"""
    if not isinstance(message, str):
        return ''

    message = message.strip()
    if len(message) < config.min_message_length:
        return ''

    return message

def chunk_message(message: str, max_chunk_size: int = 1000) -> List[str]:
    """Split message into smaller chunks while preserving context"""
    if not message:
        return []

    sentences = message.split('. ')  # Simple sentence splitting
    chunks = []
    current_chunk = []
    current_size = 0

    for sentence in sentences:
        sentence_size = len(sentence.split())
        if current_size + sentence_size > max_chunk_size and current_chunk:
            chunks.append('. '.join(current_chunk) + '.')
            current_chunk = []
            current_size = 0
        current_chunk.append(sentence)
        current_size += sentence_size

    if current_chunk:
        chunks.append('. '.join(current_chunk) + '.')

    return chunks

In [None]:
# CSV Processing
def create_caches_from_csv(csv_path: str, chunk_size: int = None) -> List[caching.CachedContent]:
    """Process CSV data in chunks with proper validation and error handling"""
    chunk_size = chunk_size or config.chunk_size

    # Set a conservative token limit (30% of the max)
    MAX_TOKENS_PER_CACHE = 300000
    # Set total token limit across all caches (90% of API limit)
    TOTAL_TOKEN_LIMIT = 900000
    model = genai.GenerativeModel(config.model_name)

    try:
        df = pd.read_csv(csv_path)
        required_columns = ['file', 'message']
        if not all(col in df.columns for col in required_columns):
            missing = [col for col in required_columns if col not in df.columns]
            raise ValueError(f"Missing required columns: {missing}")

        caches = []
        num_chunks = len(df) // chunk_size + (len(df) % chunk_size > 0)

        # ============= TESTING LIMIT =============
        # REMOVE THIS LINE FOR PRODUCTION USE
        num_chunks = min(3, num_chunks)  # Limit to first 3 chunks for testing
        # ======================================

        # Count system instruction tokens once
        system_tokens = model.count_tokens(system_instruction).total_tokens
        print(f"System instruction uses {system_tokens} tokens")

        total_tokens_used = 0

        for chunk_num in range(num_chunks):
            # Check if we're approaching total token limit
            if total_tokens_used >= TOTAL_TOKEN_LIMIT:
                print(f"\nReached total token limit ({total_tokens_used} tokens) - stopping processing")
                break

            start_idx = chunk_num * chunk_size
            end_idx = min((chunk_num + 1) * chunk_size, len(df))

            chunk = df.iloc[start_idx:end_idx].copy()
            chunk.loc[:, 'message'] = chunk['message'].fillna('').astype(str)
            chunk.loc[:, 'message'] = chunk['message'].apply(preprocess_message)
            chunk = chunk[chunk['message'].str.len() > 0]

            if not chunk.empty:
                processed_messages = []
                current_token_count = system_tokens  # Start with system tokens
                print(f"\nProcessing chunk {chunk_num + 1}/{num_chunks}")

                for msg in chunk['message']:
                    msg_chunks = chunk_message(msg)
                    for msg_chunk in msg_chunks:
                        # Get exact token count for this chunk
                        chunk_tokens = model.count_tokens(msg_chunk).total_tokens

                        # Add safety margin for potential overhead (20%)
                        estimated_total = current_token_count + chunk_tokens + (chunk_tokens * 0.2)

                        if estimated_total > MAX_TOKENS_PER_CACHE:
                            print(f"Reached token limit at {current_token_count} tokens - creating cache...")
                            break

                        processed_messages.append(msg_chunk)
                        current_token_count += chunk_tokens

                    # Break out of outer loop too if we hit the limit
                    if estimated_total > MAX_TOKENS_PER_CACHE:
                        break

                if processed_messages:
                    try:
                        # Check if adding this cache would exceed total limit
                        if total_tokens_used + current_token_count > TOTAL_TOKEN_LIMIT:
                            print(f"\nWould exceed total token limit - stopping at {total_tokens_used} tokens")
                            break

                        cache = caching.CachedContent.create(
                            model=config.model_name,
                            display_name=f'fraud_analysis_chunk_{chunk_num}',
                            system_instruction=system_instruction,
                            contents=[{'text': msg} for msg in processed_messages],
                            ttl=datetime.timedelta(seconds=config.cache_ttl)
                        )
                        caches.append(cache)
                        total_tokens_used += current_token_count
                        print(f"Successfully created cache {chunk_num + 1} with {current_token_count} tokens (Total: {total_tokens_used})")
                    except Exception as e:
                        print(f"Warning: Failed to create cache for chunk {chunk_num + 1}: {str(e)}")
                        continue

        if not caches:
            raise ValueError("No valid caches could be created from the input data")

        print(f"\nSuccessfully processed {len(caches)} chunks (Total tokens: {total_tokens_used})")
        return caches

    except Exception as e:
        print(f"Error processing CSV: {str(e)}")
        raise

In [None]:
# Email Processing
def process_email_batch(emails: List[Dict], retries: Optional[int] = None) -> List[Dict]:
    """Process a batch of emails with retry logic"""
    retries = retries or config.max_retries
    results = []

    for email in emails:
        for attempt in range(retries):
            try:
                # Create a model instance for this analysis
                model = genai.GenerativeModel(config.model_name)

                # Analyze the email
                response = model.generate_content([
                    system_instruction,
                    f"From: {email.get('sender', 'Unknown')}\n"
                    f"To: {email.get('recipient', 'Unknown')}\n"
                    f"Subject: {email.get('subject', 'No Subject')}\n"
                    f"Body: {email.get('body', 'No Content')}"
                ])

                results.append({
                    'file': f"{email.get('sender', 'Unknown')} - {email.get('subject', 'No Subject')}",
                    'analysis': response.text,
                    'status': 'success'
                })
                break

            except Exception as e:
                if attempt == retries - 1:
                    logging.error(f"Failed to process email: {str(e)}")
                    results.append({
                        'file': f"{email.get('sender', 'Unknown')} - {email.get('subject', 'No Subject')}",
                        'error': str(e),
                        'status': 'failed'
                    })
                time.sleep(attempt * 2)  # Exponential backoff

    return results


In [None]:
# Console Interface
def analyze_file(file=None, case_details=None, system_prompt=None, keywords=None):
    """Analyze emails for potential fraud using Gemini AI."""
    try:
        # Use the default dataset
        path = kagglehub.dataset_download("wcukierski/enron-email-dataset")
        file_path = os.path.join(path, 'emails.csv')
        print(f"Using default email dataset from: {file_path}")

        # Use custom system prompt if provided
        current_prompt = system_prompt if system_prompt else system_instruction

        # Create caches from the CSV file
        caches = create_caches_from_csv(file_path)

        # Process emails using the cached content
        sample_results = []
        for cache in caches[:1]:  # Process first chunk for sample results
            try:
                model = genai.GenerativeModel.from_cached_content(cache)

                df = pd.read_csv(file_path)
                chunk_size = config.chunk_size
                chunk = df.iloc[0:chunk_size].copy()

                chunk.loc[:, 'message'] = chunk['message'].fillna('').astype(str)
                chunk.loc[:, 'message'] = chunk['message'].apply(preprocess_message)
                chunk = chunk[chunk['message'].str.len() > 0]

                email_texts = [
                    f"EMAIL {i+1}:\n{msg}\n---"
                    for i, msg in enumerate(chunk['message']) if msg.strip()
                ]

                analysis_prompt = [
                    current_prompt,
                    "Below are the email chunks to analyze:",
                    "\n".join(email_texts)
                ]

                response = model.generate_content(analysis_prompt)

                sample_results.append({
                    'file': 'Sample Analysis',
                    'analysis': response.text,
                    'status': 'success'
                })

            except Exception as e:
                print(f"Failed to process cached content: {str(e)}")
                sample_results.append({
                    'file': 'Error',
                    'error': str(e),
                    'status': 'failed'
                })

        output_data = []
        for result in sample_results:
            row = {
                'File': result['file'],
                'Status': result['status'],
                'Analysis': result.get('analysis', result.get('error', ''))
            }
            output_data.append(row)

        results_df = pd.DataFrame(output_data)

        if keywords:
            for keyword in keywords.split(','):
                keyword = keyword.strip()
                if keyword:
                    results_df['Analysis'] = results_df['Analysis'].str.replace(
                        f'({keyword})',
                        r'**\1**',  # Using markdown-style bold
                        case=False,
                        regex=True
                    )

        return results_df

    except Exception as e:
        error_msg = f"Error: {str(e)}"
        print(error_msg)
        return pd.DataFrame([{
            'File': 'Error',
            'Status': 'failed',
            'Analysis': error_msg
        }])

In [None]:

if __name__ == "__main__":
    # Get custom prompt if desired
    print("\nCurrent system prompt:")
    print(system_instruction)
    custom_prompt = input("\nEnter custom system prompt (or press Enter to use default): ").strip()

    # Get keywords for highlighting
    keywords = input("\nEnter keywords to highlight (comma-separated, or press Enter to skip): ").strip()

    # Run analysis
    results_df = analyze_file(
        file=None,
        case_details="Email Analysis",
        system_prompt=custom_prompt if custom_prompt else None,
        keywords=keywords
    )

    # Display results
    print("\n=== Analysis Results ===\n")
    for _, row in results_df.iterrows():
        print(f"File: {row['File']}")
        print(f"Status: {row['Status']}")
        print("Analysis:")
        print(row['Analysis'])
        print("\n" + "="*50 + "\n")


Current system prompt:

You are an expert fraud analyst.
Your task is to analyze the provided email chunks
and identify potential evidence of fraudulent activities.
Focus on:
1. Financial irregularities
2. Suspicious transactions
3. Attempts to conceal information
4. Unusual communication patterns


Enter custom system prompt (or press Enter to use default): look for frogs, bogs, dogs, nogs, bogs, logs or any other ogs.

Enter keywords to highlight (comma-separated, or press Enter to skip): 
Using default email dataset from: /root/.cache/kagglehub/datasets/wcukierski/enron-email-dataset/versions/2/emails.csv
System instruction uses 56 tokens

Processing chunk 1/3
Successfully created cache 1 with 52961 tokens (Total: 52961)

Processing chunk 2/3
Successfully created cache 2 with 40712 tokens (Total: 93673)

Processing chunk 3/3
Successfully created cache 3 with 42222 tokens (Total: 135895)

Successfully processed 3 chunks (Total tokens: 135895)

=== Analysis Results ===

File: Sample 

In [None]:
#                    "\nPlease analyze these emails for signs of fraud, focusing on:",
#                    "1. Financial irregularities",
#                    "2. Suspicious transactions",
#                    "3. Attempts to conceal information",
#                    "4. Unusual communication patterns",
#                    "\nProvide a detailed analysis of any suspicious patterns or potential fraud indicators found."