In [1]:
# =============================================================================
# IMPORTS
# =============================================================================

import os
import uuid
from typing import List, Optional, Dict, Any

import numpy as np
import pandas as pd
from dotenv import load_dotenv
from pydantic import BaseModel, Field

import google.generativeai as genai
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from langchain_core.embeddings import Embeddings
from langchain_core.documents import Document
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# =============================================================================
# CONFIGURATION
# =============================================================================

# File paths
INPUT_DIR = "../data"
INPUT_FILE = "LFC.pdf"
OUTPUT_DIR = "../output"
CHROMA_DB_PATH = "../vectorstores"

# Create directories if they don't exist
os.makedirs(INPUT_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(CHROMA_DB_PATH, exist_ok=True)

# Configure API
load_dotenv()
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
genai.configure(api_key=GEMINI_API_KEY)

# File paths
input_filepath = os.path.join(INPUT_DIR, INPUT_FILE)
output_filepath = os.path.join(OUTPUT_DIR, f"{os.path.splitext(INPUT_FILE)[0].lower()}_extracted_data.json")
CHROMA_DB_PATH = os.path.join(CHROMA_DB_PATH, f"{os.path.splitext(INPUT_FILE)[0].lower()}_report_vectorstore")


In [3]:
# =============================================================================
# PYDANTIC MODELS FOR STRUCTURED OUTPUT
# =============================================================================

class AnswerWithSources(BaseModel):
    """An answer to a specific question or extracted piece of information, with sources and reasoning."""
    answer: str = Field(description="The extracted answer or information.")
    sources: str = Field(description="Full direct text chunk from the context used to extract this information.")
    reasoning: str = Field(description="Explanation of how the answer was derived from the provided sources.")


class ConductedActionDetails(BaseModel):
    """Details for a specific conducted environmental action category."""
    progress_achieved_for_period: AnswerWithSources = Field(
        description="Progress achieved for the reporting period regarding this environmental topic."
    )
    actual_targets_set_for_period: AnswerWithSources = Field(
        description="Actual targets set for the reporting period regarding this environmental topic."
    )
    reasons_for_achieved_progress: AnswerWithSources = Field(
        description="Reason(s) for the achieved progress regarding this environmental topic."
    )


class ConductedActionCategory(BaseModel):
    """A specific category of conducted environmental action (e.g., Total Emissions)."""
    category_name: str = Field(
        description="The name of the environmental category (e.g., 'Total emissions', 'Total energy consumption')."
    )
    details: ConductedActionDetails = Field(
        description="Detailed progress, targets, and reasons for this category."
    )


class ConductedActionsSection(BaseModel):
    """The main section for conducted environmental actions conducted during the reporting period."""
    categories_data: List[ConductedActionCategory] = Field(
        description="A list of specific environmental categories with their respective progress, targets, and reasons."
    )


class PlannedActionDetails(BaseModel):
    """Details for a specific planned environmental action category."""
    planned_or_targeted_progress: AnswerWithSources = Field(
        description="Planned or targeted progress for the upcoming period regarding this environmental topic."
    )
    planned_actions_to_achieve_target: AnswerWithSources = Field(
        description="Planned action(s) to achieve the target(s) for the upcoming period regarding this environmental topic."
    )


class PlannedActionCategory(BaseModel):
    """A specific category of planned environmental action (e.g., Total Emissions)."""
    category_name: str = Field(
        description="The name of the planned environmental category (e.g., 'Total emissions', 'Total energy consumption')."
    )
    details: PlannedActionDetails = Field(
        description="Detailed planned progress and actions for this category."
    )


class PlannedActionsSection(BaseModel):
    """The main section for environmental actions planned for the next reporting period."""
    categories_data: List[PlannedActionCategory] = Field(
        description="A list of specific environmental categories with their respective planned progress and actions."
    )

    
class EnvironmentalCampaign(BaseModel):
    """Information extracted about a specific environmental campaign or activity."""
    name_of_campaign_activity: AnswerWithSources = Field(
        description="Name of the environmental campaign or activity."
    )
    purpose_and_goal: AnswerWithSources = Field(
        description="Purpose and aspired goal of the campaign/activity."
    )
    description_and_scope: AnswerWithSources = Field(
        description="Description and scope of the campaign."
    )
    qualitative_quantitative_results: AnswerWithSources = Field(
        description="Qualitative and quantitative results of the campaign."
    )
    environmental_issues_addressed: AnswerWithSources = Field(
        description="Environmental issues addressed by the campaign."
    )
    name_of_collaboration_partners: AnswerWithSources = Field(
        description="Names of collaboration partners (e.g., fan club, local community, charity, foundation, sponsorship partner)."
    )
    number_of_potential_individuals_reached: AnswerWithSources = Field(
        description="Number of potential individuals reached by the campaign."
    )
    sdgs_addressed: List[AnswerWithSources] = Field(
        description="Which of the 17 Sustainable Development Goals (SDGs) have been addressed by the campaign."
    )


class EnvironmentalCampaignsList(BaseModel):
    """List of environmental campaigns extracted from the report."""
    campaigns: List[EnvironmentalCampaign] = Field(
        description="A list of environmental campaigns and activities identified in the report."
    )


class ESGReportExtractedInfo(BaseModel):
    """Comprehensive extracted information from an ESG report."""
    environmental_campaigns: List[EnvironmentalCampaign] = Field(
        description="A list of environmental campaigns and activities identified in the report."
    )
    environmental_actions_conducted: Optional[ConductedActionsSection] = Field(
        default=None,
        description="Detailed information about environmental actions conducted during the reporting period."
    )
    environmental_actions_planned: Optional[PlannedActionsSection] = Field(
        default=None,
        description="Detailed information about environmental actions planned for the next reporting period."
    )


In [4]:
# =============================================================================
# EMBEDDING FUNCTIONS
# =============================================================================

def get_gemini_embedding_function():
    """Returns a function that embeds text using Gemini's embedding model."""
    def embed_text(text: str, task_type: str = "SEMANTIC_SIMILARITY") -> Optional[np.ndarray]:
        """Embeds a single string using the Gemini embedding model."""
        try:
            response = genai.embed_content(
                model="models/embedding-001",
                content=text,
                task_type=task_type
            )
            return np.array(response['embedding'])
        except Exception as e:
            print(f"Error embedding text with Gemini: {e}")
            return None
    return embed_text


class CustomGeminiEmbeddings(Embeddings):
    """
    A LangChain Embeddings class wrapper for the Gemini embedding function.
    This allows the Gemini embedder to be used with LangChain's vector stores.
    """
    def __init__(self, gemini_embed_func):
        self.gemini_embed_func = gemini_embed_func

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """
        Embeds a list of documents for storage in a vector store.
        Uses "RETRIEVAL_DOCUMENT" task type for optimal retrieval performance.
        """
        embeddings_list = []
        for text in texts:
            embedding = self.gemini_embed_func(text, task_type="RETRIEVAL_DOCUMENT")
            if embedding is not None:
                embeddings_list.append(embedding.tolist())
            else:
                # Handle cases where embedding fails
                print(f"Warning: Embedding failed for a document. Using a zero vector.")
                embeddings_list.append([0.0] * 768)  # Assuming 768 is the embedding dimension
        return embeddings_list

    def embed_query(self, text: str) -> List[float]:
        """
        Embeds a single query string for similarity search.
        Uses "RETRIEVAL_QUERY" task type.
        """
        embedding = self.gemini_embed_func(text, task_type="RETRIEVAL_QUERY")
        if embedding is not None:
            return embedding.tolist()
        else:
            print(f"Warning: Embedding failed for the query. Using a zero vector.")
            return [0.0] * 768  # Assuming 768 is the embedding dimension


In [5]:
# =============================================================================
# VECTOR STORE FUNCTIONS
# =============================================================================

def create_vectorstore(chunks: List[Document], embedding_function_instance: Embeddings, vectorstore_path: str):
    """
    Creates or loads a Chroma vector store from a list of document chunks.
    Ensures uniqueness of documents based on content hash.
    """
    print(f"Attempting to create/load vector store at: {vectorstore_path}")

    # Generate unique IDs for each document based on its content
    ids = [str(uuid.uuid5(uuid.NAMESPACE_DNS, doc.page_content)) for doc in chunks]

    # Filter out duplicate chunks based on their generated IDs
    unique_ids = set()
    unique_chunks = []
    for chunk, doc_id in zip(chunks, ids):
        if doc_id not in unique_ids:
            unique_ids.add(doc_id)
            unique_chunks.append(chunk)

    print(f"Found {len(unique_chunks)} unique chunks out of {len(chunks)} total.")

    # Create or load the Chroma database
    vectorstore = Chroma.from_documents(
        documents=unique_chunks,
        ids=list(unique_ids),
        embedding=embedding_function_instance,
        persist_directory=vectorstore_path
    )
    
    print(f"Vector store created/loaded and persisted successfully at: {vectorstore_path}")
    return vectorstore


def format_docs(docs):
    """Format documents for RAG context."""
    return "\n\n".join(doc.page_content for doc in docs)

In [6]:
# =============================================================================
# DOCUMENT PROCESSING
# =============================================================================

def load_and_split_documents(file_path: str) -> List[Document]:
    """Load PDF and split into chunks."""
    print(f"Loading document: {file_path}")
    
    loader = PyPDFLoader(file_path)
    pages = loader.load()
    
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1500,
        chunk_overlap=200,
        length_function=len,
        separators=["\n\n", "\n", " "]
    )
    
    chunks = text_splitter.split_documents(pages)
    print(f"Document split into {len(chunks)} chunks")
    
    return chunks


