# Step 1: Parse Contracts

This notebook handles the first stage of contract analysis: getting raw contract files into a usable format.

**What it does:**
1. Downloads contract PDFs from the Cook County open data portal
2. Reads the raw files into a Delta table (so we can process them in parallel)
3. Parses each document using Databricks AI (extracts text from PDFs, images, etc.)
4. Flattens the parsed results into a clean table with full text and summaries

**Before you run this:**
- Set the widgets at the top (catalog, schema, volume name)
- Make sure you have a Unity Catalog volume to store raw files
- This step is the most time-consuming -- parsing large PDFs takes a while

**Output tables:**
- `bytes` -- raw file content
- `parsed` -- AI-parsed document structure
- `flat` -- cleaned text with preamble and truncated versions

## Configuration

Set your catalog, schema, and volume name using the widgets. These control where data is stored in Unity Catalog.

In [None]:
dbutils.widgets.text("catalog", "shm", "Catalog")
dbutils.widgets.text("schema", "contract", "Schema")
dbutils.widgets.text("volume_name", "raw", "Volume Name")
dbutils.widgets.text("batch_size", "100", "Batch Size")
dbutils.widgets.text("words_preamble", "100", "Words in Preamble")
dbutils.widgets.text("words_truncated", "5000", "Words in Truncated Text")

catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")
volume = dbutils.widgets.get("volume_name")
batch_size = int(dbutils.widgets.get("batch_size"))
doc_path = f"/Volumes/{catalog}/{schema}/{volume}"

---
## 1A: Download Contracts

