In [1]:
import os
from dotenv import load_dotenv
from openai import OpenAI
import instructor
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
import json
import asyncio
import aiofiles
from langchain.document_loaders import PyPDFLoader
import easyocr
import nest_asyncio


In [2]:
# Apply nest_asyncio to allow running asyncio in Jupyter
nest_asyncio.apply()

# Load environment variables
load_dotenv()
api_key = os.getenv('OPENAI_API_KEY')
if not api_key:
    raise ValueError("No OpenAI API key found in environment variables")

# Initialize OpenAI client with Instructor
client = instructor.patch(OpenAI(api_key=api_key))

In [3]:
# Initialize EasyOCR reader
reader = easyocr.Reader(['en', 'ro'])

# Constants
DATA_FOLDER = 'data'
SYSTEM_PROMPT = """
You are an AI agent specializing in contract creation and personal information extraction. 
Your role is to guide the process of extracting information from documents, verifying it with human input, 
identifying parties, and constructing a contract. Follow the workflow precisely and ask for human input when needed.
"""

Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


In [5]:
 # Define schemas
class PIIData(BaseModel):
    name: str = Field(..., description="Full name of the person")
    address: str = Field(..., description="Residential address of the person")

class ContractParties(BaseModel):
    buyer: str = Field(..., description="Name of the buyer")
    seller: str = Field(..., description="Name of the seller")

class Contract(BaseModel):
    buyer: str = Field(..., description="Name of the buyer")
    seller: str = Field(..., description="Name of the seller")
    address: str = Field(..., description="Address where the contract is applicable")
    terms: str = Field(..., description="Terms of the contract")


In [6]:
# Function to get documents from the data folder
def get_documents() -> List[str]:
    os.makedirs(DATA_FOLDER, exist_ok=True)
    all_files = os.listdir(DATA_FOLDER)
    documents = [
        os.path.join(DATA_FOLDER, f)
        for f in all_files
        if f.lower().endswith(('.pdf', '.jpg', '.jpeg', '.png', '.txt'))
    ]
    print(f"Total files in {DATA_FOLDER}: {len(all_files)}")
    if all_files:
        print(f"Files found: {all_files}")
        print(f"Documents to process: {len(documents)}")
    else:
        print(f"The {DATA_FOLDER} folder is empty.")
    return documents

# Function to extract text from documents
async def extract_text(file_path: str) -> str:
    if file_path.lower().endswith('.pdf'):
        try:
            loader = PyPDFLoader(file_path)
            pages = await asyncio.to_thread(loader.load)
            return "\n".join(page.page_content for page in pages)
        except ImportError:
            return "Error: PyPDFLoader not available. Please install langchain."
    elif file_path.lower().endswith(('.jpg', '.jpeg', '.png')):
        try:
            def process_image():
                results = reader.readtext(file_path)
                return ' '.join([result[1] for result in results])
            return await asyncio.to_thread(process_image)
        except Exception as e:
            return f"Error processing image: {str(e)}"
    else:
        try:
            async with aiofiles.open(file_path, mode='r') as f:
                return await f.read()
        except Exception as e:
            return f"Error reading file: {str(e)}"

# Function to process documents
async def process_documents() -> Dict[str, str]:
    documents = get_documents()
    results = {}
    for doc in documents:
        try:
            text = await extract_text(doc)
            results[os.path.basename(doc)] = text
        except Exception as e:
            results[os.path.basename(doc)] = f"Error processing file: {str(e)}"
    return results

# Agent functions
def extract_pii(text: str) -> PIIData:
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": f"Extract the name and address from the given text:\n\n{text}"}
        ],
        response_model=PIIData
    )
    return response

def verify_information(pii: PIIData) -> bool:
    print(f"Please verify the following information:")
    print(f"Name: {pii.name}")
    print(f"Address: {pii.address}")
    verification = input("Is this information correct? (yes/no): ").lower()
    return verification == 'yes'

def identify_parties(pii_data: List[PIIData]) -> ContractParties:
    pii_text = "\n".join([f"Name: {pii.name}, Address: {pii.address}" for pii in pii_data])
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": f"Identify the buyer and seller from the given information:\n\n{pii_text}"}
        ],
        response_model=ContractParties
    )
    return response

def construct_contract(parties: ContractParties, address: str) -> Contract:
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": f"Create a contract between the buyer and seller. Buyer: {parties.buyer}, Seller: {parties.seller}, Address: {address}"}
        ],
        response_model=Contract
    )
    return response

