**Install Required Packages**

In [53]:
!pip install pandas --quiet
!pip install tabulate --quiet
!pip install pdfplumber --quiet
!pip install tqdm

!pip install pymupdf --quiet --upgrade --prefer-binary --only-binary :all:



**Define the SkipFile exception**

In [54]:
class SkipFile(Exception):
    """Raised to indicate this PDF should be skipped (no data to process)."""
    pass

**Define page-finder & table-extractor functions**

In [55]:
import pandas as pd
import fitz
import pdfplumber

def find_pages(pdf_path: str, keyword: str) -> list[int]:
    """
    Return 1-based page numbers where `keyword` appears in the page text.
    """
    pages = []
    with fitz.open(pdf_path) as doc:
        for i in range(doc.page_count):
            if keyword in doc[i].get_text():
                pages.append(i + 1)
    return pages


def extract_tables(
    pdf_path: str,
    pages: list[int],
    period: str,
    drop_type: str = "Spread"
) -> pd.DataFrame:
    """
    From the given pages, pull out every table row where
    df['Period']==period AND df['Type']!=drop_type.
    Returns one concatenated DataFrame, or raises SkipFile if none.
    """
    dfs = []
    with pdfplumber.open(pdf_path) as pdf:
        for pnum in pages:
            tbl = pdf.pages[pnum - 1].extract_table()
            if not tbl:
                continue
            df = pd.DataFrame(tbl[1:], columns=tbl[0])
            if "Type" in df.columns:
                df["Type"] = df["Type"].str.strip()
                df = df[df["Type"] != drop_type]
            if "Period" in df.columns:
                df = df[df["Period"] == period]
            if not df.empty:
                dfs.append(df)

    if not dfs:
        # nothing matched → skip this PDF
        raise SkipFile(f"No tables extracted")

    return pd.concat(dfs, ignore_index=True)

**Define a function to determine if "In the Money"**

In [56]:
def find_in_the_money(df: pd.DataFrame) -> pd.DataFrame:
    """
    Convert 'Exp Value' to numeric;
    Extract the numeric strike price from 'Display Name' (number following '>');
    Compute 'Flag' as 1 if Exp Value > Strike Price, else 0;
    Keep the 'exp time' column unchanged.
    """
    return (
        df
        .assign(
            **{
                'Exp Value': lambda df: pd.to_numeric(df['Exp Value'], errors='coerce'),
                'Strike Price': lambda df: (
                    df['Display Name']
                      .str.extract(r'>\s*([\d.]+)', expand=False)
                      .pipe(pd.to_numeric, errors='coerce')
                ),
            }
        )
        .assign(
            Flag=lambda df: (df['Exp Value'] > df['Strike Price']).astype(int)
        )
    )

**Define the clean and rename function**

In [57]:
import pandas as pd

def clean_and_rename(df: pd.DataFrame) -> pd.DataFrame:
    to_drop = [
        c for c in [
            "Period", "Display Name",
            "Type", "Buyer", "Seller"
        ] if c in df.columns
    ]
    return (
        df
        .rename(columns={
            "Business Date": "Date",
            "Flag": "In the Money",
        })
        .drop_duplicates(
            subset=["Date", "Exp Value", "Strike Price", "Ticker"],
            keep="first",
        )
        .drop(columns=to_drop)
    )


**Define a function to substitue the Ticker for the name**

In [58]:
import pandas as pd
from pathlib import Path
from typing import Union

def add_ticker_from_mapping(
    df: pd.DataFrame,
    mapping_file: Union[str, Path]) -> pd.DataFrame:
    """
    Load a mapping CSV with columns 'Display Name' and 'Ticker',
    then add a 'Ticker' column to `df` by substring-matching 'Name' against each 'Display Name'.

    Parameters
    ----------
    df : pd.DataFrame
        Source DataFrame containing a 'Name' column.
    mapping_file : str or Path
        Path to CSV with mapping of 'Display Name' to 'Ticker'.

    Returns
    -------
    pd.DataFrame
        A new DataFrame with an added 'Ticker' column.
    """
    mapping_df = pd.read_csv(mapping_file)
    
    def lookup_ticker(name: str) -> pd.Series:
        if pd.isna(name):
            return pd.NA
        for desc, ticker in zip(mapping_df['Display Name'], mapping_df['Ticker']):
            if desc in name:
                return ticker
        return pd.NA

    result = df.copy()
    result['Ticker'] = result['Name'].apply(lookup_ticker).fillna({'Ticker': 'UNKNOWN'})

    return result

**Define a function to save the final result to an S3 CSV**

In [59]:
import io
from botocore.exceptions import ClientError

import io
from botocore.exceptions import ClientError

