In [1]:
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.document_loaders import PyPDFLoader
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.runnables import RunnableParallel, RunnableLambda
from pydantic import BaseModel, Field
from typing import Dict, List, Any, Optional
import os
import json
import asyncio
import uuid
import logging
from tqdm.notebook import tqdm
import time
from dotenv import load_dotenv


In [2]:
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

In [3]:
# Load environment variables
load_dotenv()
google_api_key = os.getenv("GOOGLE_API_KEY")  

In [4]:
llm = ChatGoogleGenerativeAI(
    model="gemini-2.5-flash-preview-04-17",
    temperature=0,
    max_tokens=None,
    timeout=None,
    max_retries=2,
)

In [5]:
def load_pdf_content(pdf_path, return_single_string=True, extract_metadata=False):
    """
    Load and parse a PDF document, returning its text content.
    
    Args:
        pdf_path (str): Path to the PDF file
        return_single_string (bool): If True, returns the entire PDF content as a single string.
                                    If False, returns a list of strings (one per page).
        extract_metadata (bool): If True, returns metadata along with content
    
    Returns:
        If return_single_string is True and extract_metadata is False:
            str: The entire text content of the PDF
        If return_single_string is False and extract_metadata is False:
            list: List of strings, one for each page
        If extract_metadata is True:
            tuple: (content, metadata) where content is either a string or list based on return_single_string
    """
    
    # Check if the file exists
    if not os.path.exists(pdf_path):
        raise FileNotFoundError(f"PDF file not found at: {pdf_path}")
    
    # Initialize the loader with the appropriate mode
    mode = "single" if return_single_string else "elements"
    loader = PyPDFLoader(pdf_path, mode=mode)
    
    # Load the documents
    docs = loader.load()
    
    if return_single_string:
        # With mode="single", there should only be one document containing all pages
        content = docs[0].page_content if docs else ""
        metadata = docs[0].metadata if docs else {}
    else:
        # With default mode, each document is a page
        content = [doc.page_content for doc in docs]
        metadata = [doc.metadata for doc in docs]
    
    if extract_metadata:
        return content, metadata
    else:
        return content

In [6]:
doc = load_pdf_content("Cloud Computing Copy Lecture Notes.pdf")

In [7]:
print(doc[:1000])  # Print the first 1000 characters of the loaded document

Cloud Computing Lecture Notes 
Distributed Computing/Systems 
Definition: 
Distributed computing refers to a system where computing resources are distributed 
across multiple locations rather than being centralized in a single system. This enables 
task distribution and efficient resource utilization. 
Why Use Distributed Systems? 
• Scalability Issues: Traditional computing faces bottlenecks due to hardware 
limitations, whereas distributed systems allow for hardware scaling. 
• Connected Devices: In a networked system, connected devices communicate, but 
this does not necessarily make them distributed. 
• IoT (Internet of Things): IoT is one of the largest examples of distributed computing. 
• Multi-layered System Design: Distributed computing enables systems to function 
in multiple layers, with each layer acting as a distributed entity. 
• User Perspective: Although the system consists of multiple machines, distributed 
computing presents a unified system to users. 
 
Parallel Comp

In [8]:
# Define Pydantic model for entity schema parser
class EntitySchema(BaseModel):
    """Entity types and their properties."""
    entities: Dict[str, List[str]] = Field(
        description="Dictionary mapping entity types to their possible properties"
    )

