<a href="https://colab.research.google.com/github/esemsc-rw1024/irp-rw1024-public/blob/main/KPI_extraction_Jul25.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [176]:
"""
extract_sustainability_kpi.py
==================================
Automatically extract KPI sentences/table rows from Sustainability Report PDF
and compare with manual KPI annotations
--------------------------------------------------
1. pdfplumber extracts text + tables
2. Camelot supplements complex table parsing (optional)
3. Chunking to control tokens
4. OpenAI ChatCompletion API call (GPT-4o / GPT-4 / GPT-3.5)
5. Aggregate, deduplicate, and export to auto_kpi.xlsx
6. Compare with manual_kpi.xlsx for differences
"""



In [177]:
!pip install -q openai

In [178]:
import openai

In [179]:
!pip install openai python-dotenv pdfplumber tiktoken pandas
!sudo apt-get update -y
!sudo apt-get install -y ghostscript
!pip install "camelot-py[cv]"
!pip install PyMuPDF Pillow
!pip install -q transformers pillow torchvision

Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:3 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:6 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:7 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Reading package lists... Done
Building dependency tree... Done
Reading

In [180]:
import os, re, json, time, textwrap, argparse, logging
import pdfplumber, pandas as pd, tiktoken
from openai import OpenAI
from dotenv import load_dotenv
from typing import List, Dict, Optional, Set, Tuple
from pathlib import Path
from difflib import SequenceMatcher
import base64
from io import BytesIO
from PIL import Image
import fitz  # PyMuPDF
import numpy as np
from transformers import CLIPProcessor, CLIPModel


In [181]:
# ----------------------------- Configuration -----------------------------
PDF_PATH          = "/content/Global_Resource_Corp_eco1999e_2dcbgi89 (1).pdf"
MANUAL_XLSX       = "manual_kpi.xlsx"   # Leave empty if not available
EXPORT_AUTO_XLSX  = "auto_kpi.xlsx"
MODEL_NAME        = "gpt-4o"       # Adjust based on account availability
MAX_TOKENS_CHUNK  = 1500               # Token limit per chunk
SLEEP_SEC         = 0.6                # Rate limiting
ENABLE_QUALITY_VALIDATION = True       # Enable additional quality checks
# -----------------------------------------------------------------

In [182]:
# ============ 修复的初始化部分 ============
def initialize_environment():
    """Initialize the environment and API client"""
    # Load environment variables
    load_dotenv("ruojia_api_key.env")

    # Initialize OpenAI client
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise ValueError("OPENAI_API_KEY not found in environment variables!")

    client = OpenAI(api_key=api_key)

    # Initialize tokenizer
    enc = tiktoken.get_encoding("cl100k_base")

    return client, enc

# Initialize global variables
client, enc = initialize_environment()

In [183]:
# ============ 修复的PDF文本提取 ============
def pdf_to_text_and_tables(path: str) -> str:
    """Extract text paragraphs and tables using pdfplumber."""
    all_chunks = []

    if not os.path.exists(path):
        raise FileNotFoundError(f"PDF file not found: {path}")

    try:
        with pdfplumber.open(path) as pdf:
            logging.info(f"Processing PDF with {len(pdf.pages)} pages...")

            for page_num, page in enumerate(pdf.pages, 1):
                try:
                    # Extract text
                    text = page.extract_text() or ""
                    if text.strip():
                        all_chunks.append(f"PAGE_{page_num}_TEXT:\n{text}")

                    # Extract tables
                    tables = page.extract_tables()
                    for table_num, tb in enumerate(tables):
                        if tb and len(tb) > 0:
                            try:
                                # Handle table headers safely
                                if tb[0]:
                                    headers = tb[0]
                                else:
                                    headers = [f"Col_{i}" for i in range(len(tb[1]) if len(tb) > 1 else 1)]

                                rows = tb[1:] if len(tb) > 1 else []

                                if rows:
                                    df = pd.DataFrame(rows, columns=headers)
                                    # Clean DataFrame
                                    df = df.dropna(how='all')  # Remove empty rows
                                    if not df.empty:
                                        table_txt = f"TABLE_START_PAGE_{page_num}_{table_num}\n" + df.to_csv(index=False) + "\nTABLE_END"
                                        all_chunks.append(table_txt)
                            except Exception as e:
                                logging.warning(f"Error processing table on page {page_num}: {e}")
                                continue

                except Exception as e:
                    logging.warning(f"Error processing page {page_num}: {e}")
                    continue

        return "\n\n".join(all_chunks)

    except Exception as e:
        logging.error(f"Error opening PDF file: {e}")
        raise

In [184]:
# ============ 修复的Camelot表格提取 ============
def generate_table_fingerprint(df: pd.DataFrame) -> str:
    """Generate table fingerprint for deduplication"""
    try:
        fingerprint_parts = []
        fingerprint_parts.append(f"shape_{df.shape[0]}x{df.shape[1]}")

        if not df.columns.empty:
            col_names = [str(col).strip().lower().replace(' ', '') for col in df.columns]
            col_fingerprint = '_'.join(sorted(col_names))
            fingerprint_parts.append(f"cols_{hash(col_fingerprint)}")

        if df.shape[0] > 0:
            numeric_values = []
            for col in df.columns:
                for val in df[col].head(3):
                    if pd.notna(val):
                        numbers = re.findall(r'\d+\.?\d*', str(val))
                        numeric_values.extend(numbers)

            if numeric_values:
                numeric_fingerprint = hash('_'.join(sorted(numeric_values[:10])))
                fingerprint_parts.append(f"nums_{numeric_fingerprint}")

        return '_'.join(fingerprint_parts)

    except Exception as e:
        logging.warning(f"Error generating table fingerprint: {e}")
        return str(hash(df.to_csv()))

def clean_table_data_improved(df: pd.DataFrame) -> pd.DataFrame:
    """Improved table data cleaning"""
    try:
        cleaned_df = df.copy()
        cleaned_df = cleaned_df.dropna(how='all')
        cleaned_df = cleaned_df.dropna(axis=1, how='all')

        for col in cleaned_df.columns:
            if cleaned_df[col].dtype == 'object':
                cleaned_df[col] = cleaned_df[col].astype(str).str.strip()
                cleaned_df[col] = cleaned_df[col].replace(['nan', 'NaN', 'None'], '')

        if not cleaned_df.empty:
            new_columns = []
            for i, col in enumerate(cleaned_df.columns):
                col_str = str(col).strip()
                if col_str in ['nan', 'NaN', 'None', ''] or pd.isna(col):
                    new_columns.append(f'Column_{i}')
                else:
                    new_columns.append(col_str)
            cleaned_df.columns = new_columns

        cleaned_df = cleaned_df.reset_index(drop=True)
        return cleaned_df

    except Exception as e:
        logging.warning(f"Error in table cleaning: {e}")
        return df

def is_valid_table_improved(df: pd.DataFrame) -> bool:
    """Improved table validation"""
    try:
        if df.empty or df.shape[0] < 1 or df.shape[1] < 1:
            return False

        non_null_cells = 0
        total_cells = df.shape[0] * df.shape[1]

        for col in df.columns:
            for val in df[col]:
                if pd.notna(val) and str(val).strip() not in ['', 'nan', 'NaN', 'None']:
                    non_null_cells += 1

        if non_null_cells / total_cells < 0.2:
            return False

        has_meaningful_content = False
        for col in df.columns:
            text_content = ' '.join(df[col].dropna().astype(str))
            if (any(char.isdigit() for char in text_content) or
                '%' in text_content or
                any(keyword in text_content.lower() for keyword in [
                    'rate', 'percentage', 'total', 'number', 'emission', 'energy',
                    'water', 'waste', 'employee', 'year', '2020', '2021', '2022', '2023'
                ])):
                has_meaningful_content = True
                break

        return has_meaningful_content

    except Exception as e:
        logging.warning(f"Error validating table: {e}")
        return True

def format_table_output_improved(df: pd.DataFrame, table_id: str, parsing_report=None) -> str:
    """Improved table output formatting"""
    try:
        table_info = f"TABLE_START_{table_id}\n"
        table_info += f"DIMENSIONS: {df.shape[0]} rows × {df.shape[1]} columns\n"

        col_info = "COLUMNS: " + " | ".join([f"{i}:{col}" for i, col in enumerate(df.columns)])
        table_info += col_info + "\n"

        if df.shape[0] > 0:
            preview_rows = min(2, df.shape[0])
            table_info += f"PREVIEW_FIRST_{preview_rows}_ROWS:\n"
            for i in range(preview_rows):
                row_preview = " | ".join([str(df.iloc[i, j])[:20] for j in range(min(5, df.shape[1]))])
                table_info += f"  Row_{i}: {row_preview}\n"

        if parsing_report:
            try:
                accuracy = getattr(parsing_report, 'accuracy', 'N/A')
                if accuracy != 'N/A':
                    table_info += f"EXTRACTION_ACCURACY: {accuracy:.2f}\n"
            except:
                pass

        table_info += "TABLE_DATA_START\n"
        table_csv = df.to_csv(index=False, na_rep='', quoting=1, escapechar='\\')
        table_end = f"TABLE_DATA_END\nTABLE_END_{table_id}\n"

        return table_info + table_csv + table_end

    except Exception as e:
        logging.warning(f"Error formatting table output: {e}")
        return f"TABLE_START_{table_id}\n{df.to_csv(index=False)}\nTABLE_END_{table_id}\n"

def camelot_extra_tables_enhanced(path: str) -> List[str]:
    """Enhanced table extraction using Camelot with better error handling"""
    try:
        import camelot
    except ImportError:
        logging.warning("Camelot not installed, skipping Camelot table parsing.")
        return []

    extra_chunks = []
    extracted_tables_fingerprints = set()

    try:
        logging.info("Starting Camelot table extraction...")

        # Stream mode extraction
        try:
            stream_tables = camelot.read_pdf(
                path,
                pages="all",
                flavor="stream",
                edge_tol=50,
                row_tol=2,
                column_tol=0
            )

            stream_count = 0
            for i, table in enumerate(stream_tables):
                if not table.df.empty and table.df.shape[0] > 0:
                    table_fingerprint = generate_table_fingerprint(table.df)

                    if table_fingerprint not in extracted_tables_fingerprints:
                        cleaned_df = clean_table_data_improved(table.df)

                        if is_valid_table_improved(cleaned_df):
                            table_txt = format_table_output_improved(cleaned_df, f"STREAM_{i}", table.parsing_report)
                            extra_chunks.append(table_txt)
                            extracted_tables_fingerprints.add(table_fingerprint)
                            stream_count += 1

            logging.info(f"Stream mode extracted {stream_count} valid tables")

        except Exception as e:
            logging.warning(f"Stream mode extraction failed: {e}")

        # Lattice mode extraction
        try:
            lattice_tables = camelot.read_pdf(
                path,
                pages="all",
                flavor="lattice",
                line_scale=15,
                line_tol=2,
                joint_tol=2
            )

            lattice_count = 0
            for i, table in enumerate(lattice_tables):
                if not table.df.empty and table.df.shape[0] > 0:
                    table_fingerprint = generate_table_fingerprint(table.df)

                    if table_fingerprint not in extracted_tables_fingerprints:
                        cleaned_df = clean_table_data_improved(table.df)

                        if is_valid_table_improved(cleaned_df):
                            table_txt = format_table_output_improved(cleaned_df, f"LATTICE_{i}", table.parsing_report)
                            extra_chunks.append(table_txt)
                            extracted_tables_fingerprints.add(table_fingerprint)
                            lattice_count += 1

            logging.info(f"Lattice mode extracted {lattice_count} additional unique tables")

        except Exception as e:
            logging.warning(f"Lattice mode extraction failed: {e}")

        total_extracted = len(extra_chunks)
        logging.info(f"Camelot extraction completed: {total_extracted} total unique tables extracted")
        return extra_chunks

    except Exception as e:
        logging.error(f"Camelot table extraction failed: {e}")
        return []