def upload_df_to_s3(
    df: pd.DataFrame,
    s3_client,
    bucket: str,
    key: str,
) -> None:
    """
    Upload a DataFrame to S3 as CSV using your provided S3 client.
    """
    bucket = bucket.strip()

    # sanity‐check bucket exists
    try:
        s3_client.head_bucket(Bucket=bucket)
    except ClientError as e:
        raise RuntimeError(f"Could not access bucket '{bucket}': {e}") from e

    # ——— Use BytesIO instead of StringIO ————————————————
    csv_buffer = io.BytesIO()
    # write UTF-8–encoded bytes, not text
    csv_buffer.write(df.to_csv(index=False).encode('utf-8'))
    csv_buffer.seek(0)

    try:
        s3_client.upload_fileobj(
            Fileobj=csv_buffer,
            Bucket=bucket,
            Key=key,
            ExtraArgs={"ContentType": "text/csv"},
        )
    except ClientError as e:
        raise RuntimeError(
            f"Failed to upload CSV to s3://{bucket}/{key}: {e}"
        ) from e

    # optional confirm
    try:
        meta = s3_client.head_object(Bucket=bucket, Key=key)
        # print(f"   → Confirmed upload: {meta.get('ContentLength')} bytes")
    except Exception:
        print("   ⚠️ Could not confirm upload with head_object")


**Define function to avoid re-processing processed files**

In [60]:
import json
import boto3 

from botocore.exceptions import ClientError

def load_manifest(
    s3_client: boto3.client, 
    bucket_name: str, 
    manifest_key: str = "manifests/processed_files.json"
) -> set[str]:
    """
    Download and parse the JSON manifest of processed PDFs.
    Returns a set of keys, or empty set if it doesn’t exist.
    """
    try:
        resp = s3_client.get_object(Bucket=bucket_name, Key=manifest_key)
        return set(json.loads(resp["Body"].read()))
    except ClientError as e:
        # If the object isn’t found, return empty set; else re-raise
        if e.response["Error"]["Code"] == "NoSuchKey":
            return set()
        raise

def save_manifest(
    processed: set[str],
    s3_client: boto3.client,
    bucket_name: str,
    manifest_key: str = "manifests/processed_files.json"
) -> None:
    """
    Upload the updated manifest back to S3.
    """
    s3_client.put_object(
        Bucket=bucket_name,
        Key=manifest_key,
        Body=json.dumps(list(processed)).encode("utf-8"),
        ContentType="application/json",
    )

**Define a function to get all the files related to trading results**

In [61]:
from typing import List

def list_nadex_trading_results(bucket, prefix: str = "") -> List[str]:
    """
    List PDF keys in the given bucket under `prefix`
    that contain 'tradingResults' and end with .pdf.
    """
    return [
        obj.key
        for obj in bucket.objects.filter(Prefix=prefix)
        if "tradingResults" in obj.key and obj.key.lower().endswith(".pdf")
    ]


**Define functions to setup access to S3, and filter the list of pdf's returned**

In [62]:
import boto3
from botocore import UNSIGNED
from botocore.config import Config

from datetime import date, datetime
from pathlib import Path

from typing import Iterable, List, Dict

def create_s3_clients(
    profile: str = "default", region: str = "us-east-1"
) -> Dict[str, boto3.client]:
    session = boto3.Session(profile_name=profile, region_name=region)
    return {
        "public": session.client(
            "s3",
            config=Config(signature_version=UNSIGNED),
            region_name=region,
        ),
        "private": session.client("s3"),
        "resource": session.resource("s3"),
    }

def get_bucket(resource: boto3.resource, name: str):
    return resource.Bucket(name)

def parse_key_date(key: str) -> date:
    stem = Path(key).stem
    return datetime.strptime(stem[:8], "%Y%m%d").date()

def filter_new_pdfs(
    keys: Iterable[str],
    processed: Iterable[str],
    start: date = date(2024, 1, 1),
    end: date | None = None,
) -> List[str]:
    """
    Return keys between start & end (inclusive) that aren’t in processed.
    """
    if end is None:
        end = date.today()
    return [
        key
        for key in keys
        if start <= (d := parse_key_date(key)) <= end
        and key not in processed
    ]

**Define a helper function to process each file**

**Also, define a function to run the Pipeline**
1. For each PDF in the folder:
2. Read the PDF file
3. Extract the Tables with Daily contracts
4. Create a CSV
5. Write the CSV to S3
6. Log the PDF file as 'processed

In [63]:
from pathlib import Path
from datetime import date
from typing import List

import traceback

from tqdm import tqdm

