# Email ETL Demo Notebook

This notebook demonstrates the end-to-end ETL pipeline for parsing `.eml` email files.

Pipeline flow:
1. **Extract** → Parse `.eml` → messages + attachments
2. **Transform** → Enrich, normalize, and run data quality checks
3. **Merge** → Link attachments to parent messages
4. **Load** → Save results to CSV & SQLite
5. **Batch Control** → Track ETL metadata and summarize results

We'll use generated sample `.eml` files for testing.

In [1]:
# Setup
from etl.core.context import ETLContext
from etl.transform.processor import process_files_parallel, merge_messages_with_attachments
from etl.transform.enrichments import enrich_messages, enrich_attachments
from etl.transform.data_quality import run_data_quality
from etl.load.storage import Storage
from etl.load.batch_control import BatchControl
import os

# Define paths for this demo run. Paths are defined in config/settings.py
input_dir = "examples/sample_emails"
output_dir = "data/output"

# Initialize ETL context
ctx = ETLContext.from_args(input_dir=input_dir, output_dir=output_dir)
print("ETL context initialized:")
print(f"  Input:  {ctx.input_dir}")
print(f"  Output: {ctx.output_dir}")
print(f"  SQLite: {ctx.db_path}")

# Generate sample emails if missing
sample_dir = "examples/sample_emails"
if not os.path.exists(sample_dir) or not any(f.endswith('.eml') for f in os.listdir(sample_dir)):
    from examples.generate_sample_eml import generate_eml
    generate_eml(sample_dir, count=5)

print(f"Using sample emails from: {sample_dir}")

ETL context initialized:
  Input:  examples/sample_emails
  Output: data/output
  SQLite: data/output/etl_demo.db
Using sample emails from: examples/sample_emails


## Step 1: Extract
Parse `.eml` files into messages and attachments.

In [2]:
messages_df, attachments_df, file_count = process_files_parallel(sample_dir, max_workers=2)
print(f"Processed {file_count} files")

display(messages_df.head())
display(attachments_df.head())

Processed 5 files


Unnamed: 0,email_id,message_id,timestamp,speaker_name,speaker_contact,message,with_attachment
0,sample_1,6fb353ba-5345-4c42-a197-e018f82535e2,2025-10-04T08:09:40.279545Z,Alice Example,alice@example.com,"Hello team, please find attached the report.",
1,sample_3,cbab3fe6-f4ec-47a4-bbe8-c60a2a89d7eb,2025-09-28T08:09:40.282160Z,Carol Test,carol@example.com,Here is the updated project plan.,
2,sample_2,84dc08ec-7d12-4d23-ae05-942269326e2d,2025-10-01T08:09:40.281832Z,Bob Demo,bob@example.com,Reminder: standup meeting tomorrow at 9am.,
3,sample_5,63726476-b1fc-4740-8518-93ea2b043f5f,2025-09-22T08:09:40.282580Z,Bob Demo,bob@example.com,Attached is the invoice for last month.,
4,sample_4,5b88a145-7172-48a5-a17a-b9ccce9483ea,2025-09-25T08:09:40.282404Z,Alice Example,alice@example.com,Let's schedule a call next week.,


Unnamed: 0,email_id,attachment_name,content_id,content_type
0,sample_1,attachment_0.txt,1a111dc9-eb38-48e1-aeae-bda8fb7a9e98@example.com,text/plain
1,sample_3,attachment_2.txt,c7b51c87-8042-48a8-a062-86e53e43d14e@example.com,text/plain
2,sample_5,attachment_4.txt,018ee6b3-2f4f-45aa-9044-c8ba55b20328@example.com,text/plain


## Step 2: Transform
- Link attachments to their parent messages (1-to-many) and flag messages that contain attachments.
- Enrich message and attachment data with batch partition date.

In [3]:
messages_df, attachments_df = merge_messages_with_attachments(messages_df, attachments_df)
messages_df = enrich_messages(messages_df.copy())
attachments_df = enrich_attachments(attachments_df.copy())

print(f"Messages with attachments: {messages_df['with_attachment'].sum()}")
display(messages_df.head())
display(attachments_df.head())

Messages with attachments: 3


