# Entity Extraction with Large Language Models (LLMs)

This notebook provides guided examples for using **OpenAI's** **gpt-4o-mini** model to extract structured JSON outputs from unstructured data.

### Environment Setup
Import the necessary dependencies.

In [1]:
# Import required libraries
from pydantic import BaseModel, Field
from enum import Enum
from typing import Type, List
import openai
from openai import OpenAI

from dotenv import load_dotenv
import os, json

# Load environment variables from .env file
load_dotenv()

# Initialise the OpenAI client
client = OpenAI()

### Define a Reusable Helper Function for Entity Extraction
This function will handle the logic for entity extraction. It accepts two inputs as outlined below.

#### Inputs
* text: corpus of text that we we want to extract information from
* custom_class: schema definition for the desired structured output format

Note: We use OpenAI's **gpt-4o-mini** model for this task; however, you should be able to use any model that supports structured outputs.

#### Benefits
There are many advantages of abstracting and centralising logic inside a reusable function, including:
- Improved maintainability e.g., only need to update in one place if the model provider releases a change
- Experimentation e.g., trial different prompt instructions, model versions, or even model providers without needing to update downstream code

In [2]:
def extract_entities_from_single_input(text: str, custom_class: Type[BaseModel], client: OpenAI = OpenAI()):
    """
    Perform entity extraction on a text input and structure the output according to a provided pydantic model.
    
    Args:
    - text (str): The text to extract entities from.
    - custom_class (Type[BaseModel]): A pydantic model class defining the structure of the output.
    
    Returns:
    - Instance of custom_class containing the structured output.
    """
    prompt = f"""
    Extract entities from the following text and format the output according to the provided JSON schema:
    
    {text}
    
    Use the following JSON format:
    {custom_class.model_json_schema()}
    """

    try:
        completion = client.beta.chat.completions.parse(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": prompt},
                {"role": "user", "content": text}
            ],
            max_tokens=300,
            temperature=0,
            n=1,
            response_format=custom_class
        )

        extracted_object = completion.choices[0].message.parsed

        return extracted_object
    except openai.APIConnectionError as e:
        print("The server could not be reached")
        print(e.__cause__)  # an underlying Exception, likely raised within httpx.
        return e
    except openai.RateLimitError as e:
        print("A 429 status code was received; we should back off a bit.")
        return e
    except openai.APIStatusError as e:
        print("Another non-200-range status code was received")
        print(e.status_code)
        print(e.response)
        return e


### Practical Examples of Extracting Structured Outputs from Unstructured Data

#### Example 1: Analysing News Articles

In [None]:
# Define a general purpose class
class GeneralPurposeEntity(BaseModel):
    type: str = Field(..., description="Type of entity, e.g., 'person', 'organisation', 'location'")
    text: str = Field(..., description="Extracted entity text")

# Return a list, noting that our input data may contain many instances of entities
class GeneralPurposeEntityList(BaseModel):
    entities: List[GeneralPurposeEntity] = Field(..., description="List of extracted entities")

# Sample news article
# Source: https://www.wired.com/story/tesla-cybercab-is-here/
sample_text_general = """
    AARIAN MARSHALL | GEAR | OCT 10. 2024 11:24 PM
    Tesla's Cybercab Is Here
    At a livestreamed event this evening, Tesla CEO Elon Musk showed off the company's new Cybercab and shared some details about Tesla's plan to launch its own robotaxi service.
    """

# Perform entity extraction using the GeneralPurposeEntityList class
resp = extract_entities_from_single_input(sample_text_general, GeneralPurposeEntityList).model_dump()
print(json.dumps(resp, indent=2))


#### Example 2: Analysing Customer Support Transcripts

In [None]:
# Define the allowed values for severity
class Severity(str, Enum):
    low = "low"
    moderate = "moderate"
    high = "high"

class CustomerSupportTranscriptEntity(BaseModel):
    reported_by: str = Field(..., description="Name of the person who reported the issue. Default to 'Unspecified' if unknown.")
    issue_type: str = Field(..., description="Type of issue, e.g., 'battery', 'screen'")
    location: str = Field(..., description="The city associated with the issue.")
    severity: Severity = Field(..., description="Severity level of the issue. Categorise as 'low' if issue impacts a few people, 'moderate' if issue impacts 5-10 people, or 'high' if issue impacts more than 10 people.")

