# Enron Email Dataset -> Neo4j Direct Import

This notebook loads the raw Enron email dataset **directly into Neo4j** as a property graph.

## Approach
1. Parse raw email files
2. Connect to Neo4j
3. Create graph schema (constraints/indexes)
4. Load emails as nodes and relationships
5. Validate with Cypher queries

## Graph Model
```
(User {nameRaw, primaryEmail, nameNormalized, associatedEmails})
(Mailbox {address})
(Email {date, folder, message_id, subject, thread, user, x_folder})

(User)-[:SENT|RECEIVED|CC_ON|BCC_ON]->(Email)
(User)-[:USED]->(Mailbox)
```

Note: The original files were re-redacted in 2006, so, while this is an interesting dataset, we can assume most of the super incriminating stuff is gone. Still, there are a good few examples throughout this dataset that hint at Enron Execs' activities. 

## Prerequisites

Before running this notebook, ensure you have:

1. **Downloaded the Enron dataset** from: https://www.cs.cmu.edu/~enron/enron_mail_20150507.tar.gz
2. **Extracted it** to the parent directory: `../maildir/`
3. **Created a Neo4j database** named `neo4j`
4. **Installed the GDS plugin** in Neo4j
5. **Configured your `.env` file** with Neo4j credentials

**Expected Runtime**: 60-90 minutes for full dataset (~517,000 emails)

**Quick Test Option**: Set `LIMIT = 10000` in Cell 13 to process only 10,000 emails (~5 minutes)

## 1. Setup

In [25]:
%pip install -r ../requirements.txt

Note: you may need to restart the kernel to use updated packages.


Start a new local database and install both apoc and GDS.

Set up your .env file in line with the example env.

In [26]:
import os
import re
from email import policy
from email.parser import Parser
from pathlib import Path
from typing import Dict, List, Optional
from collections import defaultdict
from tqdm.auto import tqdm
from neo4j import GraphDatabase
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

print("Imports loaded")
print("Environment variables loaded from .env")

Imports loaded
Environment variables loaded from .env


## 2. Neo4j Connection

In [27]:
# Load Neo4j connection from environment variables
NEO4J_URI = os.getenv("NEO4J_URI", "bolt://localhost:7687")
NEO4J_USER = os.getenv("NEO4J_USERNAME", "neo4j")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD")
DATABASE = "enrongit"

if not NEO4J_PASSWORD:
    raise ValueError("NEO4J_PASSWORD not found in .env file! Please copy .env.example to .env and configure it.")

print(f"Connecting to: {NEO4J_URI}")
print(f"Database: {DATABASE}")
print(f"User: {NEO4J_USER}")

try:
    driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD), database=DATABASE)
    driver.verify_connectivity()
    
    # Get version info
    with driver.session() as session:
        result = session.run("CALL dbms.components() YIELD name, versions RETURN name, versions[0] as version")
        for record in result:
            print(f"  {record['name']}: {record['version']}")
    
    print("Connected to Neo4j successfully")
    
except Exception as e:
    print(f"Connection failed: {e}")
    print("\nTroubleshooting:")
    print("  1. Is Neo4j running? Check Neo4j Desktop or run: systemctl status neo4j")
    print("  2. Does the 'neo4j' database exist?")
    print("  3. Are your credentials in .env correct?")
    print("  4. Is the URI correct? (neo4j:// for bolt, bolt:// also works)")
    raise

Connecting to: neo4j://127.0.0.1:7687
Database: enrongit
User: neo4j
  Neo4j Kernel: 2025.10.1
  Cypher: 5
Connected to Neo4j successfully


## 3. Create Schema

In [28]:
with driver.session() as session:
    # Constraints - composite keys for User
    session.run("CREATE CONSTRAINT user_email_key IF NOT EXISTS FOR (u:User) REQUIRE (u.nameRaw, u.primaryEmail) IS UNIQUE")
    session.run("CREATE CONSTRAINT user_imceanotes_key IF NOT EXISTS FOR (u:User) REQUIRE (u.nameRaw, u.primaryImceanotes) IS UNIQUE")
    session.run("CREATE CONSTRAINT email_id IF NOT EXISTS FOR (e:Email) REQUIRE e.message_id IS UNIQUE")
    session.run("CREATE CONSTRAINT mailbox_address IF NOT EXISTS FOR (m:Mailbox) REQUIRE m.address IS UNIQUE")
    
    # Indexes
    session.run("CREATE INDEX email_date IF NOT EXISTS FOR (e:Email) ON (e.date)")
    session.run("CREATE INDEX email_subject IF NOT EXISTS FOR (e:Email) ON (e.subject)")
    session.run("CREATE INDEX user_name_raw IF NOT EXISTS FOR (u:User) ON (u.nameRaw)")
    session.run("CREATE INDEX user_name_normalized IF NOT EXISTS FOR (u:User) ON (u.nameNormalized)")
    
