# Credit Card Statement Analysis Pipeline

This notebook creates a pipeline that:
1. Uses LlamaParse to extract text from PDF statements
2. Processes the text with LangChain and GPT-4
3. Structures the data using Pydantic models

In [None]:
# Activate virtual environment (adjust path as needed)
import os
os.system('source ../venv/bin/activate')

In [None]:
# Install required packages
!source ../venv/bin/activate && pip install langchain langchain_openai langchain_community pydantic pypdf llama-parse python-dotenv requests

In [None]:
import os
from dotenv import load_dotenv
from pypdf import PdfReader
from typing import List, Dict, Optional
from enum import Enum
from datetime import date
import json

# For LlamaParse
from llama_cloud_services import LlamaParse

# For LangChain
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.output_parsers import PydanticOutputParser
from langchain.chains import LLMChain

# For data models
from pydantic import BaseModel, Field, ValidationError

# Load environment variables
load_dotenv()
# if not os.environ.get('OPENAI_API_KEY'):
#     os.environ['OPENAI_API_KEY'] = "sk-..."  # Replace with your OpenAI API key

# # LlamaParse API key (uncomment if not in .env file)
# os.environ['LLAMA_CLOUD_API_KEY'] = "llama-..."  # Replace with your LlamaParse API key

## Define Pydantic Models for Data Structures

In [None]:
class CreditCardTransaction(BaseModel):
    transaction_date: date = Field(
        description="Date when the transaction occurred (YYYY-MM-DD)"
    )
    posting_date: Optional[date] = Field(
        default=None,
        description="Date when the transaction was posted (YYYY-MM-DD). May be missing in some statements."
    )
    description: str = Field(
        description="Text description of the transaction, including merchant name or payment info"
    )
    amount: float = Field(
        description="Transaction amount in dollars. Negative for charges, positive for credits/refunds"
    )

class CreditCardStatement(BaseModel):
    """
    A model representing structured information extracted from a credit card statement, 
    including customer info, billing period, account metadata, and itemized transactions.
    """

    first_name: Optional[str] = Field(
        default=None,
        description="First name of the credit card holder"
    )
    last_name: Optional[str] = Field(
        default=None,
        description="Last name of the credit card holder"
    )
    account_number: str = Field(
        description="Last 4 or 5 digits of the credit card number, extracted exactly as shown at the end of the account reference (e.g., '91003'). Do not include any masked or partial prefixes like 'XXXX XXXX'."
    )
    card_type: Optional[str] = Field(
        default=None,
        description="Type of credit card (e.g. Visa, Amex, Mastercard)"
    )
    statement_period: Optional[str] = Field(
        default=None,
        description="Month and year of the statement, e.g., 'February 2024'"
    )
    statement_opening_date: Optional[date] = Field(
        default=None,
        description="Opening date of the billing cycle in YYYY-MM-DD format"
    )
    statement_closing_date: Optional[date] = Field(
        default=None,
        description="Closing date of the billing cycle in YYYY-MM-DD format"
    )
    credit_limit: Optional[float] = Field(
        default=None,
        description="Credit limit of the card in dollars"
    )
    transactions: List[CreditCardTransaction] = Field(
        default_factory=list,
        description="List of all transactions that occurred during the billing cycle"
    )

In [None]:
class TransactionType(Enum):
    DEPOSIT = "deposit"
    WITHDRAWN = "withdrawn"

class ChequingTransaction(BaseModel):
    transaction_date: date = Field(
        description="Date when the transaction occurred (YYYY-MM-DD)"
    )
    description: str = Field(
        description="Text description of the transaction, including merchant name or payment info"
    )
    transaction_type: TransactionType = Field(
        description="Type of transaction: 'deposit' or 'withdrawn'"
    )
    amount: float = Field(
        description="The amount of the transaction, either debit or credit"
    )
    ledger_balance: float = Field(
        description="Account balance after the transaction in dollars"
    )