In [7]:
# =============================================================================
# RAG SETUP AND QUERIES
# =============================================================================

def setup_rag_chains(vectorstore, llm):
    """Set up RAG chains for different extraction tasks."""
    
    # Create retriever with optimized parameters
    retriever = vectorstore.as_retriever(
        search_type="similarity", 
        search_kwargs={"k": 5}  # Increased from 3 to 5 for better context coverage
    )
    
    # Define optimized RAG prompt template
    rag_prompt_template = """You are an expert ESG (Environmental, Social, Governance) data extraction specialist with deep knowledge of sustainability reporting standards, environmental metrics, and corporate responsibility frameworks.

## TASK OVERVIEW:
Your primary objective is to extract precise, structured information from ESG reports and sustainability documents. You must analyze the provided context thoroughly and extract relevant data according to the specified Pydantic model structure.

## EXTRACTION PRINCIPLES:
1. **ACCURACY FIRST**: Base all responses strictly on the provided context. Never invent, assume, or extrapolate information not present in the source material.

2. **COMPREHENSIVE ANALYSIS**: 
   - Examine ALL provided context chunks thoroughly
   - Look for information across different sections, pages, and document parts
   - Consider synonyms, alternative terminology, and related concepts
   - Connect information that may be scattered across multiple paragraphs

3. **DIRECT EVIDENCE REQUIREMENT**:
   - For each extracted piece of information, provide the EXACT source text that supports your answer
   - Use direct quotes whenever possible, preserving original wording and formatting
   - When paraphrasing is necessary, stay as close to the original language as possible

4. **STRUCTURED OUTPUT COMPLIANCE**:
   - Follow the Pydantic model structure exactly as specified
   - Ensure all required fields are populated appropriately
   - Use "Information not available in the provided context" for missing data rather than leaving fields empty

5. **NUMERICAL DATA PRECISION**:
   - Extract exact figures, percentages, dates, and metrics when available
   - Preserve units of measurement, currency symbols, and time periods
   - Note any baseline years, comparison periods, or contextual qualifiers

6. **REASONING TRANSPARENCY**:
   - Clearly explain how each answer was derived from the source material
   - Show logical connections between questions and relevant context sections
   - Highlight any assumptions or interpretations made during extraction

## CONTEXT ANALYSIS GUIDELINES:
- **Multiple Perspectives**: Consider how information might be presented in different formats (tables, charts, narrative text, bullet points)
- **Cross-References**: Look for information that spans multiple sections or references other parts of the document
- **Implicit Information**: Extract information that might be implied or stated indirectly
- **Temporal Context**: Pay attention to reporting periods, historical comparisons, and future projections

## QUALITY ASSURANCE:
- **Consistency Check**: Ensure extracted information aligns logically across all fields
- **Completeness Review**: Verify that all available relevant information has been captured
- **Accuracy Validation**: Double-check that source quotes exactly match the original text

## HANDLING MISSING INFORMATION:
If specific information is not available in the context:
- State explicitly: "This information is not available in the provided context"
- Do not guess, estimate, or use general knowledge to fill gaps
- Provide reasoning for why the information might be missing or where it might typically be found

---

## DOCUMENT CONTEXT:
{context}

---

## EXTRACTION REQUEST:
{question}

---

## RESPONSE INSTRUCTIONS:
Analyze the above context thoroughly and extract the requested information following the Pydantic model structure. Ensure every piece of extracted data is supported by direct evidence from the provided context."""
    
    rag_prompt = ChatPromptTemplate.from_template(rag_prompt_template)
    
    # Create chains for different extraction tasks
    conducted_actions_chain = (
        {"context": retriever | format_docs, "question": RunnablePassthrough()}
        | rag_prompt
        | llm.with_structured_output(ConductedActionsSection, strict=True)
    )
    
    planned_actions_chain = (
        {"context": retriever | format_docs, "question": RunnablePassthrough()}
        | rag_prompt
        | llm.with_structured_output(PlannedActionsSection, strict=True)
    )
    
    environmental_campaigns_chain = (
        {"context": retriever | format_docs, "question": RunnablePassthrough()}
        | rag_prompt
        | llm.with_structured_output(EnvironmentalCampaignsList, strict=True)
    )
    
    return conducted_actions_chain, planned_actions_chain, environmental_campaigns_chain