In [9]:
# Create entity extraction chain
def create_entity_extraction_chain():
    parser = JsonOutputParser(pydantic_object=EntitySchema)
    
    # Prompt template
    prompt = PromptTemplate(
        template="""
    You are the first agent in a multi-step workflow to build a Knowledge Graph from raw text.

    Workflow Steps Overview:
    1. Extract high-level entity types and their properties from the text. [CURRENT STEP]
    2. Extract specific instances of entities and their properties based on the identified types.
    3. Deduplicate extracted instances and assign them unique identifiers.
    4. Identify and define relationships between the instances of entities.
    5. Create a structured knowledge graph using the extracted entities and relationships.

    You are the FIRST agent in this workflow.


    YOUR TASK:
    - Identify high-level, general entity types (e.g., Person, Company, Location, Event).
    - For each entity type, list all the possible (available) properties it might have.
    - Focus on information that would be useful for structuring a knowledge graph.
    - Stay general — do not extract specific names, examples, or relationships.
    - Avoid unnecessary details or context-specific examples.

    FORMAT:
    - Return a valid JSON object.
    - Keys = entity types (strings).
    - Values = lists of property names (strings).
    - Use double quotes for all keys and string values.
    - No extra explanation, text, or markdown formatting.

    EXAMPLES:
    {{
        "Person": ["name", "age", "email", "address"],
        "Company": ["name", "industry", "founded_date"],
        "Location": ["name", "coordinates", "population"]
    }}

    Text to process: {input}

    {format_instructions}

    Response:
    """,
        input_variables=["input"],
        partial_variables={"format_instructions": parser.get_format_instructions()},
    )

    
    # Build the chain
    chain = prompt | llm | parser
    
    return chain


In [10]:
# Function to extract entities from text with retry logic
def extract_entity_schema(text, max_retries=3):
    """
    Extract entity types and their properties from input text with retry logic.
    
    Args:
        text (str): Input text to analyze
        max_retries (int): Maximum number of retry attempts
        
    Returns:
        dict: Dictionary mapping entity types to lists of properties
    """
    chain = create_entity_extraction_chain()
    
    for attempt in range(max_retries):
        try:
            result = chain.invoke({"input": text})
            # The result is the entities dictionary from the Pydantic model
            return result.get("entities", {})
        except Exception as e:
            if attempt < max_retries - 1:
                print(f"Attempt {attempt + 1} failed. Retrying... Error: {str(e)[:100]}...")
            else:
                print(f"All {max_retries} attempts failed. Last error: {str(e)[:100]}...")
                # Return empty dict as fallback
                return {}

In [11]:
sample_text = """
John Doe, a 35-year-old software engineer, works at Google in Mountain View.
He graduated from MIT with a degree in Computer Science and has been with the company for 5 years.
Google, founded in 1998, is a technology company specializing in internet services and products.
John lives in San Francisco and commutes to work daily. His email is john.doe@example.com.
"""

entities = extract_entity_schema(sample_text)
print(entities)

print("Extracted Entity Schema:")
for entity_type, properties in entities.items():
    print(f"\n{entity_type}:")
    for prop in properties:
        print(f"- {prop}")

{'Person': ['name', 'age', 'occupation', 'employer', 'education', 'residence', 'email', 'tenure'], 'Company': ['name', 'location', 'founded_date', 'industry', 'specialization'], 'Location': ['name'], 'Educational Institution': ['name'], 'Field of Study': ['name']}
Extracted Entity Schema:

Person:
- name
- age
- occupation
- employer
- education
- residence
- email
- tenure

Company:
- name
- location
- founded_date
- industry
- specialization

Location:
- name

Educational Institution:
- name

Field of Study:
- name


In [12]:
# Extract entities from the loaded PDF document
entities = extract_entity_schema(doc)

print("\nExtracted Entity Schema:")
for entity_type, properties in entities.items():
    print(f"\n{entity_type}:")
    for prop in properties:
        print(f"- {prop}")


Extracted Entity Schema:

Computing Concept:
- definition
- characteristics
- use_cases
- limitations
- aspects
- related_concepts

System Architecture:
- description
- characteristics
- components
- use_cases
- comparison_aspects

Platform:
- overview
- purpose
- architecture
- components
- service_offerings
- deployment_aspects
- management_aspects
- security_aspects
- scalability_aspects
- reliability_aspects
- cost_aspects
- features

Resource:
- description
- characteristics
- management_aspects
- lifecycle_aspects
- allocation_aspects
- pricing_aspects
- type

Storage Type:
- description
- characteristics
- use_cases
- pricing_models
- management_aspects

Database Type:
- description
- characteristics
- use_cases
- management_aspects
- migration_aspects

Network Entity:
- definition
- purpose
- characteristics
- components
- management_aspects
- security_aspects
- type

Service Model:
- definition
- characteristics
- responsibility_division

Deployment Model:
- definition
- char

