# Enron Email Network Analysis

Builds a directed communication network from the Enron email corpus and exports a Gephi-compatible `.gexf` file with useful node metrics and degree-driven pruning.


In [1]:
import os
import re
from pathlib import Path
from email import policy
from email.parser import BytesParser
from email.utils import getaddresses, parsedate_to_datetime

import pandas as pd
import networkx as nx
from tqdm import tqdm

# ----- CONFIG -----
INPUT_MODE = 'dir'  # 'dir' for raw maildir structure, 'csv' for pre-flattened table
INPUT_PATH = '/path/to/your/enron/maildir'  # update with the local path
CSV_FROM_COL = 'From'
CSV_TO_COLS = ['To', 'Cc', 'Bcc']
CSV_DATE_COL = 'Date'  # set to None when the column is unavailable

# Pruning thresholds
MIN_EDGE_WEIGHT = 2  # drop edges with weight < this
KEEP_GIANT_COMPONENT = True
TOP_N_NODES = None  # fallback: keep top N nodes by degree when auto pruning is disabled

# Degree-driven pruning
AUTO_PRUNE_BY_DEGREE = True
PRUNE_DEGREE_MIN = None        # e.g., 5 to require at least 5 total connections
PRUNE_DEGREE_QUANTILE = 0.9    # e.g., 0.9 keeps top 10% by degree; set to None to skip
PRUNE_DEGREE_TOP_N = None      # e.g., 1000 keeps the top N nodes by degree if others unset

# Optional time slicing (use 'M' for monthly, 'Q' for quarterly, or None)
TIME_SLICE_FREQ = None
DATE_MIN = None  # e.g., '2000-01-01'
DATE_MAX = None  # e.g., '2002-12-31'

OUTPUT_DIR = './gephi_exports'
BASE_GEXF_NAME = 'enron_network.gexf'
os.makedirs(OUTPUT_DIR, exist_ok=True)

EMAIL_RE = re.compile(r'<?([^<>@,\s]+@[^<>,\s]+)>?')


In [2]:
def normalize_addresses(raw_list):
    addrs = []
    for _, addr in getaddresses(raw_list):
        addr = (addr or '').strip().lower()
        match = EMAIL_RE.search(addr)
        if match:
            addr = match.group(1)
        if addr:
            addrs.append(addr)
    return addrs


def parse_date_safe(val):
    if not val:
        return pd.NaT
    try:
        dt = parsedate_to_datetime(val)
        return pd.to_datetime(dt)
    except Exception:
        return pd.to_datetime(val, errors='coerce')


def iter_emails_from_dir(root_path):
    root = Path(root_path)
    files = [p for p in root.rglob('*') if p.is_file()]
    for path in tqdm(files, desc='Reading emails from dir'):
        try:
            with open(path, 'rb') as handle:
                message = BytesParser(policy=policy.default).parse(handle)
        except Exception:
            continue
        sender = normalize_addresses([message.get('From', '')])
        to_list = normalize_addresses([message.get('To', '')])
        cc_list = normalize_addresses([message.get('Cc', '')])
        bcc_list = normalize_addresses([message.get('Bcc', '')])
        date_val = parse_date_safe(message.get('Date', None))
        yield {
            'from': sender[0] if sender else None,
            'to': to_list,
            'cc': cc_list,
            'bcc': bcc_list,
            'date': date_val,
        }


def iter_emails_from_csv(csv_path, from_col, to_cols, date_col=None):
    df = pd.read_csv(csv_path, dtype=str, keep_default_na=False)
    dates = pd.Series(pd.NaT, index=df.index)
    if date_col and date_col in df.columns:
        dates = pd.to_datetime(df[date_col], errors='coerce')

    for idx, row in tqdm(df.iterrows(), total=len(df), desc='Reading emails from csv'):
        sender = normalize_addresses([row.get(from_col, '')])
        recipients_raw = [row.get(col, '') for col in to_cols if col in df.columns]
        recipients_split = []
        for raw in recipients_raw:
            if raw:
                recipients_split.extend(re.split(r'[;,]', raw))
        recipients = normalize_addresses(recipients_split)
        yield {
            'from': sender[0] if sender else None,
            'to': recipients,
            'cc': [],
            'bcc': [],
            'date': dates.iloc[idx],
        }


def build_edges(email_iter):
    records = []
    for message in email_iter:
        sender = message['from']
        if not sender:
            continue
        recipients = list(set((message['to'] or []) + (message['cc'] or []) + (message['bcc'] or [])))
        for recipient in recipients:
            if recipient and recipient != sender:
                records.append((sender, recipient, message['date']))
    if not records:
        return pd.DataFrame(columns=['source', 'target', 'date', 'weight'])
    edge_df = pd.DataFrame(records, columns=['source', 'target', 'date'])
    edge_df['weight'] = 1
    return edge_df