def get_extraction_questions():
    """Define optimized questions for different extraction tasks."""
    
    conducted_actions_question = """
    Analyze the ESG report and extract detailed information about CONDUCTED environmental actions during the current reporting period.
    
    For each of these environmental categories, identify:
    - Total emissions (including Scope 1, 2, and 3 emissions, carbon footprint, GHG emissions)
    - Total business travel (including team travel, staff travel, transportation)
    - Total energy consumption (including electricity, gas, fuel, renewable energy usage)
    - Total water consumption (including water usage, conservation efforts)
    - Total paper consumption (including digital transformation efforts, paperless initiatives)
    - Total waste generation (including waste reduction, recycling, circular economy efforts)
    
    For EACH category found, extract:
    1. PROGRESS ACHIEVED: What specific measurable progress was made? Include quantitative metrics (percentages, absolute numbers, comparisons to baselines or previous years).
    2. ACTUAL TARGETS: What were the specific targets or goals set for this reporting period? Include numerical targets and timeframes.
    3. REASONS FOR PROGRESS: What specific actions, initiatives, or factors led to the achieved progress? Include implementation details and methodologies.
    
    Important guidelines:
    - Look for synonyms and related terms (e.g., "carbon footprint" for emissions, "energy efficiency" for energy consumption)
    - Extract exact figures, percentages, and metrics when available
    - Include both absolute and relative improvements
    - Note any certifications, standards, or methodologies mentioned
    - If no information is found for a category, clearly state "No conducted actions reported for this category"
    """
    
    planned_actions_question = """
    Analyze the ESG report and extract detailed information about PLANNED environmental actions for future reporting periods.
    
    For each of these environmental categories, identify future plans and commitments:
    - Total emissions (including net-zero commitments, carbon reduction targets, offset plans)
    - Total business travel (including travel reduction policies, sustainable transport initiatives)
    - Total energy consumption (including renewable energy transitions, efficiency improvements)
    - Total water consumption (including conservation targets, efficiency measures)
    - Total paper consumption (including digitization plans, sustainable sourcing)
    - Total waste generation (including zero-waste goals, circular economy initiatives)
    
    For EACH category found, extract:
    1. PLANNED/TARGETED PROGRESS: What specific measurable targets are set for future periods? Include numerical goals, percentages, and clear timeframes (e.g., by 2025, 2030, 2040).
    2. PLANNED ACTIONS: What specific initiatives, strategies, or actions will be implemented to achieve these targets? Include implementation timelines, responsible parties, and methodologies.
    
    Important guidelines:
    - Focus on future-oriented language (will, plan to, target, aim, commit to, by [year])
    - Extract specific numerical targets and deadlines
    - Look for strategic roadmaps, action plans, and investment commitments
    - Include any mentions of standards, certifications, or frameworks to be adopted
    - Note partnerships or collaborations planned for achieving targets
    - If no future plans are mentioned for a category, clearly state "No planned actions reported for this category"
    """
    
    environmental_campaigns_question = """
    Comprehensively analyze the ESG report to identify and extract detailed information about ALL environmental campaigns, initiatives, activities, and programs mentioned.
    
    Look for various types of environmental activities including:
    - Awareness campaigns and educational programs
    - Community engagement initiatives
    - Sustainability programs and projects
    - Environmental partnerships and collaborations
    - Green initiatives and eco-friendly activities
    - Climate action programs
    - Conservation efforts and biodiversity projects
    - Stakeholder engagement on environmental issues
    - Environmental advocacy and policy initiatives
    - Green technology implementations
    
    For EACH campaign/activity identified, extract comprehensive details:
    
    1. NAME: The specific name or title of the campaign/activity. If no formal name exists, create a descriptive title based on the content.
    
    2. PURPOSE & GOAL: The intended objectives, mission, and aspirational outcomes. What environmental impact is the campaign trying to achieve?
    
    3. DESCRIPTION & SCOPE: Detailed description of what the campaign involves, its scope, duration, geographic reach, and key components or phases.
    
    4. RESULTS: Both qualitative and quantitative outcomes, impacts, and achievements. Include metrics, participation numbers, environmental benefits, and success indicators.
    
    5. ENVIRONMENTAL ISSUES: Specific environmental challenges, problems, or areas addressed (e.g., climate change, biodiversity loss, pollution, resource depletion).
    
    6. COLLABORATION PARTNERS: Names of all partners involved, including:
       - Fan clubs and supporter groups
       - Local communities and neighborhoods
       - Charities and non-profit organizations
       - Foundations and environmental groups
       - Sponsorship and business partners
       - Government agencies and institutions
       - Academic institutions and schools
    
    7. REACH: Number of people, communities, or stakeholders potentially reached or engaged by the campaign.
    
    8. SDGs ADDRESSED: Which of the 17 UN Sustainable Development Goals are specifically addressed, with explanations of how the campaign contributes to each goal.
    
    Important guidelines:
    - Don't miss smaller initiatives or brief mentions of environmental activities
    - Look for both formal programs and informal/ad-hoc environmental efforts
    - Extract information even if incomplete - capture what's available
    - Consider the full lifecycle of campaigns from planning to evaluation
    - Include both internal (employee-focused) and external (community-focused) campaigns
    - Look for environmental messaging, communications, and awareness efforts
    """
    
    return conducted_actions_question, planned_actions_question, environmental_campaigns_question