print("Schema created")

Schema created


## 4. Email Parser

This is just one way to parse these emails. 

Bear in mind, there are already email parsers out there, both generally, and which are specifically made to parse this dataset. However, we're not using those here, because your data may not look anything like that.

Before you run it, it would be worth checking out the actual email files, so you can understand why certain decisions have been made.

## Why is Email Parsing Complex?

The Enron dataset contains messy, real-world email data with multiple inconsistent formats:

### Challenge 1: Duplicate Headers
Each email has **two sets of headers**:
- **X-headers** (X-From, X-To, X-cc): Contain display names like "Kenneth Lay"
- **Regular headers** (From, To, Cc): Contain email addresses like "kenneth.lay@enron.com"

These don't always match perfectly. Sometimes "Ken Lay" in X-From corresponds to "klay@enron.com" in From.

### Challenge 2: IMCEANOTES Identifiers
Some participants appear as cryptic IMCEANOTES strings instead of proper emails:
```
IMCEANOTES-Michael+20Maggi+20+3CMichael+2EMaggi+40ENRON+2Ecom+3E@ENRON.com
```
These are Microsoft Exchange artifacts that need special handling.

### Challenge 3: Inconsistent Name Formats
Names appear in multiple formats:
- "Kenneth Lay" vs "Lay, Kenneth" vs "Kenneth L. Lay"
- "Ken Lay (E-mail)" vs "Ken Lay"
- "/O=ENRON/OU=NA/CN=RECIPIENTS/CN=KLAY"

### Challenge 4: Mismatched Pairs
Sometimes the name in X-From doesn't correspond to the email in From:
- X-From: "John Smith"
- From: "random.person@enron.com"

We need to validate these pairs before linking them.

## Our Parsing Strategy

The `ParticipantExtractor` class handles these challenges:

1. **Extract all parts separately**: Name, email, and IMCEANOTES from each header
2. **Combine strategically**: Prefer X-headers for names, regular headers for emails
3. **Validate with Jaro-Winkler**: Use string similarity to check if name matches email
   - "kenneth.lay" vs "Kenneth Lay" -> 95%+ similarity ✓ Link them
   - "john.smith" vs "Kenneth Lay" -> Low similarity ✗ Keep separate
4. **Create flexible nodes**:
   - `User` nodes from names (may have multiple per person initially)
   - `Mailbox` nodes from email addresses (unique)
   - `User` <-> `Mailbox` via `USED` relationship

This approach preserves **traceability** - we can see what data came from where and fix issues later in the entity resolution phase (Notebook 2).

## The Graph Model

```
(User {nameRaw, nameNormalized, primaryEmail, associatedEmails[]})
(Mailbox {address})
(Email {message_id, date, subject, thread})

Relationships:
(User)-[:USED]->(Mailbox)
(Mailbox)-[:SENT|RECEIVED|CC_ON|BCC_ON]->(Email)
(User)-[:SENT|RECEIVED|CC_ON|BCC_ON]->(Email)
```

**Note**: We'll have many duplicate `User` nodes initially (e.g., "Ken Lay", "Kenneth Lay", "Lay, Kenneth"). That's intentional! Notebook 2 will resolve these into unified entities.

Also, we could use something like GLiNER or spaCy for extracting entities. However, the threads are so variable, I think it's better to go with something deterministic to begin with. Later, we'll use tools like spaCy and GLiNER to extract individual messages from threads, and the entities mentioned within them followed by another resolution pass.


In [31]:
# 4.1 Parser
import re
import quopri
import jellyfish
from email import policy
from email.parser import Parser
from email.utils import parsedate_to_datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, field


