# Epstein Estate Documents - Email Processing

This notebook is a data-science pipeline processing <strong>23,124 images</strong> and thousands of OCR text files released by the <strong>U.S. House Committee on Oversight and Government Reform</strong> in 2025. Using a combination of OCR cleanup, vision-LLM extraction, name normalization, timestamp reconstruction, and thread mapping, these public images were transformed into a clean, coherent <strong>SQLite message database</strong>. The resulting db file can be used in the GUI phone-like viewer. It supposes you download the source files:
[U.S. House Oversight Committee - November 12, 2025](https://oversight.house.gov/release/oversight-committee-releases-additional-epstein-estate-documents/) and set up the .env file (example is provided). The file 'emails_list.csv' is also taken from these source files.


## Setup and Configuration

In [1]:
import os
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv
from openai import OpenAI
import sqlite3
import json
import base64
from PIL import Image
from io import BytesIO
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from tqdm import tqdm

load_dotenv()

OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
OPENAI_API_URL = os.getenv('OPENAI_API_URL')
OPENAI_API_MODEL = os.getenv('OPENAI_API_MODEL')
DATASET_ROOT = Path(os.getenv('DATASET_ROOT_PATH'))

print(f"Dataset: {DATASET_ROOT}")
print(f"Model: {OPENAI_API_MODEL}")

Dataset: C:\Projects\datasets\Epstein Estate Documents - Seventh Production
Model: Qwen/Qwen2.5-VL-72B-Instruct


In [2]:
client = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_API_URL)
DB_PATH = 'emails.db'
db_lock = Lock()  # Thread-safe database access
print("Client initialized")

Client initialized


In [3]:
# Delete database if it still exists from a previous run
import os
if os.path.exists(DB_PATH):
    os.remove(DB_PATH)
    print("Database deleted")

## Database Setup

In [4]:
def init_database():
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS email_documents (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        filename TEXT NOT NULL UNIQUE,
        subject TEXT,
        num_pages INTEGER,
        file_size_kb REAL,
        processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        processing_status TEXT DEFAULT 'pending',
        error_message TEXT
    )
    ''')
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS messages (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        email_document_id INTEGER NOT NULL,
        source_filename TEXT,
        subject TEXT,
        message_order INTEGER NOT NULL,
        from_address TEXT,
        to_address TEXT,
        other_recipients TEXT,
        timestamp_raw TEXT,
        timestamp_iso TEXT,
        message_html TEXT,
        document_id TEXT,
        FOREIGN KEY (email_document_id) REFERENCES email_documents(id)
    )
    ''')
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages(timestamp_iso)')
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_messages_filename ON messages(source_filename)')
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_messages_subject ON messages(subject)')
    conn.commit()
    conn.close()
    print(f"Database initialized: {DB_PATH}")

init_database()

Database initialized: emails.db


## Helper Functions

In [5]:
def get_text_file_path(filename):
    """Find TEXT file in subdirectories"""
    text_dir = DATASET_ROOT / 'TEXT'
    pattern = f"**/{filename}"
    matching_files = list(text_dir.glob(pattern))
    if matching_files:
        return matching_files[0]
    return None

def get_image_paths(page_number, num_pages):
    images_dir = DATASET_ROOT / 'IMAGES'
    image_paths = []
    for i in range(num_pages):
        page_num = page_number + i
        pattern = f"**/HOUSE_OVERSIGHT_{page_num:06d}.jpg"
        matching_files = list(images_dir.glob(pattern))
        if matching_files:
            image_paths.append(matching_files[0])
    return image_paths

def resize_and_encode_image(image_path, target_height=800):
    try:
        img = Image.open(image_path)
        aspect_ratio = img.width / img.height
        new_width = int(aspect_ratio * target_height)
        img_resized = img.resize((new_width, target_height), Image.Resampling.LANCZOS)
        buffer = BytesIO()
        img_resized.save(buffer, format='JPEG', quality=85)
        return base64.b64encode(buffer.getvalue()).decode('utf-8')
    except Exception as e:
        print(f"Error: {e}")
        return None

print("Helper functions defined")

Helper functions defined


In [6]:
def clean_and_extract_json(response_text):
    """Robustly extract and clean JSON from LLM response"""
    try:
        # Remove code block markers if present
        if response_text.startswith('```'):
            lines = response_text.split('\n')
            # Find the start and end of the JSON block
            start_idx = 1 if lines[0].strip() == '```' else 0
            if lines[start_idx].strip().lower() == 'json':
                start_idx += 1
            end_idx = len(lines) - 1
            if lines[end_idx].strip() == '```':
                end_idx -= 1
            response_text = '\n'.join(lines[start_idx:end_idx+1])
        
        # Remove leading "json" if present
        response_text = response_text.strip()
        if response_text.lower().startswith('json'):
            response_text = response_text[4:].strip()
        
        # Try parsing as-is first
        try:
            return json.loads(response_text)
        except json.JSONDecodeError:
            pass
        
        # Find the main JSON object using regex
        import re
        json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
        if not json_match:
            raise ValueError("No JSON object found in response")
        
        json_str = json_match.group()
        
        # Try parsing the extracted JSON
        try:
            return json.loads(json_str)
        except json.JSONDecodeError as e:
            # If it still fails, try to fix common issues
            # Fix unescaped quotes in string values
            fixed_json = re.sub(r'(?<!\\)"(?=[^,}\]:]*[^"\\]")', r'\\"', json_str)
            
            # Try one more time
            try:
                return json.loads(fixed_json)
            except json.JSONDecodeError:
                # Last resort: try to extract individual components
                return extract_json_components(json_str)
                
    except Exception as e:
        raise ValueError(f"Failed to parse JSON from this raw output: {response_text}")

def extract_json_components(json_str):
    """Extract JSON components manually as fallback"""
    import re
    
    # Extract subject (using raw string)
    subject_match = re.search(r'"subject":\s*"([^"]*)"', json_str)
    subject = subject_match.group(1) if subject_match else ""
    
    # Extract messages array - simplified extraction
    messages = []
    message_blocks = re.findall(r'\{[^{}]*"from"[^{}]*\}', json_str, re.DOTALL)
    
    for block in message_blocks:
        message = {}
        
        # Extract each field with regex (using raw strings with rf prefix for f-strings)
        for field in ['from', 'to', 'timestamp_raw', 'timestamp_iso', 'document_id']:
            match = re.search(rf'"{field}":\s*"([^"]*)"', block)
            message[field] = match.group(1) if match else ""
        
        # Handle message_html separately (might contain quotes)
        html_match = re.search(r'"message_html":\s*"(.*?)"(?=\s*[,}])', block, re.DOTALL)
        message['message_html'] = html_match.group(1) if html_match else ""
        
        # Handle other_recipients array
        recipients_match = re.search(r'"other_recipients":\s*\[(.*?)\]', block)
        if recipients_match:
            recipients_str = recipients_match.group(1)
            message['other_recipients'] = re.findall(r'"([^"]*)"', recipients_str)
        else:
            message['other_recipients'] = []
            
        messages.append(message)
    
    return {
        "subject": subject,
        "messages": messages
    }


print("JSON cleaning functions defined")


JSON cleaning functions defined


In [7]:
import re
import json
from typing import Optional, Dict, Any

def clean_and_extract_json_qwen3(raw_text: str) -> Optional[Dict[str, Any]]:
    """
    Attempts to clean non-JSON artifacts (like markdown fences or preamble text)
    from the LLM's raw response and load the JSON object.
    
    Args:
        raw_text: The complete string response from the LLM.
    
    Returns:
        The parsed Python dictionary, or None if parsing fails.
    """
    
    # 1. Remove markdown code fences and unnecessary backticks
    # This targets "```json ... ```" or "``` ... ```"
    text = re.sub(r'```json\s*|```\s*', '', raw_text, flags=re.IGNORECASE)
    
    # 2. Trim leading/trailing whitespace and non-JSON characters
    # Look for the first '{' and the last '}'
    start_match = re.search(r'^\s*\{', text, re.DOTALL)
    end_match = re.search(r'\}\s*$', text, re.DOTALL)

    if start_match and end_match:
        # Get the clean content between the first '{' and the last '}'
        start_index = start_match.start() + start_match.group().find('{')
        end_index = end_match.end() - end_match.group()[::-1].find('}') # Finds the last '}'
        
        cleaned_json_string = text[start_index : end_index + 1].strip()
        
        try:
            # 3. Attempt to load the cleaned string
            return json.loads(cleaned_json_string)
        except json.JSONDecodeError:
            # If the JSON structure itself is invalid (e.g., missing comma)
            return None
    
    # If no valid JSON structure ({} block) was found
    return None