In [3]:
def filter_by_date(df_edges, date_min=None, date_max=None):
    if 'date' not in df_edges.columns:
        return df_edges
    filtered = df_edges.copy()
    if date_min:
        filtered = filtered[(filtered['date'].isna()) | (filtered['date'] >= pd.to_datetime(date_min))]
    if date_max:
        filtered = filtered[(filtered['date'].isna()) | (filtered['date'] <= pd.to_datetime(date_max))]
    return filtered


def aggregate_edges(df_edges):
    return df_edges.groupby(['source', 'target'], as_index=False).agg(weight=('weight', 'sum'))


def compute_node_metrics(df_edges_agg):
    if df_edges_agg.empty:
        return pd.DataFrame(columns=['Id', 'out_weight', 'in_weight', 'out_degree', 'in_degree', 'degree', 'weighted_degree', 'Label'])
    out_weight = df_edges_agg.groupby('source')['weight'].sum().rename('out_weight')
    in_weight = df_edges_agg.groupby('target')['weight'].sum().rename('in_weight')
    out_degree = df_edges_agg.groupby('source').size().rename('out_degree')
    in_degree = df_edges_agg.groupby('target').size().rename('in_degree')
    nodes = pd.Index(
        out_weight.index.tolist() + in_weight.index.tolist() + out_degree.index.tolist() + in_degree.index.tolist()
    ).unique()
    nodes_df = pd.DataFrame({'Id': nodes})
    nodes_df = (
        nodes_df
        .merge(out_weight, left_on='Id', right_index=True, how='left')
        .merge(in_weight, left_on='Id', right_index=True, how='left')
        .merge(out_degree, left_on='Id', right_index=True, how='left')
        .merge(in_degree, left_on='Id', right_index=True, how='left')
    )
    for col in ['out_weight', 'in_weight', 'out_degree', 'in_degree']:
        nodes_df[col] = nodes_df[col].fillna(0).astype(int)
    nodes_df['degree'] = nodes_df['out_degree'] + nodes_df['in_degree']
    nodes_df['weighted_degree'] = nodes_df['out_weight'] + nodes_df['in_weight']
    nodes_df['Label'] = nodes_df['Id']
    return nodes_df


def keep_giant_component(df_edges_agg):
    graph = {}
    for _, row in df_edges_agg.iterrows():
        graph.setdefault(row['source'], set()).add(row['target'])
        graph.setdefault(row['target'], set()).add(row['source'])
    visited = set()
    components = []
    for node in graph:
        if node in visited:
            continue
        stack = [node]
        component = set()
        while stack:
            current = stack.pop()
            if current in visited:
                continue
            visited.add(current)
            component.add(current)
            stack.extend([nbr for nbr in graph.get(current, []) if nbr not in visited])
        components.append(component)
    if not components:
        return df_edges_agg
    giant = max(components, key=len)
    mask = df_edges_agg['source'].isin(giant) & df_edges_agg['target'].isin(giant)
    return df_edges_agg[mask]


def keep_top_n_nodes(df_edges_agg, n):
    top_nodes = compute_node_metrics(df_edges_agg).sort_values('degree', ascending=False)['Id'].head(n)
    node_set = set(top_nodes)
    return df_edges_agg[df_edges_agg['source'].isin(node_set) & df_edges_agg['target'].isin(node_set)]


def prune_by_degree(df_edges_agg, min_degree=None, quantile=None, top_n=None):
    metrics_full = compute_node_metrics(df_edges_agg)
    if metrics_full.empty:
        return df_edges_agg, metrics_full, None, 0

    threshold = None
    if min_degree is not None:
        threshold = float(min_degree)
        nodes_to_keep = metrics_full[metrics_full['degree'] >= min_degree]['Id']
    elif quantile is not None:
        threshold = float(metrics_full['degree'].quantile(quantile))
        nodes_to_keep = metrics_full[metrics_full['degree'] >= threshold]['Id']
    elif top_n is not None:
        sorted_metrics = metrics_full.sort_values('degree', ascending=False)
        nodes_to_keep = sorted_metrics['Id'].head(top_n)
        if len(sorted_metrics) >= 1:
            threshold = float(sorted_metrics['degree'].iloc[min(top_n - 1, len(sorted_metrics) - 1)])
    else:
        return df_edges_agg, metrics_full, None, len(metrics_full)

    nodes_set = set(nodes_to_keep)
    pruned_edges = df_edges_agg[
        df_edges_agg['source'].isin(nodes_set) & df_edges_agg['target'].isin(nodes_set)
    ]
    pruned_metrics = compute_node_metrics(pruned_edges)
    return pruned_edges, pruned_metrics, threshold, len(nodes_set)


def export_gexf(df_edges_agg, out_path, node_metrics=None):
    if node_metrics is None:
        node_metrics = compute_node_metrics(df_edges_agg)
    graph = nx.DiGraph()
    for _, row in df_edges_agg.iterrows():
        graph.add_edge(row['source'], row['target'], weight=int(row['weight']))
    for _, row in node_metrics.iterrows():
        graph.add_node(row['Id'])
        graph.nodes[row['Id']].update({
            'label': row['Label'],
            'out_degree': int(row['out_degree']),
            'in_degree': int(row['in_degree']),
            'degree': int(row['degree']),
            'out_weight': int(row['out_weight']),
            'in_weight': int(row['in_weight']),
            'weighted_degree': int(row['weighted_degree']),
        })
    nx.write_gexf(graph, out_path, encoding='utf-8')
    return out_path, node_metrics