In [12]:
# Function to get documents from the data folder
def get_documents() -> List[str]:
    os.makedirs(DATA_FOLDER, exist_ok=True)
    all_files = os.listdir(DATA_FOLDER)
    documents = [
        os.path.join(DATA_FOLDER, f)
        for f in all_files
        if f.lower().endswith(('.pdf', '.jpg', '.jpeg', '.png', '.txt'))
    ]
    print(f"Total files in {DATA_FOLDER}: {len(all_files)}")
    if all_files:
        print(f"Files found: {all_files}")
        print(f"Documents to process: {len(documents)}")
        if not documents:
            print("Warning: No files with supported extensions (.pdf, .jpg, .jpeg, .png, .txt) found.")
    else:
        print(f"The {DATA_FOLDER} folder is empty.")
    return documents

# Function to extract text from documents
async def extract_text(file_path: str) -> str:
    if file_path.lower().endswith('.pdf'):
        try:
            loader = PyPDFLoader(file_path)
            pages = await asyncio.to_thread(loader.load)
            return "\n".join(page.page_content for page in pages)
        except ImportError:
            return "Error: PyPDFLoader not available. Please install langchain."
    elif file_path.lower().endswith(('.jpg', '.jpeg', '.png')):
        try:
            def process_image():
                results = reader.readtext(file_path)
                return ' '.join([result[1] for result in results])
            return await asyncio.to_thread(process_image)
        except Exception as e:
            return f"Error processing image: {str(e)}"
    else:
        try:
            async with aiofiles.open(file_path, mode='r') as f:
                return await f.read()
        except Exception as e:
            return f"Error reading file: {str(e)}"

# Function to process documents
async def process_documents() -> Dict[str, str]:
    documents = get_documents()
    results = {}
    for doc in documents:
        try:
            text = await extract_text(doc)
            results[os.path.basename(doc)] = text
        except Exception as e:
            results[os.path.basename(doc)] = f"Error processing file: {str(e)}"
    return results

# Function to extract PII data
def extract_pii(text: str) -> PIIData:
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": f"Extract the name and address from the given text:\n\n{text}"}
        ],
        response_model=PIIData
    )
    return response

# Function to identify buyer and seller
def identify_parties(pii_data: List[PIIData]) -> ContractParties:
    pii_text = "\n".join([f"Name: {pii.name}, Address: {pii.address}" for pii in pii_data])
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": f"Identify the buyer and seller from the given information:\n\n{pii_text}"}
        ],
        response_model=ContractParties
    )
    return response

# Function to construct contract
def construct_contract(parties: ContractParties, address: str) -> Contract:
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": f"Create a contract between the buyer and seller. Buyer: {parties.buyer}, Seller: {parties.seller}, Address: {address}"}
        ],
        response_model=Contract
    )
    return response


In [7]:
# Main agent workflow
async def agent_workflow():
    # Process documents
    documents = await process_documents()
    print("Documents processed.")

    # Extract PII and address, verify with human input
    verified_pii_data = []
    for doc, text in documents.items():
        print(f"Processing document: {doc}")
        while True:
            pii = extract_pii(text)
            if verify_information(pii):
                verified_pii_data.append(pii)
                break
            else:
                print("Please provide the correct information:")
                name = input("Name: ")
                address = input("Address: ")
                pii = PIIData(name=name, address=address)
                if verify_information(pii):
                    verified_pii_data.append(pii)
                    break

    # Identify buyer and seller
    parties = identify_parties(verified_pii_data)
    print(f"\nParties identified - Buyer: {parties.buyer}, Seller: {parties.seller}")

    # Construct contract
    contract = construct_contract(parties, verified_pii_data[0].address)
    print("\nContract constructed:")
    print(f"Buyer: {contract.buyer}")
    print(f"Seller: {contract.seller}")
    print(f"Address: {contract.address}")
    print(f"Terms: {contract.terms}")

# Run the agent workflow
await agent_workflow()

Total files in data: 2
Files found: ['Screenshot 2024-10-08 at 13.22.38.png', 'Screenshot 2024-10-08 at 13.24.00.png']
Documents to process: 2
Documents processed.
Processing document: Screenshot 2024-10-08 at 13.22.38.png
Please verify the following information:
Name: ISPILANTE SENTIMENT BRUSLI
Address: Videle Jud: Teleorman Sos. Giurgiului nr.1 695 TR
Processing document: Screenshot 2024-10-08 at 13.24.00.png
Please verify the following information:
Name: VASILESCU ELENA
Address: Mun. Bucuresti Sec. 5 Str Nicolae Iorga nr.3

Parties identified - Buyer: ISPILANTE SENTIMENT BRUSLI, Seller: VASILESCU ELENA

Contract constructed:
Buyer: ISPILANTE SENTIMENT BRUSLI
Seller: VASILESCU ELENA
Address: Videle Jud: Teleorman Sos. Giurgiului nr.1 695 TR
Terms: Standard contract terms apply unless otherwise specified.