# Sample customer support transcript
sample_text_customer_support = "Annie Liao from BuildClub reported an issue with the air conditioning unit on Level 2 at the Stone & Chalk building located in Hay Market, Sydney."

# Perform entity extraction using the CustomerSupportTranscriptEntity class
resp = extract_entities_from_single_input(sample_text_customer_support, CustomerSupportTranscriptEntity).model_dump()
print(json.dumps(resp, indent=2))


#### Example 3: Processing Multiple Inputs
In the previous examples we dealt with single inputs. But what if we have many inputs e.g., many news articles or customer support transcripts? 

Let's create a wrapper around our helper function to accepts a list of inputs and iterate over each input.

In [None]:
def extract_entities_from_list_input(texts: List[str], custom_class: Type[BaseModel], client: OpenAI = OpenAI()):
    results = []
    # Iterate over each item and append the model output to results set
    for text in texts:
        result = extract_entities_from_single_input(text, custom_class, client).model_dump()
        results.append(result)
    return results

# List of sample news articles
sample_text_list = [
    "Customer Jack Wakem came into the Melbourne CBD Apple store requesting repair of Macbook Pro due to faulty battery.",
    "Anonymous complaint in relation to students using mechnical keyboards, disturbing peace and quiet of other students at Sydney Uni library."
]

resp = extract_entities_from_list_input(sample_text_list, CustomerSupportTranscriptEntity)
print(json.dumps(resp, indent=2))


#### Prepare the Input File for Batch Processing

In [None]:
# Define a research paper class
class ResearchPaperEntity(BaseModel):
    title: str
    authors: list[str]
    abstract: str
    keywords: list[str]

# Function to pre process the sample input file
def prepare_batch_input_file(input_file: str, output_file: str, custom_class: Type[BaseModel]):
    """
    Reads a .jsonl file, adds extra fields to each entry, and writes it to a new .jsonl file.

    Parameters:
        input_file (str): Path to the input .jsonl file.
        output_file (str): Path to the output .jsonl file.

    Returns:
        None
    """
    with open(input_file, 'r') as infile, open(output_file, 'w') as outfile:
        for line in infile:
            # Load the line as a JSON object (dictionary)
            entry = json.loads(line.strip())
            
            # Append additional fields needed for the batch api
            prompt = f"""
            Extract entities from the following text and format the output according to the provided JSON schema:
            
            {entry['content']}
            
            Use the following JSON format:
            {custom_class.model_json_schema()}
            """

            extra_fields = {
                "custom_id": entry['row_id'],
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": "gpt-4o-mini",
                    "messages": [
                        {"role": "system", "content": prompt},
                        {"role": "user", "content": entry['content']}
                    ],
                    "max_tokens": 300,
                    "temperature": 0,
                    "n": 1,
                    "response_format": custom_class.model_json_schema()
                }
            }

            # Update the entry with additional fields
            entry.update(extra_fields)

            # Create a new entry with only the specified fields
            filtered_entry = {field: entry[field] for field in extra_fields if field in entry}
            
            # Write the modified entry to the output file
            outfile.write(json.dumps(filtered_entry) + '\n')
    
    print(f"Modified entries written to {output_file}")

# Example usage
input_file = 'sample_research_papers.jsonl'
output_file = 'batch_input.jsonl'
custom_class = ResearchPaperEntity

prepare_batch_input_file(input_file, output_file, custom_class)

#### Upload the Input File

In [None]:
batch_input_file = client.files.create(
  file=open("batch_input.jsonl", "rb"),
  purpose="batch"
)


#### Submit the Batch Job

In [None]:
# Initialise the OpenAI client
client = OpenAI()

batch_input_file_id = batch_input_file.id

batch_job = client.batches.create(
    input_file_id=batch_input_file_id,
    endpoint="/v1/chat/completions",
    completion_window="24h",
    metadata={
      "description": "entity extraction job"
    }
)


#### Check the Batch Job Status

In [None]:
print(json.dumps(client.batches.retrieve(batch_job.id).model_dump(), indent=2))