class ChequingAccountStatement(BaseModel):
    """
    A model representing structured information extracted from a chequing account statement,
    including customer info, statement period, account metadata, and itemized transactions.
    """
    opening_balance: float = Field(
        description="Opening balance of the account for the statement period"
    )
    closing_balance: float = Field(
        description="Closing balance of the account for the statement period"
    )
    bank_name: str = Field(
        description="Name of the bank or financial institution"
    )
    account_number: str = Field(
        description="Last 4 or 5 digits of the account number, extracted exactly as shown at the end of the account reference (e.g., '91003'). Do not include any masked or partial prefixes like 'XXXX XXXX'."
    )
    statement_opening_date: Optional[date] = Field(
        default=None,
        description="Opening date from the statement in YYYY-MM-DD format for which the statement is generated and the opening balance is provided"
    )
    statement_closing_date: Optional[date] = Field(
        default=None,
        description="Closing date from the statement in YYYY-MM-DD format for which the statement is generated and the closing balance is provided"
    )
    transactions: List[ChequingTransaction] = Field(
        default_factory=list,
        description="List of all transactions that occurred during the statement period"
    )

## Step 1: PDF Text Extraction with LlamaParse

In [None]:
class PDFExtractor:
    def __init__(self, preset="complexTables"):
        self.api_key = os.environ.get('LLAMA_CLOUD_API_KEY')
        self.preset = preset
        
        try:
            # Initialize LlamaParse with API key
            if self.api_key:
                self.parser = LlamaParse(api_key=self.api_key, preset=self.preset)
                self.use_llama = True
                print(f"Using LlamaParse for PDF extraction with '{self.preset}' preset")
            else:
                raise ValueError("LLAMA_CLOUD_API_KEY not found in environment variables")
        except Exception as e:
            # Fallback to PyPDF if LlamaParse is not available
            self.use_llama = False
            print(f"LlamaParse not available: {e}\nFalling back to PyPDF")
    
    def extract_text(self, pdf_path):
        """Extract text from PDF using LlamaParse or fallback to PyPDF"""
        try:
            if self.use_llama:
                # Create status tracker
                parse_status = {"status": "processing", "file": pdf_path}
                
                # Start the parsing job with the specified preset
                print(f"Starting LlamaParse job with {self.preset} preset...")
                
                # Submit document for parsing
                document = self.parser.parse(
                    file_path=pdf_path
                )
                
                # Check if parsing was successful and extract text
                if hasattr(document, 'text'):
                    extracted_text = document.text
                elif hasattr(document, 'content'):
                    extracted_text = document.content
                else:
                    # Access raw document content as fallback
                    extracted_text = str(document)
                
                # Update status
                parse_status["status"] = "complete"
                parse_status["text_length"] = len(extracted_text)
                parse_status["job_id"] = getattr(document, 'job_id', 'unknown')
                
                return {
                    "status": parse_status,
                    "text": extracted_text
                }
            else:
                # Fallback to PyPDF
                reader = PdfReader(pdf_path)
                text = ""
                for page in reader.pages:
                    text += page.extract_text() + "\n"
                
                return {
                    "status": {"status": "complete", "file": pdf_path, "text_length": len(text)},
                    "text": text
                }
        except Exception as e:
            print(f"Error extracting text: {e}")
            return {
                "status": {"status": "error", "file": pdf_path, "error": str(e)},
                "text": ""
            }

## Step 2: Define LangChain Pipeline for Text Processing

In [None]:
from abc import ABC, abstractmethod

class AbstractProcessor(ABC):
    def __init__(self):
        self.llm = ChatOpenAI(model_name="gpt-4.1-nano")
        self.chain = None

    @abstractmethod
    def set_prompt(self):
        """Set the prompt template for the processor"""
        pass

    def process(self, statement_text: str):
        try:
            # Log the raw text for debugging
            print(f"🔍 Raw statement text (first 500 chars): {statement_text[:500]}")

            # Run the chain
            result = self.chain.invoke({
                "statement_text": statement_text,
                "format_instructions": self.parser.get_format_instructions()
            })

            # Parse the output
            parsed_data = self.parser.parse(result["structured_data"])
            return parsed_data

        except ValidationError as ve:
            print(f"Validation error: {ve}")
        except Exception as e:
            print(f"LLM parsing error: {e}")
        
        return None