In [18]:
  # Save entity schema for next step
with open("entity_schema.json", "w") as f:
    json.dump({"entities": entities}, f, indent=2)

# Step 2

In [19]:
# Function to split text into chunks
def split_text_into_chunks(text, chunk_size=5000, chunk_overlap=500):
    """
    Split the input text into manageable chunks using RecursiveCharacterTextSplitter.
    
    Args:
        text: The input text to be split
        chunk_size: Maximum size of each chunk in characters
        chunk_overlap: Overlap between consecutive chunks
        
    Returns:
        list: List of Document objects
    """
    logger.info(f"Splitting text into chunks (size={chunk_size}, overlap={chunk_overlap})")
    
    # Initialize the splitter with paragraph-focused splitting
    splitter = RecursiveCharacterTextSplitter(
        separators=["\n\n", "\n", ". ", " ", ""],  # Try to split at paragraph boundaries first
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        keep_separator=True,
        add_start_index=True  # Add start position metadata
    )
    
    # Split the text into chunks
    chunks = splitter.create_documents([text])
    
    # Add chunk index as metadata
    for i, chunk in enumerate(chunks):
        chunk.metadata["chunk_id"] = i
        chunk.metadata["total_chunks"] = len(chunks)
    
    logger.info(f"Text split into {len(chunks)} chunks")
    return chunks

In [20]:
# Define Pydantic models for entity instance extraction
class EntityInstance(BaseModel):
    """A single instance of an entity with its properties."""
    property_values: Dict[str, Any] = Field(
        description="Dictionary mapping property names to values for this instance"
    )
    
class EntityInstances(BaseModel):
    """Instances of a specific entity type."""
    Entity: str = Field(description="The entity type name")
    Instances: List[Dict[str, Any]] = Field(
        description="List of instances found for this entity type"
    )

class ChunkExtractionResult(BaseModel):
    """Result of entity extraction from a single chunk."""
    entities: List[EntityInstances] = Field(
        description="List of entity types and their instances found in this chunk"
    )
    chunk_id: int = Field(description="ID of the chunk this extraction is from")
    error: Optional[str] = Field(None, description="Error message if extraction failed")

In [21]:
# Function to create entity instance extraction chain
def create_entity_instance_extraction_chain():
    """
    Create a chain for extracting entity instances from text chunks.
    
    Returns:
        Chain: A chain that extracts entity instances from text chunks
    """
    # Entity instance extraction result parser
    parser = JsonOutputParser(pydantic_object=ChunkExtractionResult)
    
    # Create prompt template for entity instance extraction
    prompt = PromptTemplate(
        template="""
        You are part of a multi-step workflow to build a Knowledge Graph from raw text.

        Workflow Steps Overview:
        1. Extract high-level entity types and their properties from the text. [COMPLETED]
        2. Extract specific instances of entities and their properties from text chunks. [CURRENT STEP]
        3. Deduplicate extracted instances and assign them unique identifiers.
        4. Identify and define relationships between the instances of entities.
        5. Create a structured knowledge graph using the extracted entities and relationships.

        YOUR TASK:
        You are processing a CHUNK of the full text. Focus ONLY on extracting CONCRETE INSTANCES of entities found in this chunk.

        GIVEN:
        1. A chunk of text
        2. A schema of entity types and their possible properties

        INSTRUCTIONS:
        - Extract ALL instances of the predefined entity types found in this chunk
        - For each instance, extract values for as many properties as are mentioned in the text
        - Be precise - only extract information explicitly stated in this chunk
        - Do NOT make up or infer missing properties
        - If a property is not mentioned, omit it from the output (don't include it with null/empty values)

        INPUT TEXT CHUNK:
        {chunk}

        ENTITY TYPES AND THEIR PROPERTIES:
        {entity_schema}

        FORMAT YOUR RESPONSE AS FOLLOWS:
        - Return a valid JSON object
        - Include the chunk_id provided with the input
        - For each entity type found, include its name and a list of instance objects
        - Each instance object should contain only the properties mentioned in this chunk
        - Properties not mentioned should be omitted entirely (not included as null/empty)
        - If no instances of a particular entity type are found, do not include that entity type

        {format_instructions}

        EXAMPLE RESPONSE FOR A CHUNK ABOUT PEOPLE AND COMPANIES:
        {{
        "entities": [
            {{
            "Entity": "Person",
            "Instances": [
                {{"name": "John Doe", "age": 35, "email": "john@example.com"}},
                {{"name": "Jane Smith", "email": "jane@example.com"}}
            ]
            }},
            {{
            "Entity": "Company",
            "Instances": [
                {{"name": "Google", "industry": "Technology", "founded": 1998}}
            ]
            }}
        ],
        "chunk_id": 3
        }}
        Begin your extraction now: """, 
    input_variables=["chunk", "entity_schema", "chunk_id"], 
    partial_variables={"format_instructions": parser.get_format_instructions()}, 
    )

    # Build the chain
    chain = prompt | llm | parser

    return chain