@dataclass
class ParsedParts:
    """Container for extracted participant parts."""
    name_raw: Optional[str] = None
    name_normalized: Optional[str] = None
    email: Optional[str] = None
    imceanotes: Optional[str] = None
    
    def to_participant(self) -> Optional[Dict]:
        """Convert to the participant dict format expected by the importer."""
        if not self.name_raw and not self.email:
            return None
        
        person = None
        if self.name_raw:
            person = {'raw': self.name_raw, 'normalized': self.name_normalized}
            if self.email:
                person['associated_email'] = self.email
            if self.imceanotes:
                person['imceanotes_id'] = self.imceanotes
        
        return {
            'person': person,
            'mailbox': {'address': self.email} if self.email else None,
            'linked': bool(self.name_raw and self.email)
        }


class ParticipantExtractor:
    """Extract User and Mailbox entities from email header fields."""
    
    EMAIL_RE = re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+(?:\.[a-zA-Z]{2,})?')
    NAME_EMAIL_RE = re.compile(r'^([^<]+?)\s*<([^>]+)>$')
    UNDISCLOSED_RE = re.compile(r'undisclosed[- ]?recipients?', re.I)
    ENRON_PATH_RE = re.compile(r'/O=ENRON/.*?/CN=([A-Za-z0-9]+)', re.I)
    IMCEANOTES_RE = re.compile(r'(IMCEANOTES-[^@]+@[^>\s,]+)', re.I)
    
    def name_to_email_similarity(self, name: str, email: str) -> float:
        """Calculate best Jaro-Winkler similarity between name variants and email local part."""
        if not name or not email:
            return 0.0
        
        local = email.split('@')[0].lower()
        parts = name.lower().split()
        
        variants = [name.lower().replace(' ', '.')]
        if len(parts) >= 2:
            variants.extend([
                f"{parts[0]}.{parts[-1]}",
                f"{parts[-1]}.{parts[0]}",
                f"{parts[0][0]}.{parts[-1]}"
            ])
        
        return max(jellyfish.jaro_winkler_similarity(local, v) for v in variants)
    
    def clean_email(self, email: str) -> Optional[str]:
        """Clean and validate an email address."""
        if not email:
            return None
        
        email = email.strip().lower()
        
        if email.startswith(('imceanotes-', '/')) or '@' not in email:
            return None
        
        # Normalize @enron to @enron.com
        if email.endswith('@enron'):
            email += '.com'
        
        if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+$', email):
            return None
        
        return email
    
    def clean_name(self, name: str) -> Optional[tuple[str, str]]:
        """Clean and validate a name. Returns (raw, normalized) or None."""
        if not name:
            return None
        
        # Clean up artifacts
        name = re.sub(r'@ENRON\s*$|/[A-Z]=[^/]+|<[^>]+>', '', name, flags=re.I)
        name = ' '.join(name.strip().strip('"\'').split())
        
        if len(name) < 3 or not re.search(r'[a-zA-Z]', name) or '@' in name or name.startswith('/'):
            return None
        
        # Handle "Last, First" format
        if ',' in name:
            parts = [p.strip() for p in name.split(',', 1)]
            if len(parts) == 2 and len(parts[0]) >= 2 and len(parts[1]) >= 2:
                name = f"{parts[1]} {parts[0]}"
        
        # Require first + last name, each at least 2 chars
        name_parts = name.split()
        if len(name_parts) < 2 or len(name_parts[0]) < 2 or len(name_parts[-1]) < 2:
            return None
        
        return (name, name.title())
    
    def extract(self, text: str) -> ParsedParts:
        """Extract all parts from a single participant string."""
        if not text:
            return ParsedParts()
        
        text = re.sub(r'<\?\?S[^>]*>', '', text).strip()
        if not text:
            return ParsedParts()
        
        parts = ParsedParts()
        
        # Extract IMCEANOTES
        if match := self.IMCEANOTES_RE.search(text):
            parts.imceanotes = match.group(1).lower()
        
        # Extract first valid email (not IMCEANOTES)
        for match in self.EMAIL_RE.finditer(text):
            if cleaned := self.clean_email(match.group()):
                parts.email = cleaned
                break
        
        # Try Enron path if no email found
        if not parts.email and (match := self.ENRON_PATH_RE.search(text)):
            parts.email = f"{match.group(1).lower()}@enron.com"
        
        # Extract name
        if match := self.NAME_EMAIL_RE.match(text):
            name_result = self.clean_name(match.group(1))
        else:
            cleaned_text = re.sub(r'<[^>]*>|@ENRON\s*$', '', text, flags=re.I).strip()
            name_result = self.clean_name(cleaned_text) if '@' not in cleaned_text else None
        
        if name_result:
            parts.name_raw, parts.name_normalized = name_result
        
        return parts
    
    def extract_list(self, header: str) -> List[ParsedParts]:
        """Extract all parts from a comma-separated header."""
        if not header:
            return []
        
        if self.UNDISCLOSED_RE.search(header):
            return [ParsedParts(email='undisclosed-recipients')]
        
        # Split on commas, respecting brackets
        results = []
        current, depth = "", 0
        
        for char in header:
            if char in '<(':
                depth += 1
            elif char in '>)':
                depth -= 1
            elif char == ',' and depth == 0:
                if current.strip():
                    results.append(self.extract(current))
                current = ""
                continue
            current += char
        
        if current.strip():
            results.append(self.extract(current))
        
        return results


