# Playground for invoice processing with OpenAI API

### Define AWS invoice and credit record model

In [None]:
from pydantic import BaseModel, Field

# Define a new Pydantic model with field descriptions and tailored for AWS Invoice/Credit Record.
class AwsInvoiceCredit(BaseModel):
    file_name: str = Field(description="AWS Invoice PDF file name.")
    doit_payer_id: str = Field(description="Doit Payer ID. Can be extracted from the parent folder name.")
    aws_account_number: str = Field(description="AWS Account number.")
    address_company: str = Field(description="Address or Bill to Address company name. Use first line of the address. Usually, it is the company name.")
    address_attn: str = Field(description="Address or Bill to Address ATTN. Use second line of the address. Usually, it is the name of the person.")
    address_country: str = Field(description="Bill to address country. Use last line of the address. Usually, it is the country name. Convert short country code to a full country name.")
    document_type: str = Field(description="Document Type. Can be Invoice or Credit Note. Credit Note can be Credit Memo or Credit Adjustment Note.")
    billing_period: str = Field(description="Billing Period; Two dates separated by a dash; leave empty if not present")
    tax_registration_number: str = Field(default=None, description="Tax Registration Number; ABN Number; GST/HST Registration number; leave empty if not present")
    invoice_number: str = Field(description="Invoice Number from the Invoice Summary")
    invoice_date: str = Field(default=None, description="Invoice Date from the Invoice Summary")
    original_invoice_number: str = Field(default=None, description="Original Invoice Number from the Invoice Summary of Credit Memo/Note; leave empty if not present")
    original_invoice_date: str = Field(default=None, description="Original Invoice Date from the Invoice Adjustment Summary of Credit Memo/Note; leave empty if not present")
    total_amount: float = Field(description="Total Amount from the Invoice Summary; without currency; add minus sign if parentheses around or has a minus prefix")
    total_amount_currency: str = Field(description="Total Amount Currency from the Invoice Summary; use currency code instead of symbol")
    total_vat_tax_amount: float = Field(default=None, description="Total VAT/Tax Amount from the Invoice Summary; without currency; add minus sign if parentheses around or has a minus prefix")
    total_vat_tax_currency: str = Field(default=None, description="VAT/Tax Currency from the Invoice Summary; use currency code instead of symbol")
    vat_percentage: float = Field(default=None, description="VAT Percentage from the Invoice Summary Table; VAT - <number>%; GST amount at <number>%; HST Amount at <number>%; leave empty if not present")
    exchange_rate: float = Field(default=None, description="Exchange Rate from the Invoice Summary Table (1 USD = ?); leave empty if not found")

### Define the OpenAI model

In [None]:
import os
from langchain.chat_models import ChatOpenAI

llm = ChatOpenAI(
    model="gpt-4-1106-preview",
    openai_api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.0,
    max_tokens=4096,
)

In [None]:
def remove_footer(text):
    # remove everything after one of the following lines (including the line itself)
    lines = [
        "* May include estimated US sales tax, VAT, ST, GST and CT.",
        "Amazon Web Services EMEA SARL",
        "Amazon Web Services Australia Pty Ltd",
        "AMAZON WEB SERVICES EMEA SARL",
        "Amazon Web Services Canada, Inc.",
        "Amazon Web Services EMEA SARL, Luxembourg, Zweigniederlassung Zürich",
    ]
    for line in lines:
        if line in text:
            return text.split(line)[0]
    return text

In [None]:
import os
from langchain.document_loaders import PyMuPDFLoader

# scan all documents in the folder (recursively)
def scan_folder(folder):
    documents = []
    for root, dirs, files in os.walk(folder):
        for file in files:
            if file.endswith(".pdf"):
                loader = PyMuPDFLoader(os.path.join(root, file))
                data = loader.load()
                invoice = remove_footer(data[0].page_content)
                # get parent folder name
                parent_folder = os.path.basename(os.path.dirname(os.path.join(root, file)))
                # extract doit payer id from the parent folder name
                payer_id = parent_folder.split("_")[1]
                # add file name to the invoice
                invoice = f"File name: {file}\nDoiT payer id: {payer_id}\n" + invoice
                documents.append(invoice)
    return documents

all_documents = scan_folder("./data/12-2023")
print(f"Found {len(all_documents)} documents")

In [None]:
import asyncio
from langchain.chains import LLMChain
from langchain.output_parsers import PydanticOutputParser
from langchain.prompts import PromptTemplate
import textwrap

# Instantiate the parser with the new model.
parser = PydanticOutputParser(pydantic_object=AwsInvoiceCredit)

# Instantiate the semaphore to limit the number of concurrent requests.
# Approximate number of tokens per request is 1000-1500, so 50 requests will be 75k tokens
# a single request takes 10 seconds, so 30 concurrent requests can lead to 180 requests per minute
# 180 * 1500 = 270k tokens per minute (TPM) should be within the 600k TPM limit
sem = asyncio.Semaphore(50)

async def extract_data(model, document):
    async with sem:
        # Update the prompt to match the new query and desired format.
        try:
            # get file name from the document: first line
            file_name = document.split("\n")[0]
            # trim "File name: " from the file name
            file_name = file_name.removeprefix("File name: ")
            print(f"Processing file {file_name}")
            
            prompt = PromptTemplate(
                template=textwrap.dedent(
                    """
                    Extract data from the AWS Invoice or Credit document into a flat JSON object.
                    {format_instructions}
                    {request}
                    <document>
                    {invoice}
                    <document>
                    JSON:
                    """
                ),
                input_variables=["request", "invoice"],
                partial_variables={
                    "format_instructions": parser.get_format_instructions(),
                },
            )
            # Generate the input using the updated prompt.
            parsing_request = textwrap.dedent(
                """
                Return the extracted fields in the valid JSON format: only JSON objects and arrays are allowed without any comments or other text. 
                Keep it as simple as possible and ensure the JSON is valid. 
                Skip the fields that are not present in the invoice.
                Be careful with the currency symbols, which are not always in the invoice.
                Try to extract the fields even if the invoice format differs and the fields are not in the same order. 
                My job depends on it! And I will be very grateful to you! Will pay you an extra 1000$ if you do it without errors!
                """
            )
            chain = LLMChain(llm=model, prompt=prompt)
            output = await chain.arun(request=parsing_request, invoice=document)
            # remove everything before the first { and after the last }
            output = output[output.find("{"):output.rfind("}")+1]
            parsed = parser.parse(output)
            print(f"File {file_name} processed successfully")
            return parsed
        except Exception as e:
            print(f"An error occurred: {e} processing file {file_name}")
            return None

In [None]:
import asyncio
import pandas
import time

# measure time
start = time.time()

# Initialize an empty list to store the results
results = []

# Loop over the first max documents
max_docs = len(all_documents)
tasks = []
for i, doc in enumerate(all_documents[:max_docs]):
    # Extract data from the document (async)
    tasks.append(extract_data(llm, doc))

# measure time
start = time.time()
   
# wait for all tasks to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# handle exceptions and collect results
data = []
for i, result in enumerate(results):
    if isinstance(result, Exception):
        print(result)
        print(f"Failed to process document {i+1} of {max_docs}")
        results.remove(result)
    else:
        data.append(result.model_dump())
    
# Convert the list of dictionaries to a DataFrame
df = pandas.DataFrame.from_dict(data)

# Export the DataFrame to a CSV file
df.to_csv("invoices.csv", index=False)

# measure time
end = time.time()
print(f"Time elapsed: {end - start} seconds")