In [22]:
# Function to process a single chunk with retry logic
def process_chunk(inputs, max_retries=3):
    """
    Process a single text chunk to extract entity instances with retry logic.
    
    Args:
        inputs: Dictionary containing chunk and entity_schema
        max_retries: Maximum number of retry attempts
        
    Returns:
        dict: Extraction results
    """
    chunk = inputs["chunk"]
    entity_schema = inputs["entity_schema"]
    chunk_id = chunk.metadata.get("chunk_id", 0)
    
    for attempt in range(max_retries):
        try:
            chain = create_entity_instance_extraction_chain()
            result = chain.invoke({
                "chunk": chunk.page_content,
                "entity_schema": json.dumps(entity_schema, indent=2),
                "chunk_id": chunk_id
            })
            logger.info(f"Successfully processed chunk {chunk_id}")
            return result
        except Exception as e:
            if attempt < max_retries - 1:
                logger.warning(f"Attempt {attempt + 1} failed for chunk {chunk_id}. Retrying... Error: {str(e)[:100]}...")
            else:
                logger.error(f"All {max_retries} attempts failed for chunk {chunk_id}. Error: {str(e)[:100]}...")
                return {
                    "entities": [],
                    "chunk_id": chunk_id,
                    "error": str(e)[:200]
                }

In [41]:
# Main function to extract entity instances in parallel
async def extract_entity_instances(document, entity_schema, max_concurrency=5, chunk_size=5000, chunk_overlap=500):
    """
    Extract entity instances from document chunks in parallel using asyncio.
    
    Args:
        document: The full text document
        entity_schema: Dictionary of entity types and their properties
        max_concurrency: Maximum number of chunks to process in parallel
        chunk_size: Size of each chunk in characters
        chunk_overlap: Overlap between chunks in characters
        
    Returns:
        list: List of entity instances extracted from all chunks
    """
    start_time = time.time()
    logger.info("Beginning entity instance extraction (async)")
    
    # Split the document into chunks
    chunks = split_text_into_chunks(document, chunk_size, chunk_overlap)
    logger.info(f"Document split into {len(chunks)} chunks")
    
    # Create a semaphore to limit concurrency
    semaphore = asyncio.Semaphore(max_concurrency)
    
    # Process chunks with limited concurrency
    async def process_with_semaphore(chunk):
        async with semaphore:
            inputs = {"chunk": chunk, "entity_schema": entity_schema}
            return process_chunk(inputs)
    
    # Create tasks for all chunks
    tasks = [process_with_semaphore(chunk) for chunk in chunks]
    
    # Process chunks and collect results with progress tracking
    all_results = []
    total_chunks = len(chunks)
    completed = 0
    
    # Use as_completed to process tasks as they finish
    for future in asyncio.as_completed(tasks):
        result = await future
        all_results.append(result)
        
        # Log progress
        completed += 1
        logger.info(f"Completed {completed}/{total_chunks} chunks ({completed/total_chunks*100:.1f}%)")
        
        # Save intermediate results
        if completed % 10 == 0 or completed == total_chunks:
            with open(f"intermediate_results_{completed}.json", "w") as f:
                json.dump(all_results, f, indent=2)
    
    end_time = time.time()
    logger.info(f"Entity instance extraction completed in {end_time - start_time:.2f} seconds")
    
    return all_results

