In [None]:
%pip install pandas numpy peft scikit-learn transformers datasets torch accelerate bitsandbytes huggingface-hub tabulate evaluate -q

In [None]:
import json

extraction_json_schema = {
  "title": "ExtractedLeaseData",
  "type": "object",
  "properties": {
    "tenant_name": {
      "type": ["string", "null"],
      "description": "The name of the tenant, found in the OCR text."
    },
    "unit_address": {
      "type": ["string", "null"],
      "description": "The unit address found in the OCR text."
    },
    "unit_number": {
      "type": ["string", "null"],
      "description": "The unit number found in the OCR text."
    },
    "unit_type": {
      "type": ["string", "null"],
      "description": "The unit type found in the OCR text."
    },
    "agreement_date": {
      "type": ["string", "null"],
      "format": "date"
    },
    "lease_start": {
      "type": ["string", "null"],
      "format": "date",
      "description": "The date when the lease starts, found in the OCR text."
    },
    "lease_end": {
      "type": ["string", "null"],
      "format": "date",
      "description": "The date when the lease ends, found in the OCR text."
    },
    "lease_auto_renew": {
      "type": ["string", "null"],
      "description": "The type of lease auto renewal, found in the OCR text."
    },
    "hourly_rate": {
      "type": ["number", "null"],
      "description": "The hourly rate found in the OCR text."
    },
    "monthly_rent": {
      "type": ["number", "null"],
      "description": "The monthly rent found in the OCR text."
    },
    "prorated_rent": {
      "type": ["number", "null"],
      "description": "The prorated rent found in the OCR text."
    },
    "security_deposit": {
      "type": ["number", "null"],
      "description": "The security deposit found in the OCR text."
    },
    "lease_rent": {
      "type": ["number", "null"],
      "description": "The security deposit found in the OCR text."
    },
    "monthly_payment_breakdown": {
      "type": ["object", "null"],
      "description": "The monthly payment breakdown data found in the OCR text.",
      "additionalProperties": {}
    },
    "utility_charges": {
      "type": ["object", "null"],
      "description": "The utility charges found in the OCR text. This is a dictionary with utility charges as the key, and their price as the value.",
      "additionalProperties": {
        "type": ["number", "null"]
      }
    }
  },
  "required": ["tenant_name", "unit_address", "unit_number", "unit_type", "agreement_date", "lease_start", "lease_end", "lease_auto_renew", "hourly_rate", "monthly_rent", "prorated_rent", "security_deposit", "lease_rent", "monthly_payment_breakdown", "utility_charges"]
}

extraction_json_schema_str = json.dumps(extraction_json_schema, indent=2)

In [None]:
import pandas as pd
import sqlite3

# Path to the SQLite database
db_path = "../output/extracted_lease_agreements.db"

# Connect to the SQLite database
conn = sqlite3.connect(db_path)

# Query to select all data from the extracted_data table
query = "SELECT * FROM extracted_data"

# Read the data into a DataFrame
df = pd.read_sql_query(query, conn, index_col="id")

# Close the database connection
conn.close()

df

In [None]:
# Function to ensure all required fields are present in the extracted_fields column
def ensure_all_fields(extracted_fields, required_fields):
    # Convert the extracted_fields string to a dictionary
    extracted_data = json.loads(extracted_fields)
    
    # Check and add any missing fields with a value of None
    for field in required_fields:
        if field not in extracted_data:
            extracted_data[field] = None
    
    # Convert the dictionary back to a JSON string
    return json.dumps(extracted_data)

# List of required fields from the extraction_json_schema
extraction_fields = list(extraction_json_schema['properties'].keys())

# Apply the function to each row in the extracted_fields column
df['extracted_fields'] = df['extracted_fields'].apply(ensure_all_fields, required_fields=extraction_fields)

# Display the updated DataFrame
df

In [None]:
from sklearn.model_selection import train_test_split

# First split: 80% for training, 20% for temp (which will be split into eval and test)
train_df, temp_df = train_test_split(df, test_size=0.2, random_state=42)

# Second split: 50% of temp for eval and 50% for test (10% of the original data each)
eval_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42)

# Check the sizes of the splits
print(f"Training set size: {len(train_df)}")
print(f"Evaluation set size: {len(eval_df)}")
print(f"Test set size: {len(test_df)}")


In [None]:
from peft import LoraConfig, TaskType

peft_config = LoraConfig(
    lora_alpha=16, # Higher alpha to match larger model capacity
    lora_dropout=0.1, # Consistent dropout rate to prevent overfitting
    r=8, # Rank, kept the same for balance between performance and efficiency
    task_type=TaskType.SEQ_2_SEQ_LM, # Change to SEQ_2_SEQ_LM for seq2seq models
    bias="none", # Keeping bias as none, similar to your original setup
    target_modules=[
        'q', 'v', 'k', 'o', # Attention layers (query, value, key, output projections)
        'wi', 'wo', # Feedforward layers (input, output projections)
        'wq', 'wv', 'wk', 'wo', # Additional T5-specific projection layers
    ], # Target modules relevant to T5 architecture
)