In [8]:
# =============================================================================
# MAIN EXECUTION
# =============================================================================

def main():
    """Main execution function."""
    print("=" * 80)
    print("ESG DATA EXTRACTION PIPELINE")
    print("=" * 80)
    
    # 1. Load and process documents
    print("\n1. Loading and processing documents...")
    chunks = load_and_split_documents(input_filepath)
    
    # 2. Initialize embedding function
    print("\n2. Initializing embedding function...")
    raw_gemini_embedder = get_gemini_embedding_function()
    custom_langchain_embeddings = CustomGeminiEmbeddings(raw_gemini_embedder)
    
    # 3. Create/load vector store
    print("\n3. Creating/loading vector store...")
    vectorstore = create_vectorstore(
        chunks=chunks,
        embedding_function_instance=custom_langchain_embeddings,
        vectorstore_path=CHROMA_DB_PATH
    )
    
    # 4. Initialize LLM and setup RAG chains
    print("\n4. Setting up RAG chains...")
    llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0.2, google_api_key=GEMINI_API_KEY)
    
    conducted_actions_chain, planned_actions_chain, environmental_campaigns_chain = setup_rag_chains(vectorstore, llm)
    
    # 5. Get extraction questions
    conducted_actions_question, planned_actions_question, environmental_campaigns_question = get_extraction_questions()
    
    # 6. Extract information
    print("\n5. Extracting information...")
    print("   - Extracting conducted actions...")
    extracted_conducted_actions = conducted_actions_chain.invoke(conducted_actions_question)
    
    print("   - Extracting planned actions...")
    extracted_planned_actions = planned_actions_chain.invoke(planned_actions_question)
    
    print("   - Extracting environmental campaigns...")
    extracted_campaigns = environmental_campaigns_chain.invoke(environmental_campaigns_question)
    
    # 7. Combine results
    print("\n6. Combining results...")
    extracted_esg_data = ESGReportExtractedInfo(
        environmental_actions_conducted=extracted_conducted_actions,
        environmental_actions_planned=extracted_planned_actions,
        environmental_campaigns=extracted_campaigns.campaigns
    )
    
    # 8. Save results
    print("\n7. Saving results...")
    try:
        json_output = extracted_esg_data.model_dump_json(indent=2)
        
        with open(output_filepath, 'w', encoding='utf-8') as f:
            f.write(json_output)
        
        print(f"✓ Data successfully saved to: {output_filepath}")
        print(f"✓ Extracted {len(extracted_esg_data.environmental_campaigns)} environmental campaigns")
        print(f"✓ Extracted {len(extracted_esg_data.environmental_actions_conducted.categories_data) if extracted_esg_data.environmental_actions_conducted else 0} conducted action categories")
        print(f"✓ Extracted {len(extracted_esg_data.environmental_actions_planned.categories_data) if extracted_esg_data.environmental_actions_planned else 0} planned action categories")
        
    except Exception as e:
        print(f"❌ Error saving results: {e}")
        return False
    
    print("\n" + "=" * 80)
    print("EXTRACTION COMPLETED SUCCESSFULLY!")
    print("=" * 80)
    
    return True


if __name__ == "__main__":
    try:
        success = main()
        if not success:
            exit(1)
    except Exception as e:
        print(f"❌ Error during execution: {e}")
        exit(1)

ESG DATA EXTRACTION PIPELINE

1. Loading and processing documents...
Loading document: data/LFC.pdf
Document split into 62 chunks

2. Initializing embedding function...

3. Creating/loading vector store...
Attempting to create/load vector store at: ./lfc_report_vectorstore
Found 62 unique chunks out of 62 total.
Vector store created/loaded and persisted successfully at: ./lfc_report_vectorstore

4. Setting up RAG chains...

5. Extracting information...
   - Extracting conducted actions...
   - Extracting planned actions...
   - Extracting environmental campaigns...

6. Combining results...

7. Saving results...
✓ Data successfully saved to: output/lfc_extracted_data.json
✓ Extracted 12 environmental campaigns
✓ Extracted 6 conducted action categories
✓ Extracted 6 planned action categories

EXTRACTION COMPLETED SUCCESSFULLY!