In [8]:
# Helper function to clean HTML for comparison
def clean_html_for_comparison(html_text):
    """Remove HTML tags and normalize whitespace for comparison"""
    # Remove HTML tags
    text = re.sub(r'<[^>]+>', '', html_text)
    # Normalize whitespace (collapse multiple spaces/newlines into single spaces)
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

print("HTML cleaning function defined")

HTML cleaning function defined


## Prompt Template

In [9]:
EXTRACTION_PROMPT = """# TASK
Extract all single messages from this email thread in JSON format:
{{
  "subject": "Email subject",
  "messages": [{{
    "from": "Sender",
    "to": "Recipient",
    "other_recipients": ["Others except Jeffrey Epstein/jeevacation@gmail.com"],
    "timestamp_raw": "As shown",
    "timestamp_iso": "yyyymmddhhmmss format",
    "message_html": "complete HTML content of this message excluding: quoted text or signatures or disclaimers",
    "document_id": "HOUSE_OVERSIGHT_XXXXXX from bottom page corner"
  }}]
}}
Special instructions:
* If the message is quoted text, the recipient is the one receiving the quoted text, which is the message above.
* Return only valid JSON.
* document_id must be extracted from the bottom corner of the page image. Replace the X's with the actual number.
* the "from or "to" can be partial names
* if the "from or "to" are completely missing you must return <REDACTED> instead
# Here is raw text from the email thread by OCR:
{ocr_text}
# Here are the images of the same email thread:
"""
print("Prompt defined")

Prompt defined


let's try a more advanced prompt

In [10]:
EXTRACTION_PROMPT = """
# INSTRUCTIONS FOR DATA EXTRACTION
You are a highly accurate email forensics expert. Your task is to analyze the provided OCR text and image content of an email thread and extract every distinct, non-quoted message into a single JSON object.

## JSON SCHEMA REQUIREMENT
You MUST return ONLY a single, valid JSON object following this strict schema. Do NOT include any text, markdown fences (```json), or explanation outside of the JSON object.

{{"subject":"Email subject of the thread","messages":[{{"from":"Sender Name/Email (Use <REDACTED> if missing)","to":"Primary Recipient Name/Email (Use <REDACTED> if missing)","other_recipients":["List of CC/BCC names (Exclude Jeffrey Epstein/jeevacation@gmail.com)"],"timestamp_raw":"Date and Time exactly as shown in the header","timestamp_iso":"Convert timestamp strictly to YYYYMMDDHHMMSS format. If time is missing, use 000000.","message_html":"The complete HTML content of this single, distinct message body. Strip all quoted replies, signatures, and disclaimers.","document_id":"Extract the page ID where the message is found (format: HOUSE_OVERSIGHT_XXXXXX)."}}]}}

## EXTRACTION RULES
1.  **Thread Unrolling:** Treat each distinct email reply as a separate object in the 'messages' array. Messages should be ordered chronologically (oldest to newest).
2.  **Multimodality:** Use the **OCR Text** for the body content and the **Images** for layout verification and locating the **Document ID** in the footer.
3.  **Redaction:** If the 'from' or 'to' fields are incomplete or missing, return <REDACTED>.
4.  **Output Format:** **STRICTLY RETURN ONLY THE JSON OBJECT.**

# INPUT DATA
## OCR Text:
{ocr_text}
## Images:

"""

## API and Database Functions

In [11]:
def process_email_with_vision(text_content, image_base64_list):
    try:
        prompt = EXTRACTION_PROMPT.format(ocr_text=text_content)
        content = [{"type": "text", "text": prompt}]
        for img_base64 in image_base64_list:
            content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"}})
        response = client.chat.completions.create(
            model=OPENAI_API_MODEL,
            messages=[{"role": "user", "content": content}],
            # temperature=0.02,
            max_tokens=8000,
        )
        #print the complete response for debugging
        # print(response)
        response_text = response.choices[0].message.content.strip()
        
        # Use the robust JSON cleaning function
        parsed_json = clean_and_extract_json_qwen3(response_text)

        # If parsing failed, log the raw response for debugging
        if parsed_json is None:
            print(f"\n{'='*60}")
            print(f"FAILED TO PARSE JSON - RAW RESPONSE:")
            print(f"{'='*60}")
            print(response_text)
            print(f"{'='*60}\n")
            return None, f"JSON parsing failed"
        
        return parsed_json, None
        
    except Exception as e:
        # More detailed error reporting
        error_msg = f"API/Parsing error: {str(e)}"
        if "response_text" in locals():
            error_msg += f" | Response preview: {response_text[:200]}..."
        return None, error_msg

def save_email_to_database(filename, num_pages, file_size_kb, extracted_data):
    """Thread-safe database write function"""
    with db_lock:  # Ensure only one thread writes at a time
        conn = sqlite3.connect(DB_PATH)
        cursor = conn.cursor()
        try:
            subject = extracted_data.get('subject', '')
            cursor.execute('INSERT INTO email_documents (filename, subject, num_pages, file_size_kb, processing_status) VALUES (?, ?, ?, ?, ?)',
                          (filename, subject, num_pages, file_size_kb, 'success'))
            email_doc_id = cursor.lastrowid
            for idx, message in enumerate(extracted_data.get('messages', [])):
                cursor.execute('INSERT INTO messages (email_document_id, source_filename, subject, message_order, from_address, to_address, other_recipients, timestamp_raw, timestamp_iso, message_html, document_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
                              (email_doc_id, filename, subject, idx, message.get('from', ''), message.get('to', ''), json.dumps(message.get('other_recipients', [])),
                               message.get('timestamp_raw', ''), message.get('timestamp_iso', ''), message.get('message_html', ''), message.get('document_id', '')))
            conn.commit()
            return True, None
        except Exception as e:
            conn.rollback()
            return False, str(e)
        finally:
            conn.close()

def mark_email_failed(filename, error_message):
    """Thread-safe failure marking function"""
    with db_lock:
        conn = sqlite3.connect(DB_PATH)
        cursor = conn.cursor()
        cursor.execute('INSERT OR REPLACE INTO email_documents (filename, processing_status, error_message) VALUES (?, ?, ?)',
                      (filename, 'failed', error_message))
        conn.commit()
        conn.close()

print("Functions defined")

Functions defined


In [12]:
## show statistics of emails_df, such as average number of pages, total size, etc.
emails_df = pd.read_csv('emails_list.csv')
print(emails_df.describe())

        page_number    num_pages  file_size_kb
count   2112.000000  2112.000000   2112.000000
mean   29700.258523     2.807292      4.428660
std     3996.217631     5.086675      9.436321
min    11277.000000     1.000000      0.160000
25%    27048.750000     1.000000      1.110000
50%    31003.500000     2.000000      2.275000
75%    32671.250000     3.000000      4.575000
max    33599.000000   149.000000    290.130000


## Main Processing Loop (Parallelized with 20 Threads)