In [None]:

from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, BitsAndBytesConfig
from huggingface_hub import notebook_login
from peft import get_peft_model, prepare_model_for_kbit_training
import torch

# login to access gated model
notebook_login()

# Load the tokenizer and model
model_id = "google/flan-t5-xl"

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16,
)

# empty GPU memory
torch.cuda.empty_cache()

tokenizer = AutoTokenizer.from_pretrained(model_id)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = 'right'

model = AutoModelForSeq2SeqLM.from_pretrained(model_id, quantization_config=bnb_config)

model = prepare_model_for_kbit_training(model)
model = get_peft_model(model, peft_config)
model.print_trainable_parameters()


In [None]:
prompt_template = """
### TASK:
You are a specialized model for extracting specific information from lease agreement text. Your goal is to accurately extract data fields from the provided OCR text of a lease agreement. Additionally, correct any obvious OCR errors you encounter during extraction.

### INPUT TEXT:
Below is the OCR text extracted from a lease agreement. Carefully analyze this text, and extract the relevant data fields.

OCR Text:
```
{extracted_text}
```

### RESPONSE FORMAT:
Return the extracted data as a JSON object, adhering strictly to the following JSON schema:

```json
{extraction_json_schema_str}
```
"""

In [None]:
from datasets import Dataset

def preprocess_function(examples):
    inputs = [prompt_template.format(extracted_text=extracted_text, extraction_json_schema_str=extraction_json_schema_str) for extracted_text in examples["extracted_text"]]
    model_inputs = tokenizer(inputs, max_length=512, truncation=True)

    # The "labels" are the tokenized outputs:
    labels = tokenizer(
        text_target=examples["extracted_fields"], 
        max_length=512,         
        truncation=True
    )

    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

# Create datasets with the extracted text and labels
train_dataset = Dataset.from_pandas(train_df)
tokenized_train_dataset = train_dataset.map(preprocess_function, batched=True)
eval_dataset = Dataset.from_pandas(eval_df)
tokenized_eval_dataset = eval_dataset.map(preprocess_function, batched=True)
test_dataset = Dataset.from_pandas(test_df)
tokenized_test_dataset = test_dataset.map(preprocess_function, batched=True)

train_dataset


In [None]:
peft_model_repo_id = "aryaniyaps/finetuned_flan_t5_xl_lease_data_extraction_200"

In [None]:
from peft import PeftModel
 
# Load the Lora model
finetuned_model = PeftModel.from_pretrained(model, peft_model_repo_id, device_map={"":0})
finetuned_model.eval()
 
print("PEFT model loaded")

Let's try data extraction with a random sample

In [None]:
from random import randrange
from tabulate import tabulate

sample = test_dataset[randrange(len(test_dataset))]
 
input_ids = tokenizer(prompt_template.format(extracted_text=sample["extracted_text"], extraction_json_schema_str=extraction_json_schema_str), return_tensors="pt", truncation=True).input_ids.cuda()
with torch.no_grad():
    outputs = finetuned_model.generate(input_ids=input_ids, max_new_tokens=512)


In [None]:
prediction = tokenizer.decode(outputs[0].detach().cpu().numpy(), skip_special_tokens=True)

table_data = [[sample['extracted_text'], prediction]]

# Define table headers
headers = ["OCR text", "Extracted data (Finetuned model)"]

# Display the table
tabulate(table_data, headers=headers, tablefmt="html", showindex=False)

### Evaluate the model
(Using F1 score and exact matches)

In [None]:
from tqdm import tqdm

def evaluate_peft_model(sample, max_target_length=512):
    # Generate extracted data
    input_ids = torch.tensor(sample["input_ids"]).unsqueeze(0).cuda()
    with torch.no_grad():
        outputs = model.generate(input_ids=input_ids, max_new_tokens=max_target_length)
    prediction = tokenizer.decode(outputs[0].detach().cpu().numpy(), skip_special_tokens=True)
    
    # Decode eval sample
    labels = tokenizer.decode(sample['labels'], skip_special_tokens=True)
 
    return prediction, labels

# Run predictions
predictions, references = [], []
for sample in tqdm(tokenized_test_dataset):
    prediction, reference = evaluate_peft_model(sample)
    predictions.append(prediction)
    references.append(reference)

In [None]:
from tabulate import tabulate

# Prepare data for tabulation
table_data = []
for i, (pred, ref) in enumerate(zip(predictions, references), 1):
    table_data.append([i, ref, pred])

# Define table headers
headers = ["#", "Reference data", "Extracted data (Finetuned model)"]

# Display the table
tabulate(table_data, headers=headers, tablefmt="html", showindex=False)


In [None]:
# TODO: calculate accuracy using metrics