Unnamed: 0,email_id,message_id,timestamp,speaker_name,speaker_contact,message,with_attachment,batch_dt
0,sample_1,6fb353ba-5345-4c42-a197-e018f82535e2,2025-10-04T08:09:40.279545Z,Alice Example,alice@example.com,"Hello team, please find attached the report.",True,2025-10-04
1,sample_3,cbab3fe6-f4ec-47a4-bbe8-c60a2a89d7eb,2025-09-28T08:09:40.282160Z,Carol Test,carol@example.com,Here is the updated project plan.,True,2025-10-04
2,sample_2,84dc08ec-7d12-4d23-ae05-942269326e2d,2025-10-01T08:09:40.281832Z,Bob Demo,bob@example.com,Reminder: standup meeting tomorrow at 9am.,False,2025-10-04
3,sample_5,63726476-b1fc-4740-8518-93ea2b043f5f,2025-09-22T08:09:40.282580Z,Bob Demo,bob@example.com,Attached is the invoice for last month.,True,2025-10-04
4,sample_4,5b88a145-7172-48a5-a17a-b9ccce9483ea,2025-09-25T08:09:40.282404Z,Alice Example,alice@example.com,Let's schedule a call next week.,False,2025-10-04


Unnamed: 0,email_id,attachment_name,content_id,content_type,message_id,batch_dt
0,sample_1,attachment_0.txt,1a111dc9-eb38-48e1-aeae-bda8fb7a9e98@example.com,text/plain,6fb353ba-5345-4c42-a197-e018f82535e2,2025-10-04
1,sample_3,attachment_2.txt,c7b51c87-8042-48a8-a062-86e53e43d14e@example.com,text/plain,cbab3fe6-f4ec-47a4-bbe8-c60a2a89d7eb,2025-10-04
2,sample_5,attachment_4.txt,018ee6b3-2f4f-45aa-9044-c8ba55b20328@example.com,text/plain,63726476-b1fc-4740-8518-93ea2b043f5f,2025-10-04


## Step 3: Data Quality Checks
Run data quality checks on the messages table after enrichment and merge.

In [4]:
dq_issues = run_data_quality(messages_df, name="Messages")
print(f"Data quality issues found: {len(dq_issues)}")
display(dq_issues.head())

Data quality issues found: 0


Unnamed: 0,email_id,message_id,timestamp,speaker_name,speaker_contact,message,with_attachment,batch_dt


## Step 4: Load
Save both tables to CSV and SQLite, using BatchControl to track metadata for each stage.

In [5]:
storage = Storage(ctx)

# Messages
msg_batch = BatchControl("messages_load", ctx)
msg_batch.start(rows_expected=len(messages_df))
storage.write_csv(messages_df, "messages")
storage.write_sqlite(messages_df, "messages")
msg_batch.end(rows_loaded=len(messages_df))

# Attachments
att_batch = BatchControl("attachments_load", ctx)
att_batch.start(rows_expected=len(attachments_df))
storage.write_csv(attachments_df, "attachments")
storage.write_sqlite(attachments_df, "attachments")
att_batch.end(rows_loaded=len(attachments_df))

## Step 5: Summary
Display record counts and verify message-to-attachment relationships.

In [6]:
print(f"Messages total: {len(messages_df)}")
print(f"Attachments total: {len(attachments_df)}")
print(f"Messages with attachments: {messages_df['with_attachment'].sum()}")
print(f"Messages without attachments: {(~messages_df['with_attachment']).sum()}")

Messages total: 5
Attachments total: 3
Messages with attachments: 3
Messages without attachments: 2


## Step 6: Query SQLite Database
Query the SQLite database created by the pipeline to validate relational integrity and view batch control metadata.

In [7]:
import sqlite3
import pandas as pd

db_path = ctx.db_path
conn = sqlite3.connect(db_path)

# List tables
tables = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table';", conn)
print("Tables in SQLite database:")
display(tables)

# Query messages
messages_sql = pd.read_sql_query("SELECT * FROM messages LIMIT 5;", conn)
print("Messages sample:")
display(messages_sql)

# Query attachments
attachments_sql = pd.read_sql_query("SELECT * FROM attachments LIMIT 5;", conn)
print("Attachments sample:")
display(attachments_sql)