We use the Cook County of Illinois Procurement dataset: [Awarded Contracts & Amendments](https://catalog.data.gov/dataset/procurement-awarded-contracts-amendments).

A local copy of the data is saved as `cook_county_contracts.parquet` in this repo. We filter to **Environmental Services** contracts to get a realistic set (~25 contracts with amendments).

Each contract PDF is downloaded into a vendor-specific subfolder on your Unity Catalog volume.

In [None]:
import pandas as pd
import re
import os
from pathlib import Path

df = pd.read_parquet("cook_county_contracts.parquet")
env_df = df[df["Commodity Type"] == "Environmental Services"]
print(f"Found {len(env_df)} Environmental Services rows")
display(env_df)

In [None]:
# Download each contract PDF into a vendor subfolder
base_output_dir = doc_path
Path(base_output_dir).mkdir(parents=True, exist_ok=True)

downloaded = failed = 0

for _, row in env_df[['Category', 'Vendor Name']].dropna(subset=['Category', 'Vendor Name']).iterrows():
    category = row['Category']
    vendor = re.sub(r'[^A-Z ]', '', str(row['Vendor Name']).upper())
    match = re.search(r'(https?://[^\s\)]+)', str(category))
    if match:
        url = match.group(1)
        filename = url.split("/")[-1]
        vendor_dir = os.path.join(base_output_dir, vendor)
        Path(vendor_dir).mkdir(parents=True, exist_ok=True)
        output_path = os.path.join(vendor_dir, filename)
        if os.system(f'wget -q -O "{output_path}" "{url}"') == 0:
            downloaded += 1
        else:
            failed += 1

print(f"Done: {downloaded} files downloaded, {failed} failed")

---
## 1B: Read Files into Delta Table

This reads all downloaded files into a `bytes` table. Each row stores:
- The raw file content (as binary)
- The vendor name and file name (extracted from the path)
- A list of other file paths in the same vendor folder (useful later for linking amendments to master agreements)

In [None]:
-- Create the bytes table if it doesn't exist
CREATE TABLE IF NOT EXISTS IDENTIFIER(:catalog || '.' || :schema || '.bytes') (
  path STRING,
  modificationTime TIMESTAMP,
  length BIGINT,
  _metadata STRUCT<
    file_path: STRING, 
    file_name: STRING, 
    file_size: BIGINT, 
    file_block_start: BIGINT, 
    file_block_length: BIGINT, 
    file_modification_time: TIMESTAMP
  >,
  content BINARY,
  vendor_name STRING,
  file_name STRING,
  vendor_folder_paths ARRAY<STRING>,
  CONSTRAINT bytes_path_pk PRIMARY KEY (path)
);

-- Read raw files and merge into the bytes table
MERGE INTO IDENTIFIER(:catalog || '.' || :schema || '.bytes') AS target
USING (
  WITH main_files AS (
    SELECT
      path,
      modificationTime,
      length,
      _metadata,
      content,
      regexp_extract(path, :doc_path || '/([^/]+)/', 1) AS vendor_name,
      regexp_extract(path, '/([^/]+)$', 1) AS file_name
    FROM READ_FILES(:doc_path, format => 'binaryFile', recursiveFileLookup => true)
  ),
  all_vendor_files AS (
    SELECT
      path, 
      regexp_extract(path, :doc_path || '/([^/]+)/', 1) AS vendor_name,
      regexp_extract(path, '/([^/]+)$', 1) AS file_name
    FROM READ_FILES(:doc_path, format => 'binaryFile', recursiveFileLookup => true)
  )
  SELECT
    m.*,
    (
      SELECT collect_list(avf.path)
      FROM all_vendor_files avf
      WHERE avf.vendor_name = m.vendor_name
        AND avf.file_name != m.file_name
    ) AS vendor_folder_paths
  FROM main_files m
) AS source
ON target.path = source.path
WHEN NOT MATCHED THEN
  INSERT *

---
## 1C: Parse Documents with AI

This uses Databricks `AI_PARSE_DOCUMENT` to extract structured text from each file. It processes files in batches to avoid timeouts.

The loop keeps running until all files are parsed. If it fails partway through, just re-run -- it picks up where it left off.

We start with PDFs and images, then do Office files separately.

In [None]:
CREATE TABLE IF NOT EXISTS IDENTIFIER(:catalog || '.' || :schema || '.parsed') (
  path STRING NOT NULL PRIMARY KEY,
  parsed VARIANT
)

In [None]:
import time

# Parse PDFs and images first
file_pattern = r'\.(pdf|jpg|jpeg|png)$'

def remaining_count():
    return spark.sql(f"""
        SELECT COUNT(*) AS cnt
        FROM {catalog}.{schema}.bytes AS b
        LEFT JOIN {catalog}.{schema}.parsed AS p
          ON b.path = p.path
        WHERE b.file_name RLIKE '{file_pattern}'
          AND p.path IS NULL
    """).collect()[0]["cnt"]

batch = 0
start = time.time()

while True:
    remaining = remaining_count()
    print(f"Batch {batch+1}, remaining: {remaining}")
    if remaining == 0:
        break

    t0 = time.time()
    spark.sql(f"""
        MERGE INTO {catalog}.{schema}.parsed AS target
        USING (
          SELECT 
            b.path,
            AI_PARSE_DOCUMENT(b.content) AS parsed
          FROM (
            SELECT b.path, content
            FROM {catalog}.{schema}.bytes AS b
            LEFT JOIN {catalog}.{schema}.parsed AS p
              ON b.path = p.path
            WHERE b.file_name RLIKE '{file_pattern}'
              AND p.path IS NULL
            ORDER BY b.length
            LIMIT CAST({batch_size} AS INTEGER)
          ) AS b
        ) AS source
        ON target.path = source.path
        WHEN NOT MATCHED THEN INSERT *
    """)
    print(f"  Batch {batch+1} done in {time.time() - t0:.1f}s")
    batch += 1

print(f"All PDFs/images done in {time.time() - start:.1f}s")

In [None]:
# Now parse Office files (doc, docx, ppt, pptx)
file_pattern = r'\.(doc|docx|ppt|pptx)$'

batch = 0
start = time.time()

while True:
    remaining = remaining_count()
    print(f"Batch {batch+1}, remaining: {remaining}")
    if remaining == 0:
        break

    t0 = time.time()
    spark.sql(f"""
        MERGE INTO {catalog}.{schema}.parsed AS target
        USING (
          SELECT 
            b.path,
            AI_PARSE_DOCUMENT(b.content) AS parsed
          FROM (
            SELECT path, content
            FROM {catalog}.{schema}.bytes AS b
            LEFT JOIN {catalog}.{schema}.parsed AS p
              ON b.path = p.path
            WHERE b.file_name RLIKE '{file_pattern}'
              AND p.path IS NULL
            ORDER BY b.length
            LIMIT CAST({batch_size} AS INTEGER)
          ) AS b
        ) AS source
        ON target.path = source.path
        WHEN NOT MATCHED THEN INSERT *
    """)
    print(f"  Batch {batch+1} done in {time.time() - t0:.1f}s")
    batch += 1

print(f"All Office files done in {time.time() - start:.1f}s")

### Clean up failed parses

Sometimes parsing produces nearly empty results. This removes those so they get re-processed on the next run.

In [None]:
DELETE FROM IDENTIFIER(:catalog || '.' || :schema || '.parsed')
WHERE length(concat_ws(
        '\n\n',
        transform(
          try_cast(parsed:document:elements AS ARRAY<VARIANT>),
          element -> try_cast(element:content AS STRING)
        )
      )) < 100

---
## 1D: Flatten Parsed Data

This takes the structured parse output and creates a clean `flat` table with:
- **text** -- the full extracted text of the document
- **preamble** -- the first ~100 words (used for quick identification)
- **truncated** -- the first ~5000 words (used as input to the LLM, since full text is often too long)

The flat table also joins in vendor name, file name, and sibling file paths from the `bytes` table.

In [None]:
CREATE OR REPLACE TABLE IDENTIFIER(:catalog || '.' || :schema || '.flat') AS
WITH flattened AS (
  SELECT
    * EXCEPT(b.path),
    concat_ws(
        '\n\n',
        transform(
          try_cast(parsed:document:elements AS ARRAY<VARIANT>),
          element -> try_cast(element:content AS STRING)
        )
      ) AS text,
    concat_ws(
        ' ',
        slice(
          split(
            concat_ws(
              '\n\n',
              transform(
                try_cast(parsed:document:elements AS ARRAY<VARIANT>),
                element -> try_cast(element:content AS STRING)
              )
            ),
            ' '
          ),
          1,
          :words_preamble
        )
      ) AS preamble,
    concat_ws(
        ' ',
        slice(
          split(
            concat_ws(
              '\n\n',
              transform(
                try_cast(parsed:document:elements AS ARRAY<VARIANT>),
                element -> try_cast(element:content AS STRING)
              )
            ),
            ' '
          ),
          1,
          :words_truncated
        )
      ) AS truncated
  FROM IDENTIFIER(:catalog || '.' || :schema || '.parsed') p
  LEFT JOIN (
    SELECT * EXCEPT(content)
    FROM IDENTIFIER(:catalog || '.' || :schema || '.bytes')
  ) b
  ON p.path = b.path
)
SELECT * FROM flattened

In [None]:
-- Quick check: how many documents do we have?
SELECT COUNT(*) as total_docs FROM IDENTIFIER(:catalog || '.' || :schema || '.flat')