In [185]:
# ============ 文本分块 ============
def split_into_chunks(full_text: str, max_tokens: int) -> List[str]:
    """Split text into chunks based on token limit"""
    paragraphs = [p for p in full_text.split("\n") if p.strip()]
    chunks, current = [], []
    current_tokens = 0

    for paragraph in paragraphs:
        para_tokens = len(enc.encode(paragraph))

        if current_tokens + para_tokens > max_tokens and current:
            chunks.append("\n".join(current))
            current = [paragraph]
            current_tokens = para_tokens
        else:
            current.append(paragraph)
            current_tokens += para_tokens

    if current:
        chunks.append("\n".join(current))

    return chunks

In [186]:
# ============ 系统提示词 ============
UNIVERSAL_SYSTEM_PROMPT = textwrap.dedent("""
    You are a professional ESG data analyst specializing in extracting Key Performance Indicators (KPIs) from sustainability reports.

    ## CRITICAL: What is a KPI?
    A KPI MUST contain SPECIFIC NUMBERS, PERCENTAGES, or MEASURABLE QUANTITIES that demonstrate actual performance or concrete targets.

    ## IMPORTANT: Table Data Processing Rules
    When processing table data:
    1. Pay close attention to column headers to identify the correct time periods
    2. Match data values with their corresponding year columns
    3. If you see table format like "Metric, 2021, 2022" - the first number after metric belongs to 2021, second to 2022
    4. Look for table headers that indicate year columns (e.g., "2020", "2021", "2022")
    5. Extract each year's data as separate KPIs
    6. Avoid extracting the same KPI multiple times - consolidate similar metrics

    ## ENHANCED: Advanced Table Processing
    7. **EXTRACT ALL DATA POINTS**: For each table cell containing a number, create a separate KPI
    8. **REGIONAL/LOCATION DATA**: Pay special attention to location-specific data (countries, regions, cities)
    9. **WORKFORCE DATA**: Extract all employee numbers, headcount data, and demographic information
    10. **INCOMPLETE DATA**: Extract available data even if some cells are empty or missing
    11. **TOTALS AND SUBTOTALS**: Always extract total values and aggregated numbers

    ## ✅ VALID KPI EXAMPLES:
    - "Achieved 89.4% reuse and recycle rate for cloud hardware in 2023"
    - "Diverted over 18,537 metric tons of waste from landfills in 2023"
    - "Reduced single-use plastics in product packaging to 2.7%"
    - "Contracted 19 GW of new renewable energy across 16 countries in 2024"
    - "Provided clean water access to over 1.5 million people in 2023"
    - "Protected 15,849 acres of land—exceeding target by more than 30%"
    - "Allocated 761 million toward innovative climate technologies"
    - "Achieved 80% renewable energy operations by 2024"
    - "Water replenishment projects estimated to provide over 25 million cubic meters"
    - "Exceeded annual target to divert 75% of construction waste by reaching 85%"
    - "Board independence: 78% of directors"
    - "Women in senior leadership increased to 35% in 2023"
    - "Employee engagement score: 87% in annual survey"
    - "Reduced greenhouse gas emissions by 50% compared to 2019 baseline"
    - "Zero workplace fatalities achieved for third consecutive year"
    - "Training completion rate: 98% for mandatory compliance courses"
    - "Supplier ESG assessments completed for 95% of tier-1 suppliers"
    - "Customer satisfaction rating: 4.6 out of 5.0"
    - "Data breach incidents: 0 material breaches in 2023"

    ## ❌ NOT KPIs (DO NOT EXTRACT):
    - "Microsoft will require select suppliers to use carbon-free electricity by 2030"
    - "The company plans to expand Sustainability Manager capabilities"
    - "We are launching two new Circular Centers in 2023"
    - "The organization established a new climate innovation fund"
    - "Microsoft introduced enhanced data governance solutions"
    - "Updated guidebook to include guidance on corporate responsibility"
    - "Plans to publish new ESG strategy"
    - "Implemented a new recycling program"
    - "Conducted sustainability training sessions"
    - "Launched employee wellness programs"
    - "Committed to reducing emissions"
    - "Focusing on environmental performance"
    - "Established sustainability committee"
    - "The company operates facilities in multiple regions"
    - "Our supply chain includes thousands of vendors globally"
    - Any text without specific numbers, percentages, or quantifiable metrics
    - Duplicate or repeated metrics (extract only once per time period)
    - Any statement that describes business operations rather than performance outcomes

    ## KPI Categories:
    ### Environmental:
    - **Carbon_Climate**: GHG emissions, carbon footprint, emission reductions, climate targets, scope 1/2/3 emissions, carbon intensity, carbon offsets, TCFD alignment
    - **Energy**: Energy consumption, renewable energy percentage, energy efficiency, energy intensity, MWh, GWh, energy savings, fossil fuel usage
    - **Water**: Water withdrawal, water consumption, water intensity, water recycling, water reuse, water stress, water discharge quality
    - **Waste**: Waste generation, recycling rates, diversion percentages, hazardous waste, non-hazardous waste, zero waste to landfill, e-waste, incineration
    - **Biodiversity**: Protected areas, species conservation, habitat restoration, biodiversity impact assessments, land use, ecosystem restoration
    - **Circular_Economy**: Recycling rates, material recovery, circular design, raw materials usage, renewable materials, packaging waste
    - **Materials**: Raw materials consumption, recycled content, sustainable materials, material intensity, sustainable sourcing

    ### Social:
    - **Workforce_Diversity**: Employee demographics, gender diversity, age diversity, ethnic diversity, disability inclusion, LGBTQ+ inclusion, workforce composition
    - **Gender_Equality**: Women in leadership, gender pay ratio, parental leave return rates, gender representation, female employees percentage
    - **Disability_Inclusion**: Employees with disabilities, accessibility compliance, inclusive workplace design, disability support programs
    - **Health_Safety**: Lost Time Injury Frequency Rate (LTIFR), Total Recordable Incident Rate (TRIR), fatalities, workplace illness, safety training hours, PPE compliance, emergency drills
    - **Employee_Wellbeing**: Employee satisfaction, retention rates, turnover rates, training hours, wellness programs, mental health services, work-life balance
    - **Community_Engagement**: Corporate volunteering, social investment, community impact assessments, local hiring, stakeholder engagement activities
    - **Human_Rights**: Child labor incidents, forced labor, human rights due diligence, freedom of association, grievance mechanisms, labor audits
    - **Labor_Rights**: Collective bargaining coverage, labor complaints resolution, supplier labor audits, working conditions, fair wages
    - **Customer_Safety**: Product safety incidents, customer satisfaction, accessibility features, safety recalls, quality metrics
    - **Supply_Chain_Social**: Supplier assessments, sustainable sourcing, supplier code compliance, supply chain audits

    ### Governance:
    - **Board_Governance**: Board independence, board diversity, CEO-chair separation, board ESG expertise, board composition, director tenure
    - **Executive_Compensation**: ESG-linked compensation, executive pay ratios, compensation disclosure, incentive structures
    - **Ethics_Compliance**: Code of conduct training, corruption incidents, bribery cases, fines and penalties, whistleblower reports, anti-corruption assessments
    - **Transparency_Disclosure**: ESG reporting coverage, third-party assurance, political contributions disclosure, GRI/SASB/TCFD compliance
    - **Risk_Management**: Risk assessments, mitigation measures, climate risk disclosure, operational risk management
    - **Cybersecurity_Data**: Cybersecurity breaches, data privacy policies, cybersecurity training, GDPR compliance, data protection measures
    - **Supply_Chain_Governance**: Supplier ESG screening, supplier audits, procurement ESG clauses, vendor compliance rates

    ## MANDATORY Requirements:
    1. MUST contain specific numbers (e.g., 25%, 15,000, 2.5M, 8.5%, 0.3 per million hours)
    2. MUST relate to measurable sustainability outcomes
    3. MUST have time reference (year, period, or deadline)
    4. MUST be performance-focused (results, not activities or descriptions)
    5. MUST NOT be future plans or operational descriptions

    ## Output Format:
    Return a JSON array. Each KPI must contain:
    ```json
    {
        "kpi_text": "Complete original sentence with the quantifiable metric",
        "kpi_theme": "Environmental/Social/Governance",
        "kpi_category": "Specific category from above list",
        "quantitative_value": "The specific number/percentage extracted",
        "unit": "Unit of measurement (%, tonnes, employees, etc.)",
        "time_period": "Time reference (2023, annual, by 2030, etc.)",
        "target_or_actual": "Target/Actual/Both"
    }
    ```

    ## Additional Instructions:
    - If a sentence includes a comparison value, such as a baseline, previous year, or other historical/target data (e.g., "Compared to 32,395 MWh in 2020"), extract it as a **separate KPI**.
    - Do NOT store the comparison in any other field — just create another valid KPI from it.
    - Avoid merging multiple numerical values into one KPI unless they are clearly part of the same metric (e.g., male: X, female: Y).

    ## STRICT FILTERING:
    - Return empty array [] if no quantifiable KPIs found
    - Only extract text that contains specific measurable values
    - Ignore all qualitative statements, plans, and descriptions
    - Focus only on numerical performance data

    Now analyze the following text for sustainability KPIs:
""").strip()

