# Module 1: Structured Data Extraction with LLMs

Welcome to the first module of Cohort-2! This notebook marks the beginning of our **Ingestion Pipeline**. Our goal is to transform the raw, unstructured text from Aperture Global Logistics (AGL) contracts into a structured, machine-readable format.

**Our Mission:** Take 5 raw contract text files as input and use the Gemini 2.0 Flash LLM to extract key entities and clauses, producing a clean `contract_data.json` file as our final output.

## 1. Setup and Dependencies

First, let's install and import the necessary Python libraries. We'll need libraries for asynchronous operations, data validation, and interacting with Google's Generative AI models.

In [2]:
%pip install -qU langchain-google-genai pydantic langchain python-dotenv isodate

Note: you may need to restart the kernel to use updated packages.


In [4]:
import asyncio
import json
import os
import getpass
from datetime import datetime
from typing import List, Optional
#from google.colab import userdata

import isodate
from langchain_google_genai import ChatGoogleGenerativeAI
from pydantic import BaseModel, Field
#from tqdm.asyncio import tqdm as async_tqdm

## 2. Configure Google API Key

To use the Gemini model, you need to configure your Google API key. The following cell will securely prompt you to enter your key.

In [12]:
# https://aistudio.google.com/app/apikey

# Set up the API key by reading from .env file
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Read the API key from environment variable
api_key = os.getenv("GOOGLE_API_KEY")
if api_key:
    print("Google API key set.")
else:
    print("Google API key not found. Please ensure it is set in your .env file.")

Google API key set.


## 3. Load Contract Data

We need to load our five AGL contracts from their respective `.md` files. The function below will read all files from a specified folder.

**Action:** Please create a folder named `data` in the same directory as this notebook and place the 5 AGL contract `.md` files inside it.

In [23]:
def read_markdown_files(folder_path):
    """
    Reads all .md files from a specified folder and returns a list of dictionaries.
    Each dictionary contains:
        - 'file_id': the filename without extension
        - 'text': the full text content of the markdown file
    """
    data = []
    # Check if the folder exists; if not, raise an error with a helpful message
    if not os.path.exists(folder_path):
        print(folder_path)
        raise FileNotFoundError(f"Folder '{folder_path}' not found. Please create it and add the contract files.")

    # Iterate over all files in the folder
    #print(os.listdir(folder_path))
    for filename in os.listdir(folder_path):
        # Only process files with a .md extension
        if filename.endswith(".md"):
            file_id = os.path.splitext(filename)[0]  # Remove file extension for ID
            file_path = os.path.join(folder_path, filename)  # Full path to the file
            # Open and read the file content as UTF-8 text
            with open(file_path, "r", encoding="utf-8") as file:
                text = file.read()
                # Append a dictionary with file_id and text to the data list
                data.append({"file_id": file_id, "text": text})
    #print(type(data))
    return data
    

# Load the contracts from the 'data' folder
try:
    contracts = read_markdown_files("data")  # Attempt to read contract files from 'data'
    print(f"Successfully loaded {len(contracts)} contracts.")  # Print the number of contracts loaded
except FileNotFoundError as e:
    print(e)  # Print the error message if the folder is missing

#print(contracts)

Successfully loaded 5 contracts.


In [15]:
# Print the first 2 records from the loaded contracts list as a sample.
# This helps in quickly inspecting the raw data structure before processing.
print("--- Sample of Loaded Contracts (first 2 records) ---")
# Use slicing [0:2] to get the first two elements of the list.
# Use json.dumps with indent for pretty printing the dictionaries.
if 'contracts' in locals() and contracts:
    for i, contract in enumerate(contracts[:2]):
        #print(f"\nContract {i+1}:")
        print(json.dumps(contract, indent=2))
else:
    print("Contracts variable not found or is empty.")

