# Can we use Claude to extract all the fields from a form

Based on the idea from here: 
https://github.com/co-cddo/form-extractor-prototype_testing/tree/main

Yes we can.

In [1]:
import asyncio
import base64
import io
import json
import logging
import random
from pathlib import Path
from typing import Dict, List, Tuple, Union, Set
import pickle
import boto3
import nest_asyncio
import pandas as pd
from anthropic import AsyncAnthropicBedrock, RateLimitError
from anthropic.types.message import Message
from anthropic.types.tool_use_block import ToolUseBlock
from dotenv import load_dotenv
from pdf2image import convert_from_path
from PIL import Image
from tqdm.asyncio import tqdm
from tqdm.notebook import tqdm as tqdm_notebook
import botocore
import time
from datetime import datetime

logging.basicConfig(
    filename="form_processing.log",
    level=logging.INFO,  # Adjust the level as needed (e.g., INFO, DEBUG, WARNING, ERROR, CRITICAL)
    format="%(asctime)s - %(levelname)s - %(message)s",
    filemode='a'  # 'a' for append mode, 'w' for overwrite mode
)


In [None]:
load_dotenv()

nest_asyncio.apply()
# load_dotenv()
boto3.setup_default_session()
client = AsyncAnthropicBedrock()

base_prompt = """
Is this a form? Answer Yes or No. 
It's only a form if it contains form field boxes.
Hand drawn forms, questionnaires and surveys are all valid forms.
If it is a form, extract the questions from it using the extract_form_questions tool.
If there is no output, explain why.
"""


def save_batch_results(results: List[Dict], output_file: str = "batch_results.pickle"):
    with open(output_file, "wb") as f:
        pickle.dump(results, f)


def save_progress(
    processed_files: Union[List[str], Set[str]], output_file: str = "progress.json"
):
    with open(output_file, "w") as f:
        json.dump(list(processed_files), f)

def load_batch_results(output_file: str = "batch_results.pickle") -> List[Dict]:
    if Path(output_file).exists():
        with open(output_file, "rb") as f:
            return pickle.load(f)
    return []


def load_progress(output_file: str = "progress.json") -> List[str]:
    if Path(output_file).exists():
        with open(output_file, "r") as f:
            return json.load(f)
    return []


def get_all_files(folder_path: str) -> List[Path]:
    return list(Path(folder_path).glob("**/*"))


def pdf_to_image_bytes(pdf_path: Path, width: int = 600, dpi: int = 300):
    # Convert PDF to list of PIL Image objects
    images = convert_from_path(pdf_path, dpi=dpi)

    image_bytes_list = []

    for i, img in enumerate(images):
        # Resize image if width is specified
        if width:
            ratio = width / float(img.width)
            height = int(ratio * img.height)
            img = img.resize((width, height), Image.LANCZOS)

        # Convert PIL Image to bytes
        img_byte_arr = io.BytesIO()
        img.save(img_byte_arr, format="JPEG")
        img_byte_arr = img_byte_arr.getvalue()

        image_bytes_list.append(img_byte_arr)

    return image_bytes_list


async def exponential_backoff(attempt, base_delay):
    delay = base_delay * (2**attempt)
    await asyncio.sleep(delay)
    return delay


def encode_image(byte_array):
    """encode image for claude"""
    return base64.b64encode(byte_array).decode("utf-8")


def format_messages(image_bytes):
    messages = [
        {
            "type": "image",
            "source": {
                "type": "base64",
                "media_type": "image/jpeg",
                "data": encode_image(img),
            },
        }
        for img in image_bytes
    ] + [{"type": "text", "text": base_prompt}]
    return messages


async def ask_claude(messages, client, extraction_tool):
    response = await client.messages.create(
        model="anthropic.claude-3-5-sonnet-20240620-v1:0",
        temperature=0.01,
        max_tokens=5000,
        tools=[extraction_tool],
        messages=[{"role": "user", "content": messages}],
    )
    return {
        "result": response,
        "input_tokens": response.usage.input_tokens,
        "output_tokens": response.usage.output_tokens,
    }