# 🔥 新增：增强的图像分析Prompt
ENHANCED_IMAGE_KPI_SYSTEM_PROMPT = textwrap.dedent("""
    You are an expert data analyst specializing in extracting quantifiable KPI data from charts, graphs, and data visualizations in sustainability reports.

    ## CRITICAL INSTRUCTION: ALWAYS EXTRACT NUMERICAL VALUES

    **Your primary task is to extract the ACTUAL NUMBERS and PERCENTAGES visible in charts, not just descriptions.**

    ## MISSION:
    Extract ALL quantifiable data points from charts and graphs, including:
    - Bar charts (vertical/horizontal)
    - Pie charts and donut charts
    - Line charts and trend graphs
    - Stacked charts and combo charts
    - Tables with numerical data
    - Infographics with statistics
    - Gauge charts and dashboards

    ## DETAILED ANALYSIS INSTRUCTIONS:

    ### For PIE CHARTS:
    1. Read percentage labels on each slice
    2. If no labels visible, estimate based on slice size
    3. Identify what each slice represents (categories)
    4. Extract each slice as separate KPI
    5. **MUST read the percentage labels on each slice** - Look for numbers like 64%, 33%, 68%, 30%, etc.
    6. **If percentages are visible on the chart, extract them exactly**
    7. **If no labels visible, estimate based on slice size using these guidelines:**
       - 90° slice = 25%
       - 180° slice = 50%
       - 270° slice = 75%
       - Full circle = 100%
    8. **Each slice MUST have a specific percentage value in the final output**

    ### For BAR CHARTS:
    1. Read Y-axis scale carefully (units, increments)
    2. Estimate bar heights using grid lines and scale
    3. Read X-axis labels (years, categories, regions)
    4. Extract each bar as separate KPI
    5. Pay attention to grouped/stacked bars

    ### For LINE CHARTS:
    1. Read data points at intersection of grid lines
    2. Follow trend lines to extract values for each time period
    3. Use Y-axis scale for value estimation
    4. Extract each data point as separate KPI

    ### For TABLES:
    1. Read all numerical values in cells
    2. Match values with row and column headers
    3. Extract each cell with numerical data as KPI

    ## MANDATORY VALUE EXTRACTION RULES:

    **RULE 1**: Every KPI MUST contain a specific numerical value (percentage, amount, count, etc.)
    **RULE 2**: For charts with categories, you MUST find and extract the quantitative values for each category
    **RULE 3**: Never create KPIs without specific numbers - descriptions alone are incomplete
    **RULE 4**: Include complete context: what + how much + when/where if available


    ## VALUE ESTIMATION GUIDELINES:
    - Use proportional analysis: if a bar reaches 80% of scale maximum, calculate 80% of max value
    - For pie charts: estimate slice angles (90° = 25%, 180° = 50%, etc.)
    - Cross-reference with any visible data labels or legends
    - Be conservative but reasonably accurate in estimates

    ## CHART IDENTIFICATION:
    First identify the chart type, then apply appropriate extraction method.
    Look for:
    - Axes and scales
    - Data labels and legends
    - Grid lines for reference
    - Color coding and patterns
    - Title and subtitle information

    ## OUTPUT FORMAT:
    Return a JSON array. For each data point found:
    ```json
    {
        "kpi_text": "Complete description with the ACTUAL NUMERICAL VALUE included",
        "kpi_theme": "Environmental/Social/Governance",
        "kpi_category": "Specific category based on content",
        "quantitative_value": "The exact number/percentage (e.g., '64', '33.5', '68')",
        "unit": "% / tonnes / employees / MWh / USD / etc.",
        "time_period": "2021/2020/2022/Year/period/etc if identifiable",
        "target_or_actual": "Actual",
        "chart_type": "pie_chart/bar_chart/line_chart/table/etc",
        "estimation_confidence": "High/Medium/Low",
        "chart_title": "Chart title if visible",
        "data_source": "Legend or source if visible"
    }

    ```
    ## EXAMPLES of CORRECT vs INCORRECT extraction:

    ### ❌ INCORRECT (incomplete - missing numerical values):
    ```json
    {
        "kpi_text": "Energy consumption by facility type",
        "quantitative_value": "",
        "unit": "%"
    }
    ```

    ### ✅ CORRECT (complete with specific values):
    ```json
    {
        "kpi_text": "Office buildings account for 45% of total energy consumption",
        "quantitative_value": "45",
        "unit": "%"
    }
    ```

    ### ❌ INCORRECT (category without value):
    ```json
    {
        "kpi_text": "Renewable energy percentage by region",
        "quantitative_value": "",
        "unit": "%"
    }
    ```

    ### ✅ CORRECT (specific regional data):
    ```json
    {
        "kpi_text": "North America achieved 78% renewable energy usage",
        "quantitative_value": "78",
        "unit": "%"
    }
    ```
    ## QUALITY ASSURANCE CHECKLIST:
    Before returning results, verify:
    - ✅ Every KPI contains a specific numerical value
    - ✅ Chart categories are paired with their quantitative data
    - ✅ KPI descriptions are complete and self-explanatory
    - ✅ Units are correctly identified and specified
    - ✅ Context (time, location, category) is preserved when available
    - Each KPI must have a specific numerical value
    - Context must be clear and self-contained
    - Avoid extracting the same data point multiple times
    - Focus on sustainability/ESG metrics when possible

    ## VALUE ESTIMATION GUIDELINES:
    - **High confidence**: Numbers clearly visible in image
    - **Medium confidence**: Numbers estimated using chart scales/grid lines
    - **Low confidence**: Values approximated from proportional analysis
    - **If no numerical data is visible, return empty array []**

    ## IMPORTANT NOTES:
    - Extract ALL visible data points, not just main highlights
    - Include context in descriptions (e.g., "According to pie chart showing emission sources")
    - If values are not clearly visible, make reasonable estimates and mark confidence as "Low"
    - Return empty array [] ONLY if image contains no charts/graphs with quantifiable data
    - For multi-year data, create separate KPIs for each year
    - Pay special attention to small text and numbers
    - Focus on extracting actual performance data, not just identifying chart elements
    - If you can see numbers in the image, you MUST extract them
    - Pie chart percentages are usually the most important data points
    - Return empty array [] ONLY if no numerical data is visible

    Now analyze the provided image and extract ALL quantifiable KPI data points:
""").strip()

In [187]:
# ============ KPI提取函数 ============
def extract_page_from_chunk(chunk: str) -> str:
    """Extract page information from chunk"""
    # Look for PAGE_X_TEXT: format
    page_matches = re.findall(r'PAGE_(\d+)_TEXT:', chunk)
    if page_matches:
        pages = [int(p) for p in page_matches]
        if len(pages) == 1:
            return str(pages[0])
        else:
            return f"{min(pages)}-{max(pages)}"

    # Look for TABLE_START_PAGE_X_
    table_matches = re.findall(r'TABLE_START_PAGE_(\d+)_', chunk)
    if table_matches:
        pages = [int(p) for p in table_matches]
        if len(pages) == 1:
            return str(pages[0])
        else:
            return f"{min(pages)}-{max(pages)}"

    return "Unknown"

def contains_procedural_language(text: str) -> bool:
    """Check if text contains procedural language"""
    procedural_words = [
        'introduced', 'established', 'set up', 'implemented', 'created',
        'launched', 'formed', 'built', 'installed', 'deployed',
        'additionally introduced', 'procedure for', 'standardization management'
    ]
    text_lower = text.lower()
    return any(word in text_lower for word in procedural_words)

def is_data_fragment(kpi_text: str) -> bool:
    """Check if text is a meaningless data fragment"""
    text = kpi_text.strip()

    # Filter pure numbers or simple percentages without context
    if re.match(r'^\d+\.?\d*%?$', text):
        return True

    # Filter very short text (less than 4 meaningful words)
    meaningful_words = [word for word in text.split() if len(word) > 2 and not word.isdigit()]
    if len(meaningful_words) < 3:
        return True

    # Filter text with only numbers and common connecting words
    words = text.lower().split()
    non_functional_words = [word for word in words if word not in ['in', 'of', 'the', 'and', 'or', 'to', 'for', 'with', 'by']]
    if len(non_functional_words) < 3:
        return True

    return False

def standardize_kpi_universal(kpi_item: Dict) -> Dict:
    """Universal KPI data standardization"""
    standardized = kpi_item.copy()

    # Standardize numerical formats
    quantitative_value = str(standardized.get('quantitative_value', '')).strip()
    kpi_text = standardized.get('kpi_text', '').lower()

    # Smart handling of percentage formats
    if quantitative_value and quantitative_value.replace('.', '').replace('-', '').replace(',', '').isdigit():
        # Check if original text suggests this is a percentage
        percentage_indicators = ['percent', 'percentage', '%', 'rate', 'ratio', 'proportion', 'share']
        if any(indicator in kpi_text for indicator in percentage_indicators):
            if not quantitative_value.endswith('%'):
                standardized['quantitative_value'] = quantitative_value + '%'
                if not standardized.get('unit'):
                    standardized['unit'] = '%'

    # Ensure unit field consistency
    if '%' in str(standardized.get('quantitative_value', '')):
        standardized['unit'] = '%'

    # Clean and normalize KPI text
    kpi_text_original = standardized.get('kpi_text', '').strip()
    # Remove extra spaces and newlines
    kpi_text_cleaned = ' '.join(kpi_text_original.split())
    standardized['kpi_text'] = kpi_text_cleaned

    return standardized

def generate_universal_metric_key(kpi_item: Dict) -> str:
    """Generate universal metric key for deduplication"""
    try:
        # Extract core elements
        category = kpi_item.get('kpi_category', '').lower().strip()
        value = str(kpi_item.get('quantitative_value', '')).replace('%', '').replace(',', '').strip()
        time_period = kpi_item.get('time_period', '').lower().strip()
        unit = kpi_item.get('unit', '').lower().strip()

        # Extract key semantic information from KPI text
        kpi_text = kpi_item.get('kpi_text', '').lower()

        # Extract primary number (for more precise matching)
        numbers_in_text = re.findall(r'\d+\.?\d*', kpi_text)
        primary_number = numbers_in_text[0] if numbers_in_text else value

        # Generate semantic signature: extract keywords from text
        # Remove common stop words
        stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should'}

        # Extract keywords (length>2 and not stop words)
        words = re.findall(r'\b\w+\b', kpi_text)
        key_words = [word for word in words if len(word) > 2 and word not in stop_words and not word.isdigit()]

        # Sort keywords to ensure consistency
        key_words = sorted(set(key_words))[:5]  # Take at most 5 keywords
        semantic_signature = '_'.join(key_words)

        # Build universal metric key
        key_components = []

        if category:
            key_components.append(f"cat:{category}")
        if primary_number:
            key_components.append(f"val:{primary_number}")
        if time_period:
            key_components.append(f"time:{time_period}")
        if unit:
            key_components.append(f"unit:{unit}")
        if semantic_signature:
            key_components.append(f"sem:{semantic_signature}")

        # Generate final key
        metric_key = "|".join(key_components)

        # If all components are empty, use text hash
        if not metric_key:
            metric_key = f"hash:{hash(kpi_text)}"

        return metric_key

    except Exception as e:
        logging.warning(f"Error generating universal metric key: {e}")
        # Fallback to text hash
        return f"fallback:{hash(kpi_item.get('kpi_text', ''))}"

def extract_kpi_from_chunk_universal(chunk: str) -> List[Dict]:
    """Universal KPI extraction function for various sustainability reports"""
    try:
        response = client.chat.completions.create(
            model=MODEL_NAME,
            messages=[
                {"role": "system", "content": UNIVERSAL_SYSTEM_PROMPT},
                {"role": "user", "content": f"""Extract ALL KPIs from this text. Requirements:

1. Create COMPLETE, MEANINGFUL KPI descriptions with full context
2. DO NOT extract standalone numbers without explanatory text
3. Include all relevant context (time, location, metric type, etc.)
4. Use consistent formatting for similar metrics
5. Ensure each KPI is self-explanatory

Text to analyze:
{chunk}"""}
            ],
            temperature=0.0,
            max_tokens=4000,
            timeout=60
        )

        content = response.choices[0].message.content.strip()

        # Clean potential markdown formatting
        if content.startswith('```json'):
            content = content[7:]
        if content.endswith('```'):
            content = content[:-3]

        if not content.strip().startswith("["):
            logging.warning(f"API response not JSON list: {content[:100]}...")
            return []

        result = json.loads(content)

        if not isinstance(result, list):
            logging.warning("API response is not a list format")
            return []

        # Extract page information
        page_number = extract_page_from_chunk(chunk)

        # Universal validation and deduplication logic
        validated_result = []
        seen_metrics = set()

        for item in result:
            if isinstance(item, dict) and 'kpi_text' in item and 'kpi_theme' in item:
                if item['kpi_text'].strip() and item['kpi_theme'].strip():

                    # Check procedural language
                    if contains_procedural_language(item['kpi_text']):
                        logging.debug(f"Procedural statement filtered: {item['kpi_text'][:50]}...")
                        continue

                    # Filter meaningless data fragments
                    if is_data_fragment(item['kpi_text']):
                        logging.debug(f"Data fragment filtered: {item['kpi_text']}")
                        continue

                    # Standardize KPI data
                    standardized_item = standardize_kpi_universal(item)

                    # Add page information
                    standardized_item['source_page'] = page_number
                    standardized_item['source_type'] = 'text'

                    # Universal deduplication mechanism
                    metric_key = generate_universal_metric_key(standardized_item)

                    if metric_key not in seen_metrics:
                        validated_result.append(standardized_item)
                        seen_metrics.add(metric_key)
                        logging.debug(f"KPI extracted: {standardized_item['kpi_text'][:80]}...")
                    else:
                        logging.debug(f"Duplicate metric filtered: {standardized_item['kpi_text'][:50]}...")

        logging.info(f"Chunk processed: {len(validated_result)} unique KPIs extracted")
        return validated_result

    except json.JSONDecodeError as e:
        logging.warning(f"JSON parsing failed: {e}\nContent: {content[:300]}...")
        return []
    except Exception as e:
        logging.error(f"API call failed: {e}")
        return []