In [42]:
# Function to merge all results from chunks
def merge_chunk_results(chunk_results):
    """
    Merge the results from all chunks.
    
    Args:
        chunk_results: List of extraction results from chunks
        
    Returns:
        list: Combined list of entity instances
    """
    merged_results = []
    
    # Check for errors
    errors = [r for r in chunk_results if r.get("error")]
    if errors:
        logger.warning(f"{len(errors)} chunks had errors during processing")
    
    # Group by entity type
    entity_instances = {}
    for result in chunk_results:
        if "entities" not in result:
            continue
            
        for entity_data in result["entities"]:
            entity_type = entity_data.get("Entity")
            instances = entity_data.get("Instances", [])
            
            if entity_type not in entity_instances:
                entity_instances[entity_type] = []
                
            entity_instances[entity_type].extend(instances)
    
    # Convert to the expected format
    for entity_type, instances in entity_instances.items():
        merged_results.append({
            "Entity": entity_type,
            "Instances": instances
        })
    
    logger.info(f"Merged results for {len(entity_instances)} entity types")
    return merged_results

In [43]:
# Load the entity schema from the previous step
with open("entity_schema.json", "r") as f:
    entity_schema_data = json.load(f)
    entity_schema = entity_schema_data.get("entities", {})

# Print the entity schema
print("Using Entity Schema:")
for entity_type, properties in entity_schema.items():
    print(f"\n{entity_type}:")
    print(f"  Properties: {', '.join(properties)}")

# Execute entity instance extraction
async def run_entity_extraction():
    # Extract entity instances from chunks
    chunk_results = await extract_entity_instances(
        document=doc,
        entity_schema=entity_schema,
        max_concurrency=5,
        chunk_size=5000,
        chunk_overlap=500
    )
    
    # Merge results from all chunks
    merged_results = merge_chunk_results(chunk_results)
    
    # Save the merged results for the next step
    with open("entity_instances_raw.json", "w") as f:
        json.dump(merged_results, f, indent=2)
    
    print(f"\nExtracted {sum(len(entity['Instances']) for entity in merged_results)} instances across {len(merged_results)} entity types")
    
    # Print sample results
    print("\nSample instances:")
    for entity in merged_results:
        entity_type = entity["Entity"]
        instances = entity["Instances"]
        print(f"\n{entity_type} ({len(instances)} instances):")
        for i, instance in enumerate(instances[:3]):  # Show up to 3 instances per type
            print(f"  Instance {i+1}: {instance}")
        if len(instances) > 3:
            print(f"  ... and {len(instances) - 3} more")
    
    return merged_results


Using Entity Schema:

Computing Concept:
  Properties: definition, characteristics, use_cases, limitations, aspects, related_concepts

System Architecture:
  Properties: description, characteristics, components, use_cases, comparison_aspects

Platform:
  Properties: overview, purpose, architecture, components, service_offerings, deployment_aspects, management_aspects, security_aspects, scalability_aspects, reliability_aspects, cost_aspects, features

Resource:
  Properties: description, characteristics, management_aspects, lifecycle_aspects, allocation_aspects, pricing_aspects, type

Storage Type:
  Properties: description, characteristics, use_cases, pricing_models, management_aspects

Database Type:
  Properties: description, characteristics, use_cases, management_aspects, migration_aspects

Network Entity:
  Properties: definition, purpose, characteristics, components, management_aspects, security_aspects, type

Service Model:
  Properties: definition, characteristics, responsibilit

In [44]:
# Run the extraction
entity_instances = await run_entity_extraction()