def _process_pdf(
    pdf_key: str,
    target: str,
    mapping_file: Path,
    public_s3,
    private_s3,
    bucket_name: str,
    nadex_bucket_name: str,
    tmp_dir: Path = Path("/tmp"),
) -> None:
    """
    Download a single PDF, extract/transform, upload the CSV, and
    print a success message. Raises on any step failure.
    """
    local_pdf = tmp_dir / Path(pdf_key).name
    public_s3.download_file(
        Bucket=nadex_bucket_name,
        Key=pdf_key,
        Filename=str(local_pdf),
    )

    def _format_ctx(exc: Exception) -> str:
        tb = traceback.extract_tb(exc.__traceback__)
        fn, ln, _, text = tb[-1]
        return f"{fn}:{ln} -> {text.strip()}"
        
    try:
        pages = find_pages(str(local_pdf), target)
        df = (
            extract_tables(str(local_pdf), pages, target)
            .pipe(find_in_the_money)
            .pipe(add_ticker_from_mapping, mapping_file)
            .pipe(clean_and_rename)
        )
    except SkipFile as sf:
        raise
    except Exception as e:
        ctx = _format_ctx(e)
        raise RuntimeError(
            f"[EXTRACT/TRANSFORM ERROR] '{pdf_key}' (pages={pages}) "
            f"({ctx}): {e}"
        ) from e

    try:
        upload_df_to_s3(df, private_s3, bucket_name, f"historical/{Path(pdf_key).stem.split('_', 1)[0]}_Historical.csv")
    except Exception as e:
        ctx = _format_ctx(e)
        raise RuntimeError(
            f"[UPLOAD ERROR] '{pdf_key}' → '{bucket_name}/{pdf_key}' "
            f"({ctx}): {e}"
        ) from e

def run_nadex_pipeline(
    mapping_file: Path,
    target: str,
    bucket_name: str,
    nadex_bucket_name: str,
    start: date,
    end: date,
):
    """
    Orchestrates the full Nadex PDF → CSV pipeline over a given date range.

    Parameters:
    -----------
    mapping_file : Path to the CSV mapping file used to enrich extracted tables.
    target : The keyword to locate pages within each PDF (e.g. "Daily").
    bucket_name : Name of your own S3 bucket where resulting CSVs and the manifest are stored.
    nadex_bucket_name : Name of the public Nadex S3 bucket from which PDFs are downloaded (unsigned).
    start : Lower bound (inclusive) on PDF dates to process (parsed from filenames).
    end : Upper bound (inclusive) on PDF dates to process.

    Actions:
    --------
    1. Bootstraps three S3 interfaces:
       - `public_s3`: an unsigned client to download Nadex PDFs.
       - `private_s3`: a signed client for uploading CSVs & manifest.
       - `s3_resource`: resource interface to enumerate bucket objects.
    2. Constructs `buckets` dict with Bucket objects for both source (market) and destination.
    3. Loads the JSON “processed files” manifest from your private bucket into a `processed` set.
    4. Iterates over every PDF key in the Nadex (market) bucket:
       a. Skips any key already in `processed`, accumulating in `skipped`.
       b. Parses the date out of the filename and skips if outside `[start, end]`.
       c. Calls `_process_pdf(...)` to:
          • Download PDF locally,
          • Extract & transform tables,
          • Enrich with ticker mapping,
          • Upload the resulting CSV back to your bucket.
       d. On success, adds the key to `processed`; on exception, logs to `errors`.
    5. After the loop finishes, writes the updated `processed` manifest back to S3.
    6. Prints a summary of how many files were processed, skipped, and errored.

    """
    number_processed = number_skipped = number_of_errors = 0
    
    clients = create_s3_clients()
    public_s3 = clients["public"]
    private_s3 = clients["private"]
    s3_resource = clients["resource"]
    buckets = {
        "daily":  get_bucket(s3_resource, bucket_name),
        "market": get_bucket(s3_resource, nadex_bucket_name),
    }

    processed = load_manifest(private_s3, bucket_name)
    errors: Dict[str, str]  = {}

    # 1) List *all* PDFs in the Nadex bucket
    all_keys = list_nadex_trading_results(buckets["market"], prefix="")

    # 2) Filter those to your date window
    date_range_keys = [
        k for k in all_keys
        if start <= parse_key_date(k) <= end
    ]

    # 3) Split into new vs skipped-within-date-range
    new_keys = []
    skipped: Dict[str, str] = {}
    
    for k in date_range_keys:
        if k in processed:
            skipped[k] = "already processed"
            number_skipped += 1
        else:
            new_keys.append(k)
            number_processed += 1

    errors: Dict[str, str] = {}
    
    for pdf_key in tqdm(
        new_keys,
        desc="Processing PDFs",
        ascii=True,
    ):

    # ─── Main loop (single statement!) ────────────────────────────────────────
        try:
            _process_pdf(
                pdf_key,
                target,
                mapping_file,
                public_s3,
                private_s3,
                bucket_name,
                nadex_bucket_name,
            )
            processed.add(pdf_key)
        except SkipFile as sf:
            skipped[pdf_key] = str(sf)
            number_skipped += 1
        except Exception as exc:
            errors[pdf_key] = str(exc)
            number_of_errors += 1

    save_manifest(processed, private_s3, bucket_name)

    print()
    print(f"Done — processed: {len(processed)} files")
    print(f"       skipped:   {len(skipped)} files already processed")
    print(f"       errors:    {len(errors)} failures")

    return {
        'files_processed': number_processed,
        'files_skipped': number_skipped,
        'files_with_errors': number_of_errors 
    }