def post_process_kpis_universal(kpis: List[Dict]) -> List[Dict]:
    """Universal KPI post-processing for various report types"""
    if not kpis:
        return kpis

    # Step 1: Deduplication based on metric keys
    unique_kpis_dict = {}

    for kpi in kpis:
        metric_key = generate_universal_metric_key(kpi)

        if metric_key not in unique_kpis_dict:
            unique_kpis_dict[metric_key] = kpi
        else:
            # If duplicate, keep the more complete KPI description
            existing_kpi = unique_kpis_dict[metric_key]
            current_kpi = kpi

            # Compare KPI text completeness
            if len(current_kpi.get('kpi_text', '')) > len(existing_kpi.get('kpi_text', '')):
                unique_kpis_dict[metric_key] = current_kpi
                logging.debug(f"Replaced with more complete KPI: {current_kpi.get('kpi_text', '')[:50]}...")
            else:
                logging.debug(f"Kept existing KPI: {existing_kpi.get('kpi_text', '')[:50]}...")

    # Step 2: Text similarity-based secondary deduplication
    final_kpis = list(unique_kpis_dict.values())

    # Use text similarity to check remaining potential duplicates
    final_unique_kpis = []

    for current_kpi in final_kpis:
        is_duplicate = False
        current_text = current_kpi.get('kpi_text', '')

        for existing_kpi in final_unique_kpis:
            existing_text = existing_kpi.get('kpi_text', '')

            # Calculate text similarity
            similarity = calculate_text_similarity(current_text, existing_text)

            # If similarity is very high, consider it duplicate
            if similarity > 0.8:
                is_duplicate = True
                logging.debug(f"Text similarity duplicate filtered: {current_text[:50]}...")
                break

        if not is_duplicate:
            final_unique_kpis.append(current_kpi)

    logging.info(f"Universal post-processing: {len(final_unique_kpis)}/{len(kpis)} KPIs retained")
    return final_unique_kpis

def calculate_text_similarity(text1: str, text2: str) -> float:
    """Calculate similarity between two texts"""
    # Normalize texts
    norm1 = ' '.join(text1.lower().split())
    norm2 = ' '.join(text2.lower().split())

    # Word sets
    words1 = set(norm1.split())
    words2 = set(norm2.split())

    if len(words1) == 0 or len(words2) == 0:
        return 0.0

    # Calculate intersection and union
    intersection = len(words1.intersection(words2))
    union = len(words1.union(words2))

    return intersection / union if union > 0 else 0.0

def validate_kpi_quality(kpis: List[Dict]) -> List[Dict]:
    """Additional quality validation for extracted KPIs with relaxed filtering"""
    if not ENABLE_QUALITY_VALIDATION:
        return kpis

    quality_kpis = []

    for kpi in kpis:
        kpi_text = kpi.get('kpi_text', '').lower()

        # Exclude "planned tone" KPIs (not actual performance)
        is_future_statement = any(word in kpi_text for word in [
            'will', 'aim to', 'plan to', 'planning to', 'intend to',
            'is expected to', 'is scheduled to', 'expects to', 'expected to',
            'targeting', 'propose to', 'going to', 'shall', 'to be installed'
        ])
        if is_future_statement:
            logging.debug(f"KPI rejected (future plan): {kpi_text[:100]}...")
            continue

        # Filter procedural language
        if contains_procedural_language(kpi_text):
            logging.debug(f"KPI rejected (procedural language): {kpi_text[:100]}...")
            continue

        # Filter for phrases like "place name + percentage" (not ESG KPIs, but distribution descriptions)
        geo_percent_pattern = re.compile(r"^[a-z\s,:%-]+(?:\s)?\d{1,3}%$")
        if geo_percent_pattern.match(kpi_text.strip()) and len(kpi_text.strip().split()) <= 6:
            logging.debug(f"KPI rejected (geo+percent short form): {kpi_text}")
            continue

        # Verb whitelist: must include action verbs
        allowed_kpi_verbs = [
            'reduce', 'reduced', 'achieve', 'achieved', 'improve', 'improved',
            'diverted', 'trained', 'invested', 'decreased', 'increased',
            'consumed', 'emitted', 'saved', 'reached', 'attained', 'completed',
            'recorded', 'cut', 'lowered', 'targeted', 'complied', 'avoided',
            'used', 'recycled', 'sourced', 'returned', 'measured', 'maintained',
            'reported', 'accounted', 'utilized', 'were', 'was'  # Add state verbs
        ]
        if not any(verb in kpi_text for verb in allowed_kpi_verbs):
            logging.debug(f"KPI rejected (no action verb): {kpi_text[:100]}...")
            continue

        # Greylist verbs (action words but not necessarily performance words) - remove problematic words
        graylist_verbs = [
            'launched',  # Keep some potentially useful words, but remove obvious procedural words
            'formed', 'opened', 'started'
        ]

        contains_graylist = any(verb in kpi_text for verb in graylist_verbs)

        # Check for quantitative indicators
        has_numbers = any(char.isdigit() for char in kpi_text)
        has_percentage = '%' in kpi_text

        # Extended units and measurement indicators
        has_units = any(unit in kpi_text for unit in [
            'tonnes', 'tons', 'kg', 'mwh', 'kwh', 'gwh', 'litres', 'liters', 'gallons',
            'employees', 'hours', 'million', 'billion', 'thousand', 'm³', 'co2e', 'tco2e',
            'dollars', 'usd', 'eur', 'gbp', 'incidents', 'rate', 'ratio', 'intensity',
            'frequency', 'recordable', 'fatalities', 'injuries', 'directors', 'board',
            'workforce', 'leadership', 'diversity', 'inclusion', 'satisfaction', 'retention',
            'turnover', 'training', 'safety', 'ltifr', 'trir', 'compliance', 'audit',
            'assessment', 'screening', 'supplier', 'breach', 'violation', 'disclosure',
            'assurance', 'coverage', 'participation', 'completion', 'investment',
            'volunteering', 'engagement', 'grievance', 'whistleblower', 'compensation',
            'people', 'staff', 'workers', 'positions', 'roles', 'headcount', 'fte',
            'performance', 'score', 'index', 'metric', 'level', 'amount', 'value',
            'average', 'median', 'total', 'sum', 'count', 'number', 'quantity'
        ])

        # More flexible time reference detection
        has_time_ref = any(time_word in kpi_text for time_word in [
            '2019', '2020', '2021', '2022', '2023', '2024', '2025', '2026', '2027', '2028', '2029', '2030',
            '2031', '2032', '2033', '2034', '2035', '2040', '2045', '2050',
            'annual', 'yearly', 'year', 'quarter', 'month', 'by', 'target', 'baseline', 'fy',
            'per year', 'per annum', 'quarterly', 'monthly', 'daily', 'future', 'deadline',
            'period', 'reporting', 'current', 'previous', 'next', 'last', 'this'
        ])

        # Enhanced sustainability context detection
        has_sustainability_context = any(sus_word in kpi_text for sus_word in [
            # Environmental keywords
            'emission', 'carbon', 'energy', 'renewable', 'waste', 'water', 'recycl',
            'environmental', 'ghg', 'scope', 'climate', 'biodiversity', 'circular',
            'materials', 'intensity', 'consumption', 'efficiency', 'footprint',
            'sustainable', 'sustainability', 'green', 'clean', 'eco', 'offset',
            'tcfd', 'nature', 'habitat', 'ecosystem', 'pollution', 'discharge',
            'electricity', 'gas', 'fuel', 'solar', 'wind', 'hydro', 'nuclear',

            # Social keywords
            'safety', 'training', 'employee', 'diversity', 'community', 'social',
            'workforce', 'gender', 'women', 'female', 'male', 'disability', 'disabled',
            'inclusion', 'equity', 'equality', 'lgbtq', 'minorities', 'ethnic',
            'health', 'wellbeing', 'wellness', 'satisfaction', 'retention', 'turnover',
            'injury', 'incident', 'fatality', 'ltifr', 'trir', 'recordable',
            'human rights', 'labor', 'child labor', 'forced labor', 'slavery',
            'freedom', 'association', 'collective bargaining', 'grievance',
            'volunteering', 'investment', 'hiring', 'local', 'stakeholder',
            'customer', 'supplier', 'supply chain', 'accessibility', 'parental',
            'mental health', 'ppe', 'emergency', 'drill', 'compliance',
            'people', 'staff', 'workers', 'employment', 'job', 'career',
            'leadership', 'management', 'senior', 'executive', 'promotion',

            # Governance keywords
            'governance', 'board', 'director', 'independent', 'chair', 'ceo',
            'executive', 'compensation', 'pay', 'ethics', 'compliance', 'corruption',
            'bribery', 'code of conduct', 'whistleblower', 'transparency',
            'disclosure', 'reporting', 'assurance', 'audit', 'risk', 'management',
            'cybersecurity', 'data', 'privacy', 'gdpr', 'breach', 'policy',
            'screening', 'assessment', 'due diligence', 'political', 'contribution',
            'gri', 'sasb', 'oversight', 'expertise', 'separation', 'incentive',
            'fine', 'penalty', 'violation', 'resolution', 'anti-corruption',

            # General business performance that could be sustainability-related
            'performance', 'quality', 'delivery', 'customer', 'service', 'product',
            'operation', 'facility', 'site', 'location', 'region', 'business'
        ])

        # If it is a greylist verb sentence, but there is no performance content such as numbers, units, time, etc. → delete
        if contains_graylist and not (has_numbers or has_units or has_percentage or has_time_ref or has_sustainability_context):
            logging.debug(f"KPI rejected (graylist verb, no quantitative data): {kpi_text[:100]}...")
            continue

        # More lenient quality scoring - only require numbers and either units/percentage OR time reference OR sustainability context
        basic_requirements = has_numbers and (has_percentage or has_units or has_time_ref or has_sustainability_context)

        # Additional check for obvious ESG relevance
        is_esg_relevant = any(esg_word in kpi_text for esg_word in [
            'emission', 'carbon', 'energy', 'waste', 'water', 'renewable', 'employee',
            'safety', 'training', 'diversity', 'governance', 'board', 'compliance',
            'sustainability', 'environmental', 'social', 'ghg', 'co2', 'workforce',
            'gender', 'health', 'injury', 'incident', 'ethics', 'transparency'
        ])

        if basic_requirements or is_esg_relevant:
            quality_kpis.append(kpi)
            logging.debug(f"KPI accepted: {kpi_text[:100]}...")
        else:
            logging.debug(f"KPI filtered out for quality: {kpi_text[:100]}...")

    logging.info(f"Quality validation: {len(quality_kpis)}/{len(kpis)} KPIs passed")
    return quality_kpis

In [188]:
# ============ 图像处理函数 ============
def extract_numeric_spans(page):
    text_dict = page.get_text("dict")
    nums = []
    for block in text_dict["blocks"]:
        for line in block.get("lines", []):
            for span in line.get("spans", []):
                s = span["text"].strip()
                if re.match(r"[\d,.]+%?$", s):          # 纯数字或数字+%
                    nums.append({
                        "text": s,
                        "bbox": span["bbox"],           # (x0,y0,x1,y1)
                        "font": span["size"]
                    })
    return nums