#### Retrieve the Results

In [None]:
if(batch_job.status == 'completed'):
    file_response = client.files.content(batch_job.output_file_id)
    print(file_response.text)
else:
    print(batch_job.status)


### Custom Batch Processing Job

In [None]:
import os, json
import asyncio
import random
from functools import partial
from typing import Type, Callable, Any, List
from pydantic import BaseModel, Field
import openai
from openai import AsyncOpenAI

# Use the async client
client = AsyncOpenAI()

# Define a research paper class
class ResearchPaperEntity(BaseModel):
    paper_title: str = Field(..., description="Inferred from the header section or contents of the paper.")
    authors: list[str] = Field(..., description="Names of individuals who contributed to the paper.")
    abstract: str = Field(..., description="One line summary of the paper.")
    keywords: list[str] = Field(..., description="Metadata to assist with keyword search.")

# Helper function with retry and backoff mechanism
async def backoff_retry_async(func: Callable[[], Any], max_retries=5, base_delay=1):
    retries = 0
    while retries < max_retries:
        try:
            return await func()
        except openai.RateLimitError as e:
            delay = base_delay * (2 ** retries) + random.uniform(0, 0.1)  # Exponential backoff with jitter
            print(f"Rate limit reached. Retrying in {delay:.2f} seconds...")
            await asyncio.sleep(delay)
            retries += 1
        except openai.APIConnectionError as e:
            print("The server could not be reached")
            print(e.__cause__)
            return e  # Return the exception if non-retry error occurs
        except openai.APIStatusError as e:
            print("Another non-200-range status code was received")
            print(e.status_code)
            print(e.response)
            return e  # Return the exception if non-retry error occurs

    return openai.RateLimitError("Max retries exceeded. Exiting.")  # Return a custom error after max retries

# Updated entity extraction function using asynchronous client
async def extract_entities_from_single_input_async(text: str, custom_class: Type[BaseModel], client: AsyncOpenAI = AsyncOpenAI()):
    """
    Perform entity extraction on a text input and structure the output according to a provided pydantic model.
    
    Args:
    - text (str): The text to extract entities from.
    - custom_class (Type[BaseModel]): A pydantic model class defining the structure of the output.
    
    Returns:
    - Instance of custom_class containing the structured output.
    """
    prompt = f"""
    Extract entities from the following text and format the output according to the provided JSON schema:
    
    {text}
    
    Use the following JSON format:
    {custom_class.model_json_schema()}
    """

    try:
        completion = await client.beta.chat.completions.parse(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": prompt},
                {"role": "user", "content": text}
            ],
            max_tokens=1000,
            temperature=0,
            n=1,
            response_format=custom_class
        )

        extracted_object = completion.choices[0].message.parsed

        return extracted_object
    except openai.APIConnectionError as e:
        print("The server could not be reached")
        print(e.__cause__)  # an underlying Exception, likely raised within httpx.
        return e
    except openai.RateLimitError as e:
        print("A 429 status code was received; we should back off a bit.")
        return e
    except openai.APIStatusError as e:
        print("Another non-200-range status code was received")
        print(e.status_code)
        print(e.response)
        return e

# Custom batch processing job
async def batch_processing_job(texts: List[str], custom_class: Type[BaseModel]):

    # Create a list of tasks, one for each input
    tasks = [
        backoff_retry_async(partial(extract_entities_from_single_input_async, text=text, custom_class=custom_class, client=client))
        for text in texts
    ]

    # Run tasks concurrently
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Process results
    for idx, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"### Error encountered while processing record {idx + 1}: {result}")
        else:
            print(f"### Success for record {idx + 1}:")
            print(json.dumps(result.model_dump(), indent=2))

# Function to load texts from a .jsonl file
def load_texts_from_jsonl(file_path: str) -> List[str]:
    texts = []
    with open(file_path, 'r') as f:
        for line in f:
            data = json.loads(line)
            texts.append(data['content'])  # Extracting 'content' from each line
    return texts

# Load texts from the .jsonl file and run the extraction
research_papers = load_texts_from_jsonl("sample_research_papers.jsonl")
await batch_processing_job(research_papers, ResearchPaperEntity)