class EmailParser:
    def __init__(self):
        self.parser = Parser(policy=policy.default)
        self.extractor = ParticipantExtractor()
    
    def parse(self, file_path) -> Optional[Dict]:
        """Parse an email file."""
        try:
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                raw_content = f.read()
            
            msg = self.parser.parsestr(raw_content)
            parts = file_path.parts
            
            # Extract folder info
            try:
                idx = parts.index('maildir')
                user = parts[idx + 1] if len(parts) > idx + 1 else None
                folder = '/'.join(parts[idx + 2:-1]) if len(parts) > idx + 2 else None
            except ValueError:
                user = parts[-3] if len(parts) >= 3 else None
                folder = parts[-2] if len(parts) >= 2 else None
            
            # Parse date
            date = None
            try:
                if dt := parsedate_to_datetime(msg.get('Date', '')):
                    date = dt.replace(tzinfo=None).strftime('%Y-%m-%dT%H:%M:%S')
            except:
                pass
            
            # Decode thread content
            thread = raw_content[:50000]
            if '=20' in thread or '=\n' in thread:
                try:
                    thread = quopri.decodestring(thread.encode()).decode('utf-8', errors='ignore')
                except:
                    pass
            
            return {
                'message_id': msg.get('Message-ID', '').strip('<>'),
                'date': date,
                'subject': msg.get('Subject', ''),
                'thread': thread,
                'from': self._combine_single(msg.get('X-From', ''), msg.get('From', '')),
                'to': self._combine_list(msg.get('X-To', ''), msg.get('To', '')),
                'cc': self._combine_list(msg.get('X-cc', ''), msg.get('Cc', '')),
                'bcc': self._combine_list(msg.get('X-bcc', ''), msg.get('Bcc', '')),
                'user': user,
                'folder': folder,
                'x_folder': msg.get('X-Folder', ''),
            }
        except:
            return None
    
    def _merge_parts(self, x: ParsedParts, reg: ParsedParts, validate: bool = True) -> List[Dict]:
        """Merge X-header and regular header parts, optionally validating with Jaro-Winkler."""
        name_raw = x.name_raw or reg.name_raw
        name_norm = x.name_normalized or reg.name_normalized
        email = reg.email or x.email  # Prefer regular header for email
        imceanotes = x.imceanotes or reg.imceanotes
        
        # Validate match if both name and email present
        if validate and name_raw and email:
            if self.extractor.name_to_email_similarity(name_norm, email) < 0.95:
                # Poor match - return separate entries
                results = []
                if p := x.to_participant():
                    results.append(p)
                if p := reg.to_participant():
                    results.append(p)
                return results
        
        # Good match or no validation needed
        merged = ParsedParts(name_raw, name_norm, email, imceanotes)
        if p := merged.to_participant():
            return [p]
        return []
    
    def _combine_single(self, x_header: str, reg_header: str) -> Optional[Dict]:
        """Combine X-header and regular header for a single participant."""
        x_parts = self.extractor.extract(x_header)
        reg_parts = self.extractor.extract(reg_header)
        results = self._merge_parts(x_parts, reg_parts, validate=False)
        return results[0] if results else {'person': None, 'mailbox': None, 'linked': False}
    
    def _combine_list(self, x_header: str, reg_header: str) -> List[Dict]:
        """Combine X-header and regular header lists with positional matching."""
        x_list = self.extractor.extract_list(x_header)
        reg_list = self.extractor.extract_list(reg_header)
        
        if not x_list and not reg_list:
            return []
        
        # Single list case
        if not x_list:
            return [p for parts in reg_list if (p := parts.to_participant())]
        if not reg_list:
            return [p for parts in x_list if (p := parts.to_participant())]
        
        # Both lists - positional matching with validation
        results = []
        min_len = min(len(x_list), len(reg_list))
        
        for i in range(min_len):
            results.extend(self._merge_parts(x_list[i], reg_list[i], validate=True))
        
        # Handle remainders
        for parts in x_list[min_len:] + reg_list[min_len:]:
            if p := parts.to_participant():
                results.append(p)
        
        return results