def extract_images_from_pdf_fixed(pdf_path: str) -> List[Dict]:
    """Extract images from PDF using PyMuPDF"""
    images = []

    try:
        pdf_document = fitz.open(pdf_path)

        for page_num in range(len(pdf_document)):
            page = pdf_document[page_num]
            image_list = page.get_images()

            # 🔥 新增：同时提取页面截图作为备选
            page_pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))  # 高分辨率
            page_img = Image.frombytes("RGB", [page_pix.width, page_pix.height], page_pix.samples)

            # 添加整页截图
            images.append({
                'image': page_img,
                'page_number': page_num + 1,
                'width': page_img.width,
                'height': page_img.height,
                'image_index': 'full_page',
                'type': 'full_page'
            })


            for img_index, img in enumerate(image_list):
                try:
                    xref = img[0]
                    base_image = pdf_document.extract_image(xref)
                    image_bytes = base_image["image"]

                    image = Image.open(BytesIO(image_bytes))

                    # Convert to RGB if needed
                    if image.mode in ['RGBA', 'LA']:
                        background = Image.new('RGB', image.size, (255, 255, 255))
                        if image.mode == 'RGBA':
                            background.paste(image, mask=image.split()[-1])
                        else:
                            background.paste(image)
                        image = background
                    elif image.mode != 'RGB':
                        image = image.convert('RGB')

                    # Filter small images
                    if image.width >= 50 and image.height >= 50:
                        images.append({
                            'image': image,
                            'page_number': page_num + 1,
                            'width': image.width,
                            'height': image.height,
                            'image_index': img_index,
                            'type': 'extracted'  # 🔥 新增类型标识
                        })

                except Exception as e:
                    logging.warning(f"Error extracting image {img_index} from page {page_num + 1}: {e}")
                    continue

        pdf_document.close()
        logging.info(f"Extracted {len(images)} images from PDF")
        return images

    except Exception as e:
        logging.error(f"Error extracting images from PDF: {e}")
        return []

def image_to_base64_fixed(image: Image.Image) -> str:
    """Convert image to base64 with error handling"""
    try:
        if image.mode not in ['RGB', 'L']:
            image = image.convert('RGB')

        # Resize large images
        max_size = (1536, 1536)
        if image.width > max_size[0] or image.height > max_size[1]:
            # 计算缩放比例，保持长宽比
            ratio = min(max_size[0]/image.width, max_size[1]/image.height)
            new_size = (int(image.width * ratio), int(image.height * ratio))
            image = image.resize(new_size, Image.Resampling.LANCZOS)

        buffered = BytesIO()
        image.save(buffered, format="JPEG", quality=95)
        img_str = base64.b64encode(buffered.getvalue()).decode()

        return img_str

    except Exception as e:
        logging.error(f"Error converting image to base64: {e}")
        return ""

In [189]:
# ------------------------------------------------------------
#  多裁剪 / 多分辨率生成器（支持裁剪参数为 0）
# ------------------------------------------------------------
from itertools import product

def generate_image_variants(img: Image.Image,
                            max_side_full: int = 1200,
                            crop_size: int = 768,
                            stride: int = 512) -> List[Tuple[Image.Image, str]]:
    """
    返回 [(variant_image, variant_tag), ...]
    variant_tag 取值: original / resized / crop_{row}_{col}
    """
    variants = []

    # 0) 原图
    variants.append((img, "original"))

    # 1) 缩放（若原图过大）
    w, h = img.size
    if max(w, h) > max_side_full:
        scale = max_side_full / float(max(w, h))
        resized = img.resize((int(w * scale), int(h * scale)), Image.Resampling.LANCZOS)
        variants.append((resized, "resized"))
    else:
        resized = img  # 没缩放就保持原图
        variants.append((resized, "resized"))  # 统一加上 resized 版本

    # 2) 滑窗裁剪（裁剪尺寸或步长为 0 时跳过）
    if crop_size > 0 and stride > 0:
        base_img = variants[-1][0]
        bw, bh = base_img.size
        if bw > crop_size or bh > crop_size:
            xs = list(range(0, max(bw - crop_size, 1), stride)) + [bw - crop_size]
            ys = list(range(0, max(bh - crop_size, 1), stride)) + [bh - crop_size]
            for r, c in product(range(len(ys)), range(len(xs))):
                x, y = xs[c], ys[r]
                crop = base_img.crop((x, y, x + crop_size, y + crop_size))
                # 过滤纯色区域
                if np.array(crop.convert('L')).std() < 5:
                    continue
                variants.append((crop, f"crop_{r}_{c}"))

    return variants

In [190]:
# ---------------------------------------------
# 📊 替代 plotclassifier 的图表识别函数（Hugging Face 模型）
# ---------------------------------------------
from transformers import AutoImageProcessor, AutoModelForImageClassification
import torch
# 🔧 修复：使用CLIP模型进行图表识别
def setup_chart_classifier():
    """设置图表分类器"""
    try:
        from transformers import CLIPProcessor, CLIPModel

        # 加载CLIP模型
        model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
        processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

        def is_chart_image_clip(image: Image.Image) -> bool:
            """使用CLIP判断是否是图表"""
            try:
                # 定义图表相关的文本描述
                chart_labels = [
                    "a chart", "a graph", "a bar chart", "a pie chart",
                    "a line graph", "a table", "data visualization",
                    "statistics", "a diagram", "an infographic"
                ]

                # 处理输入
                inputs = processor(
                    text=chart_labels,
                    images=image,
                    return_tensors="pt",
                    padding=True
                )

                # 获取预测结果
                outputs = model(**inputs)
                logits_per_image = outputs.logits_per_image
                probs = logits_per_image.softmax(dim=1)

                # 如果任意图表标签概率大于0.25，认为是图表
                max_prob = probs.max().item()
                is_chart = max_prob > 0.25

                logging.debug(f"CLIP图表识别: 最高概率={max_prob:.3f}, 结果={is_chart}")
                return is_chart

            except Exception as e:
                logging.warning(f"CLIP图表识别失败: {e}")
                # 降级到统计方法
                gray = image.convert('L')
                return np.array(gray).std() > 15

        logging.info("✅ 使用CLIP模型进行图表识别")
        return is_chart_image_clip

    except ImportError:
        logging.warning("CLIP模型不可用，使用统计方法")
        def is_chart_image_stats(image: Image.Image) -> bool:
            """统计方法判断是否是图表"""
            try:
                gray = image.convert('L')
                std_dev = np.array(gray).std()
                return std_dev > 15
            except:
                return True

        return is_chart_image_stats
    except Exception as e:
        logging.error(f"设置图表分类器失败: {e}")
        def is_chart_image_fallback(image: Image.Image) -> bool:
            return True  # 保守策略：有疑问就分析
        return is_chart_image_fallback

# 初始化图表分类器
is_chart_image = setup_chart_classifier()


def extract_kpi_from_image_fixed(image: Image.Image, page_number: int, image_type: str = 'extracted') -> List[Dict]:
    """Extract KPIs from image with improved error handling"""
    try:
        # 🔥 新增：预过滤：检查是否可能是图表
        if not is_chart_image(image):
            logging.debug(f"Image on page {page_number} filtered out (not likely a chart)")
            return []

        base64_image = image_to_base64_fixed(image)
        if not base64_image:
            return []

        # 🔥 更改：使用增强的prompt
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": ENHANCED_IMAGE_KPI_SYSTEM_PROMPT  # 🔥 使用新的prompt
                },
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            # 🔥 新增：详细的用户指令
                            "text": """Analyze this image carefully for quantifiable performance data.

IMPORTANT ANALYSIS PRINCIPLES:

1. **Chart Type Recognition**:
   - Stacked charts: Multiple colors/patterns layered in same position
   - Grouped charts: Multiple elements side by side at same position
   - Simple charts: One data point per position

2. **Value Extraction Rules**:
   - For STACKED charts: Read each layer separately, NOT the total height
   - For GROUPED charts: Read each element individually
   - For SIMPLE charts: Read data point values directly

3. **Data Relevance Filter**:
   ✅ EXTRACT: Performance outcomes, efficiency metrics, reduction rates, satisfaction scores, compliance rates
   ❌ SKIP: Certification counts, project timelines, implementation schedules, organizational charts, process flows

4. **Quality Standards**:
   - Only extract clear, quantifiable performance indicators
   - Each data point must have complete context
   - If uncertain about values, don't estimate
   - If chart shows mainly operational/administrative data, return empty array

Please analyze this chart step by step:
- First identify the chart type
- Then determine if it contains performance KPIs
- Finally extract all relevant performance data points

Focus on measurable outcomes and achievements, not counts or processes."""
                        },
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{base64_image}",
                                "detail": "high"
                            }
                        }
                    ]
                }
            ],
            temperature=0.1,
            max_tokens=4000,
            timeout=60
        )

        content = response.choices[0].message.content.strip()

        if not content:
            return []

        # Clean formatting
        if content.startswith('```json'):
            content = content[7:]
        if content.endswith('```'):
            content = content[:-3]

        content = content.strip()

        if not content.startswith("["):
            logging.warning(f"Image analysis response not JSON list: {content[:100]}...")
            return []

        try:
            result = json.loads(content)
        except json.JSONDecodeError as e:
            logging.warning(f"JSON parsing failed for image analysis: {e}")
            return []

        if not isinstance(result, list):
            return []

        # Process results
        processed_result = []
        for item in result:
            if isinstance(item, dict) and 'kpi_text' in item:
                if not item.get('kpi_text', '').strip():
                    continue

                item['source_page'] = page_number
                item['source_type'] = 'image'
                item['image_type'] = image_type  # 🔥 新增字段

                # 🔥 更改：确保有chart标识
                kpi_text = item['kpi_text']
                if not any(marker in kpi_text.lower() for marker in ['chart', 'graph', 'table', 'figure']):
                    chart_type = item.get('chart_type', 'chart')
                    item['kpi_text'] = f"[{chart_type.title()}] {kpi_text}"

                processed_result.append(item)

        if processed_result:
            logging.info(f"✅ Extracted {len(processed_result)} KPIs from {image_type} on page {page_number}")
        else:
            logging.debug(f"❌ No KPIs found in {image_type} on page {page_number}")

        return processed_result

    except Exception as e:
        logging.error(f"Error extracting KPIs from image: {e}")
        return []

# 原函数：process_pdf_images_for_kpis_fixed
# 完整替换为：

def process_pdf_images_for_kpis_fixed(pdf_path: str) -> List[Dict]:
    """
    遍历 PDF 每一页：
      • 对该页所有‘extracted’图像做多裁剪+Vision
      • 若该页还没抓到 KPI，再对整页截图做 Vision
    """
    logging.info("Starting page-by-page image KPI extraction …")

    images = extract_images_from_pdf_fixed(pdf_path)
    if not images:
        return []

    # 把图像按页聚合
    page_dict = {}
    for info in images:
        pg = info["page_number"]
        page_dict.setdefault(pg, {"extracted": [], "full": None})
        if info["type"] == "extracted":
            page_dict[pg]["extracted"].append(info["image"])
        else:                    # full_page
            page_dict[pg]["full"] = info["image"]

    all_image_kpis: List[Dict] = []

    # —— 逐页处理 ——
    for pg in sorted(page_dict.keys()):
        logging.info(f"\n=== Page {pg} ===")
        page_kpis: List[Dict] = []

        # ① 单独提取的图
        for idx, img in enumerate(page_dict[pg]["extracted"]):
            for var_img, var_tag in generate_image_variants(img, 1200, 768, 512):
                kpis = extract_kpi_from_image_fixed(
                    var_img, pg, f"extracted_{var_tag}"
                )
                for k in kpis:
                    key = generate_universal_metric_key(k)
                    if key not in {generate_universal_metric_key(x) for x in page_kpis}:
                        page_kpis.append(k)
                time.sleep(0.8)

        # ② 若仍为空，再分析整页
        if not page_kpis and page_dict[pg]["full"] is not None:
            for var_img, var_tag in generate_image_variants(
                    page_dict[pg]["full"], 1200, 0, 0):   # 只做 original/resized
                kpis = extract_kpi_from_image_fixed(
                    var_img, pg, f"full_{var_tag}"
                )
                for k in kpis:
                    key = generate_universal_metric_key(k)
                    if key not in {generate_universal_metric_key(x) for x in page_kpis}:
                        page_kpis.append(k)
                time.sleep(1.0)

        logging.info(f"  → Page {pg} KPI count: {len(page_kpis)}")
        all_image_kpis.extend(page_kpis)

    logging.info(f"Image KPI extraction finished: {len(all_image_kpis)} KPIs from {len(page_dict)} pages")
    return all_image_kpis