# Join query
join_sql = pd.read_sql_query("""
SELECT m.email_id, m.message_id, m.speaker_name, a.attachment_name, m.batch_dt
FROM messages m
LEFT JOIN attachments a USING (message_id)
LIMIT 10;
""", conn)
print("Joined messages and attachments:")
display(join_sql)

Tables in SQLite database:


Unnamed: 0,name
0,messages
1,batch_control
2,attachments


Messages sample:


Unnamed: 0,email_id,message_id,timestamp,speaker_name,speaker_contact,message,with_attachment,batch_dt
0,sample_1,6fb353ba-5345-4c42-a197-e018f82535e2,2025-10-04T08:09:40.279545Z,Alice Example,alice@example.com,"Hello team, please find attached the report.",1,2025-10-04
1,sample_3,cbab3fe6-f4ec-47a4-bbe8-c60a2a89d7eb,2025-09-28T08:09:40.282160Z,Carol Test,carol@example.com,Here is the updated project plan.,1,2025-10-04
2,sample_2,84dc08ec-7d12-4d23-ae05-942269326e2d,2025-10-01T08:09:40.281832Z,Bob Demo,bob@example.com,Reminder: standup meeting tomorrow at 9am.,0,2025-10-04
3,sample_5,63726476-b1fc-4740-8518-93ea2b043f5f,2025-09-22T08:09:40.282580Z,Bob Demo,bob@example.com,Attached is the invoice for last month.,1,2025-10-04
4,sample_4,5b88a145-7172-48a5-a17a-b9ccce9483ea,2025-09-25T08:09:40.282404Z,Alice Example,alice@example.com,Let's schedule a call next week.,0,2025-10-04


Attachments sample:


Unnamed: 0,email_id,attachment_name,content_id,content_type,message_id,batch_dt
0,sample_1,attachment_0.txt,1a111dc9-eb38-48e1-aeae-bda8fb7a9e98@example.com,text/plain,6fb353ba-5345-4c42-a197-e018f82535e2,2025-10-04
1,sample_3,attachment_2.txt,c7b51c87-8042-48a8-a062-86e53e43d14e@example.com,text/plain,cbab3fe6-f4ec-47a4-bbe8-c60a2a89d7eb,2025-10-04
2,sample_5,attachment_4.txt,018ee6b3-2f4f-45aa-9044-c8ba55b20328@example.com,text/plain,63726476-b1fc-4740-8518-93ea2b043f5f,2025-10-04
3,sample_1,attachment_0.txt,1a111dc9-eb38-48e1-aeae-bda8fb7a9e98@example.com,text/plain,6fb353ba-5345-4c42-a197-e018f82535e2,2025-10-04
4,sample_3,attachment_2.txt,c7b51c87-8042-48a8-a062-86e53e43d14e@example.com,text/plain,cbab3fe6-f4ec-47a4-bbe8-c60a2a89d7eb,2025-10-04


Joined messages and attachments:


Unnamed: 0,email_id,message_id,speaker_name,attachment_name,batch_dt
0,sample_1,6fb353ba-5345-4c42-a197-e018f82535e2,Alice Example,attachment_0.txt,2025-10-04
1,sample_1,6fb353ba-5345-4c42-a197-e018f82535e2,Alice Example,attachment_0.txt,2025-10-04
2,sample_1,6fb353ba-5345-4c42-a197-e018f82535e2,Alice Example,attachment_0.txt,2025-10-04
3,sample_1,6fb353ba-5345-4c42-a197-e018f82535e2,Alice Example,attachment_0.txt,2025-10-04
4,sample_1,6fb353ba-5345-4c42-a197-e018f82535e2,Alice Example,attachment_0.txt,2025-10-04
5,sample_1,6fb353ba-5345-4c42-a197-e018f82535e2,Alice Example,attachment_0.txt,2025-10-04
6,sample_1,6fb353ba-5345-4c42-a197-e018f82535e2,Alice Example,attachment_0.txt,2025-10-04
7,sample_3,cbab3fe6-f4ec-47a4-bbe8-c60a2a89d7eb,Carol Test,attachment_2.txt,2025-10-04
8,sample_3,cbab3fe6-f4ec-47a4-bbe8-c60a2a89d7eb,Carol Test,attachment_2.txt,2025-10-04
9,sample_3,cbab3fe6-f4ec-47a4-bbe8-c60a2a89d7eb,Carol Test,attachment_2.txt,2025-10-04