--- Sample of Loaded Contracts (first 2 records) ---
{
  "file_id": "Contract_05_Reseller_AGL_LogiSync",
  "text": "EXHIBIT 10.1\n\nRESELLER AGREEMENT\n\nTHIS RESELLER AGREEMENT (this \"Agreement\") is made and entered into effect the 7th day of April, 2017 (\"Effective Date\"), by and between Aperture Global Logistics (\"AGL\"), a Delaware corporation, having its offices at 123 Main Street, Anytown, USA 12345 (\"Reseller\") and the company set forth below (\"Company\") (each, individually, a \"party\" and collectively, \"parties\"):\n\nCompany: LogiSync Solutions Inc.\n\nTelephone: 1-800-555-1234\n\nAddress: 456 Tech Avenue, Suite 100, Silicon Valley, CA 94088\n\nFax: N/A\n\nE-mail: sales@testxyz.com\n\nTerritory: Worldwide for internal enterprise use by Aperture Global Logistics and its affiliates.\n\nAgreement Term: 1 Year\n\nCompany Products: Logistics Optimization Software Suite and Supply Chain Visibility Services\n\nOther Terms (not applicable if blank):\n\nPricing: Reseller wil

## 4. Define the Data Schema with Pydantic

This is the most critical step. We need to tell the LLM *exactly* what information to extract and what format to use. We do this by defining a Pydantic schema.

- **`Location`**: Captures geographic information.
- **`Organization`**: Represents a company or party involved in the contract.
- **`Clause`**: Extracts a summary of specific legal clauses we're interested in.
- **`Contract`**: The main model that brings everything together.

In [32]:
CLAUSE_TYPES = [
    "Renewal & Termination",
    "Confidentiality & Non-Disclosure",
    "Liability & Indemnification",
    "Intellectual Property",
    "Payment and Freight Terms",
    "Dispute Resolution"
]

CONTRACT_TYPES = [
    "Distributor",
    "Reseller",
    "Service",
    "Supply",
    "Transportation",
]

 
# This code defines a Pydantic model named Clause, representing a specific
# clause in a contract.
#
# - `class Clause(BaseModel)`: Inherits from Pydantic's BaseModel for
#   validation and parsing.
# - `summary: str = Field(..., description="Summary of the clause using no
#   pronouns")`: Required string field for a clause summary, without pronouns.
# - `clause_type: str = Field(..., description="Clause types",
#   enum=CLAUSE_TYPES)`: Required string field for the clause type, must be
#   one of the allowed types in CLAUSE_TYPES.
#
# The `...` in `Field(...)` means the field is required when creating an
# instance.

class Clause(BaseModel):
    """Represents a clause in a contract."""
    summary: str = Field(..., description="Summary of the clause using no pronouns")
    clause_type: str = Field(..., description="Clause types", enum=CLAUSE_TYPES)

# In Pydantic, ... in Field(...) means the field is required (must be provided), even if its type is Optional. 
# This enforces that the field is present, but its value can be None.
class Location(BaseModel):
    """Represents a physical location."""
    city: Optional[str] = Field(..., description="The city of the location. Use None if not provided")
    state: Optional[str] = Field(..., description="The state or region of the location. Use None if not provided")
    country: str = Field(..., description="The country of the location. Use the two-letter ISO standard.")

class Organization(BaseModel):
    """Represents an organization, including its name and location."""
    name: str = Field(..., description="The name of the organization.")
    location: Location = Field(..., description="The primary location of the organization.")
    role: str = Field(..., description="The role of the organization in the contract, e.g., 'Shipper', 'Carrier', 'Supplier', 'Buyer'.")

class Contract(BaseModel):
    """Represents the key details of the contract."""
    summary: str = Field(..., description="High-level summary of the contract with relevant facts and details. Do not use any pronouns.")
    contract_type: str = Field(..., description="The type of contract being entered into.", enum=CONTRACT_TYPES)
    parties: List[Organization] = Field(..., description="List of parties involved in the contract, with details of each party's role.")
    effective_date: str = Field(..., description="The date when the contract becomes effective in yyyy-MM-dd format.")
    duration: Optional[str] = Field(None, description="The duration of the agreement. Use ISO 8601 duration standard (e.g., P1Y for 1 year, P2M for 2 months).")
    end_date: Optional[str] = Field(None, description="The date when the contract expires in yyyy-MM-dd format.")
    governing_law: Optional[Location] = Field(None, description="The jurisdiction's laws governing the contract.")
    clauses: Optional[List[Clause]] = Field(None, description=f"Relevant summaries of clause types. Allowed clause types are {CLAUSE_TYPES}")

## 5. LLM Processing and Data Cleaning

Now we'll define the functions to process our contracts.

- **`process_contract`**: This function takes a single contract's text, sends it to the Gemini LLM, and asks for the output to be structured according to our `Contract` Pydantic model.
- **`process_all_contracts`**: This function manages the concurrent execution for all 5 contracts to speed things up. It uses a semaphore to avoid hitting API rate limits.
- **Helper functions**: We also include helpers to validate dates and infer a contract's `end_date` if a `duration` is provided.

In [None]:
# Although we have reduced the contracts to 5 for this notebook, the code is designed to handle a larger number of contracts.
# asyncio is used here to allow multiple contracts to be processed concurrently, 
# making efficient use of time while waiting for LLM API responses (which are I/O-bound and slow). 
# The async with semaphore limits the number of concurrent LLM calls to avoid rate limits. We are using max_workers=5 for this notebook.
# Starting a new one as soon as any finish, to avoid API rate limits.
# This way, the code can process several contracts in parallel without blocking, speeding up the overall extraction pipeline.

# Instantiate the Gemini LLM with structured output support for contract extraction
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash")

def is_valid_date(date_string):
    """
    Check if the input string is a valid date in YYYY-MM-DD format.

    Returns True if valid, False otherwise.
    """
    if not date_string:
        return False
    try:
        datetime.strptime(date_string, '%Y-%m-%d')
        return True
    except ValueError:
        return False

def add_duration_to_date(date_str, duration_str):
    """
    Add an ISO 8601 duration (e.g., 'P1Y', 'P2M') to a date string (YYYY-MM-DD).

    Returns the resulting date as a string in YYYY-MM-DD format, or None if invalid.
    """
    try:
        date_obj = datetime.strptime(date_str, "%Y-%m-%d")  # Parse the base date
        duration = isodate.parse_duration(duration_str)      # Parse the ISO 8601 duration
        result_date = date_obj + duration                    # Add duration to date
        return result_date.strftime("%Y-%m-%d")
    except (ValueError, isodate.ISO8601Error):
        return None

async def process_contract(contract_data, semaphore):
    """
    Process a single contract's text using the LLM and the Contract Pydantic schema.

    - Uses a semaphore to limit concurrency.
    - Calls the LLM for structured extraction.
    - Validates and cleans date fields.
    - Infers end_date from duration if necessary.
    - Returns a dictionary with structured contract data.
    """
    async with semaphore:
        try:
            # Configure the LLM to return output matching the Contract schema
            structured_llm = llm.with_structured_output(Contract)
            
            # Invoke the LLM asynchronously with the contract text
            result_obj = await structured_llm.ainvoke(contract_data["text"])
            
            # Convert the Pydantic model output to a dictionary
            structured_data = json.loads(result_obj.model_dump_json())
            
            # Attach the file identifier to the result
            structured_data["file_id"] = contract_data["file_id"]

            # Validate and clean the effective_date field
            if not is_valid_date(structured_data.get("effective_date")):
                structured_data["effective_date"] = None
            
            # Validate and clean the end_date field
            if not is_valid_date(structured_data.get("end_date")):
                structured_data["end_date"] = None

            # If end_date is missing but duration and effective_date are present, infer end_date
            if (
                not structured_data.get("end_date")
                and structured_data.get("effective_date")
                and structured_data.get("duration")
            ):
                structured_data["end_date"] = add_duration_to_date(
                    structured_data["effective_date"], structured_data["duration"]
                )

            return structured_data
        except Exception as e:
            # Log and return error information for this contract
            print(f"Error processing {contract_data['file_id']}: {e}")
            return {"file_id": contract_data["file_id"], "error": str(e)}

async def process_all_contracts(contracts_list, max_workers=5):
    """
    Process a list of contracts concurrently using asyncio.

    - contracts_list: List of contract data dictionaries.
    - max_workers: Maximum number of concurrent LLM calls (default: 5).
    - Returns a list of structured contract data results.
    """
    semaphore = asyncio.Semaphore(max_workers)  # Limit concurrent LLM calls
    tasks = [process_contract(c, semaphore) for c in contracts_list]  # Create async tasks

    results = []
    # Asynchronously gather results as each contract is processed
    for future in asyncio.as_completed(tasks):
        result = await future
        results.append(result)
    return results

## 6. Run the Extraction Pipeline

Let's execute the pipeline. This will process all 5 contracts and may take a few moments. You'll see a progress bar tracking the status.

In [34]:
if 'contracts' in locals() and contracts:
    # Run the asynchronous processing function
    extracted_results = await process_all_contracts(contracts)
    print("\nExtraction complete.")
    # Display the first result as a sample
    print("\n--- Sample Output ---")
    print(json.dumps(extracted_results[0], indent=2))
else:
    print("Contract data not loaded. Please ensure the 'data' folder and files are set up correctly.")


Extraction complete.

--- Sample Output ---
{
  "summary": "Cyberdyne appoints Distributor as a non-exclusive, worldwide distributor for the Products and Services to Customers.",
  "contract_type": "Distributor",
  "parties": [
    {
      "name": "CYBERDYNE ROBOTICS",
      "location": {
        "city": "Sunnyvale",
        "state": "California",
        "country": "US"
      },
      "role": "Manufacturer"
    },
    {
      "name": "APERTURE GLOBAL LOGISTICS",
      "location": {
        "city": "Anytown",
        "state": "Delaware",
        "country": "US"
      },
      "role": "Distributor"
    }
  ],
  "effective_date": "2010-06-08",
  "duration": null,
  "end_date": null,
  "governing_law": {
    "city": null,
    "state": null,
    "country": "DE"
  },
  "clauses": [
    {
      "summary": "All Confidential Information furnished to a party will be subject to and the parties' rights and obligations with respect to such Confidential Information shall be governed by the Confide

## 7. Save the Structured Data

Finally, we'll save our list of structured contract data into a single JSON file. This file will be the input for Module 2, where we'll build our knowledge graph.

In [35]:
if 'extracted_results' in locals() and extracted_results:
    output_filename = "contract_data.json"
    with open(output_filename, "w") as json_file:
        json.dump(extracted_results, json_file, indent=4)
    print(f"Successfully saved structured data to {output_filename}")
else:
    print("No extracted data to save.")

Successfully saved structured data to contract_data.json


### Congratulations!

You have successfully completed Module 1. We have transformed raw legal text into a clean, structured `contract_data.json` file. We are now ready to move on to Module 2, where we will use this file to construct our Neo4j Knowledge Graph.

## Appendix


### A Note on Our Schema for Contract Types and Contract Clauses

For this cohort, we are using a **predefined, fixed list** of `CONTRACT_TYPES` and `CLAUSE_TYPES` in our Pydantic schema. This is an intentional choice designed to help us focus on mastering the core end-to-end GraphRAG pipeline without the added complexity of dynamic schema management.

However, in a real-world production scenario with hundreds or thousands of varied contracts, this fixed approach would be too rigid. We would inevitably encounter new or uniquely phrased clauses that your system would miss.

A production-grade solution would use a more sophisticated, iterative approach:

1.  **Open Extraction:** Instead of providing a fixed list, we would prompt the LLM to identify and categorize *all* contract and clause types it finds in a document.
2.  **Dynamic Taxonomy Management:** The system would flag any newly discovered types that are not in the existing schema.
3.  **Human-in-the-Loop Review:** A legal expert or data steward would then review these new suggestions to validate, merge, or add them to the official knowledge graph schema.

This "human-in-the-loop" process ensures the knowledge graph is robust, evolves over time, and accurately reflects the full diversity of the legal documents, making it a truly reliable enterprise asset.

### FAQ

**Question**: Where are we invoking the LLM and which prompt are we using to get structured output?

**Answer**: We invoke the LLM in `process_contract` with `await structured_llm.ainvoke(contract_data["text"])`. The prompt is simply the contract's raw text (`contract_data["text"]`). Structured output is enforced by `llm.with_structured_output(Contract)`, which instructs the LLM to return data matching the `Contract` Pydantic schema—no explicit prompt template is shown; the schema guides the output format.

---

**Question**: What is the role of the Pydantic schema in this workflow?

**Answer**: The Pydantic schema (the `Contract` class) defines the structure and types of the data we want to extract from each contract. When we use `llm.with_structured_output(Contract)`, the LLM is guided to produce output that matches this schema, ensuring consistency and type safety in the extracted data.

---

**Question**: Why don't we need to write a custom prompt template for the LLM?

**Answer**: Because we use `with_structured_output` with a Pydantic schema, the LLM is automatically instructed to return data in the required structured format. This approach leverages the schema as a "contract" for the output, so a custom prompt template is not necessary for basic extraction.

---

**Question**: What if LLM is not extracting all the clause types mentioned in the original contract document?

**Answer**: If the LLM is not extracting all clause types, you can absolutely write a custom prompt to improve extraction. This is possible and often effective. A custom prompt allows you to give the LLM more explicit instructions, such as asking it to carefully identify every clause, use a specific set of clause types, or even output in a particular format. You can also provide examples (few-shot prompting) to show the LLM exactly how you want the extraction to work. For instance, you might instruct: "Extract all clauses from the contract below. For each clause, assign it to one of these clause types: [list of types]. If a clause does not fit, use 'Other'. Output the results as a JSON list with 'summary' and 'clause_type' for each clause." This approach can help the LLM be more thorough and accurate, especially if you notice consistent omissions with schema-only guidance.

Here is an example code snippet for using a custom prompt with an LLM:

1. Configure the LLM to return output matching the Contract schema

    structured_llm = llm.with_structured_output(Contract)

2. Create your custom prompt

    custom_prompt = f'''You are a legal contract analyst. Carefully read the contract text below and extract every clause. For each clause, provide a summary and assign it to one of the following EXACT clause types:

    {chr(10).join([f"{i+1}. {clause_type}" for i, clause_type in enumerate(CLAUSE_TYPES)])}

    For each clause, provide a summary and the exact clause type from the list above. If none fit, use "Other". Output your answer as a JSON list, where each item has "summary" and "clause_type".

    Contract Text:
    {contract_text}'''

3. Invoke the LLM asynchronously with your custom prompt

    result_obj = await structured_llm.ainvoke(custom_prompt.format(contract_text=contract_data["text"]))

---

**Question**: What happens if the contract text is missing some fields required by the schema?

**Answer**: If the contract text does not contain information for a required field, the LLM may return `null` (or `None` in Python) for that field, or it may attempt to infer or leave it blank. It's important to review the extracted data for completeness and accuracy, especially for required fields.

---

**Question**: Can we use this approach for other document types or schemas?

**Answer**: Yes! As long as you define an appropriate Pydantic schema for your target data structure, you can use `llm.with_structured_output(YourSchema)` to extract structured data from any unstructured text, not just contracts.

---

**Question**: How does the LLM know what the allowed contract types or clause types are?

**Answer**: The allowed values for fields like `contract_type` and `clause_type` are defined as Enums in the Pydantic schema. When the LLM is guided by the schema, it will try to select values from these predefined lists, helping to standardize the extracted data.