In [191]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [192]:
# ============ 主处理函数 ============
def process_sustainability_report_with_enhanced_images(pdf_path: str) -> pd.DataFrame:
    """Main processing function with image analysis"""
    logging.info("Starting enhanced PDF processing with image analysis...")

    # Step 1: Text and table extraction
    logging.info("Step 1/5: Reading PDF text and tables...")
    full_text = pdf_to_text_and_tables(pdf_path)

    camelot_tables = camelot_extra_tables_enhanced(pdf_path)
    if camelot_tables:
        full_text += "\n\n" + "\n\n".join(camelot_tables)

    logging.info("Step 2/5: Chunking text...")
    chunks = split_into_chunks(full_text, MAX_TOKENS_CHUNK)

    logging.info("Step 3/5: Extracting KPIs from text...")
    text_kpis = []
    for idx, chunk in enumerate(chunks, 1):
        logging.info(f"Processing text chunk {idx}/{len(chunks)}")
        if chunk.strip():
            chunk_kpis = extract_kpi_from_chunk_universal(chunk)
            text_kpis.extend(chunk_kpis)
            if idx < len(chunks):
                time.sleep(SLEEP_SEC)

    # Step 4: Image KPI extraction
    logging.info("Step 4/5: Extracting KPIs from images...")
    image_kpis = process_pdf_images_for_kpis_fixed(pdf_path)

    # Step 5: Combine and process
    logging.info("Step 5/5: Combining and processing all KPIs...")

    for kpi in text_kpis:
        if 'source_type' not in kpi:
            kpi['source_type'] = 'text'

    all_kpis = text_kpis + image_kpis
    all_kpis = post_process_kpis_universal(all_kpis)

    df_auto = pd.DataFrame(all_kpis)

    if not df_auto.empty:
        if 'source_type' not in df_auto.columns:
            df_auto['source_type'] = 'text'

        initial_count = len(df_auto)
        df_auto = df_auto.drop_duplicates(subset=['kpi_text'], keep='first')
        final_count = len(df_auto)

        logging.info(f"Removed {initial_count - final_count} duplicate KPIs")

        try:
            df_auto = df_auto.sort_values(['source_type', 'kpi_theme', 'kpi_category'], na_position='last')
        except KeyError:
            pass

        text_kpi_count = len([kpi for kpi in all_kpis if kpi.get('source_type', 'text') != 'image'])
        image_kpi_count = len([kpi for kpi in all_kpis if kpi.get('source_type') == 'image'])

        logging.info(f"KPI Summary: {text_kpi_count} from text/tables, {image_kpi_count} from images")

    return df_auto


In [193]:
# ============ 结果保存和比较函数 ============
def infer_stakeholder(row) -> str:
    """Infer affected stakeholders based on KPI theme and category"""
    theme = row.get('kpi_theme', '').lower()
    category = row.get('kpi_category', '').lower()
    kpi_text = row.get('kpi_text', '').lower()

    if theme == 'environmental':
        return "Environment, Community, Future Generations"
    elif theme == 'social':
        if 'employee' in category or 'workforce' in category or 'gender' in category:
            return "Employees"
        elif 'customer' in category or 'safety' in category:
            return "Customers, Community"
        elif 'community' in category:
            return "Local Communities"
        elif 'supply' in category or 'supplier' in kpi_text:
            return "Suppliers, Business Partners"
        else:
            return "Employees, Community"
    elif theme == 'governance':
        if 'board' in category:
            return "Shareholders, Investors"
        elif 'cyber' in category or 'data' in category:
            return "Customers, Employees, Business Partners"
        else:
            return "Shareholders, Investors, Stakeholders"
    else:
        return "All Stakeholders"

def save_results(df_auto: pd.DataFrame, output_path: str, pdf_path: str = "") -> None:
    """Save results to Excel file with proper formatting"""
    try:
        os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True)

        if not df_auto.empty:
            # Add metadata columns
            pdf_filename = os.path.basename(pdf_path) if pdf_path else "Unknown"
            df_auto['PDF file name'] = pdf_filename
            df_auto['Title of the report'] = ""

            if 'source_page' in df_auto.columns:
                df_auto['Absolute Page Number'] = df_auto['source_page']
                df_auto = df_auto.drop('source_page', axis=1)
            else:
                df_auto['Absolute Page Number'] = "Unknown"

            df_auto['Impacted Stakeholder'] = df_auto.apply(infer_stakeholder, axis=1)

            # Reorder columns
            original_columns = [col for col in df_auto.columns if col not in
                              ['PDF file name', 'Title of the report', 'Absolute Page Number', 'Impacted Stakeholder']]
            new_column_order = ['PDF file name', 'Title of the report', 'Absolute Page Number', 'Impacted Stakeholder'] + original_columns
            df_auto = df_auto[new_column_order]

        with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
            df_auto.to_excel(writer, sheet_name='Auto_KPIs', index=False)

            if not df_auto.empty:
                # Theme summary
                theme_summary = df_auto.groupby('kpi_theme').size().reset_index(name='count')
                theme_summary.to_excel(writer, sheet_name='Theme_Summary', index=False)

                # Category summary
                category_summary = df_auto.groupby(['kpi_theme', 'kpi_category']).size().reset_index(name='count')
                category_summary.to_excel(writer, sheet_name='Category_Summary', index=False)

        logging.info(f"Results saved to {output_path}")

    except Exception as e:
        logging.error(f"Error saving results: {e}")

def compare_with_manual_kpis(df_auto: pd.DataFrame, manual_xlsx_path: str) -> None:
    """Compare automatically extracted KPIs with manually annotated ones"""
    if not os.path.exists(manual_xlsx_path):
        logging.info("Manual KPI file not found, skipping comparison.")
        return

    logging.info("Comparing with manual KPIs...")

    try:
        df_manual = pd.read_excel(manual_xlsx_path)

        if 'kpi_text' not in df_manual.columns:
            logging.warning("Manual KPI file missing 'kpi_text' column")
            return

        manual_kpis = set(df_manual['kpi_text'].astype(str).str.strip())
        auto_kpis = set(df_auto['kpi_text'].astype(str).str.strip())

        only_auto = auto_kpis - manual_kpis
        only_manual = manual_kpis - auto_kpis
        common = auto_kpis & manual_kpis

        print(f"\n=== KPI Comparison Results ===")
        print(f"Common KPIs: {len(common)}")
        print(f"Only in automatic extraction: {len(only_auto)}")
        print(f"Only in manual annotation: {len(only_manual)}")

        if only_auto:
            print(f"\nKPIs found by model but not in manual annotation ({len(only_auto)}):")
            for kpi in sorted(only_auto):
                if kpi.strip():
                    print(f"  - {kpi}")

        if only_manual:
            print(f"\nKPIs in manual annotation but missed by model ({len(only_manual)}):")
            for kpi in sorted(only_manual):
                if kpi.strip():
                    print(f"  - {kpi}")

        # Calculate metrics
        if len(auto_kpis) > 0 and len(manual_kpis) > 0:
            precision = len(common) / len(auto_kpis)
            recall = len(common) / len(manual_kpis)
            f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

            print(f"\n=== Performance Metrics ===")
            print(f"Precision: {precision:.3f}")
            print(f"Recall: {recall:.3f}")
            print(f"F1 Score: {f1_score:.3f}")

    except Exception as e:
        logging.error(f"Error comparing with manual KPIs: {e}")

In [194]:
# ============ 主执行函数 ============
def main():
    """Main execution function"""
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s: %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S"
    )

    try:
        if not os.path.exists(PDF_PATH):
            logging.error(f"PDF file not found: {PDF_PATH}")
            return

        # Process the PDF
        df_auto = process_sustainability_report_with_enhanced_images(PDF_PATH)

        # Save results
        save_results(df_auto, EXPORT_AUTO_XLSX, PDF_PATH)

        logging.info(f"KPI extraction completed: {len(df_auto)} KPIs extracted")

        # Compare with manual annotations if available
        if MANUAL_XLSX:
            compare_with_manual_kpis(df_auto, MANUAL_XLSX)

        # Display summary
        if not df_auto.empty:
            print(f"\n=== Extraction Summary ===")
            print(f"Total KPIs extracted: {len(df_auto)}")

            # Source statistics
            if 'source_type' in df_auto.columns:
                source_counts = df_auto['source_type'].value_counts()
                print(f"From text/tables: {source_counts.get('text', 0)}")
                print(f"From images/charts: {source_counts.get('image', 0)}")

            # Theme statistics
            if 'kpi_theme' in df_auto.columns:
                theme_counts = df_auto['kpi_theme'].value_counts()
                print(f"\nKPI Distribution by Theme:")
                for theme, count in theme_counts.items():
                    print(f"  {theme}: {count}")
        else:
            print("\nNo KPIs were extracted from the document.")

    except Exception as e:
        logging.error(f"Error in main execution: {e}")
        import traceback
        traceback.print_exc()

In [195]:
# ============ 辅助功能函数 ============
def install_dependencies():
    """Install required dependencies"""
    try:
        import subprocess
        import sys

        dependencies = [
            "openai",
            "python-dotenv",
            "pdfplumber",
            "tiktoken",
            "pandas",
            "PyMuPDF",
            "Pillow",
            "openpyxl"
        ]

        for dep in dependencies:
            try:
                __import__(dep.replace('-', '_'))
                print(f"✅ {dep} is already installed")
            except ImportError:
                print(f"Installing {dep}...")
                subprocess.check_call([sys.executable, "-m", "pip", "install", dep])
                print(f"✅ Installed {dep}")

        # Optional Camelot installation
        try:
            import camelot
            print("✅ Camelot is already installed")
        except ImportError:
            print("Installing Camelot (optional)...")
            try:
                subprocess.check_call([sys.executable, "-m", "pip", "install", "camelot-py[cv]"])
                print("✅ Installed Camelot")
            except:
                print("⚠️ Camelot installation failed (optional dependency)")

        print("🎉 All dependencies checked/installed successfully!")

    except Exception as e:
        print(f"❌ Error with dependencies: {e}")

def validate_environment():
    """Validate environment setup"""
    issues = []

    # Check API key
    if not os.getenv("OPENAI_API_KEY"):
        issues.append("OPENAI_API_KEY not found in environment variables")

    # Check PDF file
    if not os.path.exists(PDF_PATH):
        issues.append(f"PDF file not found: {PDF_PATH}")

    # Check required imports
    required_modules = ['openai', 'pdfplumber', 'pandas', 'tiktoken', 'PIL', 'fitz']
    for module in required_modules:
        try:
            __import__(module)
        except ImportError:
            issues.append(f"Required module '{module}' not installed")

    if issues:
        print("❌ Environment validation failed:")
        for issue in issues:
            print(f"  - {issue}")
        return False
    else:
        print("✅ Environment validation passed")
        return True


In [196]:
# ============ 简化的执行接口 ============
def run_kpi_extraction():
    """Simplified interface to run KPI extraction"""
    print("🚀 Starting KPI extraction process...")

    # Validate environment
    if not validate_environment():
        print("Please fix the environment issues before running.")
        return

    # Run main function
    main()