## Step 7: Review Batch Control Records
Confirm that batch control metadata has been written to both CSV and SQLite.

In [8]:
# Query batch_control table
batch_sql = pd.read_sql_query("SELECT * FROM batch_control ORDER BY created_at DESC;", conn)
print("Batch control summary:")
display(batch_sql)

# Optionally read from CSV
batch_csv_path = os.path.join(ctx.output_dir, "batch_control.csv")
if os.path.exists(batch_csv_path):
    batch_csv = pd.read_csv(batch_csv_path)
    print("Batch control CSV:")
    display(batch_csv.tail())

conn.close()

Batch control summary:


Unnamed: 0,batch_name,start_time,end_time,duration_sec,rows_expected,rows_loaded,status,created_at
0,attachments_load,2025-10-04 18:21:28.100175,2025-10-04 18:21:28.101792,0.001617,3,3,SUCCESS,2025-10-04 18:21:28.101801
1,messages_load,2025-10-04 18:21:28.077015,2025-10-04 18:21:28.094697,0.017682,5,5,SUCCESS,2025-10-04 18:21:28.094730
2,attachments_load,2025-10-04 18:20:23.003062,2025-10-04 18:20:23.007703,0.004641,3,3,SUCCESS,2025-10-04 18:20:23.007719
3,messages_load,2025-10-04 18:20:22.980706,2025-10-04 18:20:22.993753,0.013047,5,5,SUCCESS,2025-10-04 18:20:22.993766
4,attachments_load,2025-10-04 18:20:12.723553,2025-10-04 18:20:12.727541,0.003988,3,3,SUCCESS,2025-10-04 18:20:12.727553
5,messages_load,2025-10-04 18:20:12.699709,2025-10-04 18:20:12.719378,0.019669,5,5,SUCCESS,2025-10-04 18:20:12.719396
6,attachments_load,2025-10-04 18:19:13.375496,2025-10-04 18:19:13.381491,0.005995,3,3,SUCCESS,2025-10-04 18:19:13.381507
7,messages_load,2025-10-04 18:19:13.352837,2025-10-04 18:19:13.371170,0.018333,5,5,SUCCESS,2025-10-04 18:19:13.371185
8,attachments_load,2025-10-04 18:13:25.246101,2025-10-04 18:13:25.247455,0.001354,3,3,SUCCESS,2025-10-04 18:13:25.247471
9,messages_load,2025-10-04 18:13:25.234383,2025-10-04 18:13:25.242858,0.008475,5,5,SUCCESS,2025-10-04 18:13:25.242881


Batch control CSV:


Unnamed: 0,batch_name,start_time,end_time,duration_sec,rows_expected,rows_loaded,status,created_at
9,attachments_load,2025-10-04 18:20:12.723553,2025-10-04 18:20:12.727541,0.003988,3,3,SUCCESS,2025-10-04 18:20:12.727553
10,messages_load,2025-10-04 18:20:22.980706,2025-10-04 18:20:22.993753,0.013047,5,5,SUCCESS,2025-10-04 18:20:22.993766
11,attachments_load,2025-10-04 18:20:23.003062,2025-10-04 18:20:23.007703,0.004641,3,3,SUCCESS,2025-10-04 18:20:23.007719
12,messages_load,2025-10-04 18:21:28.077015,2025-10-04 18:21:28.094697,0.017682,5,5,SUCCESS,2025-10-04 18:21:28.094730
13,attachments_load,2025-10-04 18:21:28.100175,2025-10-04 18:21:28.101792,0.001617,3,3,SUCCESS,2025-10-04 18:21:28.101801


In [9]:
# Summary
print(f"ETL run completed successfully.")
print(f"Results stored under: {ctx.output_dir}")
print(f"SQLite database path: {ctx.db_path}")

ETL run completed successfully.
Results stored under: data/output
SQLite database path: data/output/etl_demo.db


## ✅ Pipeline Complete
Data has been extracted, transformed, loaded, and verified.

Outputs:
- **Messages** → `data/output/messages.csv` and `etl_demo.db`
- **Attachments** → `data/output/attachments.csv` and `etl_demo.db`
- **Batch Control** → `data/output/batch_control.csv` and `etl_demo.db`

All data can be queried directly from SQLite for validation or reporting.