class CreditCardProcessor(AbstractProcessor):
    def __init__(self):
        super().__init__()
        self.parser = PydanticOutputParser(pydantic_object=CreditCardStatement)
        self.set_prompt()

    def set_prompt(self):
        prompt_template = """
        You are a financial document analyst specialized in credit card statements.

        Your task is to extract structured data from a credit card statement.

        Return the data in a JSON object matching this format:
        {format_instructions}

        Extract ONLY the fields that are explicitly mentioned in the text. 
        If something is missing or unclear, return null or leave the field blank.
        Never guess or invent information.

        Statement text:
        {statement_text}
        """
        self.prompt = ChatPromptTemplate.from_template(prompt_template)
        self.chain = LLMChain(
            llm=self.llm,
            prompt=self.prompt,
            output_key="structured_data"
        )

class BankAccountProcessor(AbstractProcessor):
    def __init__(self):
        super().__init__()
        self.parser = PydanticOutputParser(pydantic_object=ChequingAccountStatement)
        self.set_prompt()

    def set_prompt(self):
        prompt_template = """
        You are a financial document analyst specialized in bank account statements.

        Your task is to extract structured data from a chequing account statement.

        Return the data in a JSON object matching this format:
        {format_instructions}

        ⚠️ Important instructions:
        - Only include real monetary transactions such as deposits, withdrawals, transfers, bill payments, etc.
        - Do NOT include the "Opening Balance" or "Closing Balance" lines as transactions. These should only be used to populate the `opening_balance` and `closing_balance` fields.
        - Exclude summaries or headers from the `transactions` list.
        - If a transaction spans multiple lines, merge them into a single description.

        Statement text:
        {statement_text}
        """
        self.prompt = ChatPromptTemplate.from_template(prompt_template)
        self.chain = LLMChain(
            llm=self.llm,
            prompt=self.prompt,
            output_key="structured_data"
        )

## Step 3: Complete Pipeline Integration

In [None]:
from enum import Enum

class StatementType(Enum):
    CREDIT_CARD = "credit_card"
    CHEQUING = "chequing"

class FinancialStatementPipeline:
    def __init__(self):
        self.extractor = PDFExtractor()
    
    def get_processor(self, statement_type: StatementType):
        """Factory method to get the appropriate processor based on statement type"""
        if statement_type == StatementType.CREDIT_CARD:
            return CreditCardProcessor()
        elif statement_type == StatementType.CHEQUING:
            return BankAccountProcessor()
        else:
            raise ValueError(f'Unsupported statement type: {statement_type}')
    
    def process_statement(self, pdf_path, statement_type: StatementType):
        """Process a single financial statement"""
        extraction_result = self.extractor.extract_text(pdf_path)
        status = extraction_result["status"]
        text = extraction_result["text"]

        print(f"📝 Extraction status: {status['status']}")

        if status["status"] != "complete":
            print(f"❌ Error extracting text from {pdf_path}: {status.get('error', 'Unknown error')}")
            return None

        print(f"🔍 Processing statement text ({status.get('text_length', 0)} characters)...")
        processor = self.get_processor(statement_type)
        structured_data = processor.process(text)

        return {
            "status": status,
            "structured_data": structured_data
        }

    def run(self, pdf_path: str, statement_type: StatementType):
        print(f"📄 Processing: {pdf_path}")
        result = self.process_statement(pdf_path, statement_type)
        return result


## Run the Pipeline

In [None]:
pipeline = FinancialStatementPipeline()
results = pipeline.run("./data/Statements.pdf", StatementType.CHEQUING)

In [None]:
import json
from datetime import date

if results and "structured_data" in results:
    structured_data_json = results["structured_data"].model_dump()

    # Convert date and enum objects to strings for JSON serialization
    def custom_converter(obj):
        if isinstance(obj, date):
            return obj.isoformat()
        if isinstance(obj, Enum):
            return obj.value
        raise TypeError(f"Type {type(obj)} not serializable")

    with open("./results/structured_data.json", "w") as json_file:
        json.dump(structured_data_json, json_file, indent=2, default=custom_converter)
    print("Structured data has been written to ./results/structured_data.json")

## Pipeline Visualization

The following is a text representation of the pipeline flow:

```
PDF Files (Scotia.pdf, Amex.pdf)
    ↓
PDFExtractor (using LlamaParse)
    ↓
Raw Text + Extraction Status
    ↓
LangChain Processor
    ↓ [ChatPromptTemplate → ChatOpenAI (GPT-4) → PydanticOutputParser]
Structured Data (CreditCardStatement)
    ↓
Markdown Report + JSON Data
```