In [197]:
# ============ 调试和测试功能 ============
def test_text_extraction_only():
    """Test only text extraction without images"""
    logging.basicConfig(level=logging.INFO)

    try:
        # Extract text and tables
        full_text = pdf_to_text_and_tables(PDF_PATH)
        camelot_tables = camelot_extra_tables_enhanced(PDF_PATH)

        if camelot_tables:
            full_text += "\n\n" + "\n\n".join(camelot_tables)

        # Chunk text
        chunks = split_into_chunks(full_text, MAX_TOKENS_CHUNK)

        # Extract KPIs from first few chunks
        test_kpis = []
        for idx, chunk in enumerate(chunks[:3]):  # Test first 3 chunks
            chunk_kpis = extract_kpi_from_chunk_universal(chunk)
            test_kpis.extend(chunk_kpis)
            time.sleep(SLEEP_SEC)

        print(f"Test extraction completed: {len(test_kpis)} KPIs found in first 3 chunks")

        for i, kpi in enumerate(test_kpis[:5]):  # Show first 5
            print(f"{i+1}. {kpi.get('kpi_text', 'No text')}")

    except Exception as e:
        print(f"Test failed: {e}")

def debug_single_image_analysis(image_path: str):
    """Test single image analysis functionality"""
    try:
        from PIL import Image
        image = Image.open(image_path)

        print(f"Analyzing image: {image_path}")
        print(f"Image size: {image.width}x{image.height}")

        kpis = extract_kpi_from_image_fixed(image, 1)

        print(f"\n=== Analysis Results ===")
        print(f"Found {len(kpis)} KPIs:")

        for i, kpi in enumerate(kpis, 1):
            print(f"\n{i}. {kpi.get('kpi_text', 'No text')}")
            print(f"   Value: {kpi.get('quantitative_value', 'No value')}")
            print(f"   Confidence: {kpi.get('estimation_confidence', 'Not specified')}")

    except Exception as e:
        print(f"Error in debug analysis: {e}")

def process_text_only():
    """Process only text and tables, skip images"""
    logging.basicConfig(level=logging.INFO)

    try:
        logging.info("Starting text-only processing...")

        # Step 1: Text and table extraction
        full_text = pdf_to_text_and_tables(PDF_PATH)
        camelot_tables = camelot_extra_tables_enhanced(PDF_PATH)

        if camelot_tables:
            full_text += "\n\n" + "\n\n".join(camelot_tables)

        # Step 2: Chunking
        chunks = split_into_chunks(full_text, MAX_TOKENS_CHUNK)

        # Step 3: Extract KPIs
        all_kpis = []
        for idx, chunk in enumerate(chunks, 1):
            logging.info(f"Processing chunk {idx}/{len(chunks)}")
            if chunk.strip():
                chunk_kpis = extract_kpi_from_chunk_universal(chunk)
                all_kpis.extend(chunk_kpis)
                if idx < len(chunks):
                    time.sleep(SLEEP_SEC)

        # Post-processing
        all_kpis = post_process_kpis_universal(all_kpis)

        # Convert to DataFrame
        df_auto = pd.DataFrame(all_kpis)

        if not df_auto.empty:
            df_auto = df_auto.drop_duplicates(subset=['kpi_text'], keep='first')

        # Save results
        text_only_output = "text_only_" + EXPORT_AUTO_XLSX
        save_results(df_auto, text_only_output, PDF_PATH)

        print(f"Text-only processing completed: {len(df_auto)} KPIs extracted")

        return df_auto

    except Exception as e:
        logging.error(f"Text-only processing failed: {e}")
        return pd.DataFrame()

In [198]:
# ============ 兼容性函数 ============
def extract_kpi_from_chunk(chunk: str) -> List[Dict]:
    """Backward compatibility function"""
    return extract_kpi_from_chunk_universal(chunk)

def process_sustainability_report(pdf_path: str) -> pd.DataFrame:
    """Backward compatibility function for text-only processing"""
    return process_text_only()

def process_sustainability_report_with_images(pdf_path: str) -> pd.DataFrame:
    """Backward compatibility function for full processing"""
    return process_sustainability_report_with_enhanced_images(pdf_path)


In [199]:
# ============ 使用示例 ============
def example_usage():
    """Usage examples"""
    print("=== KPI Extraction Tool Usage Examples ===\n")

    print("1. Full extraction (text + images):")
    print("   df_results = process_sustainability_report_with_enhanced_images(PDF_PATH)")
    print("   save_results(df_results, EXPORT_AUTO_XLSX, PDF_PATH)\n")

    print("2. Text-only extraction:")
    print("   df_results = process_text_only()")
    print("   # Results automatically saved\n")

    print("3. Simple run:")
    print("   run_kpi_extraction()  # Complete pipeline with validation\n")

    print("4. Debug single component:")
    print("   test_text_extraction_only()  # Test first 3 chunks")
    print("   debug_single_image_analysis('path/to/image.jpg')\n")

    print("5. Install dependencies:")
    print("   install_dependencies()  # Install all required packages\n")


In [200]:
# ============================================================================
# 调试代码 - 直接复制粘贴到你的代码末尾
# ============================================================================

# 方法1: 检查所有图像提取和识别情况
def debug_method_1_check_image_detection():
    """检查PDF中的所有图像是否被正确提取，以及图表分类器是否工作正常"""
    print("=== 方法1: 检查图像提取和图表识别 ===")

    # 创建调试文件夹
    import os
    debug_folder = "debug_images_method1"
    os.makedirs(debug_folder, exist_ok=True)

    try:
        # 提取所有图像
        images = extract_images_from_pdf_fixed(PDF_PATH)
        print(f"从PDF中提取到 {len(images)} 个图像")

        chart_count = 0
        non_chart_count = 0

        for i, img_info in enumerate(images):
            page_num = img_info['page_number']
            img_type = img_info['type']
            image = img_info['image']

            # 检查是否被识别为图表
            is_chart = is_chart_image(image)

            # 保存图像，文件名包含识别结果
            chart_status = "CHART" if is_chart else "NOT_CHART"
            filename = f"{debug_folder}/page_{page_num}_{img_type}_{chart_status}_{i}.jpg"
            image.save(filename)

            print(f"图像 {i+1}: 页面{page_num}, 类型{img_type}, 尺寸{image.width}x{image.height}, 图表识别:{is_chart}")

            if is_chart:
                chart_count += 1
            else:
                non_chart_count += 1

        print(f"\n总结:")
        print(f"- 被识别为图表的图像: {chart_count}")
        print(f"- 未被识别为图表的图像: {non_chart_count}")
        print(f"- 所有图像已保存到 {debug_folder} 文件夹")
        print(f"- 请手动检查 NOT_CHART 的图像，看是否包含你缺失的饼图")

        return images

    except Exception as e:
        print(f"方法1执行出错: {e}")
        import traceback
        traceback.print_exc()
        return []

# 方法2: 临时禁用图表分类器
def debug_method_2_bypass_chart_filter():
    """完全禁用图表分类器，强制处理所有图像"""
    print("=== 方法2: 禁用图表分类器 ===")

    # 保存原始的图表分类器函数
    global is_chart_image
    original_chart_classifier = is_chart_image

    # 创建新的分类器（总是返回True）
    def bypass_chart_classifier(image):
        print(f"  🔓 强制处理图像 (尺寸: {image.width}x{image.height})")
        return True

    # 临时替换分类器
    is_chart_image = bypass_chart_classifier

    try:
        print("开始重新提取图像KPI（已禁用图表过滤）...")

        # 重新运行图像处理
        image_kpis = process_pdf_images_for_kpis_fixed(PDF_PATH)

        print(f"禁用过滤器后提取到 {len(image_kpis)} 个图像KPI")

        # 显示结果
        for i, kpi in enumerate(image_kpis):
            print(f"{i+1}. 页面{kpi.get('source_page', 'Unknown')}: {kpi.get('kpi_text', 'No text')[:100]}")

        return image_kpis

    except Exception as e:
        print(f"方法2执行出错: {e}")
        import traceback
        traceback.print_exc()
        return []

    finally:
        # 恢复原始分类器
        is_chart_image = original_chart_classifier
        print("已恢复原始图表分类器")

# 方法3: 手动测试特定图像
def debug_method_3_manual_test():
    """手动选择图像进行测试"""
    print("=== 方法3: 手动测试特定图像 ===")

    try:
        images = extract_images_from_pdf_fixed(PDF_PATH)
        print(f"找到 {len(images)} 个图像")

        # 显示所有图像信息
        for i, img_info in enumerate(images):
            page_num = img_info['page_number']
            img_type = img_info['type']
            image = img_info['image']
            is_chart = is_chart_image(image)

            print(f"{i+1}. 页面{page_num}, 类型{img_type}, 尺寸{image.width}x{image.height}, 图表:{is_chart}")

        # 让用户选择要测试的图像
        while True:
            try:
                choice = input(f"\n请选择要测试的图像编号 (1-{len(images)}, 输入0退出): ")
                if choice == '0':
                    break

                img_index = int(choice) - 1
                if 0 <= img_index < len(images):
                    img_info = images[img_index]
                    page_num = img_info['page_number']
                    image = img_info['image']

                    print(f"\n测试图像 {choice} (页面 {page_num})")

                    # 保存这个图像供检查
                    test_filename = f"test_image_{choice}_page_{page_num}.jpg"
                    image.save(test_filename)
                    print(f"图像已保存为: {test_filename}")

                    # 测试提取
                    kpis = extract_kpi_from_image_fixed(image, page_num, "manual_test")

                    print(f"提取结果: {len(kpis)} 个KPI")
                    for j, kpi in enumerate(kpis):
                        print(f"  KPI {j+1}: {kpi.get('kpi_text', 'No text')}")

                else:
                    print("无效的选择")

            except ValueError:
                print("请输入有效的数字")
            except KeyboardInterrupt:
                break
            except Exception as e:
                print(f"测试出错: {e}")

    except Exception as e:
        print(f"方法3执行出错: {e}")
        import traceback
        traceback.print_exc()

# 方法4: 简化的提取测试
def debug_method_4_simple_test():
    """使用简化的方法测试图像提取"""
    print("=== 方法4: 简化提取测试 ===")

    simple_prompt = """请分析这个图表，提取所有的数字数据。

返回JSON格式，每个数据点包含：
{
  "description": "数据描述",
  "value": "数值",
  "unit": "单位"
}

如果是饼图，请提取每个扇形的百分比。
如果是柱状图，请提取每个柱子的数值。
如果是表格，请提取每个数字。"""

    try:
        images = extract_images_from_pdf_fixed(PDF_PATH)

        for i, img_info in enumerate(images[:5]):  # 只测试前5个图像
            page_num = img_info['page_number']
            image = img_info['image']

            print(f"\n🔍 简化测试图像 {i+1} (页面 {page_num})")

            try:
                base64_image = image_to_base64_fixed(image)
                if not base64_image:
                    continue

                response = client.chat.completions.create(
                    model="gpt-4o",
                    messages=[{
                        "role": "user",
                        "content": [
                            {"type": "text", "text": simple_prompt},
                            {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}
                        ]
                    }],
                    temperature=0.0,
                    max_tokens=2000,
                    timeout=30
                )

                content = response.choices[0].message.content.strip()
                print(f"API响应: {content[:300]}...")

                # 检查是否包含你要找的数据
                if "property type" in content.lower() or "service type" in content.lower():
                    print("🎉 可能找到了缺失的饼图数据！")
                    print(f"完整响应: {content}")

            except Exception as e:
                print(f"❌ 简化测试失败: {e}")

            time.sleep(1)

    except Exception as e:
        print(f"方法4执行出错: {e}")
        import traceback
        traceback.print_exc()