parser = EmailParser()
print("Parser ready")

Parser ready


## 5. Collect Files

In [32]:
# 5.1 Collect Files
MAILDIR = Path("../maildir")

# CONFIGURATION: Set LIMIT for testing or None for full dataset
LIMIT = 10000  # Options: None (full ~517k emails), 10000 (quick test ~5 min), 50000 (medium test ~20 min)

# Validate dataset exists
if not MAILDIR.exists():
    raise FileNotFoundError(
        f"Dataset not found at {MAILDIR.absolute()}\n\n"
        f"Please download from: https://www.cs.cmu.edu/~enron/enron_mail_20150507.tar.gz\n"
        f"Extract to: {MAILDIR.parent.absolute()}/\n"
        f"Expected structure: {MAILDIR.absolute()}/[username]/[folder]/[emails]"
    )

print(f"Scanning dataset at: {MAILDIR.absolute()}")

files = []
for user in tqdm(sorted(MAILDIR.iterdir()), desc="Scanning"):
    if user.is_dir():
        files.extend([f for f in user.rglob("*") if f.is_file() and not f.name.startswith('.')])

if LIMIT:
    files = files[:LIMIT]
    print(f"Found {len(files):,} files (limited to {LIMIT:,} for testing)")
else:
    print(f"Found {len(files):,} files (full dataset)")
    print(f"Processing all files will take 60-90 minutes")

Scanning dataset at: /Users/henryadamcollie/Documents/GitHub/enron_resolution_neo4j/demo_notebooks/../maildir


Scanning: 100%|██████████| 151/151 [00:15<00:00,  9.81it/s]

Found 10,000 files (limited to 10,000 for testing)





In [33]:
# Test parser on a single file
test_file = files[0]
sample = parser.parse(test_file)
print(f"Parsed: {test_file.name}")
print(f"From: {sample['from']}")
print(f"To: {sample['to'][:2]}...")  # First 2 recipients
print(f"Subject: {sample['subject'][:50]}...")

Parsed: 36.
From: {'person': {'raw': 'Christi L Nicolay', 'normalized': 'Christi L Nicolay', 'associated_email': 'christi.nicolay@enron.com'}, 'mailbox': {'address': 'christi.nicolay@enron.com'}, 'linked': True}
To: [{'person': {'raw': 'Phillip K Allen', 'normalized': 'Phillip K Allen', 'associated_email': 'phillip.allen@enron.com'}, 'mailbox': {'address': 'phillip.allen@enron.com'}, 'linked': True}]...
Subject: Re: Talking points about California Gas market...


In [34]:
# ═══════════════════════════════════════════════════════════════════
# TEST: Parse a single email to see the structure
# ═══════════════════════════════════════════════════════════════════

test_file = files[0]

# Show the raw email first
with open(test_file, 'r', encoding='utf-8', errors='ignore') as f:
    lines = f.readlines()[:80]
    for i, line in enumerate(lines, 1):
        print(f"{i:3}: {line.rstrip()}")
    if len(lines) == 80:
        print("     ... (truncated)")

# Now show what the parser extracts
print("\n" + "=" * 70)
print("PARSED OUTPUT")
print("=" * 70)

sample = parser.parse(test_file)

print(f"Message ID: {sample['message_id']}")
print(f"Date:       {sample['date']}")
print(f"Folder:     {sample['user']}/{sample['folder']}")
print(f"Subject:    {sample['subject'][:60]}{'...' if len(sample['subject']) > 60 else ''}")

print(f"FROM:")
frm = sample['from']
if frm['person']:
    print(f"   Name:    {frm['person']['raw']}")
    print(f"   Email:   {frm['person'].get('associated_email', 'N/A')}")