In [4]:
if INPUT_MODE == 'dir':
    email_iterable = iter_emails_from_dir(INPUT_PATH)
elif INPUT_MODE == 'csv':
    email_iterable = iter_emails_from_csv(INPUT_PATH, CSV_FROM_COL, CSV_TO_COLS, CSV_DATE_COL)
else:
    raise ValueError("INPUT_MODE must be 'dir' or 'csv'")

edges_df = build_edges(email_iterable)
print(f'Raw edges: {len(edges_df)}')

edges_df = filter_by_date(edges_df, DATE_MIN, DATE_MAX)
edges_agg = aggregate_edges(edges_df)
print(f'Unique directed edges: {len(edges_agg)}')

if MIN_EDGE_WEIGHT and MIN_EDGE_WEIGHT > 1:
    edges_agg = edges_agg[edges_agg['weight'] >= MIN_EDGE_WEIGHT]
    print(f'After weight >= {MIN_EDGE_WEIGHT}: {len(edges_agg)} edges')

if KEEP_GIANT_COMPONENT:
    edges_agg = keep_giant_component(edges_agg)
    print(f'After giant component: {len(edges_agg)} edges')

node_metrics = None
if AUTO_PRUNE_BY_DEGREE:
    edges_agg, node_metrics, degree_threshold, kept_nodes = prune_by_degree(
        edges_agg,
        min_degree=PRUNE_DEGREE_MIN,
        quantile=PRUNE_DEGREE_QUANTILE,
        top_n=PRUNE_DEGREE_TOP_N,
    )
    if degree_threshold is not None:
        print(f'After degree-based pruning (threshold ~= {degree_threshold:.2f}, nodes kept = {kept_nodes}): {len(edges_agg)} edges')
    else:
        print(f'Degree pruning skipped (configuration yielded no threshold). Nodes kept = {kept_nodes}.')
elif TOP_N_NODES:
    edges_agg = keep_top_n_nodes(edges_agg, TOP_N_NODES)
    print(f'After top {TOP_N_NODES} nodes: {len(edges_agg)} edges')

if node_metrics is None:
    node_metrics = compute_node_metrics(edges_agg)

main_gexf_path = Path(OUTPUT_DIR) / BASE_GEXF_NAME
export_gexf(edges_agg, main_gexf_path, node_metrics=node_metrics)
print(f'GEXF exported to: {main_gexf_path}')

if TIME_SLICE_FREQ:
    if 'date' not in edges_df.columns or edges_df['date'].isna().all():
        print('No usable dates available for time slicing.')
    else:
        edges_with_dates = edges_df.dropna(subset=['date']).copy()
        edges_with_dates['period'] = edges_with_dates['date'].dt.to_period(TIME_SLICE_FREQ).dt.to_timestamp()
        for period, chunk in edges_with_dates.groupby('period'):
            agg_chunk = aggregate_edges(chunk)
            if MIN_EDGE_WEIGHT and MIN_EDGE_WEIGHT > 1:
                agg_chunk = agg_chunk[agg_chunk['weight'] >= MIN_EDGE_WEIGHT]
            if KEEP_GIANT_COMPONENT:
                agg_chunk = keep_giant_component(agg_chunk)
            chunk_metrics = None
            if AUTO_PRUNE_BY_DEGREE:
                agg_chunk, chunk_metrics, degree_threshold, kept_nodes = prune_by_degree(
                    agg_chunk,
                    min_degree=PRUNE_DEGREE_MIN,
                    quantile=PRUNE_DEGREE_QUANTILE,
                    top_n=PRUNE_DEGREE_TOP_N,
                )
            elif TOP_N_NODES:
                agg_chunk = keep_top_n_nodes(agg_chunk, TOP_N_NODES)
            gexf_path = Path(OUTPUT_DIR) / f'enron_network_{period.strftime("%Y-%m-%d")}.gexf'
            export_gexf(agg_chunk, gexf_path, node_metrics=chunk_metrics)
        print(f'Time-sliced GEXF files written to {OUTPUT_DIR}')


Reading emails from dir: 100%|██████████| 517404/517404 [08:17<00:00, 1040.05it/s]


Raw edges: 3610073
Unique directed edges: 361032
After weight >= 2: 269781 edges
After giant component: 269133 edges
After degree-based pruning (threshold ~= 13.00, nodes kept = 6275): 166160 edges
GEXF exported to: gephi_exports/enron_network.gexf


**Next Steps:** Update `INPUT_MODE`, `INPUT_PATH`, and pruning values to match your local dataset. After running the notebook, open the generated `.gexf` files in Gephi to explore degree-filtered communication networks.