# 调试主控制函数
def run_debugging_session():
    """调试会话主控制函数"""
    print("🔧 KPI提取调试会话")
    print("=" * 50)

    while True:
        try:
            choice = input("""\n选择调试方法：
1 - 检查所有图像的提取和识别情况
2 - 禁用图表分类器，强制处理所有图像
3 - 手动选择图像进行测试
4 - 简化API测试（测试前5个图像）
0 - 退出调试

请输入选择 (0-4): """)

            if choice == "1":
                debug_method_1_check_image_detection()
            elif choice == "2":
                debug_method_2_bypass_chart_filter()
            elif choice == "3":
                debug_method_3_manual_test()
            elif choice == "4":
                debug_method_4_simple_test()
            elif choice == "0":
                print("退出调试会话")
                break
            else:
                print("无效选择，请重新输入")

        except KeyboardInterrupt:
            print("\n用户中断，退出调试")
            break
        except Exception as e:
            print(f"调试会话出错: {e}")
            import traceback
            traceback.print_exc()

# ============================================================================
# 单独的快速测试函数（如果你不想用交互式界面）
# ============================================================================

def quick_debug():
    """快速调试 - 直接运行方法1"""
    print("🚀 快速调试模式")
    debug_method_1_check_image_detection()

# ============================================================================
# 使用方法
# ============================================================================

# 在你的代码最后，现在可以运行以下任意一个：

# 选项1: 交互式调试（推荐）
# run_debugging_session()

# 选项2: 快速调试，直接检查图像
# quick_debug()

# 选项3: 直接运行特定方法
# debug_method_1_check_image_detection()

In [201]:
# 测试函数：验证新的提示词是否有效
def test_improved_prompt():
    """测试改进的提示词是否能正确提取百分比"""
    print("=== 测试改进的提示词 ===")

    try:
        # 提取页面2的全页图像
        images = extract_images_from_pdf_fixed(PDF_PATH)
        page2_image = None

        for img_info in images:
            if img_info['page_number'] == 2 and img_info['type'] == 'full_page':
                page2_image = img_info['image']
                break

        if page2_image is None:
            print("❌ 找不到页面2图像")
            return

        print(f"✅ 找到页面2图像，尺寸: {page2_image.width}x{page2_image.height}")

        # 使用改进的提示词测试
        base64_image = image_to_base64_fixed(page2_image)

        print("🔄 使用改进的提示词调用API...")
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": ENHANCED_IMAGE_KPI_SYSTEM_PROMPT},
                {"role": "user", "content": [
                    {"type": "text", "text": """分析这个页面，重点关注两个饼图：

1. 上方饼图："Energy Use by Property Type 2021"
2. 下方饼图："Energy Use by Service Type 2021"

请提取每个饼图中每个扇形的具体百分比数值。确保包含实际的数字，不只是描述。"""},
                    {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}", "detail": "high"}}
                ]}
            ],
            temperature=0.0,
            max_tokens=4000,
            timeout=90
        )

        content = response.choices[0].message.content.strip()
        print("\n📋 API响应:")
        print(content[:500] + "..." if len(content) > 500 else content)

        # 尝试解析JSON
        try:
            if content.startswith('```json'):
                content = content[7:]
            if content.endswith('```'):
                content = content[:-3]
            content = content.strip()

            if content.startswith('['):
                result = json.loads(content)
                print(f"\n✅ 成功解析JSON，找到 {len(result)} 个KPI")

                # 检查是否提取了具体的百分比
                found_percentages = []
                for kpi in result:
                    if isinstance(kpi, dict):
                        kpi_text = kpi.get('kpi_text', '')
                        quantitative_value = kpi.get('quantitative_value', '')

                        print(f"KPI: {kpi_text}")
                        print(f"  数值: {quantitative_value}")

                        # 检查是否包含期望的百分比
                        if any(target in kpi_text.lower() for target in ['64%', '33%', '68%', '30%', 'healthcare center', 'medical office', 'electricity', 'fuel']):
                            found_percentages.append(kpi)
                            print(f"  🎯 找到目标数据!")
                        print()

                if found_percentages:
                    print(f"🎉 成功提取了 {len(found_percentages)} 个包含具体百分比的KPI!")
                    return True
                else:
                    print("❌ 仍然没有提取到具体的百分比数值")
                    return False
            else:
                print("❌ API响应不是JSON格式")
                return False

        except json.JSONDecodeError as e:
            print(f"❌ JSON解析失败: {e}")
            return False

    except Exception as e:
        print(f"❌ 测试失败: {e}")
        import traceback
        traceback.print_exc()
        return False

# 快速修复函数：替换提示词并重新运行提取
def quick_fix_and_rerun():
    """应用修复并重新运行完整的提取流程"""
    print("🔧 应用修复并重新运行...")

    # 首先测试新提示词
    if test_improved_prompt():
        print("\n✅ 新提示词测试成功!")

        # 重新运行完整的提取流程
        print("\n🔄 重新运行完整的KPI提取...")
        try:
            df_auto = process_sustainability_report_with_enhanced_images(PDF_PATH)

            # 保存结果
            save_results(df_auto, "fixed_" + EXPORT_AUTO_XLSX, PDF_PATH)

            print(f"\n🎉 修复完成! 总共提取了 {len(df_auto)} 个KPI")
            print("结果已保存到 fixed_" + EXPORT_AUTO_XLSX)

            # 显示包含饼图数据的KPI
            pie_chart_kpis = df_auto[df_auto['kpi_text'].str.contains('pie|Pie', case=False, na=False)]
            print(f"\n📊 饼图相关的KPI ({len(pie_chart_kpis)} 个):")
            for idx, row in pie_chart_kpis.iterrows():
                print(f"- {row['kpi_text']}")

            return df_auto

        except Exception as e:
            print(f"❌ 重新运行失败: {e}")
            return None
    else:
        print("\n❌ 新提示词测试失败，需要进一步调试")
        return None

In [202]:
# 通用测试函数
def test_universal_prompt():
    """测试通用提示词的效果"""
    print("=== 测试通用图像分析提示词 ===")

    try:
        # 提取页面2的全页图像进行测试
        images = extract_images_from_pdf_fixed(PDF_PATH)
        page2_image = None

        for img_info in images:
            if img_info['page_number'] == 2 and img_info['type'] == 'full_page':
                page2_image = img_info['image']
                break

        if page2_image is None:
            print("❌ 找不到页面2图像")
            return False

        print(f"✅ 找到页面2图像，尺寸: {page2_image.width}x{page2_image.height}")

        # 使用通用提示词测试
        base64_image = image_to_base64_fixed(page2_image)

        print("🔄 使用通用提示词调用API...")
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": ENHANCED_IMAGE_KPI_SYSTEM_PROMPT},
                {"role": "user", "content": [
                    {"type": "text", "text": """请分析这个页面中的所有图表和表格，提取所有可量化的数据点。

重点关注：
- 饼图中每个扇形的具体百分比
- 表格中的所有数值数据
- 确保每个提取的KPI都包含具体的数字，不只是描述

请确保提取完整的上下文信息。"""},
                    {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}", "detail": "high"}}
                ]}
            ],
            temperature=0.0,
            max_tokens=4000,
            timeout=90
        )

        content = response.choices[0].message.content.strip()
        print(f"\n📋 API响应长度: {len(content)} 字符")

        # 解析和验证结果
        try:
            if content.startswith('```json'):
                content = content[7:]
            if content.endswith('```'):
                content = content[:-3]
            content = content.strip()

            if content.startswith('['):
                result = json.loads(content)
                print(f"✅ 成功解析JSON，找到 {len(result)} 个KPI")

                # 分析提取质量
                complete_kpis = 0
                pie_chart_kpis = 0
                table_kpis = 0

                print("\n📊 提取的KPI列表:")
                for i, kpi in enumerate(result, 1):
                    if isinstance(kpi, dict):
                        kpi_text = kpi.get('kpi_text', '')
                        quantitative_value = kpi.get('quantitative_value', '')
                        chart_type = kpi.get('chart_type', '')

                        print(f"{i:2d}. {kpi_text}")
                        print(f"    数值: {quantitative_value} {kpi.get('unit', '')}")
                        print(f"    类型: {chart_type}")

                        # 统计分析
                        if quantitative_value and str(quantitative_value).strip():
                            complete_kpis += 1

                        if 'pie' in chart_type.lower():
                            pie_chart_kpis += 1
                        elif 'table' in chart_type.lower():
                            table_kpis += 1

                        print()

                print(f"📈 质量分析:")
                print(f"  - 包含数值的KPI: {complete_kpis}/{len(result)} ({complete_kpis/len(result)*100:.1f}%)")
                print(f"  - 饼图KPI: {pie_chart_kpis}")
                print(f"  - 表格KPI: {table_kpis}")

                # 检查是否提取了目标数据
                success_indicators = [
                    any('64' in str(kpi.get('quantitative_value', '')) for kpi in result),
                    any('33' in str(kpi.get('quantitative_value', '')) for kpi in result),
                    any('68' in str(kpi.get('quantitative_value', '')) for kpi in result),
                    any('30' in str(kpi.get('quantitative_value', '')) for kpi in result)
                ]

                if any(success_indicators):
                    print("🎉 成功提取了目标饼图数据!")
                    return True
                else:
                    print("⚠️ 可能没有提取到期望的饼图百分比")
                    return False
            else:
                print("❌ API响应不是JSON格式")
                print(f"响应内容: {content[:300]}...")
                return False

        except json.JSONDecodeError as e:
            print(f"❌ JSON解析失败: {e}")
            print(f"响应内容: {content[:300]}...")
            return False

    except Exception as e:
        print(f"❌ 测试失败: {e}")
        import traceback
        traceback.print_exc()
        return False

# 应用通用修复
def apply_universal_fix():
    """应用通用提示词修复并重新运行"""
    print("🔧 应用通用提示词修复...")

    # 首先测试新提示词
    print("第一步: 测试新的通用提示词...")
    if test_universal_prompt():
        print("\n✅ 通用提示词测试成功!")

        # 询问是否继续完整提取
        try:
            proceed = input("\n是否继续运行完整的KPI提取? (y/n): ").lower()
            if proceed == 'y':
                print("\n🔄 重新运行完整的KPI提取...")
                df_auto = process_sustainability_report_with_enhanced_images(PDF_PATH)

                # 保存结果
                output_file = "universal_fixed_" + EXPORT_AUTO_XLSX
                save_results(df_auto, output_file, PDF_PATH)

                print(f"\n🎉 修复完成! 总共提取了 {len(df_auto)} 个KPI")
                print(f"结果已保存到 {output_file}")

                # 显示图像来源的KPI统计
                if 'source_type' in df_auto.columns:
                    image_kpis = df_auto[df_auto['source_type'] == 'image']
                    print(f"\n📊 从图像中提取的KPI: {len(image_kpis)} 个")

                return df_auto
            else:
                print("已取消完整提取")
                return None

        except KeyboardInterrupt:
            print("\n用户取消操作")
            return None

    else:
        print("\n❌ 通用提示词测试失败")
        print("建议检查API响应或进一步调整提示词")
        return None

In [203]:
# ============ 执行入口 ============
if __name__ == "__main__":
    # Uncomment to install dependencies first
    # install_dependencies()

    # Uncomment to see usage examples
    # example_usage()

    # Run the main extraction
    run_kpi_extraction()
    #run_debugging_session()
    #apply_universal_fix()

🚀 Starting KPI extraction process...
✅ Environment validation passed


  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  if self._document_has_no_text():
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  i


=== Extraction Summary ===
Total KPIs extracted: 566
From text/tables: 412
From images/charts: 154

KPI Distribution by Theme:
  Environmental: 508
  Social: 27
  Economic: 18
  Governance: 13