if frm['mailbox']:
    print(f"   Mailbox: {frm['mailbox']['address']}")

print(f"TO: ({len(sample['to'])} recipients)")
for i, recip in enumerate(sample['to'][:3]):  # Show first 3
    name = recip['person']['raw'] if recip['person'] else None
    addr = recip['mailbox']['address'] if recip['mailbox'] else None
    print(f"   {i+1}. {name or '(no name)'} <{addr or '(no email)'}>")
if len(sample['to']) > 3:
    print(f"   ... and {len(sample['to']) - 3} more")

if sample['cc']:
    print(f"CC: ({len(sample['cc'])} recipients)")
    for i, recip in enumerate(sample['cc'][:2]):
        name = recip['person']['raw'] if recip['person'] else None
        addr = recip['mailbox']['address'] if recip['mailbox'] else None
        print(f"   {i+1}. {name or '(no name)'} <{addr or '(no email)'}>")
    if len(sample['cc']) > 2:
        print(f"   ... and {len(sample['cc']) - 2} more")

  1: Message-ID: <12357410.1075855679611.JavaMail.evans@thyme>
  2: Date: Tue, 12 Dec 2000 04:41:00 -0800 (PST)
  3: From: christi.nicolay@enron.com
  4: To: phillip.allen@enron.com
  5: Subject: Re: Talking points about California Gas market
  6: Mime-Version: 1.0
  7: Content-Type: text/plain; charset=us-ascii
  8: Content-Transfer-Encoding: 7bit
  9: X-From: Christi L Nicolay
 10: X-To: Phillip K Allen
 11: X-cc:
 12: X-bcc:
 13: X-Folder: \Phillip_Allen_Dec2000\Notes Folders\Notes inbox
 14: X-Origin: Allen-P
 15: X-FileName: pallen.nsf
 16: 
 17: Phillip--To the extent that we can give Chair Hoecker our spin on the reasons
 18: for the hikes, we would like to.  The Commission is getting calls from
 19: legislators, DOE, etc. about the prices and is going to have to provide some
 20: response.  Better if it coincides with Enron's view and is not anti-market.
 21: We still haven't decided what we will provide.  You definitely will be
 22: included in that discussion once we get the 

## 6. Import to Neo4j

This cell processes all emails and loads them into Neo4j.

**Runtime Warning**: 
- Full dataset (~517k emails): 60-90 minutes
- 50k emails: ~20 minutes  
- 10k emails: ~5 minutes

The import creates:
- Email nodes with message content
- User nodes from display names
- Mailbox nodes from email addresses
- Relationships: SENT, RECEIVED, CC_ON, BCC_ON, USED