**Record the Run Log**

In [64]:
import yaml
from pathlib import Path
with open('../configs/s3.yaml', 'r') as f:
    cfg = yaml.safe_load(f)
    
# Run log helper (append a row to S3 CSV)
import io, csv, datetime as dt
from botocore.exceptions import ClientError  # comes with boto3

RUNLOG_FIELDS = [
    'date','start_time','end_time','status',
    'files_processed','files_skipped','files_error',
    'run_id','notes'
]

from botocore.config import Config

session = boto3.Session(region_name=cfg.get('region'))
private_s3 = session.client('s3')
public_s3 = session.client('s3', config=Config(signature_version=UNSIGNED))

def append_runlog_s3(
    bucket: str,
    key: str,
    *,
    start_time=None,
    status='success',
    files_processed=0,
    files_skipped=0,
    files_error=0,
    run_id='',
    notes=''
):
    now = dt.datetime.now()
    start = start_time or now

    row = {
        'date': now.date().isoformat(),
        'start_time': start if isinstance(start, str) else start.isoformat(timespec='seconds'),
        'end_time': now.isoformat(timespec='seconds'),
        'status': status,
        'files_processed': int(files_processed),
        'files_skipped': int(files_skipped),
        'files_error': int(files_error),
        'run_id': run_id,
        'notes': notes,
    }

    # Fetch existing (if present)
    buf = io.StringIO()
    need_header = False
    try:
        obj = private_s3.get_object(Bucket=bucket, Key=key)
        buf.write(obj['Body'].read().decode('utf-8'))
    except ClientError as e:
        if e.response['Error']['Code'] in ('NoSuchKey', '404'):
            need_header = True
        else:
            raise

    if buf.tell() == 0:
        need_header = True

    writer = csv.DictWriter(buf, fieldnames=RUNLOG_FIELDS)
    if need_header:
        writer.writeheader()
    if buf.getvalue() and not buf.getvalue().endswith(''):
        buf.write('')
    writer.writerow(row)

    private_s3.put_object(
        Bucket=bucket,
        Key=key,
        Body=buf.getvalue().encode('utf-8'),
        ContentType='text/csv'
    )

**Run the pipeline & display**

In [65]:
# Load config
import yaml
from pathlib import Path
with open('../configs/s3.yaml', 'r') as f:
    cfg = yaml.safe_load(f)

BUCKET = cfg['bucket']
PUBLIC_BUCKET = cfg['public_bucket']
HIST_PREFIX = cfg['prefixes']['historical']
MANIFEST_KEY = f"{cfg['prefixes']['manifests']}/processed_files.json"
RUNLOG_KEY = f"{cfg['prefixes']['logs']}/run_log.csv"
MAPPING_FILE = Path(cfg['mapping_file'])

# Append a run-log row to S3 CSV
import io, csv, datetime as dt

from pathlib import Path
from datetime import date

metrics = run_nadex_pipeline(
    mapping_file=MAPPING_FILE,
    target="Daily",
    bucket_name=BUCKET,
    nadex_bucket_name=PUBLIC_BUCKET,
    start=date(2025, 3, 1),
    end=date.today(),
)

# Use config in the final run cell
from datetime import date
import datetime as dt

run_start = dt.datetime.now()

# Generate a run_id (timestamp) for provenance (no Git required)
run_id = run_start.strftime("%Y%m%dT%H%M%S")

# After run: append run log with counters
append_runlog_s3(
    BUCKET, RUNLOG_KEY,
    start_time=run_start,
    status=('success' if metrics.get('files_error', 0) == 0 else
            'partial' if metrics.get('files_processed', 0) > 0 else 'failed'),
    files_processed=metrics.get('files_processed', 0),
    files_skipped=metrics.get('files_skipped', 0),
    files_error=metrics.get('files_error', 0),
    run_id=run_id,
    notes='Historical batch run'
)

Processing PDFs: 100%|##########| 63/63 [02:00<00:00,  1.91s/it]


Done — processed: 144 files
       skipped:   205 files already processed
       errors:    0 failures