async def get_message_with_backoff(
    messages, semaphore, client, extraction_tool, max_retries=5, base_delay=1
):
    async with semaphore:
        for attempt in range(max_retries):
            try:
                return await ask_claude(messages, client, extraction_tool)
            except RateLimitError as e:
                if attempt == max_retries - 1:
                    logging.exception(
                        f"Rate limit exceeded after {max_retries} attempts | Error: {e}"
                    )
                    return {
                        "result": e.status_code,
                        "input_tokens": None,
                        "output_tokens": None,
                    }
                delay = await exponential_backoff(attempt, base_delay)
                logging.warning(f"Rate limit hit, retrying in {delay:.2f} seconds...")
            except Exception as e:
                logging.exception(f"Exception occurred | Error: {e}")
                return {"result": str(e), "input_tokens": None, "output_tokens": None}


async def process_form(
    pdf_path: Path, semaphore: asyncio.Semaphore, client, extraction_tool
) -> Dict:
    images = pdf_to_image_bytes(pdf_path, 600, 300)

    messages = format_messages(images)

    result = await get_message_with_backoff(
        messages, semaphore, client, extraction_tool
    )
    logging.info(f"Processed {pdf_path}")
    return result


async def process_batch(
    batch: List[Path], semaphore: asyncio.Semaphore, client, extraction_tool
) -> Tuple[List[Dict], List[Path]]:
    results = []
    processed_files = []
    for pdf_path in batch:
        try:
            result = await process_form(pdf_path, semaphore, client, extraction_tool)
            results.append(result)
            processed_files.append(pdf_path)
        except Exception as e:
            logging.error(f"Error processing {pdf_path}: {str(e)}")
    return results, processed_files


async def process_forms_in_batches(folder_path: str, client, extraction_tool, batch_size: int = 10, max_concurrent: int = 5, progress_file: str = "progress.json", results_file: str = "batch_results.pickle") -> List[Dict]:
    all_files = get_all_files(folder_path)
    pdf_files = [Path(file) for file in all_files if str(file).lower().endswith('.pdf')]
    
    # Load progress
    processed_files = set(load_progress(progress_file))
    pdf_files = [file for file in pdf_files if str(file) not in processed_files]
    
    semaphore = asyncio.Semaphore(max_concurrent)
    
    # Load existing results if any
    results = load_batch_results(results_file)
    
    # Create progress bar
    pbar = tqdm(total=len(pdf_files), desc="Processing Forms", unit="form")
    
    for i in range(0, len(pdf_files), batch_size):
        batch = pdf_files[i:i+batch_size]
        batch_results = await process_batch(batch, semaphore, client, extraction_tool)
        results.extend(batch_results)
        
        # Update progress
        processed_files.update(str(file) for file in batch)
        save_progress(list(processed_files), progress_file)
        
        # Save batch results
        save_batch_results(results, results_file)
        
        # Update progress bar
        pbar.update(len(batch))
        
        logging.info(f"Processed and saved batch {i//batch_size + 1} of {len(pdf_files)//batch_size + 1}")
    
    pbar.close()
    return results


def process_results(results: List[Dict]) -> pd.DataFrame:
    processed_results = []
    for result in results:
        if isinstance(result["result"], Message):
            # Process successful results
            processed_result = {
                "status": "success",
                "content": result["result"],
                "input_tokens": result["input_tokens"],
                "output_tokens": result["output_tokens"],
            }
        else:
            # Process error results
            processed_result = {
                "status": "error",
                "error_code": result["result"],
                "input_tokens": result["input_tokens"],
                "output_tokens": result["output_tokens"],
            }
        processed_results.append(processed_result)

    return pd.DataFrame(processed_results)


def run_form_processing(folder_path: str, client, extraction_tool, batch_size: int = 10, max_concurrent: int = 5, progress_file: str = "progress.json", results_file: str = "batch_results.pickle") -> pd.DataFrame:
    print("Processing forms. Check 'form_processing.log' for detailed logs.")
    
    async def run_async():
        return await process_forms_in_batches(folder_path, client, extraction_tool, batch_size, max_concurrent, progress_file, results_file)

    results, file_list = asyncio.run(run_async())
    print(f"Processed {len(results)} forms")
    return process_results(results)


with open("extract-form-questions.json", "r") as file:
    json_string = file.read()

extraction_tool = json.loads(json_string)

folder_path = "forms_scrape"
results_df = run_form_processing(
    folder_path=folder_path,
    client=client,
    extraction_tool=extraction_tool,
    batch_size=100,
    max_concurrent=5,
    progress_file="progress.json",
    results_file="batch_results.pickle"
)

print(results_df)

RefreshWithMFAUnsupportedError: Cannot refresh credentials: MFA token required.

## Processing a lot of forms