2025-04-27 12:22:37,679 - INFO - Beginning entity instance extraction (async)
2025-04-27 12:22:37,680 - INFO - Splitting text into chunks (size=5000, overlap=500)
2025-04-27 12:22:37,686 - INFO - Text split into 11 chunks
2025-04-27 12:22:37,687 - INFO - Document split into 11 chunks
2025-04-27 12:23:18,572 - INFO - Successfully processed chunk 2
2025-04-27 12:24:08,679 - INFO - Successfully processed chunk 7
2025-04-27 12:25:21,623 - INFO - Successfully processed chunk 8
2025-04-27 12:26:25,472 - INFO - Successfully processed chunk 9
2025-04-27 12:26:55,200 - INFO - Successfully processed chunk 1
2025-04-27 12:27:30,953 - INFO - Successfully processed chunk 0
2025-04-27 12:28:33,783 - INFO - Successfully processed chunk 4
2025-04-27 12:29:44,732 - INFO - Successfully processed chunk 3
2025-04-27 12:30:20,885 - INFO - Successfully processed chunk 5
2025-04-27 12:31:02,915 - INFO - Successfully processed chunk 6
2025-04-27 12:32:08,318 - INFO - Successfully processed chunk 10
2025-04-27


Extracted 743 instances across 12 entity types

Sample instances:

Computing Concept (248 instances):
  Instance 1: {'characteristics': 'prefers symmetric resources (same type of OS, servers, configurations, etc.)', 'aspects': 'simplifies management and performance', 'related_concepts': ['symmetric resources', 'Asymmetric resources']}
  Instance 2: {'definition': 'allows creation of virtual objects from physical resources', 'aspects': 'not cloud computing, but it is a key enabler', 'characteristics': 'Supports resource pooling', 'related_concepts': ['resource pooling', 'elasticity', 'workload migration']}
  Instance 3: {'related_concepts': ['Virtualization']}
  ... and 245 more