In [12]:
def process_single_email(row):
    """Worker function to process a single email"""
    filename = row['filename']
    result_data = {
        'filename': filename,
        'num_pages': row['num_pages'],
        'success': False,
        'error': None,
        'num_messages': 0,
        'subject': None
    }
    
    try:
        # Find text file
        text_path = get_text_file_path(filename)
        if not text_path:
            raise FileNotFoundError(f"Could not find {filename}")
        
        with open(text_path, 'r', encoding='utf-8', errors='ignore') as f:
            text_content = f.read()
        
        # Get and encode images
        image_paths = get_image_paths(row['page_number'], row['num_pages'])
        image_base64_list = [resize_and_encode_image(p) for p in image_paths if resize_and_encode_image(p)]
        
        # Call API
        api_result, error = process_email_with_vision(text_content, image_base64_list)
        if error:
            result_data['error'] = error
            mark_email_failed(filename, error)
            return result_data

        # Check if parsing completely failed (api_result is None)
        if api_result is None:
            result_data['error'] = "JSON parsing failed - no valid data extracted"
            mark_email_failed(filename, "JSON parsing failed - no valid data extracted")
            return result_data

        # Save to database
        success, db_error = save_email_to_database(filename, row['num_pages'], row['file_size_kb'], api_result)
        if success:
            result_data['success'] = True
            result_data['num_messages'] = len(api_result.get('messages', []))
            result_data['subject'] = api_result.get('subject', 'N/A')[:60]
        else:
            result_data['error'] = f"DB error: {db_error}"
            
    except Exception as e:
        result_data['error'] = str(e)
        mark_email_failed(filename, str(e))
    
    return result_data

# Main processing with ThreadPoolExecutor
emails_df = pd.read_csv('emails_list.csv')
emails_filtered = emails_df[emails_df['num_pages'] <= 5].copy()
emails_to_process = emails_filtered  # emails_filtered.head(5) for testing

print(f"Processing {len(emails_to_process)} emails with 20 parallel threads\n")

# Process emails in parallel with progress bar
with ThreadPoolExecutor(max_workers=20) as executor:
    # Submit all tasks
    futures = {executor.submit(process_single_email, row): idx for idx, row in emails_to_process.iterrows()}
    
    # Process results as they complete with progress bar
    for future in tqdm(as_completed(futures), total=len(futures), desc="Processing emails"):
        result = future.result()
        
        # Optional: print individual results (can be commented out for cleaner output)
        if not result['success']:
            # tqdm.write(f"✓ {result['filename']}: {result['num_messages']} messages - {result['subject']}")
            tqdm.write(f"❌ {result['filename']}: {result['error']}")

print("\nProcessing complete!")

Processing 1951 emails with 20 parallel threads



Processing emails:  30%|██▉       | 577/1951 [20:59<22:50,  1.00it/s]  