In [35]:
# 6.1 Import Participants
def import_participant(tx, participant: Dict, message_id: str, rel_type: str):
    """Import a participant (User and/or Mailbox) and create relationships."""
    if not participant:
        return
    
    user = participant.get('person')
    mailbox = participant.get('mailbox')
    linked = participant.get('linked', False)
    parsing_error = participant.get('parsing_error', False)
    
    has_user = user is not None
    has_mailbox = mailbox and mailbox.get('address')
    incomplete = not (has_user and has_mailbox)
    
    # Create Mailbox and connect to Email
    # Note on transactions:
    # session.execute_write(func, args) calls func(tx, args) where tx is a transaction object.
    # All tx.run() calls within the function either succeed together or roll back together.
    # This ensures we don't end up with partial data if something fails mid-import.
    if has_mailbox:
        tx.run(f"""
            MERGE (m:Mailbox {{address: $address}})
            WITH m
            MATCH (e:Email {{message_id: $message_id}})
            MERGE (m)-[r:{rel_type}]->(e)
            SET r.incompletePair = $incomplete
        """, address=mailbox['address'], message_id=message_id, incomplete=incomplete)
    
    # Create User and connect to Email
    if has_user:
        associated_email = user.get('associated_email')
        imceanotes_id = user.get('imceanotes_id')
        user_parsing_error = user.get('parsing_error', False) or parsing_error
        
        if associated_email:
            # MERGE on nameRaw + associated_email pair
            tx.run(f"""
                MERGE (u:User {{nameRaw: $nameRaw, primaryEmail: $email}})
                SET u.nameNormalized = $nameNormalized
                SET u.associatedEmails = CASE 
                    WHEN u.associatedEmails IS NULL THEN [$email]
                    WHEN NOT $email IN u.associatedEmails THEN u.associatedEmails + $email
                    ELSE u.associatedEmails
                END
                SET u.imceanotes = CASE 
                    WHEN $imceanotes_id IS NULL THEN u.imceanotes
                    WHEN u.imceanotes IS NULL THEN [$imceanotes_id]
                    WHEN NOT $imceanotes_id IN u.imceanotes THEN u.imceanotes + $imceanotes_id
                    ELSE u.imceanotes
                END
                SET u.parsingError = CASE
                    WHEN $parsing_error THEN true
                    ELSE u.parsingError
                END
                WITH u
                MATCH (e:Email {{message_id: $message_id}})
                MERGE (u)-[r:{rel_type}]->(e)
                SET r.incompletePair = $incomplete,
                    r.parsingError = $parsing_error
            """, nameRaw=user['raw'], nameNormalized=user['normalized'], 
                email=associated_email, imceanotes_id=imceanotes_id,
                message_id=message_id, incomplete=incomplete, parsing_error=user_parsing_error)
        
        elif imceanotes_id:
            # MERGE on nameRaw + imceanotes_id pair (no email available)
            tx.run(f"""
                MERGE (u:User {{nameRaw: $nameRaw, primaryImceanotes: $imceanotes_id}})
                SET u.nameNormalized = $nameNormalized
                SET u.imceanotes = CASE 
                    WHEN u.imceanotes IS NULL THEN [$imceanotes_id]
                    WHEN NOT $imceanotes_id IN u.imceanotes THEN u.imceanotes + $imceanotes_id
                    ELSE u.imceanotes
                END
                SET u.parsingError = CASE
                    WHEN $parsing_error THEN true
                    ELSE u.parsingError
                END
                WITH u
                MATCH (e:Email {{message_id: $message_id}})
                MERGE (u)-[r:{rel_type}]->(e)
                SET r.incompletePair = $incomplete,
                    r.parsingError = $parsing_error
            """, nameRaw=user['raw'], nameNormalized=user['normalized'],
                imceanotes_id=imceanotes_id, message_id=message_id, 
                incomplete=incomplete, parsing_error=user_parsing_error)
        
        else:
            # Fall back to nameRaw only (least reliable)
            tx.run(f"""
                MERGE (u:User {{nameRaw: $nameRaw}})
                SET u.nameNormalized = $nameNormalized
                SET u.parsingError = CASE
                    WHEN $parsing_error THEN true
                    ELSE u.parsingError
                END
                WITH u
                MATCH (e:Email {{message_id: $message_id}})
                MERGE (u)-[r:{rel_type}]->(e)
                SET r.incompletePair = $incomplete,
                    r.parsingError = $parsing_error
            """, nameRaw=user['raw'], nameNormalized=user['normalized'],
                message_id=message_id, incomplete=incomplete, parsing_error=user_parsing_error)
    
    # Link User to Mailbox if we have both
    if has_user and has_mailbox:
        associated_email = user.get('associated_email')
        imceanotes_id = user.get('imceanotes_id')
        
        if associated_email:
            tx.run("""
                MATCH (u:User {nameRaw: $nameRaw, primaryEmail: $email})
                MATCH (m:Mailbox {address: $address})
                MERGE (u)-[:USED]->(m)
            """, nameRaw=user['raw'], email=associated_email, address=mailbox['address'])
        elif imceanotes_id:
            tx.run("""
                MATCH (u:User {nameRaw: $nameRaw, primaryImceanotes: $imceanotes_id})
                MATCH (m:Mailbox {address: $address})
                MERGE (u)-[:USED]->(m)
            """, nameRaw=user['raw'], imceanotes_id=imceanotes_id, address=mailbox['address'])
        else:
            tx.run("""
                MATCH (u:User {nameRaw: $nameRaw})
                WHERE u.primaryEmail IS NULL AND u.primaryImceanotes IS NULL
                MATCH (m:Mailbox {address: $address})
                MERGE (u)-[:USED]->(m)
            """, nameRaw=user['raw'], address=mailbox['address'])