System Architecture (37 instances):
  Instance 1: {'description': 'A design approach used for cloud services', 'related_concepts': ['Service Orientation']}
  Instance 2: {'characteristics': 'Multiple users can use AWS, each assigned their own VPC'}
  Instance 3: {'description': 'VPC → Virtual Machines (VMs) → 

# Step 2.2

In [23]:
def extract_entity_instances_parallel(document, entity_schema, max_concurrency=6):
    """
    Extract entity instances from document chunks in parallel using RunnableParallel.
    
    Args:
        document: The full text document
        entity_schema: Dictionary of entity types and their properties
        max_concurrency: Maximum number of chunks to process in parallel
        
    Returns:
        list: List of entity instances extracted from all chunks
    """
    start_time = time.time()
    logger.info("Beginning entity instance extraction (parallel)")
    
    # Split the document into chunks
    chunks = split_text_into_chunks(document, chunk_size=5000, chunk_overlap=500)
    logger.info(f"Document split into {len(chunks)} chunks")
    
    # Prepare inputs for each chunk
    inputs = [{"chunk": chunk, "entity_schema": entity_schema} for chunk in chunks]
    
    # Create a RunnableLambda for chunk processing
    chunk_processor = RunnableLambda(process_chunk)
    
    # Process chunks in batches with progress tracking
    all_results = []
    batch_size = min(max_concurrency, len(chunks))
    
    # Use tqdm for progress tracking in batches
    with tqdm(total=len(chunks), desc="Processing chunks") as progress_bar:
        for i in range(0, len(inputs), batch_size):
            batch_inputs = inputs[i:i+batch_size]
            
            # Process the batch in parallel
            batch_results = chunk_processor.batch(batch_inputs, config={"max_concurrency": max_concurrency})
            all_results.extend(batch_results)
            
            # Update progress bar
            progress_bar.update(len(batch_inputs))
            
            # Save intermediate results
            if i + batch_size >= len(inputs) or (i > 0 and i % 20 == 0):
                with open(f"intermediate_results_{i + len(batch_inputs)}.json", "w") as f:
                    json.dump(all_results, f, indent=2)
    
    end_time = time.time()
    logger.info(f"Entity instance extraction completed in {end_time - start_time:.2f} seconds")
    
    return all_results

In [24]:
def merge_chunk_results(chunk_results):
    """
    Merge the results from all chunks.
    
    Args:
        chunk_results: List of extraction results from chunks
        
    Returns:
        list: Combined list of entity instances
    """
    merged_results = []
    
    # Check for errors
    errors = [r for r in chunk_results if r.get("error")]
    if errors:
        logger.warning(f"{len(errors)} chunks had errors during processing")
    
    # Group by entity type
    entity_instances = {}
    for result in chunk_results:
        if "entities" not in result:
            continue
            
        for entity_data in result["entities"]:
            entity_type = entity_data.get("Entity")
            instances = entity_data.get("Instances", [])
            
            if entity_type not in entity_instances:
                entity_instances[entity_type] = []
                
            entity_instances[entity_type].extend(instances)
    
    # Convert to the expected format
    for entity_type, instances in entity_instances.items():
        merged_results.append({
            "Entity": entity_type,
            "Instances": instances
        })
    
    logger.info(f"Merged results for {len(entity_instances)} entity types")
    return merged_results

In [25]:
# Load the entity schema
with open("entity_schema.json", "r") as f:
    entity_schema_data = json.load(f)
    entity_schema = entity_schema_data.get("entities", {})


# Extract entity instances using parallel processing
chunk_results = extract_entity_instances_parallel(
    document=doc,
    entity_schema=entity_schema,
    max_concurrency=6  # Process 6 chunks in parallel
)

# Merge results from all chunks
merged_results = merge_chunk_results(chunk_results)

# Save the merged results for the next step
with open("entity_instances_raw.json", "w") as f:
    json.dump(merged_results, f, indent=2)

print(f"\nExtracted {sum(len(entity['Instances']) for entity in merged_results)} instances across {len(merged_results)} entity types")

# Print sample results
print("\nSample instances:")
for entity in merged_results:
    entity_type = entity["Entity"]
    instances = entity["Instances"]
    print(f"\n{entity_type} ({len(instances)} instances):")
    for i, instance in enumerate(instances[:3]):  # Show up to 3 instances per type
        print(f"  Instance {i+1}: {instance}")
    if len(instances) > 3:
        print(f"  ... and {len(instances) - 3} more")

2025-04-27 13:41:33,107 - INFO - Beginning entity instance extraction (parallel)
2025-04-27 13:41:33,108 - INFO - Splitting text into chunks (size=5000, overlap=500)
2025-04-27 13:41:33,114 - INFO - Text split into 11 chunks
2025-04-27 13:41:33,115 - INFO - Document split into 11 chunks


Processing chunks:   0%|          | 0/11 [00:00<?, ?it/s]

2025-04-27 13:42:04,279 - INFO - Successfully processed chunk 1
2025-04-27 13:42:11,523 - INFO - Successfully processed chunk 0
2025-04-27 13:42:12,641 - INFO - Successfully processed chunk 5
2025-04-27 13:42:18,263 - INFO - Successfully processed chunk 2
2025-04-27 13:42:37,020 - INFO - Successfully processed chunk 4
2025-04-27 13:42:44,575 - INFO - Successfully processed chunk 3
2025-04-27 13:43:24,298 - INFO - Successfully processed chunk 6
2025-04-27 13:43:35,297 - INFO - Successfully processed chunk 7
2025-04-27 13:43:43,510 - INFO - Successfully processed chunk 9
2025-04-27 13:43:53,806 - INFO - Successfully processed chunk 10
2025-04-27 13:44:00,099 - INFO - Successfully processed chunk 8
2025-04-27 13:44:00,163 - INFO - Entity instance extraction completed in 147.06 seconds
2025-04-27 13:44:00,164 - INFO - Merged results for 12 entity types



Extracted 743 instances across 12 entity types

Sample instances:

Computing Concept (248 instances):
  Instance 1: {'definition': 'involves executing multiple processes simultaneously to enhance speed and efficiency', 'use_cases': ['Vector processing', 'Image processing', 'Matrix multiplication'], 'limitations': ['Not all applications can be parallelized', 'Some components of code can be executed in parallel, while others may not', 'Specific programming languages are required for parallel computing']}
  Instance 2: {}
  Instance 3: {}
  ... and 245 more

System Architecture (37 instances):
  Instance 1: {'definition': 'a system where computing resources are distributed across multiple locations rather than being centralized in a single system', 'characteristics': ['allow for hardware scaling']}
  Instance 2: {}
  Instance 3: {'use_cases': ['example of distributed computing']}
  ... and 34 more

Resource (114 instances):
  Instance 1: {}
  Instance 2: {}
  Instance 3: {}
  ... and 111