FAILED TO PARSE JSON - RAW RESPONSE:
{
    "subject": "time to publish this instead ofthe trite.",
    "messages": [
        {
            "from": "jeffrey E. [jeevacation@gmail.com]",
            "to": "Mortimer Zuckerman",
            "other_recipients": [],
            "timestamp_raw": "2/10/2015 5:22:52 PM",
            "timestamp_iso": "20150210172252",
            "message_html": "<p>Fwd: Statement for Press</p>",
            "document_id": "HOUSE_OVERSIGHT_029225"
        },
        {
            "from": "Carolyn Cohen",
            "to": "Alan Dershowitz",
            "other_recipients": [],
            "timestamp_raw": "February 10, 2015 at 10:40:57 AM EST",
            "timestamp_iso": "20150210104057",
            "message_html": "<p>Alan Dershowitz's Statement: We can now prove that Jane Doe #3 has no credibility and lied about Clinton, the Gores and Dershowitz.</p><p>Dershowitz's lawyers have filed a detailed reply to Jane Doe #3 false charges. In it they definitively res

Processing emails:  37%|███▋      | 713/1951 [26:12<1:22:43,  4.01s/it]


FAILED TO PARSE JSON - RAW RESPONSE:
{
    "subject": "Re: Keating Interview",
    "messages": [
        {
            "from": "jeffrey epstein [jeevacation@gmail.com]",
            "to": "Katherine Keating",
            "other_recipients": [],
            "timestamp_raw": "10/28/2011 4:32:02 PM",
            "timestamp_iso": "20111028163202",
            "message_html": "<p>Are you here</p><p>Sorry for all the typos .Sent from my iPhone</p>",
            "document_id": "HOUSE_OVERSIGHT_029658"
        },
        {
            "from": "<REDACTED>",
            "to": "<REDACTED>",
            "other_recipients": [],
            "timestamp_raw": "Oct 28, 2011, at 12:36 AM",
            "timestamp_iso": "20111028003600",
            "message_html": "<p>Paul Keating explains as never before</p><ul><li>BY:PAUL KELLY, EDITOR-AT-LARGE</li><li>From:The Australian</li><li>October 22, 2011 12:00AM</li></ul><p>http://www.theaustralian.com.au/news/features/paul-keating-explains-as-never-before/st

Processing emails:  84%|████████▍ | 1647/1951 [52:46<06:26,  1.27s/it]  

❌ HOUSE_OVERSIGHT_029773.txt: API/Parsing error: Request timed out.


Processing emails: 100%|██████████| 1951/1951 [58:05<00:00,  1.79s/it]


Processing complete!





## View Results

In [13]:
conn = sqlite3.connect(DB_PATH)
summary = pd.read_sql_query('SELECT processing_status, COUNT(*) as count FROM email_documents GROUP BY processing_status', conn)
print("Processing Summary:")
display(summary)

emails = pd.read_sql_query('SELECT * FROM email_documents WHERE processing_status="success" ORDER BY processed_at DESC', conn)
print(f"\nProcessed Emails ({len(emails)}):")
display(emails[['filename', 'subject', 'num_pages']])
conn.close()

Processing Summary:


Unnamed: 0,processing_status,count
0,failed,3
1,success,1948



Processed Emails (1948):


Unnamed: 0,filename,subject,num_pages
0,HOUSE_OVERSIGHT_033561.txt,"Fwd: Why Palm Beach, Florida Is The 'New Green...",2
1,HOUSE_OVERSIGHT_033498.txt,RE: Jane Doe,5
2,HOUSE_OVERSIGHT_033589.txt,Fwd: Patterson,2
3,HOUSE_OVERSIGHT_033508.txt,RE: Jane Doe,4
4,HOUSE_OVERSIGHT_033596.txt,"Gulfstream V - offmarket, sleeper aircraft",3
...,...,...,...
1943,HOUSE_OVERSIGHT_016692.txt,Russian House,1
1944,HOUSE_OVERSIGHT_011907.txt,RE:,1
1945,HOUSE_OVERSIGHT_014516.txt,Fwd: 2016 Election: Tax Changes Expected,2
1946,HOUSE_OVERSIGHT_012898.txt,Farmer Jaffe is suing Donald Trump!,1


Qwen 2.5 VL 72B only one failed
Qwen 3 VL qwen3-vl-30b-a3b-instruct

In [14]:
# count all messages in the database where from_address or to_address contains 'REDACTED'
conn = sqlite3.connect('emails.db')
cursor = conn.cursor()
cursor.execute('''SELECT COUNT(*) FROM messages WHERE from_address LIKE '%REDACTED%' OR to_address LIKE '%REDACTED%' ''')
count = cursor.fetchone()[0]
print(f"Total messages with 'REDACTED' in from_address or to_address: {count}")
# print total number of messages in the database
cursor.execute('''SELECT COUNT(*) FROM messages''')
total_count = cursor.fetchone()[0]
print(f"Total messages in the database: {total_count}")
conn.close()

Total messages with 'REDACTED' in from_address or to_address: 4164
Total messages in the database: 6577


20251121

with qwen 2.5 VL 72B:
5 failed messages
Total messages with 'REDACTED' in from_address or to_address: 1167
Total messages in the database: 4377

20251123

with qwen 2.5 VL 72B:

1 failed messages
Total messages with 'REDACTED' in from_address or to_address: 1895
Total messages in the database: 6962

with x-ai/grok-4.1-fast: 
30% failed messages -> Failed to parse JSON -> but it seems it is returning much less <REDACTED>

20251125

Qwen 3 VL qwen3-vl-30b-a3b-instruct: 
72 failed messages
Total messages with 'REDACTED' in from_address or to_address: 648
Total messages in the database: 5463
with qwen 2.5 VL 72B:
Total messages with 'REDACTED' in from_address or to_address: 4164
Total messages in the database: 6577

## Analysis of messages before cleanup

### display current state of unique addresses across 'from' and 'to'

In [15]:
# names can be spelled differently, let's try to make some order in this mess
# find all unique from addresses 
conn = sqlite3.connect(DB_PATH) 
query = '''
SELECT from_address, COUNT(*) as count
FROM messages
GROUP BY from_address
ORDER BY count DESC
'''
cursor = conn.cursor()
cursor.execute(query)
results_from = cursor.fetchall()
conn.close()
# find all unique to addresses 
conn = sqlite3.connect(DB_PATH) 
query = '''
SELECT to_address, COUNT(*) as count
FROM messages
GROUP BY to_address
ORDER BY count DESC
'''
cursor = conn.cursor()
cursor.execute(query)
results_to = cursor.fetchall()
conn.close()
# add both from and to addresses together and display top 100
from collections import defaultdict
address_counts = defaultdict(int)
for address, count in results_from:
    address_counts[address] += count
for address, count in results_to:
    address_counts[address] += count   
results = sorted(address_counts.items(), key=lambda x: x[1], reverse=True) 
# Display the top results
print("\nTop Email Addresses:")
for address, count in results[:100]:
    print(f"{address}: {count}")



Top Email Addresses:
<REDACTED>: 4537
jeffrey E. <jeevacation@gmail.com>: 1047
jeffrey E. [jeeyacation@gmail.com]: 565
jeffrey E. [jeevacation@gmail.com]: 544
Weingarten, Reid: 374
Michael Wolff: 265
Thomas Jr., Landon: 224
Jeffrey E.: 208
Kathy Ruemmler: 192
Jeffrey Epstein [jeevacation@gmail.com]: 158
Steve Bannon: 156
Nicholas Ribis: 152
J <jeevacation@gmail.com>: 145
Lawrence Krauss: 144
Jeffrey Epstein [jeeyacation@gmail.com]: 137
jeevacation@gmail.com: 136
Richard Kahn: 127
Jeffrey Epstein <jeevacation@gmail.com>: 120
jeffrey E.: 120
Larry Summers: 98
Jessica Cadwell: 87
Darren Indyke: 80
Jacquie Johnson: 73
J [jeevacation@gmail.com]: 67
jeffrey E. [mailto:jeevacation@gmail.com]: 66
Jeffrey E. <jeevacation@gmail.com>: 66
Jeffrey E. [jeeyacation@gmail.com]: 66
Jeffrey E. [jeevacation@gmail.com]: 62
Boris Nikolic: 55
Jeffrey Epstein: 52
Larry Visoski: 51
Lisa New: 50
Rebecca Watson: 49
Robert Kuhn: 40
Joi Ito: 40
Etienne Binant: 40
LHS: 39
Pritzker, Tom: 36
Ken Starr: 35
Martin G.

In [34]:
# We replace all From and To values to the value to 'Jeffrey Epstein', if it contains these strings (Consider this in lowercase)
# jeffrey e
# jeffrey epstein
# jeevacation
# jeeyacation
# also replace when value is just 'J' and message order > 0 (not first in thread) (confer 031103)

conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()

# Update from_address
update_from_query = '''
UPDATE messages
SET from_address = 'Jeffrey Epstein'
WHERE LOWER(from_address) LIKE '%jeffrey e%'
   OR LOWER(from_address) LIKE '%jeffrey epstein%'
   OR LOWER(from_address) LIKE '%jeevacation%'
   OR LOWER(from_address) LIKE '%jeeyacation%'
   OR (from_address = 'J' AND message_order > 0)
'''
cursor.execute(update_from_query)

# Update to_address
update_to_query = '''
UPDATE messages
SET to_address = 'Jeffrey Epstein'
WHERE LOWER(to_address) LIKE '%jeffrey e%'
   OR LOWER(to_address) LIKE '%jeffrey epstein%'
   OR LOWER(to_address) LIKE '%jeevacation%'
   OR LOWER(to_address) LIKE '%jeeyacation%'
   OR LOWER(to_address) LIKE '%jeffreyepstein%'
   OR (to_address = 'J' AND message_order > 0)
'''
cursor.execute(update_to_query)

conn.commit()
conn.close()


### fix variants in from and to addresses
Analysed common same from/to's using different name formats, initials, or email variants. -> standardize

In [35]:
'''Let's also change all variants to the standardized name:
| Standardized Name     | Variants Detected                                 |
| --------------------- | ------------------------------------------------- |
| **Lawrence Summers**  | Larry Summers; Lawrence Summers; LHS              |
| **Robert Kuhn**       | Robert Kuhn; Robert Lawrence Kuhn; Robert L. Kuhn |
| **Nicholas Ribis**    | Nicholas Ribis; Nicholas Ribi                     |
| **Boris Nikolic**     | Boris Nikolic; Boris Nikolic (bgC3)               |
| **Martin Weinberg**   | Martin Weinberg; Martin G. Weinberg               |
| **Ken Starr**         | Ken Starr; Starr, Ken                             |
| **Ehud Barak**        | Ehud Barak; ehud barak                            |
| **Landon Thomas Jr.** | Thomas Jr., Landon; Landon Thomas Jr.             |
'''
conn = sqlite3.connect(DB_PATH)
standardizations = {
    'Larry Summers': 'Lawrence Summers',
    'Lawrence Summers': 'Lawrence Summers',
    'LHS': 'Lawrence Summers',
    'Robert Kuhn': 'Robert Kuhn',
    'Robert Lawrence Kuhn': 'Robert Kuhn',
    'Robert L. Kuhn': 'Robert Kuhn',
    'Nicholas Ribis': 'Nicholas Ribis',
    'Nicholas Ribi': 'Nicholas Ribis',
    'Boris Nikolic': 'Boris Nikolic',
    'Boris Nikolic (bgC3)': 'Boris Nikolic',
    'Martin Weinberg': 'Martin Weinberg',
    'Martin G. Weinberg': 'Martin Weinberg',
    'Ken Starr': 'Ken Starr',
    'Starr, Ken': 'Ken Starr',
    'Ehud Barak': 'Ehud Barak',
    'ehud barak': 'Ehud Barak',
    'Thomas Jr., Landon': 'Landon Thomas Jr.',
    'Landon Thomas Jr.': 'Landon Thomas Jr.'
}

for variant, standard in standardizations.items():
    update_query = '''
    UPDATE messages
    SET from_address = ?
    WHERE from_address = ?
    '''
    cursor = conn.cursor()
    cursor.execute(update_query, (standard, variant))
conn.commit()
conn.close()

In [36]:
'''Let's also change all variants to the standardized name:
| Standardized Name     | Variants Detected                                                                         |
| --------------------- | ----------------------------------------------------------------------------------------- |
| **Lawrence Summers**  | Larry Summers, Lawrence H. Summers, Summers, Lawrence H., Lawrence Summers; LHS           |
| **Robert Kuhn**       | Robert Kuhn, Robert Lawrence Kuhn                                                         |
| **Martin Weinberg**   | Martin Weinberg, Martin G. Weinberg, 'Martin G. Weinberg'                                 |
| **Ken Starr**         | Ken Starr, Starr, Ken                                                                     |
| **Landon Thomas Jr.** | Thomas Jr., Landon, Landon Thomas Jr., Landon Thomas                                      |
| **Ehud Barak**        | ehud barak, ehbarak                                                                       |
| **Anas Alrasheed**    | Anas Alrasheed, anasalrasheed                                                             |
| **Lisa New**          | Lisa New, Elisa New                                                                       |
| **Steve Bannon**      | Steve Bannon, Steve Bannon                                                                |
| **Boris Nikolic**     | Boris Nikolic, Boris Nikolic (bgC3)                                                       |

'''
conn = sqlite3.connect(DB_PATH)
standardizations = {
    'Larry Summers': 'Lawrence Summers',
    'Lawrence H. Summers': 'Lawrence Summers',
    'LHS': 'Lawrence Summers',
    'Summers': 'Lawrence Summers',
    'Lawrence H.': 'Lawrence Summers',
    'Robert Kuhn': 'Robert Kuhn',
    'Robert Lawrence Kuhn': 'Robert Kuhn',
    'Martin Weinberg': 'Martin Weinberg',
    'Martin G. Weinberg': 'Martin Weinberg',
    'Martin G. Weinberg': 'Martin Weinberg',
    'Ken Starr': 'Ken Starr',
    'Starr': 'Ken Starr',
    'Ken': 'Ken Starr',
    'Thomas Jr., Landon': 'Landon Thomas Jr.',
    'Landon': 'Landon Thomas Jr.',
    'Landon Thomas Jr.': 'Landon Thomas Jr.',
    'ehud barak': 'Ehud Barak',
    'ehbarak': 'Ehud Barak',
    'Anas Alrasheed': 'Anas Alrasheed',
    'anasalrasheed': 'Anas Alrasheed',
    'Lisa New': 'Lisa New',
    'Elisa New': 'Lisa New',
    'Steve Bannon': 'Steve Bannon',
    'Boris Nikolic': 'Boris Nikolic',
    'Boris Nikolic (bgC3)': 'Boris Nikolic'
}

for variant, standard in standardizations.items():
    update_query = '''
    UPDATE messages
    SET to_address = ?
    WHERE to_address = ?
    '''
    cursor = conn.cursor()
    cursor.execute(update_query, (standard, variant))
conn.commit()
conn.close()

### fix REDACTED in from or to addresses

we have a lot of redacted from or to
what is going on here?
eg: 019343, the prompt is bad, it chooses <REDACTED> over the name that *is* there, bummer -> rerun with VLM?
i changed the prompt, right now we have 2517 lines that contain redacted :(
changed prompt: now 2073 lines with redacted, but also 5 txt files failed because of parsing error



In [None]:
# find messages where from_address is <REDACTED> and where message_order = 0 or not
conn = sqlite3.connect(DB_PATH)
query = '''
SELECT *
FROM messages
WHERE from_address = '<REDACTED>' AND message_order != 0
'''
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
for row in results:
    print(row)
conn.close()
print(len(results))

most redacted messages are where to_address = '<REDACTED>' AND message_order != 0
i checked a few samples form the others cominations and they seem actually redacted
although: 022668 it seems clear, don't know why. Maybe with Qwen3 VL?

Let's make a new function that for each message where either from_address or to_address contains REDACTED (but noth both at the same time), we send it to an LLM to try again

Question: is sometimes the from AND to_address <REDACTED>? -> YES: 46 cases :( but these will also be tried again


In [None]:
# Resolve Redacted Addresses Function with enhanced capability for both-REDACTED messages
def resolve_redacted_addresses():
    """
    Function to resolve redacted from_address or to_address using LLM vision capabilities.
    Processes messages where:
    1. Either from_address OR to_address contains REDACTED (but not both)
    2. Both from_address AND to_address contain REDACTED (allows partial resolution)
    """
    
    # Original prompt template for single redacted addresses
    SINGLE_REDACTED_PROMPT = """From this document (which is an email thread) with the given image and OCR text, I have extracted this message.

The {redacted_field} is unknown. Can you point it to a name or email address?

Message details:
- Subject: {subject}
- Timestamp: {timestamp}
- Current {known_field}: {known_value}
- Message content: {message_content}

Make sure to look at the timestamp and message content to find the right message in the thread!

IMPORTANT!
Only reply with the name or email address. Do not include any explanation or additional text.

Document OCR Text:
{ocr_text}
Document images:
"""

    # New prompt template for both redacted addresses
    BOTH_REDACTED_PROMPT = """From this document (which is an email thread) with the given image and OCR text, I have extracted this message.

Both the sender and recipient addresses are unknown. Can you identify both the FROM address and TO address?

Message details:
- Subject: {subject}
- Timestamp: {timestamp}
- Message content: {message_content}

Make sure to look at the timestamp and message content to find the right message in the thread!

IMPORTANT!
Reply with EXACTLY this format:
FROM: [sender name or email]
TO: [recipient name or email]

If you can only identify one of them, still use the format but put [redacted] for the unknown one.
Do not include any explanation or additional text. Only use the format above.

Document OCR Text:
{ocr_text}
Document images:
"""

    print("Starting redacted address resolution...")
    
    # Connect to database and get messages with redacted addresses
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    # Query for messages with single redacted address (either from OR to, but not both)
    single_redacted_query = '''
    SELECT id, source_filename, subject, from_address, to_address, timestamp_raw, message_html, document_id, 'single' as redaction_type
    FROM messages 
    WHERE (
        (from_address LIKE '%REDACTED%' AND to_address NOT LIKE '%REDACTED%') 
        OR 
        (to_address LIKE '%REDACTED%' AND from_address NOT LIKE '%REDACTED%')
    )
    '''
    
    # Query for messages with both addresses redacted
    both_redacted_query = '''
    SELECT id, source_filename, subject, from_address, to_address, timestamp_raw, message_html, document_id, 'both' as redaction_type
    FROM messages 
    WHERE from_address LIKE '%REDACTED%' AND to_address LIKE '%REDACTED%'
    '''
    
    # Combine both queries
    combined_query = f'''
    {single_redacted_query}
    UNION ALL
    {both_redacted_query}
    ORDER BY source_filename
    '''
    
    cursor.execute(combined_query)
    redacted_messages = cursor.fetchall()
    
    print(f"Found {len(redacted_messages)} messages with redacted addresses")
    
    # Count different types
    single_redacted_count = len([msg for msg in redacted_messages if msg[8] == 'single'])
    both_redacted_count = len([msg for msg in redacted_messages if msg[8] == 'both'])
    
    print(f"  - Single redacted: {single_redacted_count}")
    print(f"  - Both redacted: {both_redacted_count}")
    
    if len(redacted_messages) == 0:
        print("No messages found with redacted addresses.")
        conn.close()
        return []
    
    replacements = []
    processed_count = 0
    
    def is_valid_address(address):
        """Check if an address is valid (not redacted, unknown, etc.)"""
        if not address or len(address) < 2:
            return False
        skip_responses = ['unknown', 'redacted', 'not found', 'unclear', 'cannot determine', 'n/a', '[redacted]']
        return not any(skip_word in address.lower() for skip_word in skip_responses)
    
    for message in tqdm(redacted_messages, desc="Resolving redacted addresses"):
        msg_id, source_filename, subject, from_addr, to_addr, timestamp_raw, message_html, document_id, redaction_type = message
        
        try:
            # Get the OCR text file
            text_path = get_text_file_path(source_filename)
            if not text_path:
                print(f"Warning: Could not find text file for {source_filename}")
                continue
                
            with open(text_path, 'r', encoding='utf-8', errors='ignore') as f:
                ocr_text = f.read()
            
            # Get the document image
            if document_id:
                # Try to find image in data/sources first (already processed)
                source_image_path = Path('data/sources') / f"{Path(source_filename).stem}.jpg"
                if source_image_path.exists():
                    image_base64 = resize_and_encode_image(source_image_path)
                else:
                    # Fall back to searching in DATASET_ROOT/IMAGES
                    images_dir = DATASET_ROOT / 'IMAGES'
                    pattern = f"**/{Path(source_filename).stem}.jpg"
                    matching_files = list(images_dir.glob(pattern))
                    if matching_files:
                        image_base64 = resize_and_encode_image(matching_files[0])
                    else:
                        print(f"Warning: Could not find image for document {source_filename}")
                        continue
            else:
                print(f"Warning: No document_id for message {msg_id}")
                continue
            
            if not image_base64:
                print(f"Warning: Could not encode image for document {document_id}")
                continue
            
            # Clean message HTML for display
            clean_message = clean_html_for_comparison(message_html)
            
            # Handle based on redaction type
            if redaction_type == 'single':
                # Single redacted address - use original logic
                if 'REDACTED' in from_addr:
                    redacted_field = "from_address"
                    known_field = "to_address" 
                    known_value = to_addr
                    redacted_value = from_addr
                else:
                    redacted_field = "to_address"
                    known_field = "from_address"
                    known_value = from_addr
                    redacted_value = to_addr
                
                # Format the single redacted prompt
                prompt = SINGLE_REDACTED_PROMPT.format(
                    redacted_field=redacted_field.replace('_', ' '),
                    subject=subject,
                    timestamp=timestamp_raw,
                    known_field=known_field.replace('_', ' '),
                    known_value=known_value,
                    message_content=clean_message,
                    ocr_text=ocr_text
                )
                
            else:
                # Both addresses redacted - use new logic
                prompt = BOTH_REDACTED_PROMPT.format(
                    subject=subject,
                    timestamp=timestamp_raw,
                    message_content=clean_message,
                    ocr_text=ocr_text
                )
            
            # Call the LLM
            content = [
                {"type": "text", "text": prompt},
                {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}}
            ]
            
            response = client.chat.completions.create(
                model=OPENAI_API_MODEL,
                messages=[{"role": "user", "content": content}],
                temperature=0,
                max_tokens=150  # Slightly more tokens for both-redacted responses
            )
            
            # Process the response based on redaction type
            response_text = response.choices[0].message.content.strip()
            
            if redaction_type == 'single':
                # Single redacted - process as before
                resolved_address = response_text.strip('"\'')
                
                if not is_valid_address(resolved_address):
                    print(f"Skipping message {msg_id}: Invalid response - {resolved_address}")
                    continue
                
                # Update the database
                if redacted_field == "from_address":
                    cursor.execute("UPDATE messages SET from_address = ? WHERE id = ?", (resolved_address, msg_id))
                else:
                    cursor.execute("UPDATE messages SET to_address = ? WHERE id = ?", (resolved_address, msg_id))
                
                # Track the replacement
                replacements.append({
                    'message_id': msg_id,
                    'source_filename': source_filename,
                    'field': redacted_field,
                    'original': redacted_value,
                    'resolved': resolved_address,
                    'timestamp': timestamp_raw,
                    'subject': subject[:50] + "..." if len(subject) > 50 else subject
                })
                
            else:
                # Both redacted - parse the structured response and allow partial updates
                lines = response_text.split('\n')
                from_resolved = None
                to_resolved = None
                
                for line in lines:
                    line = line.strip()
                    if line.upper().startswith('FROM:'):
                        from_resolved = line[5:].strip().strip('"\'')
                    elif line.upper().startswith('TO:'):
                        to_resolved = line[3:].strip().strip('"\'')
                
                # Check which addresses are valid
                valid_from = is_valid_address(from_resolved)
                valid_to = is_valid_address(to_resolved)
                
                # Skip only if neither address could be resolved
                if not valid_from and not valid_to:
                    print(f"Skipping message {msg_id}: Could not resolve any addresses - FROM: '{from_resolved}', TO: '{to_resolved}'")
                    continue
                
                # Update whatever we can resolve
                updates_made = []
                
                if valid_from:
                    cursor.execute("UPDATE messages SET from_address = ? WHERE id = ?", (from_resolved, msg_id))
                    replacements.append({
                        'message_id': msg_id,
                        'source_filename': source_filename,
                        'field': 'from_address',
                        'original': from_addr,
                        'resolved': from_resolved,
                        'timestamp': timestamp_raw,
                        'subject': subject[:50] + "..." if len(subject) > 50 else subject
                    })
                    updates_made.append(f"FROM: {from_resolved}")
                
                if valid_to:
                    cursor.execute("UPDATE messages SET to_address = ? WHERE id = ?", (to_resolved, msg_id))
                    replacements.append({
                        'message_id': msg_id,
                        'source_filename': source_filename,
                        'field': 'to_address',
                        'original': to_addr,
                        'resolved': to_resolved,
                        'timestamp': timestamp_raw,
                        'subject': subject[:50] + "..." if len(subject) > 50 else subject
                    })
                    updates_made.append(f"TO: {to_resolved}")
                
                # Log what was updated
                if len(updates_made) == 1:
                    print(f"Partial resolution for message {msg_id}: {updates_made[0]} (other field remains REDACTED)")
            
            processed_count += 1
            
            # Print progress every 100 successful resolutions
            if processed_count % 100 == 0:
                print(f"Successfully processed {processed_count} messages...")
                
        except Exception as e:
            print(f"Error processing message {msg_id}: {str(e)}")
            continue
    
    # Commit all changes
    conn.commit()
    conn.close()
    
    print(f"\n{'='*60}")
    print(f"REDACTED ADDRESS RESOLUTION COMPLETE")
    print(f"{'='*60}")
    print(f"Total messages processed: {len(redacted_messages)}")
    print(f"  - Single redacted: {single_redacted_count}")
    print(f"  - Both redacted: {both_redacted_count}")
    print(f"Successfully processed: {processed_count}")
    print(f"Success rate: {(processed_count/len(redacted_messages)*100):.1f}%")
    
    if replacements:
        print(f"\nREPLACEMENT SUMMARY:")
        print(f"{'='*60}")
        
        # Group by field type
        from_replacements = [r for r in replacements if r['field'] == 'from_address']
        to_replacements = [r for r in replacements if r['field'] == 'to_address']
        
        print(f"From address resolutions: {len(from_replacements)}")
        print(f"To address resolutions: {len(to_replacements)}")
        
        print(f"\nDETAILED REPLACEMENTS:")
        print(f"{'ID':<8} {'Field':<12} {'Original':<15} {'Resolved':<25} {'Source':<15}")
        print(f"{'-'*80}")
        
        for replacement in replacements[:50]:  # Show first 50
            print(f"{replacement['message_id']:<8} {replacement['field']:<12} {replacement['original']:<15} {replacement['resolved']:<25} {replacement['source_filename'][:12]:<15}")
        
        if len(replacements) > 50:
            print(f"... and {len(replacements) - 50} more replacements")
        
        # Show most common resolved addresses
        resolved_addresses = [r['resolved'] for r in replacements]
        from collections import Counter
        most_common = Counter(resolved_addresses).most_common(30)
        
        print(f"\nMOST COMMON RESOLVED ADDRESSES:")
        print(f"{'='*40}")
        for address, count in most_common:
            print(f"{address}: {count} times")
    
    return replacements

# Execute the function
print("Ready to execute resolve_redacted_addresses() with partial resolution support")
replacements = resolve_redacted_addresses()


weird: HOUSE_OVERSIGHT_022770 is a tif file, not jpg. Todo: are there more other formats?

Problem: 79 messages have HOUSE_OVERSIGHT_XXXXXX as their document_id, which means they will not have a link to their image...
we could do another step here to send those again to a LLM
or default them to the first page of the thread

### fix timestamps

In [23]:
# i see about 10 out of 7000 messages that don't have a timestamp_iso or they are '00000000000000', they are however always message_order > 0
# let's just take the timestamp_iso of the first message in the thread (message_order = 0) for that source_filename and use that
# time minus 30 seconds
conn = sqlite3.connect(DB_PATH)
query = '''
UPDATE messages AS m1
SET timestamp_iso = (
    SELECT 
        strftime('%Y%m%d%H%M%S', 
            datetime(
                substr(m2.timestamp_iso, 1, 4) || '-' ||
                substr(m2.timestamp_iso, 5, 2) || '-' ||
                substr(m2.timestamp_iso, 7, 2) || ' ' ||
                substr(m2.timestamp_iso, 9, 2) || ':' ||
                substr(m2.timestamp_iso, 11, 2) || ':' ||
                substr(m2.timestamp_iso, 13, 2),
                '-30 seconds'
            )
        )
    FROM messages AS m2
    WHERE m2.source_filename = m1.source_filename 
      AND m2.message_order = 0
      AND m2.timestamp_iso IS NOT NULL
      AND m2.timestamp_iso != ''
    LIMIT 1
)
WHERE m1.timestamp_iso IS NULL OR m1.timestamp_iso = '' OR m1.timestamp_iso = '00000000000000'
'''
cursor = conn.cursor()
cursor.execute(query)
rows_updated = cursor.rowcount
conn.commit()
conn.close()
print(f"Updated {rows_updated} messages with corrected timestamps")


Updated 12 messages with corrected timestamps


In [37]:
# count duplicate messages: improved version with HTML cleaning and adaptive thresholds
import difflib
from datetime import datetime
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
query = '''
SELECT m1.id, m1.source_filename, m1.from_address, m1.to_address, m1.timestamp_iso, m1.message_html,
       m2.id, m2.source_filename, m2.from_address, m2.to_address, m2.timestamp_iso, m2.message_html
FROM messages m1
JOIN messages m2 ON m1.from_address = m2.from_address AND m1.to_address = m2.to_address AND m1.id < m2.id
'''
cursor.execute(query)
rows = cursor.fetchall()
duplicate_count = 0
for row in rows:
    msg1_html = row[5]
    msg2_html = row[11]
    time1_str = row[4]
    time2_str = row[10]
    
    # Parse timestamps properly
    try:
        time1 = datetime.strptime(time1_str, '%Y%m%d%H%M%S')
        time2 = datetime.strptime(time2_str, '%Y%m%d%H%M%S')
    except:
        print(f"Invalid timestamp format for message IDs {row[0]} and {row[6]}: '{time1_str}' or '{time2_str}'")
        continue  # Skip if timestamp format is invalid
    
    # Calculate time difference in minutes
    time_diff_minutes = abs((time2 - time1).total_seconds()) / 60
    
    if time_diff_minutes <= 5:
        # Clean HTML tags and normalize whitespace before comparison
        cleaned_msg1 = clean_html_for_comparison(msg1_html)
        cleaned_msg2 = clean_html_for_comparison(msg2_html)
        
        # Determine adaptive threshold based on message length
        cleaned_len = len(cleaned_msg1)
        if cleaned_len < 100:
            threshold = 0.90  # More lenient for short messages
        elif cleaned_len < 500:
            threshold = 0.95
        else:
            threshold = 0.98  # Stricter for longer messages
        
        # Check similarity on cleaned text
        similarity = difflib.SequenceMatcher(None, cleaned_msg1, cleaned_msg2).ratio()
        if similarity >= threshold:
            duplicate_count += 1
            # print(f"Duplicate found between message ID {row[0]} ({row[1]}) and message ID {row[6]} ({row[7]}) with similarity {similarity:.2f} (threshold: {threshold})")

print(f"Total duplicate messages found: {duplicate_count}")
conn.close()    


Total duplicate messages found: 385


### remove duplicate messages

there seem to be a lot of duplicate messages 2465 out of 6822
eg 12898 & 33575 -> same doc but different (redactions are different)
eg 019334 & 019348 -> replies at different times in a thread, causing forks -> messages are the same
This leads me to believe we can delete all duplicate messages
But which ones? The ones with the highest message order? But what if they have the same message order?
Maybe it doesn't matter.
"LMAO<br>Perfect mnuchin hit" and "<p>LMAO</p><p>Perfect mnuchin hit</p>" are not considered the same -> lets delete all html before comparing
1162 duplicates before, now : 921 -> This is actually more accurate! We're now finding true duplicates rather than messages that just had similar HTML structure. If we extend the time from 5mins to 24h, it jumps to 1641

In [38]:
# remove duplicate messages: improved version with HTML cleaning and adaptive thresholds
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
query = '''
SELECT m1.id, m1.source_filename, m1.from_address, m1.to_address, m1.timestamp_iso, m1.message_html,
       m2.id, m2.source_filename, m2.from_address, m2.to_address, m2.timestamp_iso, m2.message_html
FROM messages m1
JOIN messages m2 ON m1.from_address = m2.from_address AND m1.to_address = m2.to_address AND m1.id < m2.id
'''
cursor.execute(query)
rows = cursor.fetchall()
ids_to_delete = set()
for row in rows:
    msg1_html = row[5]
    msg2_html = row[11]
    time1_str = row[4]
    time2_str = row[10]
    
    # Parse timestamps properly
    try:
        time1 = datetime.strptime(time1_str, '%Y%m%d%H%M%S')
        time2 = datetime.strptime(time2_str, '%Y%m%d%H%M%S')
    except:
        print(f"Invalid timestamp format for message IDs {row[0]} and {row[6]}: '{time1_str}' or '{time2_str}'")
        continue  # Skip if timestamp format is invalid
    
    # Calculate time difference in minutes
    time_diff_minutes = abs((time2 - time1).total_seconds()) / 60
    
    if time_diff_minutes <= 1450:
        # Clean HTML tags and normalize whitespace before comparison
        cleaned_msg1 = clean_html_for_comparison(msg1_html)
        cleaned_msg2 = clean_html_for_comparison(msg2_html)
        
        # Determine adaptive threshold based on message length
        cleaned_len = len(cleaned_msg1)
        if cleaned_len < 100:
            threshold = 0.90  # More lenient for short messages
        elif cleaned_len < 500:
            threshold = 0.95
        else:
            threshold = 0.98  # Stricter for longer messages
        
        # Check similarity on cleaned text
        similarity = difflib.SequenceMatcher(None, cleaned_msg1, cleaned_msg2).ratio()
        if similarity >= threshold:
            ids_to_delete.add(row[6])  # delete the second message
            print(f"Marking message ID {row[6]} for deletion (duplicate of ID {row[0]}) with similarity {similarity:.2f} (threshold: {threshold})")

# Delete identified duplicates
if ids_to_delete:
    delete_query = f'''
    DELETE FROM messages
    WHERE id IN ({','.join(['?']*len(ids_to_delete))})
    '''
    cursor.execute(delete_query, tuple(ids_to_delete))
    conn.commit()
    print(f"Deleted {len(ids_to_delete)} duplicate messages.")
else:
    print("No duplicates found to delete.")

conn.close()

# print remaining number of messages
conn = sqlite3.connect(DB_PATH)
query = '''
SELECT *    
FROM messages
'''
cursor = conn.cursor()
cursor.execute(query)
remaining_messages = cursor.fetchall()
print(f"Remaining messages: {len(remaining_messages)}")
conn.close()

Marking message ID 143 for deletion (duplicate of ID 58) with similarity 1.00 (threshold: 0.9)
Marking message ID 108 for deletion (duplicate of ID 59) with similarity 1.00 (threshold: 0.9)
Marking message ID 105 for deletion (duplicate of ID 72) with similarity 1.00 (threshold: 0.95)
Marking message ID 141 for deletion (duplicate of ID 88) with similarity 1.00 (threshold: 0.9)
Marking message ID 201 for deletion (duplicate of ID 145) with similarity 0.95 (threshold: 0.9)
Marking message ID 359 for deletion (duplicate of ID 163) with similarity 1.00 (threshold: 0.98)
Marking message ID 3450 for deletion (duplicate of ID 176) with similarity 0.97 (threshold: 0.95)
Marking message ID 4471 for deletion (duplicate of ID 176) with similarity 0.97 (threshold: 0.95)
Marking message ID 4468 for deletion (duplicate of ID 176) with similarity 1.00 (threshold: 0.95)
Marking message ID 216 for deletion (duplicate of ID 179) with similarity 1.00 (threshold: 0.98)
Marking message ID 275 for deletion

Qwen 2.5 VL 72B
Remaining messages: 5557
with html tags removed for comparing and set to 24h: remaining messages 4377

251123: Remaining messages: 4335 (updated prompt)
3997: Resolved a lot of redacted messages

20251125: 
qwen3 vl 30ba3:
Deleted 1192 duplicate messages.
Remaining messages: 4271
Qwen 2.5 VL 72B
Deleted 1733 + 572 duplicate messages.
Remaining messages: 4272

## Analysis of messages after cleanup

In [39]:
# find all unique from addresses 
conn = sqlite3.connect(DB_PATH) 
query = '''
SELECT from_address, COUNT(*) as count
FROM messages
GROUP BY from_address
ORDER BY count DESC
'''
cursor = conn.cursor()
cursor.execute(query)
results_from = cursor.fetchall()
conn.close()
# find all unique to addresses 
conn = sqlite3.connect(DB_PATH) 
query = '''
SELECT to_address, COUNT(*) as count
FROM messages
GROUP BY to_address
ORDER BY count DESC
'''
cursor = conn.cursor()
cursor.execute(query)
results_to = cursor.fetchall()
conn.close()
# add both from and to addresses together and display top 100
from collections import defaultdict
address_counts = defaultdict(int)
for address, count in results_from:
    address_counts[address] += count
for address, count in results_to:
    address_counts[address] += count   
results = sorted(address_counts.items(), key=lambda x: x[1], reverse=True) 
# Display the top results
print("\nTop Email Addresses:")
for address, count in results[:100]:
    print(f"{address}: {count}")



Top Email Addresses:
Jeffrey Epstein: 4033
Michael Wolff: 251
Weingarten, Reid: 241
Kathy Ruemmler: 201
Lawrence Summers: 167
Landon Thomas Jr.: 154
Richard Kahn: 151
Steve Bannon: 119
<REDACTED>: 98
Darren Indyke: 85
Lawrence Krauss: 79
Nicholas Ribis: 71
Martin Weinberg: 63
Robert Kuhn: 59
Lisa New: 58
Larry Visoski: 49
Lesley Groff: 47
Boris Nikolic: 43
Joi Ito: 40
Deepak Chopra: 39
Ken Starr: 38
paul krassner: 33
Peggy Siegal: 33
Jonathan Farkas: 30
Noam Chomsky: 29
Anas Alrasheed: 29
Sultan Bin Sulayem: 28
Robert Trivers: 26
Pritzker, Tom: 26
David Schoen: 23
Zubair Khan: 22
Linda Stone: 21
Masha Drokova: 21
Tonja Haddad Coleman: 20
Jessica Cadwell: 19
Thorbjon Jagland: 19
Ehud Barak: 19
David Stern: 18
Tyler Shears: 18
Faith Kates: 17
David Grosof: 17
Jabor Y.: 17
steven hoffenberg: 16
Alireza Ittihadieh: 16
habebey: 16
Michael Wolff < >: 15
LHS < >: 15
Ens, Amanda: 14
Etienne Binant: 14
Jacquie Johnson: 14
tamem: 14
live:linkspirit: 14
Steven Pfeiffer: 13
Ariane de Rothschild: 

still some addresses i left out for the moment:
    'Jeffrey',
        '< >',
            'live:linkspirit',
    'soon yi previn' =    'Soon-Yi', ?
    'Google Alerts <googlealerts-noreply@google.com>',
        'Kathy Ruemmle' =     'Kathy Ruemmler'?

## save jpg sources

all messages are linked to a jpg, get it, resize it and put it under \data\sources\ with same filename, so it is easily accesble through the gui

In [40]:
# for each message, get the source_filename
# if the same filename but with extension .jpg does not yet exist under .\data\sources\
# search the same filename but with extension .jpg in the DATASET_ROOT/IMAGES (beware it has subfolders)
# resize this image to height 800px, maintain aspect ratio, quality 60
# save with same filename under .\data\sources\
conn = sqlite3.connect(DB_PATH)
query = '''
SELECT document_id
FROM messages
'''
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
print(f"Processing {len(results)} document ids for image extraction and resizing")
for row in results:
    document_id = row[0]
    # Remove existing extension (if any) before adding .jpg
    document_id_base = Path(document_id).stem
    source_image_path = Path('data/sources') / (document_id_base + '.jpg')
    if not source_image_path.exists():
        # search in DATASET_ROOT/IMAGES
        images_dir = DATASET_ROOT / 'IMAGES'
        pattern = f"**/{document_id_base}.jpg"
        matching_files = list(images_dir.glob(pattern))
        if matching_files:
            image_path = matching_files[0]
            # resize and save
            img = Image.open(image_path)
            aspect_ratio = img.width / img.height
            new_height = 800
            new_width = int(aspect_ratio * new_height)
            img_resized = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
            os.makedirs(source_image_path.parent, exist_ok=True)
            img_resized.save(source_image_path, format='JPEG', quality=60)
            print(f"Saved resized image to {source_image_path}")

conn.close()

Processing 4272 document ids for image extraction and resizing
Saved resized image to data\sources\HOUSE_OVERSIGHT_019307.jpg
Saved resized image to data\sources\HOUSE_OVERSIGHT_019318.jpg
Saved resized image to data\sources\HOUSE_OVERSIGHT_019313.jpg
Saved resized image to data\sources\HOUSE_OVERSIGHT_019314.jpg
Saved resized image to data\sources\HOUSE_OVERSIGHT_019328.jpg
Saved resized image to data\sources\HOUSE_OVERSIGHT_021405.jpg
Saved resized image to data\sources\HOUSE_OVERSIGHT_020661.jpg
Saved resized image to data\sources\HOUSE_OVERSIGHT_022192.jpg
Saved resized image to data\sources\HOUSE_OVERSIGHT_019339.jpg
Saved resized image to data\sources\HOUSE_OVERSIGHT_022194.jpg
Saved resized image to data\sources\HOUSE_OVERSIGHT_022264.jpg
Saved resized image to data\sources\HOUSE_OVERSIGHT_022231.jpg
Saved resized image to data\sources\HOUSE_OVERSIGHT_022228.jpg
Saved resized image to data\sources\HOUSE_OVERSIGHT_022229.jpg
Saved resized image to data\sources\HOUSE_OVERSIGHT_022

## Optimise DB size. This will:

0. ✅ Take a backup of the dv with extension `.before_optimization`
1. ✅ Create a new optimized `messages` table with only the 6 columns your GUI actually uses
2. ✅ Copy all data to the new table
3. ✅ Drop the old table and rename the new one
4. ✅ Recreate necessary indexes (I added indexes for `from_address` and `to_address` too for better query performance)
5. ✅ Drop the completely unused `email_documents` table
6. ✅ Run VACUUM to compress and defragment
7. ✅ Show the before/after size comparison

This should give a __30-50% size reduction__ depending on how much data was in those unused columns.


In [41]:
## Database Optimization and Compression
print("Starting database optimization...")

# first take a copy of the database for safety
import shutil
shutil.copyfile(DB_PATH, DB_PATH + '.before_optimization')
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()

# Get initial database size
initial_size = os.path.getsize(DB_PATH) / 1024  # KB

# Drop unused columns from messages table
print("\nDropping unused columns from messages table...")
cursor.execute('''
CREATE TABLE messages_optimized (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    from_address TEXT,
    to_address TEXT,
    timestamp_iso TEXT,
    subject TEXT,
    message_html TEXT,
    document_id TEXT
)
''')

cursor.execute('''
INSERT INTO messages_optimized (id, from_address, to_address, timestamp_iso, subject, message_html, document_id)
SELECT id, from_address, to_address, timestamp_iso, subject, message_html, document_id
FROM messages
''')

cursor.execute('DROP TABLE messages')
cursor.execute('ALTER TABLE messages_optimized RENAME TO messages')

# Recreate indexes on optimized table
print("Recreating indexes...")
cursor.execute('CREATE INDEX idx_messages_timestamp ON messages(timestamp_iso)')
cursor.execute('CREATE INDEX idx_messages_from ON messages(from_address)')
cursor.execute('CREATE INDEX idx_messages_to ON messages(to_address)')

# Drop unused email_documents table
print("Dropping unused email_documents table...")
cursor.execute('DROP TABLE IF EXISTS email_documents')

conn.commit()

# Run VACUUM to compress and defragment
print("Running VACUUM to compress database...")
cursor.execute('VACUUM')

conn.close()

# Get final database size
final_size = os.path.getsize(DB_PATH) / 1024  # KB
reduction = ((initial_size - final_size) / initial_size) * 100

print(f"\n{'='*50}")
print(f"Database Optimization Complete!")
print(f"{'='*50}")
print(f"Initial size:  {initial_size:,.1f} KB")
print(f"Final size:    {final_size:,.1f} KB")
print(f"Reduction:     {reduction:.1f}% ({initial_size - final_size:,.1f} KB saved)")
print(f"{'='*50}")


Starting database optimization...

Dropping unused columns from messages table...
Recreating indexes...
Dropping unused email_documents table...
Running VACUUM to compress database...

Database Optimization Complete!
Initial size:  4,844.0 KB
Final size:    2,760.0 KB
Reduction:     43.0% (2,084.0 KB saved)


the resulting file can now be copied to the \data folder of the GUI