def import_email(tx, email: Dict):
    """Import a single email with all participants."""
    if not email or not email.get('message_id'):
        return
    
    # Create Email node
    tx.run("""
        MERGE (e:Email {message_id: $message_id})
        SET e.date = $date,
            e.subject = $subject,
            e.thread = $thread,
            e.user = $user,
            e.folder = $folder,
            e.x_folder = $x_folder
    """, 
        message_id=email['message_id'],
        date=email['date'],
        subject=email['subject'],
        thread=email.get('thread', ''),
        user=email['user'],
        folder=email['folder'],
        x_folder=email.get('x_folder', '')
    )
    
    # Import sender
    import_participant(tx, email.get('from'), email['message_id'], 'SENT')
    
    # Import recipients
    for participant in email.get('to', []):
        import_participant(tx, participant, email['message_id'], 'RECEIVED')
    
    for participant in email.get('cc', []):
        import_participant(tx, participant, email['message_id'], 'CC_ON')
    
    for participant in email.get('bcc', []):
        import_participant(tx, participant, email['message_id'], 'BCC_ON')
    
    # If no recipients at all, infer from folder owner
    has_to_recipients = any(p.get('mailbox') or p.get('person') for p in email.get('to', []))

    if not has_to_recipients and email.get('user'):
        tx.run("""
            MERGE (m:Mailbox {address: $address})
            WITH m
            MATCH (e:Email {message_id: $message_id})
            MERGE (m)-[r:RECEIVED]->(e)
            SET r.inferred = true
        """, address=f"{email['user']}@enron.com", message_id=email['message_id'])


# Import emails
imported = 0
errors = 0

for file_path in tqdm(files, desc="Importing"):
    email = parser.parse(file_path)
    if email:
        try:
            with driver.session() as session:
                session.execute_write(import_email, email)
            imported += 1
        except Exception as e:
            errors += 1
    else:
        errors += 1

print(f"\nImported {imported:,} emails ({errors:,} errors)")

Importing: 100%|██████████| 10000/10000 [01:46<00:00, 93.91it/s]


Imported 10,000 emails (0 errors)





## 7. Verify Import

In [23]:
# 7.1 Verify Import
def query(cypher):
    with driver.session() as session:
        return list(session.run(cypher))

print("="*60)
print("IMPORT VERIFICATION")
print("="*60)

print("\nNode Counts:")
email_count = query('MATCH (e:Email) RETURN count(e) as n')[0]['n']
user_count = query('MATCH (u:User) RETURN count(u) as n')[0]['n']
mailbox_count = query('MATCH (m:Mailbox) RETURN count(m) as n')[0]['n']

print(f"  Emails: {email_count:,}")
print(f"  Users: {user_count:,}")
print(f"  Mailboxes: {mailbox_count:,}")

print("\nRelationship Counts:")
print(f"  SENT: {query('MATCH ()-[r:SENT]->() RETURN count(r) as n')[0]['n']:,}")
print(f"  RECEIVED: {query('MATCH ()-[r:RECEIVED]->() RETURN count(r) as n')[0]['n']:,}")
print(f"  CC_ON: {query('MATCH ()-[r:CC_ON]->() RETURN count(r) as n')[0]['n']:,}")

print("\nTop Senders:")
for record in query("MATCH (m:Mailbox)-[:SENT]->(e) RETURN m.address as email, count(e) as sent ORDER BY sent DESC LIMIT 5"):
    print(f"  {record['email']}: {record['sent']:,} emails")

print("\n" + "="*60)
print("Expected ranges for FULL dataset:")
print("  Emails: ~517,000")
print("  Users: ~84,000") 
print("  Mailboxes: ~99,000")
print("  Total Relationships: ~7.5 million")
print("="*60)

IMPORT VERIFICATION

Node Counts:
  Emails: 10,000
  Users: 2,553
  Mailboxes: 4,935

Relationship Counts:
  SENT: 19,131
  RECEIVED: 49,148
  CC_ON: 6,086

Top Senders:
  john.arnold@enron.com: 3,491 emails
  phillip.allen@enron.com: 2,125 emails
  k..allen@enron.com: 297 emails
  robert.badeer@enron.com: 193 emails
  susan.bailey@enron.com: 143 emails

Expected ranges for FULL dataset:
  Emails: ~517,000
  Users: ~84,000
  Mailboxes: ~99,000
  Total Relationships: ~7.5 million


In [24]:
# 8.1 Close Connection
driver.close()
print("Connection closed")

Connection closed
