<a href="https://colab.research.google.com/github/Tar-ive/proposal_critique/blob/main/proposal_critique.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install PyPDF2
!pip install pandas

Collecting PyPDF2
  Downloading pypdf2-3.0.1-py3-none-any.whl.metadata (6.8 kB)
Downloading pypdf2-3.0.1-py3-none-any.whl (232 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/232.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━[0m [32m153.6/232.6 kB[0m [31m4.6 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m232.6/232.6 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: PyPDF2
Successfully installed PyPDF2-3.0.1


In [None]:
import pandas as pd
import PyPDF2
import re
from typing import Dict, List, Any
import io

class NIHGrantParser:
    def __init__(self):
        self.data = {}

    def extract_basic_info(self, text: str) -> Dict[str, Any]:
        """Extract basic application information"""
        basic_info = {}

        # Extract PI name
        pi_match = re.search(r'Contact PD/PI:\s*([^,\n]+)', text)
        basic_info['PI_Name'] = pi_match.group(1).strip() if pi_match else ''

        # Extract application ID
        app_id_match = re.search(r'Application Identifier\s*([A-Z0-9]+)', text)
        basic_info['Application_ID'] = app_id_match.group(1) if app_id_match else ''

        # Extract funding opportunity
        funding_match = re.search(r'Funding Opportunity Number:\s*([A-Z0-9-]+)', text)
        basic_info['Funding_Opportunity'] = funding_match.group(1) if funding_match else ''

        # Extract title
        title_match = re.search(r'DESCRIPTIVE TITLE OF APPLICANT\'S PROJECT\*\s*([^\n]+)', text)
        if not title_match:
            title_match = re.search(r'Title:\s*([^\n]+)', text)
        basic_info['Project_Title'] = title_match.group(1).strip() if title_match else ''

        # Extract organization
        org_match = re.search(r'Legal Name\*:\s*([^\n]+)', text)
        basic_info['Organization'] = org_match.group(1).strip() if org_match else ''

        # Extract project dates
        start_date_match = re.search(r'Start Date\*\s*([0-9/]+)', text)
        end_date_match = re.search(r'Ending Date\*\s*([0-9/]+)', text)
        basic_info['Start_Date'] = start_date_match.group(1) if start_date_match else ''
        basic_info['End_Date'] = end_date_match.group(1) if end_date_match else ''

        return basic_info

    def extract_personnel(self, text: str) -> List[Dict[str, Any]]:
        """Extract key personnel information"""
        personnel = []

        # Find personnel section
        personnel_patterns = [
            r'Senior/Key Personnel:\s*Organization:\s*Role Category:(.*?)(?=\n\n|\nAlways follow)',
            r'PROFILE - Project Director/Principal Investigator(.*?)(?=PROFILE - Senior/Key Person|$)',
            r'PROFILE - Senior/Key Person(.*?)(?=PROFILE - Senior/Key Person|$)'
        ]

        for pattern in personnel_patterns:
            matches = re.finditer(pattern, text, re.DOTALL)
            for match in matches:
                person_text = match.group(1)
                person_info = self._parse_person_info(person_text)
                if person_info:
                    personnel.append(person_info)

        return personnel

    def _parse_person_info(self, text: str) -> Dict[str, Any]:
        """Parse individual person information"""
        person = {}

        # Extract name
        name_match = re.search(r'First Name\*:\s*([^\n]+).*?Last Name\*:\s*([^\n]+)', text, re.DOTALL)
        if name_match:
            person['First_Name'] = name_match.group(1).strip()
            person['Last_Name'] = name_match.group(2).strip()
            person['Full_Name'] = f"{person['First_Name']} {person['Last_Name']}"

        # Extract role
        role_match = re.search(r'Project Role\*:\s*([^\n]+)', text)
        person['Role'] = role_match.group(1).strip() if role_match else ''

        # Extract organization
        org_match = re.search(r'Organization Name\*:\s*([^\n]+)', text)
        person['Organization'] = org_match.group(1).strip() if org_match else ''

        # Extract position
        position_match = re.search(r'Position/Title\*:\s*([^\n]+)', text)
        person['Position'] = position_match.group(1).strip() if position_match else ''

        # Extract degree
        degree_match = re.search(r'Degree Type:\s*([^\n]+)', text)
        person['Degree'] = degree_match.group(1).strip() if degree_match else ''

        return person if person else None

    def extract_budget_info(self, text: str) -> Dict[str, Any]:
        """Extract budget information"""
        budget_info = {}

        # Extract total federal funds
        federal_funds_match = re.search(r'Total Federal Funds Requested\*\s*\$([0-9,]+)', text)
        budget_info['Total_Federal_Funds'] = federal_funds_match.group(1) if federal_funds_match else ''

        # Extract total project funding
        total_funds_match = re.search(r'Total Federal & Non-Federal Funds\*\s*\$([0-9,]+)', text)
        budget_info['Total_Project_Funds'] = total_funds_match.group(1) if total_funds_match else ''

        # Extract budget periods (simplified)
        budget_periods = re.findall(r'Budget Period ([0-9]+).*?\$([0-9,]+)', text)
        for period, amount in budget_periods:
            budget_info[f'Budget_Period_{period}'] = amount

        return budget_info

    def extract_research_info(self, text: str) -> Dict[str, Any]:
        """Extract research-specific information"""
        research_info = {}

        # Extract abstract/summary
        abstract_match = re.search(r'Summary:(.*?)(?=Project Narrative|Narrative:|$)', text, re.DOTALL)
        if abstract_match:
            research_info['Abstract'] = abstract_match.group(1).strip()[:1000]  # Limit length

        # Extract specific aims
        aims_match = re.search(r'Specific Aims?\.(.*?)(?=Research Strategy|B\. Innovation|$)', text, re.DOTALL)
        if aims_match:
            research_info['Specific_Aims'] = aims_match.group(1).strip()[:1000]

        # Extract human subjects info
        human_subjects_match = re.search(r'Are Human Subjects Involved\?\*\s*(Yes|No)', text)
        research_info['Human_Subjects'] = human_subjects_match.group(1) if human_subjects_match else ''

        # Extract vertebrate animals info
        animals_match = re.search(r'Are Vertebrate Animals Used\?\*\s*(Yes|No)', text)
        research_info['Vertebrate_Animals'] = animals_match.group(1) if animals_match else ''

        # Extract clinical trial info
        clinical_trial_match = re.search(r'Clinical Trial:\s*(Y|N)', text)
        research_info['Clinical_Trial'] = clinical_trial_match.group(1) if clinical_trial_match else ''

        return research_info

    def extract_sbir_info(self, text: str) -> Dict[str, Any]:
        """Extract SBIR/STTR specific information"""
        sbir_info = {}

        # Extract program type
        program_match = re.search(r'Program Type.*?(SBIR|STTR)', text)
        sbir_info['Program_Type'] = program_match.group(1) if program_match else ''

        # Extract application type
        app_type_match = re.search(r'Application Type.*?(Phase I|Phase II|Fast-Track)', text)
        sbir_info['Application_Type'] = app_type_match.group(1) if app_type_match else ''

        # Extract small business info
        sb_match = re.search(r'small business eligibility.*?(Yes|No)', text)
        sbir_info['Small_Business_Eligible'] = sb_match.group(1) if sb_match else ''

        return sbir_info

    def parse_pdf_text(self, pdf_path: str) -> str:
        """Extract text from PDF"""
        text = ""
        try:
            with open(pdf_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                for page in pdf_reader.pages:
                    text += page.extract_text() + "\n"
        except Exception as e:
            print(f"Error reading PDF: {e}")
        return text

    def parse_grant_application(self, pdf_path: str) -> Dict[str, Any]:
        """Main parsing function"""
        print("Extracting text from PDF...")
        text = self.parse_pdf_text(pdf_path)

        print("Parsing basic information...")
        basic_info = self.extract_basic_info(text)

        print("Parsing personnel information...")
        personnel = self.extract_personnel(text)

        print("Parsing budget information...")
        budget_info = self.extract_budget_info(text)

        print("Parsing research information...")
        research_info = self.extract_research_info(text)

        print("Parsing SBIR information...")
        sbir_info = self.extract_sbir_info(text)

        # Combine all information
        grant_data = {
            **basic_info,
            **budget_info,
            **research_info,
            **sbir_info
        }

        return grant_data, personnel

    def save_to_csv(self, pdf_path: str, output_prefix: str = "nih_grant"):
        """Parse PDF and save to CSV files"""
        grant_data, personnel = self.parse_grant_application(pdf_path)

        # Save main grant information
        grant_df = pd.DataFrame([grant_data])
        grant_df.to_csv(f"{output_prefix}_main.csv", index=False)
        print(f"Main grant data saved to {output_prefix}_main.csv")

        # Save personnel information
        if personnel:
            personnel_df = pd.DataFrame(personnel)
            personnel_df.to_csv(f"{output_prefix}_personnel.csv", index=False)
            print(f"Personnel data saved to {output_prefix}_personnel.csv")

        # Save summary information
        summary_data = {
            'Field': list(grant_data.keys()),
            'Value': list(grant_data.values())
        }
        summary_df = pd.DataFrame(summary_data)
        summary_df.to_csv(f"{output_prefix}_summary.csv", index=False)
        print(f"Summary data saved to {output_prefix}_summary.csv")

        return grant_df, personnel_df if personnel else None


# Usage example
if __name__ == "__main__":
    parser = NIHGrantParser()

    # Replace with your PDF path
    pdf_path = "harty-application.pdf"

    try:
        grant_df, personnel_df = parser.save_to_csv(pdf_path, "nih_grant_harty")

        print("\nExtracted Grant Information:")
        print(grant_df.head())

        if personnel_df is not None:
            print("\nExtracted Personnel Information:")
            print(personnel_df.head())

    except Exception as e:
        print(f"Error processing PDF: {e}")

Extracting text from PDF...
Parsing basic information...
Parsing personnel information...
Parsing budget information...
Parsing research information...
Parsing SBIR information...
Main grant data saved to nih_grant_harty_main.csv
Personnel data saved to nih_grant_harty_personnel.csv
Summary data saved to nih_grant_harty_summary.csv

Extracted Grant Information:
  PI_Name Application_ID Funding_Opportunity  \
0   Harty              1           PA-20-265   

                                       Project_Title  \
0  Development of Small Molecule Therapeutics Tar...   

                              Organization Start_Date    End_Date  \
0  Fox Chase Chemical Diversity Center Inc             04/01/2021   

  Total_Federal_Funds Total_Project_Funds  \
0                                           

                                            Abstract  \
0  The ultimate goal of this Phase II application...   

                                       Specific_Aims Human_Subjects  \
0  The ultim

In [None]:
import pandas as pd
import PyPDF2
import re
from typing import Dict, List, Any, Optional, Callable
import json
import yaml
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
import logging
from pathlib import Path

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class ExtractionRule:
    """Defines a rule for extracting data from text"""
    name: str
    pattern: str
    extraction_type: str = "single"  # single, multiple, table
    post_process: Optional[str] = None
    required: bool = False
    default_value: Any = ""
    max_length: Optional[int] = None

@dataclass
class TableExtractionRule:
    """Defines rules for extracting tabular data"""
    name: str
    start_pattern: str
    end_pattern: str
    column_patterns: List[str]
    row_separator: str = "\n"

@dataclass
class SectionConfig:
    """Configuration for a document section"""
    name: str
    start_pattern: Optional[str] = None
    end_pattern: Optional[str] = None
    extraction_rules: List[ExtractionRule] = field(default_factory=list)
    table_rules: List[TableExtractionRule] = field(default_factory=list)
    subsections: List['SectionConfig'] = field(default_factory=list)

class TextExtractor(ABC):
    """Abstract base class for text extraction"""

    @abstractmethod
    def extract_text(self, file_path: str) -> str:
        pass

class PDFExtractor(TextExtractor):
    """PDF text extraction using PyPDF2"""

    def extract_text(self, file_path: str) -> str:
        text = ""
        try:
            with open(file_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                for page_num, page in enumerate(pdf_reader.pages):
                    try:
                        page_text = page.extract_text()
                        text += f"\n--- PAGE {page_num + 1} ---\n{page_text}\n"
                    except Exception as e:
                        logger.warning(f"Could not extract text from page {page_num + 1}: {e}")
        except Exception as e:
            logger.error(f"Error reading PDF: {e}")
            raise
        return text

class DataProcessor:
    """Handles post-processing of extracted data"""

    @staticmethod
    def clean_text(text: str, max_length: Optional[int] = None) -> str:
        """Clean and normalize text"""
        if not text:
            return ""

        # Remove extra whitespace
        text = re.sub(r'\s+', ' ', text.strip())

        # Limit length if specified
        if max_length and len(text) > max_length:
            text = text[:max_length] + "..."

        return text

    @staticmethod
    def extract_numbers(text: str) -> str:
        """Extract numeric values from text"""
        numbers = re.findall(r'[\d,]+\.?\d*', text)
        return numbers[0] if numbers else ""

    @staticmethod
    def extract_dates(text: str) -> str:
        """Extract date patterns from text"""
        date_patterns = [
            r'\d{1,2}/\d{1,2}/\d{4}',
            r'\d{4}-\d{2}-\d{2}',
            r'\w+ \d{1,2}, \d{4}'
        ]
        for pattern in date_patterns:
            dates = re.findall(pattern, text)
            if dates:
                return dates[0]
        return ""

    @staticmethod
    def extract_emails(text: str) -> List[str]:
        """Extract email addresses from text"""
        email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
        return re.findall(email_pattern, text)

class DataExtractor:
    """Main data extraction engine"""

    def __init__(self, processor: DataProcessor = None):
        self.processor = processor or DataProcessor()

    def extract_by_rule(self, text: str, rule: ExtractionRule) -> Any:
        """Extract data based on a single rule"""
        try:
            if rule.extraction_type == "single":
                match = re.search(rule.pattern, text, re.DOTALL | re.IGNORECASE)
                if match:
                    result = match.group(1) if match.groups() else match.group(0)
                else:
                    result = rule.default_value

            elif rule.extraction_type == "multiple":
                matches = re.findall(rule.pattern, text, re.DOTALL | re.IGNORECASE)
                result = matches if matches else [rule.default_value]

            elif rule.extraction_type == "all_text":
                result = text

            else:
                result = rule.default_value

            # Post-processing
            if rule.post_process and isinstance(result, str):
                if rule.post_process == "clean_text":
                    result = self.processor.clean_text(result, rule.max_length)
                elif rule.post_process == "extract_numbers":
                    result = self.processor.extract_numbers(result)
                elif rule.post_process == "extract_dates":
                    result = self.processor.extract_dates(result)
                elif rule.post_process == "extract_emails":
                    result = self.processor.extract_emails(result)

            return result

        except Exception as e:
            logger.warning(f"Error extracting data for rule {rule.name}: {e}")
            return rule.default_value

    def extract_table_data(self, text: str, table_rule: TableExtractionRule) -> List[Dict[str, str]]:
        """Extract tabular data"""
        try:
            # Find table boundaries
            start_match = re.search(table_rule.start_pattern, text, re.IGNORECASE)
            end_match = re.search(table_rule.end_pattern, text, re.IGNORECASE)

            if not start_match:
                return []

            table_start = start_match.end()
            table_end = end_match.start() if end_match else len(text)
            table_text = text[table_start:table_end]

            # Split into rows
            rows = table_text.split(table_rule.row_separator)

            # Extract data using column patterns
            table_data = []
            for row in rows:
                row_data = {}
                for i, pattern in enumerate(table_rule.column_patterns):
                    match = re.search(pattern, row, re.IGNORECASE)
                    column_name = f"column_{i}" if i < len(table_rule.column_patterns) else f"column_{i}"
                    row_data[column_name] = match.group(1) if match and match.groups() else ""

                # Only add non-empty rows
                if any(row_data.values()):
                    table_data.append(row_data)

            return table_data

        except Exception as e:
            logger.warning(f"Error extracting table data for {table_rule.name}: {e}")
            return []

class DocumentPipeline:
    """Main pipeline for processing documents"""

    def __init__(self, config_path: Optional[str] = None, text_extractor: TextExtractor = None):
        self.text_extractor = text_extractor or PDFExtractor()
        self.data_extractor = DataExtractor()
        self.config = self.load_config(config_path) if config_path else None

    def load_config(self, config_path: str) -> Dict[str, Any]:
        """Load configuration from file"""
        config_path = Path(config_path)

        if config_path.suffix.lower() == '.json':
            with open(config_path, 'r') as f:
                return json.load(f)
        elif config_path.suffix.lower() in ['.yml', '.yaml']:
            with open(config_path, 'r') as f:
                return yaml.safe_load(f)
        else:
            raise ValueError(f"Unsupported config file format: {config_path.suffix}")

    def create_extraction_rules_from_config(self, config: Dict[str, Any]) -> List[SectionConfig]:
        """Convert configuration to extraction rules"""
        sections = []

        for section_data in config.get('sections', []):
            # Create extraction rules
            extraction_rules = []
            for rule_data in section_data.get('extraction_rules', []):
                rule = ExtractionRule(**rule_data)
                extraction_rules.append(rule)

            # Create table rules
            table_rules = []
            for table_data in section_data.get('table_rules', []):
                table_rule = TableExtractionRule(**table_data)
                table_rules.append(table_rule)

            # Create section config
            section = SectionConfig(
                name=section_data['name'],
                start_pattern=section_data.get('start_pattern'),
                end_pattern=section_data.get('end_pattern'),
                extraction_rules=extraction_rules,
                table_rules=table_rules
            )
            sections.append(section)

        return sections

    def extract_section_text(self, full_text: str, section: SectionConfig) -> str:
        """Extract text for a specific section"""
        if not section.start_pattern:
            return full_text

        start_match = re.search(section.start_pattern, full_text, re.DOTALL | re.IGNORECASE)
        if not start_match:
            logger.warning(f"Could not find start pattern for section: {section.name}")
            return ""

        section_start = start_match.start()

        if section.end_pattern:
            end_match = re.search(section.end_pattern, full_text[section_start:], re.DOTALL | re.IGNORECASE)
            section_end = section_start + end_match.start() if end_match else len(full_text)
        else:
            section_end = len(full_text)

        return full_text[section_start:section_end]

    def process_document(self, file_path: str, sections: List[SectionConfig] = None) -> Dict[str, Any]:
        """Process a document and extract structured data"""
        logger.info(f"Processing document: {file_path}")

        # Extract text
        full_text = self.text_extractor.extract_text(file_path)

        if not sections and self.config:
            sections = self.create_extraction_rules_from_config(self.config)
        elif not sections:
            raise ValueError("No extraction configuration provided")

        extracted_data = {}
        all_tables = {}

        for section in sections:
            logger.info(f"Processing section: {section.name}")

            # Get section text
            section_text = self.extract_section_text(full_text, section)

            # Extract data using rules
            section_data = {}
            for rule in section.extraction_rules:
                section_data[rule.name] = self.data_extractor.extract_by_rule(section_text, rule)

            # Extract table data
            for table_rule in section.table_rules:
                table_data = self.data_extractor.extract_table_data(section_text, table_rule)
                all_tables[f"{section.name}_{table_rule.name}"] = table_data

            extracted_data[section.name] = section_data

        return {
            'sections': extracted_data,
            'tables': all_tables,
            'metadata': {
                'file_path': file_path,
                'extraction_timestamp': pd.Timestamp.now().isoformat()
            }
        }

    def save_to_csv(self, extracted_data: Dict[str, Any], output_prefix: str):
        """Save extracted data to CSV files"""
        output_prefix = Path(output_prefix)

        # Save main sections data
        sections_data = []
        for section_name, section_data in extracted_data['sections'].items():
            flat_data = {'section': section_name}
            flat_data.update(section_data)
            sections_data.append(flat_data)

        if sections_data:
            sections_df = pd.DataFrame(sections_data)
            sections_file = f"{output_prefix}_sections.csv"
            sections_df.to_csv(sections_file, index=False)
            logger.info(f"Sections data saved to {sections_file}")

        # Save tables data
        for table_name, table_data in extracted_data['tables'].items():
            if table_data:
                table_df = pd.DataFrame(table_data)
                table_file = f"{output_prefix}_table_{table_name}.csv"
                table_df.to_csv(table_file, index=False)
                logger.info(f"Table data saved to {table_file}")

        # Save flattened data
        flattened_data = {}
        for section_name, section_data in extracted_data['sections'].items():
            for key, value in section_data.items():
                flattened_key = f"{section_name}_{key}"
                if isinstance(value, list):
                    flattened_data[flattened_key] = "; ".join(map(str, value))
                else:
                    flattened_data[flattened_key] = value

        # Add metadata
        flattened_data.update(extracted_data['metadata'])

        flattened_df = pd.DataFrame([flattened_data])
        flattened_file = f"{output_prefix}_flattened.csv"
        flattened_df.to_csv(flattened_file, index=False)
        logger.info(f"Flattened data saved to {flattened_file}")

        return flattened_df

# Configuration builder helper
class ConfigBuilder:
    """Helper class to build extraction configurations"""

    @staticmethod
    def create_nih_grant_config() -> Dict[str, Any]:
        """Create configuration for NIH grant applications"""
        return {
            "document_type": "nih_grant",
            "sections": [
                {
                    "name": "basic_info",
                    "start_pattern": r"APPLICATION FOR FEDERAL ASSISTANCE",
                    "end_pattern": r"Table of Contents",
                    "extraction_rules": [
                        {
                            "name": "pi_name",
                            "pattern": r"Contact PD/PI:\s*([^,\n]+)",
                            "post_process": "clean_text",
                            "required": True
                        },
                        {
                            "name": "application_id",
                            "pattern": r"Application Identifier\s*([A-Z0-9]+)",
                            "required": True
                        },
                        {
                            "name": "project_title",
                            "pattern": r"DESCRIPTIVE TITLE.*?\n([^\n]+)",
                            "post_process": "clean_text",
                            "max_length": 200
                        },
                        {
                            "name": "organization",
                            "pattern": r"Legal Name\*:\s*([^\n]+)",
                            "post_process": "clean_text"
                        },
                        {
                            "name": "start_date",
                            "pattern": r"Start Date\*\s*([0-9/]+)",
                            "post_process": "extract_dates"
                        },
                        {
                            "name": "end_date",
                            "pattern": r"Ending Date\*\s*([0-9/]+)",
                            "post_process": "extract_dates"
                        }
                    ]
                },
                {
                    "name": "personnel",
                    "start_pattern": r"Senior/Key Personnel:",
                    "end_pattern": r"Research & Related Budget",
                    "extraction_rules": [
                        {
                            "name": "personnel_names",
                            "pattern": r"([A-Z][a-z]+ [A-Z][a-z]+) Ph\.D",
                            "extraction_type": "multiple"
                        },
                        {
                            "name": "organizations",
                            "pattern": r"([A-Z][a-z]+ [A-Za-z ]+(?:University|Institute|Center))",
                            "extraction_type": "multiple"
                        }
                    ]
                },
                {
                    "name": "budget",
                    "start_pattern": r"Total Federal Funds Requested",
                    "end_pattern": r"SBIR/STTR Information",
                    "extraction_rules": [
                        {
                            "name": "total_federal_funds",
                            "pattern": r"Total Federal Funds Requested\*\s*\$([0-9,]+)",
                            "post_process": "extract_numbers"
                        },
                        {
                            "name": "total_project_funds",
                            "pattern": r"Total Federal & Non-Federal Funds\*\s*\$([0-9,]+)",
                            "post_process": "extract_numbers"
                        }
                    ]
                },
                {
                    "name": "research_info",
                    "start_pattern": r"Project Summary/Abstract",
                    "end_pattern": r"Bibliography",
                    "extraction_rules": [
                        {
                            "name": "abstract",
                            "pattern": r"Summary:(.*?)(?=Project Narrative|Narrative:)",
                            "post_process": "clean_text",
                            "max_length": 1000
                        },
                        {
                            "name": "human_subjects",
                            "pattern": r"Are Human Subjects Involved\?\*\s*(Yes|No)"
                        },
                        {
                            "name": "vertebrate_animals",
                            "pattern": r"Are Vertebrate Animals Used\?\*\s*(Yes|No)"
                        }
                    ]
                }
            ]
        }

    @staticmethod
    def save_config(config: Dict[str, Any], file_path: str):
        """Save configuration to file"""
        file_path = Path(file_path)

        if file_path.suffix.lower() == '.json':
            with open(file_path, 'w') as f:
                json.dump(config, f, indent=2)
        elif file_path.suffix.lower() in ['.yml', '.yaml']:
            with open(file_path, 'w') as f:
                yaml.dump(config, f, default_flow_style=False)
        else:
            raise ValueError(f"Unsupported config file format: {file_path.suffix}")

# Usage example
if __name__ == "__main__":
    # Create and save NIH grant configuration
    config = ConfigBuilder.create_nih_grant_config()
    ConfigBuilder.save_config(config, "nih_grant_config.json")

    # Initialize pipeline
    pipeline = DocumentPipeline(config_path="nih_grant_config.json")

    # Process document
    try:
        pdf_path = "harty-application.pdf"  # Replace with your PDF path
        extracted_data = pipeline.process_document(pdf_path)

        # Save to CSV
        output_files = pipeline.save_to_csv(extracted_data, "extracted_grant_data")

        print("Processing completed successfully!")
        print(f"Extracted data from {len(extracted_data['sections'])} sections")
        print(f"Found {len(extracted_data['tables'])} tables")

    except Exception as e:
        logger.error(f"Error processing document: {e}")



Processing completed successfully!
Extracted data from 4 sections
Found 0 tables


In [None]:
import json
import re
import PyPDF2
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, asdict
from pathlib import Path
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class ExtractionConfig:
    """Configuration for data extraction"""
    field_name: str
    patterns: List[str]
    extraction_type: str = "single"  # single, multiple, nested, table
    post_process: Optional[str] = None
    required: bool = False
    default_value: Any = ""

class JSONDocumentExtractor:
    """Generalizable document extractor that outputs structured JSON"""

    def __init__(self, document_type: str = "nsf_grant"):
        self.document_type = document_type
        self.extraction_configs = self._load_extraction_configs()

    def extract_text_from_pdf(self, pdf_path: str) -> str:
        """Extract text from PDF file"""
        text = ""
        try:
            with open(pdf_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                for page_num, page in enumerate(pdf_reader.pages):
                    try:
                        page_text = page.extract_text()
                        text += f"\n{page_text}\n"
                    except Exception as e:
                        logger.warning(f"Could not extract text from page {page_num + 1}: {e}")
        except Exception as e:
            logger.error(f"Error reading PDF: {e}")
            raise
        return text

    def _load_extraction_configs(self) -> Dict[str, List[ExtractionConfig]]:
        """Load extraction configurations based on document type"""
        if self.document_type == "nsf_grant":
            return self._get_nsf_grant_configs()
        elif self.document_type == "nih_grant":
            return self._get_nih_grant_configs()
        else:
            raise ValueError(f"Unsupported document type: {self.document_type}")

    def _get_nsf_grant_configs(self) -> Dict[str, List[ExtractionConfig]]:
        """NSF Grant proposal extraction configurations"""
        return {
            "administrative_info": [
                ExtractionConfig("proposal_id", [r"Proposal No\.?\s*:?\s*([A-Z0-9-]+)", r"NSF Proposal.*?([0-9]{7})"]),
                ExtractionConfig("nsf_program", [r"Program:\s*([^\n]+)", r"NSF Program.*?:\s*([^\n]+)"]),
                ExtractionConfig("submission_date", [r"Submitted.*?:\s*([0-9/\-]+)", r"Date Submitted.*?:\s*([0-9/\-]+)"]),
                ExtractionConfig("proposal_title", [r"Project Title:\s*([^\n]+)", r"Title:\s*([^\n]+)"]),
                ExtractionConfig("pi_name", [r"Principal Investigator.*?:\s*([^\n,]+)", r"PI:\s*([^\n,]+)"]),
                ExtractionConfig("pi_email", [r"([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})", r"Email.*?:\s*([^\s\n]+@[^\s\n]+)"]),
                ExtractionConfig("institution", [r"Institution.*?:\s*([^\n]+)", r"Organization.*?:\s*([^\n]+)"]),
                ExtractionConfig("requested_amount", [r"Total Budget.*?:?\s*\$?([0-9,]+)", r"Requested Amount.*?:?\s*\$?([0-9,]+)"]),
            ],
            "budget": [
                ExtractionConfig("total_budget", [r"Total.*?Budget.*?:?\s*\$?([0-9,]+)"]),
                ExtractionConfig("personnel_costs", [r"Personnel.*?:?\s*\$?([0-9,]+)"]),
                ExtractionConfig("equipment_costs", [r"Equipment.*?:?\s*\$?([0-9,]+)"]),
                ExtractionConfig("travel_costs", [r"Travel.*?:?\s*\$?([0-9,]+)"]),
                ExtractionConfig("indirect_costs", [r"Indirect.*?Cost.*?:?\s*\$?([0-9,]+)", r"F&A.*?:?\s*\$?([0-9,]+)"]),
            ],
            "project_description": [
                ExtractionConfig("overview", [r"Project Summary.*?:\s*(.*?)(?=Intellectual Merit|Project Description)", r"Overview.*?:\s*(.*?)(?=\n\n|\n[A-Z])"]),
                ExtractionConfig("intellectual_merit", [r"Intellectual Merit.*?:\s*(.*?)(?=Broader Impact|References)", r"Merit.*?:\s*(.*?)(?=\n\n|\n[A-Z])"]),
                ExtractionConfig("broader_impacts", [r"Broader Impact.*?:\s*(.*?)(?=References|Bibliography)", r"Impact.*?:\s*(.*?)(?=\n\n|\n[A-Z])"]),
                ExtractionConfig("keywords", [r"Keywords?.*?:\s*([^\n]+)", r"Key words?.*?:\s*([^\n]+)"], extraction_type="multiple"),
            ],
            "research_plan": [
                ExtractionConfig("objectives", [r"Objective.*?:\s*(.*?)(?=Methodology|Approach)", r"Goals?.*?:\s*(.*?)(?=\n\n|\n[A-Z])"], extraction_type="multiple"),
                ExtractionConfig("methodology", [r"Methodology.*?:\s*(.*?)(?=Timeline|Expected)", r"Approach.*?:\s*(.*?)(?=\n\n|\n[A-Z])"]),
                ExtractionConfig("timeline", [r"Timeline.*?:\s*(.*?)(?=Expected|Results)", r"Schedule.*?:\s*(.*?)(?=\n\n|\n[A-Z])"]),
                ExtractionConfig("innovation", [r"Innovation.*?:\s*(.*?)(?=Significance|Impact)", r"Novel.*?:\s*(.*?)(?=\n\n|\n[A-Z])"]),
            ],
            "compliance": [
                ExtractionConfig("human_subjects", [r"Human Subjects.*?:\s*(Yes|No)", r"IRB.*?:\s*(Yes|No|Approved|Pending)"]),
                ExtractionConfig("vertebrate_animals", [r"Vertebrate Animals.*?:\s*(Yes|No)", r"IACUC.*?:\s*(Yes|No|Approved|Pending)"]),
                ExtractionConfig("biohazards", [r"Biohazard.*?:\s*(Yes|No)", r"Biological.*?Agent.*?:\s*(Yes|No)"]),
            ],
            "personnel": [
                ExtractionConfig("co_investigators", [r"Co-?PI.*?:\s*([^\n]+)", r"Co-?Investigator.*?:\s*([^\n]+)"], extraction_type="multiple"),
                ExtractionConfig("senior_personnel", [r"Senior Personnel.*?:\s*([^\n]+)"], extraction_type="multiple"),
                ExtractionConfig("collaborators", [r"Collaborator.*?:\s*([^\n]+)", r"Partner.*?:\s*([^\n]+)"], extraction_type="multiple"),
            ]
        }

    def _get_nih_grant_configs(self) -> Dict[str, List[ExtractionConfig]]:
        """NIH Grant application extraction configurations"""
        return {
            "administrative_info": [
                ExtractionConfig("application_id", [r"Application.*?ID.*?:\s*([A-Z0-9-]+)", r"Grant.*?Number.*?:\s*([A-Z0-9-]+)"]),
                ExtractionConfig("funding_opportunity", [r"Funding Opportunity.*?:\s*([A-Z0-9-]+)", r"FOA.*?:\s*([A-Z0-9-]+)"]),
                ExtractionConfig("pi_name", [r"Principal Investigator.*?:\s*([^\n,]+)", r"PI.*?:\s*([^\n,]+)"]),
                ExtractionConfig("institution", [r"Institution.*?:\s*([^\n]+)", r"Organization.*?:\s*([^\n]+)"]),
                ExtractionConfig("project_title", [r"Project Title.*?:\s*([^\n]+)", r"Title.*?:\s*([^\n]+)"]),
            ],
            "research_info": [
                ExtractionConfig("specific_aims", [r"Specific Aims.*?:\s*(.*?)(?=Research Strategy|Background)"]),
                ExtractionConfig("significance", [r"Significance.*?:\s*(.*?)(?=Innovation|Approach)"]),
                ExtractionConfig("innovation", [r"Innovation.*?:\s*(.*?)(?=Approach|Methods)"]),
                ExtractionConfig("approach", [r"Approach.*?:\s*(.*?)(?=References|Bibliography)"]),
            ]
        }

    def extract_single_value(self, text: str, patterns: List[str]) -> str:
        """Extract single value using multiple pattern attempts"""
        for pattern in patterns:
            match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
            if match:
                result = match.group(1) if match.groups() else match.group(0)
                return self._clean_text(result)
        return ""

    def extract_multiple_values(self, text: str, patterns: List[str]) -> List[str]:
        """Extract multiple values using patterns"""
        results = []
        for pattern in patterns:
            matches = re.findall(pattern, text, re.DOTALL | re.IGNORECASE)
            results.extend([self._clean_text(match) for match in matches])
        return list(set(results))  # Remove duplicates

    def _clean_text(self, text: str) -> str:
        """Clean and normalize extracted text"""
        if not text:
            return ""

        # Remove extra whitespace and newlines
        text = re.sub(r'\s+', ' ', text.strip())

        # Remove common artifacts
        text = re.sub(r'[^\w\s\-.,;:()$@]', '', text)

        return text.strip()

    def extract_section_data(self, text: str, section_configs: List[ExtractionConfig]) -> Dict[str, Any]:
        """Extract data for a specific section"""
        section_data = {}

        for config in section_configs:
            try:
                if config.extraction_type == "single":
                    value = self.extract_single_value(text, config.patterns)
                elif config.extraction_type == "multiple":
                    value = self.extract_multiple_values(text, config.patterns)
                else:
                    value = config.default_value

                # Set default if empty and required
                if not value and config.required:
                    value = config.default_value

                section_data[config.field_name] = value

            except Exception as e:
                logger.warning(f"Error extracting {config.field_name}: {e}")
                section_data[config.field_name] = config.default_value

        return section_data

    def extract_references(self, text: str) -> List[Dict[str, str]]:
        """Extract bibliography/references"""
        references = []

        # Find references section
        ref_patterns = [
            r"References.*?:\s*(.*?)(?=Appendix|$)",
            r"Bibliography.*?:\s*(.*?)(?=Appendix|$)",
            r"Works Cited.*?:\s*(.*?)(?=Appendix|$)"
        ]

        ref_text = ""
        for pattern in ref_patterns:
            match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
            if match:
                ref_text = match.group(1)
                break

        if ref_text:
            # Split references (simple approach)
            ref_lines = re.split(r'\n(?=[A-Z])', ref_text)

            for i, ref_line in enumerate(ref_lines):
                if len(ref_line.strip()) > 50:  # Filter out short lines
                    references.append({
                        "id": i + 1,
                        "citation": self._clean_text(ref_line),
                        "type": "unknown"
                    })

        return references

    def process_document(self, file_path: str) -> Dict[str, Any]:
        """Main processing function that extracts all data into JSON structure"""
        logger.info(f"Processing {self.document_type} document: {file_path}")

        # Extract text
        full_text = self.extract_text_from_pdf(file_path)

        # Initialize result structure
        result = {
            "document_info": {
                "file_path": str(file_path),
                "document_type": self.document_type,
                "extraction_timestamp": pd.Timestamp.now().isoformat()
            }
        }

        # Extract data for each section
        for section_name, section_configs in self.extraction_configs.items():
            logger.info(f"Extracting {section_name} data...")
            section_data = self.extract_section_data(full_text, section_configs)
            result[section_name] = section_data

        # Extract references
        logger.info("Extracting references...")
        result["references"] = self.extract_references(full_text)

        # Add full text if needed (optional)
        # result["full_text"] = full_text

        return result

    def save_json(self, data: Dict[str, Any], output_path: str):
        """Save extracted data to JSON file"""
        output_path = Path(output_path)

        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)

        logger.info(f"Data saved to {output_path}")

    def validate_extracted_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Validate and add quality metrics to extracted data"""
        validation_results = {
            "completeness_score": 0,
            "missing_required_fields": [],
            "data_quality_issues": []
        }

        total_fields = 0
        filled_fields = 0

        for section_name, section_data in data.items():
            if section_name == "document_info":
                continue

            if isinstance(section_data, dict):
                for field_name, field_value in section_data.items():
                    total_fields += 1
                    if field_value and field_value != "":
                        filled_fields += 1
                    else:
                        validation_results["missing_required_fields"].append(f"{section_name}.{field_name}")

        validation_results["completeness_score"] = filled_fields / total_fields if total_fields > 0 else 0

        return validation_results

# Usage example
if __name__ == "__main__":
    import pandas as pd

    # Initialize extractor for NSF grant
    extractor = JSONDocumentExtractor(document_type="nsf_grant")

    try:
        # Process document
        pdf_path = "/content/1-k08-ai155816-01a1-aladra-application-508.pdf"  # Replace with your PDF path
        extracted_data = extractor.process_document(pdf_path)

        # Validate data
        validation_results = extractor.validate_extracted_data(extracted_data)
        extracted_data["validation"] = validation_results

        # Save to JSON
        extractor.save_json(extracted_data, "extracted_nsf_grant5.json")

        print("Extraction completed successfully!")
        print(f"Completeness score: {validation_results['completeness_score']:.2%}")
        print(f"Total sections extracted: {len(extracted_data) - 2}")  # Excluding document_info and validation

        # Print summary
        for section_name, section_data in extracted_data.items():
            if section_name not in ["document_info", "validation"]:
                if isinstance(section_data, dict):
                    filled_fields = sum(1 for v in section_data.values() if v and v != "")
                    print(f"{section_name}: {filled_fields}/{len(section_data)} fields extracted")

    except Exception as e:
        logger.error(f"Error processing document: {e}")

Extraction completed successfully!
Completeness score: 66.67%
Total sections extracted: 7
administrative_info: 5/8 fields extracted
budget: 5/5 fields extracted
project_description: 2/4 fields extracted
research_plan: 4/4 fields extracted
compliance: 0/3 fields extracted
personnel: 2/3 fields extracted


In [None]:
# prompt: load "/content/extracted_nsf_grant1.json" , /content/extracted_nsf_grant2.json' .... /content/extracted_nsf_grant5.json and give me a list of all of the columns or keys of the json, whatever it is called.

file_paths = [f'/content/extracted_nsf_grant{i}.json' for i in range(1, 6)]

all_keys = set()

for file_path in file_paths:
    try:
        with open(file_path, 'r') as f:
            data = json.load(f)

        # Recursively get all keys from nested dictionaries
        def get_keys(obj, current_path=""):
            if isinstance(obj, dict):
                for key, value in obj.items():
                    new_path = f"{current_path}.{key}" if current_path else key
                    all_keys.add(new_path)
                    get_keys(value, new_path)
            elif isinstance(obj, list):
                # Optionally add keys for list items if they are dicts
                for i, item in enumerate(obj):
                    # new_path = f"{current_path}[{i}]" # You can add array index if needed
                    get_keys(item, current_path)

        get_keys(data)

    except FileNotFoundError:
        print(f"Warning: File not found at {file_path}")
    except json.JSONDecodeError:
        print(f"Error decoding JSON from {file_path}")
    except Exception as e:
        print(f"An error occurred processing {file_path}: {e}")

print("List of all unique keys across all JSON files:")
for key in sorted(list(all_keys)):
    print(key)

List of all unique keys across all JSON files:
administrative_info
administrative_info.institution
administrative_info.nsf_program
administrative_info.pi_email
administrative_info.pi_name
administrative_info.proposal_id
administrative_info.proposal_title
administrative_info.requested_amount
administrative_info.submission_date
budget
budget.equipment_costs
budget.indirect_costs
budget.personnel_costs
budget.total_budget
budget.travel_costs
compliance
compliance.biohazards
compliance.human_subjects
compliance.vertebrate_animals
document_info
document_info.document_type
document_info.extraction_timestamp
document_info.file_path
personnel
personnel.co_investigators
personnel.collaborators
personnel.senior_personnel
project_description
project_description.broader_impacts
project_description.intellectual_merit
project_description.keywords
project_description.overview
references
references.citation
references.id
references.type
research_plan
research_plan.innovation
research_plan.methodology


In [None]:
import json
import re
import PyPDF2
from typing import Dict, Any, List, Optional, Union
from dataclasses import dataclass
import pandas as pd
from pathlib import Path
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class GeneralizedNIHSummaryParser:
    """Generalized parser for various types of NIH Grant Summary Statements"""

    def __init__(self):
        self.data = {}

    def extract_text_from_pdf(self, pdf_path: str) -> str:
        """Extract text from PDF file"""
        text = ""
        try:
            with open(pdf_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                for page_num, page in enumerate(pdf_reader.pages):
                    try:
                        page_text = page.extract_text()
                        text += f"\n{page_text}\n"
                    except Exception as e:
                        logger.warning(f"Could not extract text from page {page_num + 1}: {e}")
        except Exception as e:
            logger.error(f"Error reading PDF: {e}")
            raise
        return text

    def detect_grant_type(self, text: str) -> str:
        """Detect the type of NIH grant from the document"""
        # Look for common grant type indicators
        if re.search(r'\bR43\b|\bR44\b|SBIR|Small Business', text, re.IGNORECASE):
            return "SBIR"
        elif re.search(r'\bK\d+\b.*?(Career|Fellowship)', text, re.IGNORECASE):
            return "Career_Development"
        elif re.search(r'\bR01\b|\bR21\b|\bR03\b', text, re.IGNORECASE):
            return "Research_Grant"
        elif re.search(r'\bU\d+\b', text, re.IGNORECASE):
            return "Cooperative_Agreement"
        elif re.search(r'\bP\d+\b', text, re.IGNORECASE):
            return "Program_Project"
        else:
            return "Unknown"

    def extract_administrative_info(self, text: str) -> Dict[str, Any]:
        """Extract basic administrative information"""
        admin_info = {}

        # Application number - more flexible pattern
        app_patterns = [
            r'Application Number:\s*([A-Z0-9\s\-]+)',
            r'(\d+\s+[A-Z]\d+\s+[A-Z]{2}\d+\-\d+)',
            r'Grant Number:\s*([A-Z0-9\s\-]+)'
        ]

        for pattern in app_patterns:
            match = re.search(pattern, text)
            if match:
                admin_info['application_number'] = match.group(1).strip()
                break

        # Principal Investigator(s) - handle multiple formats
        pi_patterns = [
            r'Principal Investigator[^\n]*?:\s*([A-Z\-,\s]+?)(?:\(Contact\)|Applicant|Review)',
            r'Principal Investigators[^\n]*?:\s*(.*?)(?=Applicant Organization|Review Group)',
            r'PI.*?:\s*([A-Z\-,\s]+?)(?:\n|Applicant)'
        ]

        for pattern in pi_patterns:
            match = re.search(pattern, text, re.DOTALL)
            if match:
                pi_text = match.group(1).strip()
                # Clean up the PI text
                pi_text = re.sub(r'\(Contact\)', '', pi_text)
                pi_text = re.sub(r'\s+', ' ', pi_text)
                admin_info['principal_investigator'] = pi_text
                break

        # Extract individual PIs if listed alphabetically
        pi_list_match = re.search(r'Principal Investigators \(Listed Alphabetically\):\s*(.*?)(?=Applicant Organization)', text, re.DOTALL)
        if pi_list_match:
            pi_list_text = pi_list_match.group(1)
            # Split by lines and clean
            pis = [line.strip() for line in pi_list_text.split('\n') if line.strip() and not re.match(r'^\d+', line)]
            admin_info['principal_investigators_list'] = pis

        # Organization
        org_patterns = [
            r'Applicant Organization:\s*([^\n]+)',
            r'Institution:\s*([^\n]+)',
            r'Organization:\s*([^\n]+)'
        ]

        for pattern in org_patterns:
            match = re.search(pattern, text)
            if match:
                admin_info['applicant_organization'] = match.group(1).strip()
                break

        # Review Group
        review_patterns = [
            r'Review Group:\s*([^\n]+(?:\n[^\n]+)*?)(?=Meeting Date|Center for)',
            r'Study Section:\s*([^\n]+)'
        ]

        for pattern in review_patterns:
            match = re.search(pattern, text, re.DOTALL)
            if match:
                review_text = match.group(1).strip()
                review_text = re.sub(r'\s+', ' ', review_text)
                admin_info['review_group'] = review_text
                break

        # Meeting Date
        meeting_patterns = [
            r'Meeting Date:\s*([0-9/]+)',
            r'Review Date:\s*([0-9/]+)'
        ]

        for pattern in meeting_patterns:
            match = re.search(pattern, text)
            if match:
                admin_info['meeting_date'] = match.group(1)
                break

        # Council
        council_match = re.search(r'Council:\s*([A-Z0-9\s]+)', text)
        admin_info['council'] = council_match.group(1).strip() if council_match else ""

        # RFA/PA
        rfa_match = re.search(r'RFA/PA:\s*([A-Z0-9\-]+)', text)
        admin_info['rfa_pa'] = rfa_match.group(1) if rfa_match else ""

        # Requested Start Date
        start_patterns = [
            r'Requested Start:\s*([0-9/]+)',
            r'Start Date:\s*([0-9/]+)'
        ]

        for pattern in start_patterns:
            match = re.search(pattern, text)
            if match:
                admin_info['requested_start'] = match.group(1)
                break

        # Project Title - more flexible extraction
        title_patterns = [
            r'Project Title:\s*([^\n]+(?:\n[^\n]+)*?)(?=SRG Action:|Impact Score:|Next Steps:|Human Subjects)',
            r'Title:\s*([^\n]+(?:\n[^\n]+)*?)(?=SRG Action:|Impact Score:|PI:|Principal)'
        ]

        for pattern in title_patterns:
            match = re.search(pattern, text, re.DOTALL)
            if match:
                title = match.group(1).strip()
                title = re.sub(r'\s+', ' ', title)
                admin_info['project_title'] = title
                break

        # Impact Score - handle both numeric and text formats
        impact_patterns = [
            r'Impact Score:\s*(\d+)',
            r'Impact Score:\s*([^\n]+)',
            r'Priority Score:\s*(\d+)'
        ]

        for pattern in impact_patterns:
            match = re.search(pattern, text)
            if match:
                admin_info['impact_score'] = match.group(1).strip()
                break

        # Human and Animal Subjects
        human_subjects_match = re.search(r'Human Subjects:\s*([^\n]+)', text)
        admin_info['human_subjects'] = human_subjects_match.group(1).strip() if human_subjects_match else ""

        animal_subjects_match = re.search(r'Animal Subjects:\s*([^\n]+)', text)
        admin_info['animal_subjects'] = animal_subjects_match.group(1).strip() if animal_subjects_match else ""

        # Grant type detection
        admin_info['grant_type'] = self.detect_grant_type(text)

        return admin_info

    def extract_budget_info(self, text: str) -> Dict[str, Any]:
        """Extract budget information - handles various formats"""
        budget_info = {}

        # Look for budget table patterns
        budget_patterns = [
            r'Project\s+Direct Costs\s+Estimated\s+Year\s+Requested\s+Total Cost\s+((?:\d+.*?\n.*?\n.*?\n)+)',
            r'Budget Year\s+Direct Costs\s+Total Costs?\s+((?:\d+.*?\n.*?\n)+)',
            r'Year\s+\d+.*?(\$[\d,]+).*?(\$[\d,]+)'
        ]

        for pattern in budget_patterns:
            match = re.search(pattern, text, re.DOTALL)
            if match:
                budget_text = match.group(1) if len(match.groups()) > 0 else match.group(0)
                years = re.findall(r'Year\s+(\d+)|^(\d+)\s', budget_text, re.MULTILINE)
                if years:
                    budget_info['project_years'] = [y[0] or y[1] for y in years]
                break

        # Total budget - multiple patterns
        total_patterns = [
            r'TOTAL\s+([0-9,\$]+)\s+([0-9,\$]+)',
            r'Total.*?(\$[\d,]+).*?(\$[\d,]+)',
            r'Direct Costs.*?(\$[\d,]+)',
            r'Total Cost.*?(\$[\d,]+)'
        ]

        for pattern in total_patterns:
            match = re.search(pattern, text)
            if match:
                if len(match.groups()) >= 2:
                    budget_info['total_direct_costs'] = match.group(1)
                    budget_info['total_estimated_cost'] = match.group(2)
                else:
                    budget_info['total_amount'] = match.group(1)
                break

        return budget_info

    def extract_project_description(self, text: str) -> Dict[str, Any]:
        """Extract project description and related sections"""
        description_info = {}

        # Main description - multiple possible headers
        desc_patterns = [
            r'DESCRIPTION \(provided by applicant\):\s*(.*?)(?=PUBLIC HEALTH RELEVANCE:|CRITIQUE|$)',
            r'Project Description:\s*(.*?)(?=PUBLIC HEALTH RELEVANCE:|CRITIQUE|$)',
            r'Abstract:\s*(.*?)(?=PUBLIC HEALTH RELEVANCE:|CRITIQUE|$)'
        ]

        for pattern in desc_patterns:
            match = re.search(pattern, text, re.DOTALL)
            if match:
                description_info['description'] = self._clean_text(match.group(1))
                break

        # Public Health Relevance
        relevance_patterns = [
            r'PUBLIC HEALTH RELEVANCE:\s*(.*?)(?=CRITIQUE|RESUME|$)',
            r'Public Health Relevance Statement:\s*(.*?)(?=CRITIQUE|RESUME|$)',
            r'Relevance:\s*(.*?)(?=CRITIQUE|RESUME|$)'
        ]

        for pattern in relevance_patterns:
            match = re.search(pattern, text, re.DOTALL)
            if match:
                description_info['public_health_relevance'] = self._clean_text(match.group(1))
                break

        return description_info

    def extract_critique_sections(self, text: str) -> List[Dict[str, Any]]:
        """Extract all critique sections - adaptable to different formats"""
        critiques = []

        # First try the standard pattern with colon
        critique_pattern = r'CRITIQUE (\d+):(.*?)(?=CRITIQUE \d+:|THE FOLLOWING|COMMITTEE BUDGET|$)'
        critique_matches = list(re.finditer(critique_pattern, text, re.DOTALL))

        # If no matches, try without colon (like your document)
        if not critique_matches:
            critique_pattern = r'CRITIQUE (\d+)\s*\n(.*?)(?=CRITIQUE \d+|THE FOLLOWING|COMMITTEE BUDGET|$)'
            critique_matches = list(re.finditer(critique_pattern, text, re.DOTALL))

        # If still no matches, try finding them by looking for score patterns
        if not critique_matches:
            # Look for patterns like "Significance: 2" which indicate start of critiques
            score_sections = list(re.finditer(r'(Significance:\s*\d+.*?)(?=Significance:\s*\d+|THE FOLLOWING|COMMITTEE BUDGET|$)', text, re.DOTALL))
            for i, match in enumerate(score_sections):
                critique_matches.append(type('obj', (object,), {
                    'group': lambda self, x: str(i+1) if x == 1 else match.group(1)
                })())

        for match in critique_matches:
            critique_num = match.group(1)
            critique_text = match.group(2)

            # Extract scores if present
            scores = self._extract_critique_scores(critique_text)

            critique_data = {
                'critique_number': critique_num,
                'scores': scores,
                'overall_impact': self._extract_section_content(critique_text, 'Overall Impact'),
                'significance': self._extract_evaluation_section(critique_text, 'Significance'),
                'investigator': self._extract_evaluation_section(critique_text, 'Investigator'),
                'innovation': self._extract_evaluation_section(critique_text, 'Innovation'),
                'approach': self._extract_evaluation_section(critique_text, 'Approach'),
                'environment': self._extract_evaluation_section(critique_text, 'Environment'),
                # Additional sections that might be present
                'candidate': self._extract_evaluation_section(critique_text, 'Candidate'),
                'career_development': self._extract_evaluation_section(critique_text, 'Career Development'),
                'research_plan': self._extract_evaluation_section(critique_text, 'Research Plan'),
                'mentor': self._extract_evaluation_section(critique_text, 'Mentor'),
                'training_plan': self._extract_evaluation_section(critique_text, 'Training Plan')
            }

            # Extract compliance sections
            critique_data['compliance'] = self._extract_compliance_from_critique(critique_text)

            # Remove empty sections
            critique_data = {k: v for k, v in critique_data.items() if v}

            critiques.append(critique_data)

        return critiques

    def _extract_critique_scores(self, text: str) -> Dict[str, str]:
        """Extract numerical scores from critique"""
        scores = {}

        score_patterns = [
            (r'Significance:\s*(\d+)', 'significance'),
            (r'Investigator(?:\(s\))?:\s*(\d+)', 'investigator'),
            (r'Innovation:\s*(\d+)', 'innovation'),
            (r'Approach:\s*(\d+)', 'approach'),
            (r'Environment:\s*(\d+)', 'environment'),
            (r'Candidate:\s*(\d+)', 'candidate'),
            (r'Career Development.*?:\s*(\d+)', 'career_development'),
            (r'Research Plan:\s*(\d+)', 'research_plan'),
            (r'Mentor.*?:\s*(\d+)', 'mentor')
        ]

        for pattern, key in score_patterns:
            match = re.search(pattern, text)
            if match:
                scores[key] = match.group(1)

        return scores

    def _extract_evaluation_section(self, text: str, section_name: str) -> Dict[str, Any]:
        """Extract strengths and weaknesses for a specific evaluation section"""
        section_data = {}

        # Multiple patterns to try for section headers
        section_patterns = [
            rf'\d+\.\s*{section_name}.*?:(.*?)(?=\d+\.\s*\w+.*?:|Protections for Human|Vertebrate Animals|Budget and Period|$)',
            rf'{section_name}:\s*(.*?)(?=\d+\.\s*\w+.*?:|Protections for Human|Vertebrate Animals|Budget and Period|$)',
            rf'\d+\.\s*{section_name}\s*\n(.*?)(?=\d+\.\s*\w+|Protections for Human|Vertebrate Animals|Budget and Period|$)'
        ]

        section_match = None
        for pattern in section_patterns:
            section_match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
            if section_match:
                break

        if section_match:
            section_text = section_match.group(1)

            # Extract strengths - handle both "Strengths" and bullet points directly
            strengths_patterns = [
                r'Strengths?\s*(.*?)(?=Weaknesses?|\d+\.|$)',
                r'Strengths?\s*\n(.*?)(?=Weaknesses?|\d+\.|$)'
            ]

            for pattern in strengths_patterns:
                strengths_match = re.search(pattern, section_text, re.DOTALL)
                if strengths_match:
                    strengths_text = strengths_match.group(1)
                    section_data['strengths'] = self._extract_bullet_points(strengths_text)
                    break

            # Extract weaknesses
            weaknesses_patterns = [
                r'Weaknesses?\s*(.*?)(?=Strengths?|\d+\.|$)',
                r'Weaknesses?\s*\n(.*?)(?=Strengths?|\d+\.|$)'
            ]

            for pattern in weaknesses_patterns:
                weaknesses_match = re.search(pattern, section_text, re.DOTALL)
                if weaknesses_match:
                    weaknesses_text = weaknesses_match.group(1)
                    section_data['weaknesses'] = self._extract_bullet_points(weaknesses_text)
                    break

        return section_data

    def _extract_bullet_points(self, text: str) -> List[str]:
        """Extract bullet points or numbered items from text"""
        if not text:
            return []

        # Split by various bullet point indicators
        items = re.split(r'\n\s*[•·▪▫]\s*|\n\s*\d+\.\s*|\n\s*[a-z]\)\s*|\n\s*-\s+', text)

        # Also try splitting by line breaks if no bullet points found
        if len(items) <= 1:
            # Look for sentences that start new thoughts
            items = re.split(r'\n\s*(?=[A-Z])', text)

        # Clean and filter items
        cleaned_items = []
        for item in items:
            cleaned = self._clean_text(item)
            # More flexible filtering - accept shorter meaningful statements
            if cleaned and len(cleaned) > 5 and not re.match(r'^\s*(None|N/A|Not applicable)', cleaned, re.IGNORECASE):
                cleaned_items.append(cleaned)

        return cleaned_items

    def _extract_section_content(self, text: str, section_name: str) -> str:
        """Extract content from a named section"""
        pattern = rf'{section_name}:\s*(.*?)(?=\d+\.|Protections for Human|Vertebrate Animals|Budget and Period|$)'
        match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
        return self._clean_text(match.group(1)) if match else ""

    def _extract_compliance_from_critique(self, text: str) -> Dict[str, str]:
        """Extract compliance information from critique section"""
        compliance = {}

        compliance_patterns = [
            (r'Protections for Human Subjects:\s*([^\n]+)', 'human_subjects'),
            (r'Vertebrate Animals:\s*([^\n]+)', 'vertebrate_animals'),
            (r'Biohazards:\s*([^\n]+)', 'biohazards'),
            (r'Authentication of Key Biological.*?:\s*([^\n]+)', 'resource_authentication'),
            (r'Budget and Period of Support:\s*([^\n]+)', 'budget_recommendation')
        ]

        for pattern, key in compliance_patterns:
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                compliance[key] = match.group(1).strip()

        return compliance

    def extract_resume_summary(self, text: str) -> Dict[str, Any]:
        """Extract resume and summary of discussion"""
        resume_info = {}

        # Multiple possible patterns for summary
        summary_patterns = [
            r'RESUME AND SUMMARY OF DISCUSSION:\s*(.*?)(?=DESCRIPTION|CRITIQUE|$)',
            r'Summary of Discussion:\s*(.*?)(?=DESCRIPTION|CRITIQUE|$)',
            r'Review Summary:\s*(.*?)(?=DESCRIPTION|CRITIQUE|$)'
        ]

        for pattern in summary_patterns:
            match = re.search(pattern, text, re.DOTALL)
            if match:
                resume_info['summary_of_discussion'] = self._clean_text(match.group(1))
                break

        return resume_info

    def extract_committee_notes(self, text: str) -> Dict[str, Any]:
        """Extract scientific review officer's notes and committee recommendations"""
        notes_info = {}

        # Scientific Review Officer's Notes
        sro_match = re.search(r'SCIENTIFIC REVIEW OFFICER\'S NOTES:\s*(.*?)(?=COMMITTEE BUDGET|$)', text, re.DOTALL)
        if sro_match:
            notes_info['scientific_review_officer_notes'] = self._clean_text(sro_match.group(1))

        # Committee Budget Recommendations
        budget_rec_patterns = [
            r'COMMITTEE BUDGET RECOMMENDATIONS:\s*(.*?)(?=Footnotes|$)',
            r'Budget Recommendations:\s*(.*?)(?=Footnotes|$)'
        ]

        for pattern in budget_rec_patterns:
            match = re.search(pattern, text, re.DOTALL)
            if match:
                notes_info['committee_budget_recommendations'] = self._clean_text(match.group(1))
                break

        return notes_info

    def _clean_text(self, text: str) -> str:
        """Clean and normalize text"""
        if not text:
            return ""

        # Remove extra whitespace and normalize
        text = re.sub(r'\s+', ' ', text.strip())

        # Remove common page headers/footers patterns
        text = re.sub(r'\d+\s+[A-Z]\d+\s+[A-Z]{2}\d+\-\d+.*?ZRG\d+\s+[A-Z\-]+\s*\(\d+\)', '', text)
        text = re.sub(r'[A-Z\-]+,\s*[A-Z]\s*$', '', text)

        return text.strip()

    def process_summary_statement(self, pdf_path: str, debug: bool = False) -> Dict[str, Any]:
        """Main processing function - works with various NIH grant types"""
        logger.info(f"Processing NIH Summary Statement: {pdf_path}")

        # Extract text
        full_text = self.extract_text_from_pdf(pdf_path)

        # Debug: Print text sections if requested
        if debug:
            print("=== DEBUG: Looking for CRITIQUE sections ===")
            critique_positions = []
            for match in re.finditer(r'CRITIQUE \d+', full_text):
                start_pos = match.start()
                end_pos = min(start_pos + 200, len(full_text))
                print(f"Found CRITIQUE at position {start_pos}: {full_text[start_pos:end_pos]}...")
                critique_positions.append(start_pos)

            if critique_positions:
                print(f"\nFound {len(critique_positions)} CRITIQUE sections")
                # Print a sample of the first critique
                if len(critique_positions) > 0:
                    start = critique_positions[0]
                    end = critique_positions[1] if len(critique_positions) > 1 else min(start + 1000, len(full_text))
                    print(f"\n=== SAMPLE OF FIRST CRITIQUE ===")
                    print(full_text[start:end])
                    print("=== END SAMPLE ===\n")
            else:
                print("No CRITIQUE sections found!")
                # Look for other patterns
                print("\n=== Looking for alternative patterns ===")
                for pattern in ['Significance:', 'Investigator', 'Innovation:', 'Approach:', 'Environment:']:
                    matches = list(re.finditer(pattern, full_text))
                    print(f"Found {len(matches)} instances of '{pattern}'")

        # Detect grant type first
        grant_type = self.detect_grant_type(full_text)
        logger.info(f"Detected grant type: {grant_type}")

        # Extract all sections
        result = {
            "document_info": {
                "file_path": str(pdf_path),
                "document_type": "nih_summary_statement",
                "grant_type": grant_type,
                "extraction_timestamp": pd.Timestamp.now().isoformat()
            },
            "administrative_info": self.extract_administrative_info(full_text),
            "budget_info": self.extract_budget_info(full_text),
            "project_description": self.extract_project_description(full_text),
            "resume_and_summary": self.extract_resume_summary(full_text),
            "critiques": self.extract_critique_sections(full_text),
            "committee_notes": self.extract_committee_notes(full_text)
        }

        if debug:
            print(f"\n=== EXTRACTION RESULTS ===")
            print(f"Found {len(result['critiques'])} critiques")
            for i, critique in enumerate(result['critiques']):
                print(f"Critique {i+1}: {list(critique.keys())}")

        return result

    def save_json(self, data: Dict[str, Any], output_path: str):
        """Save extracted data to JSON file"""
        output_path = Path(output_path)

        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)

        logger.info(f"Data saved to {output_path}")

    def create_summary_report(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Create a summary report of the extraction"""
        admin_info = data.get("administrative_info", {})

        summary = {
            "extraction_summary": {
                "grant_type": data.get("document_info", {}).get("grant_type", ""),
                "application_number": admin_info.get("application_number", ""),
                "principal_investigator": admin_info.get("principal_investigator", ""),
                "project_title": admin_info.get("project_title", ""),
                "impact_score": admin_info.get("impact_score", ""),
                "review_group": admin_info.get("review_group", ""),
                "organization": admin_info.get("applicant_organization", ""),
                "number_of_critiques": len(data.get("critiques", [])),
                "has_description": bool(data.get("project_description", {}).get("description", "")),
                "has_public_health_relevance": bool(data.get("project_description", {}).get("public_health_relevance", "")),
                "has_committee_notes": bool(data.get("committee_notes", {})),
                "sections_extracted": list(data.keys())
            }
        }

        return summary

    def save_summary_csv(self, data: Dict[str, Any], output_path: str):
        """Save a CSV summary of key information"""
        summary = self.create_summary_report(data)

        # Flatten the data for CSV
        flat_data = {}
        flat_data.update(data.get("administrative_info", {}))
        flat_data.update(summary.get("extraction_summary", {}))

        # Add critique summary
        critiques = data.get("critiques", [])
        for i, critique in enumerate(critiques):
            if 'overall_impact' in critique:
                flat_data[f"critique_{i+1}_overall_impact"] = critique["overall_impact"][:200]
            if 'scores' in critique:
                for score_type, score_value in critique['scores'].items():
                    flat_data[f"critique_{i+1}_{score_type}_score"] = score_value

        # Create DataFrame and save
        df = pd.DataFrame([flat_data])
        df.to_csv(output_path, index=False)
        logger.info(f"Summary CSV saved to {output_path}")

# Usage example
if __name__ == "__main__":
    parser = GeneralizedNIHSummaryParser()

    try:
        # Process the summary statement
        pdf_path = "/content/R43-Summary-Statement_ MacLeod-1R43AI145704-01.pdf"  # Replace with your PDF path
        extracted_data = parser.process_summary_statement(pdf_path)

        # Save to JSON
        parser.save_json(extracted_data, "nih_summary_extracted4.json")

        # Save summary CSV
        parser.save_summary_csv(extracted_data, "nih_summary_key_info.csv")

        # Print summary
        summary = parser.create_summary_report(extracted_data)
        print("Extraction Summary:")
        print(f"Grant Type: {summary['extraction_summary']['grant_type']}")
        print(f"Application: {summary['extraction_summary']['application_number']}")
        print(f"PI: {summary['extraction_summary']['principal_investigator']}")
        print(f"Impact Score: {summary['extraction_summary']['impact_score']}")
        print(f"Number of Critiques: {summary['extraction_summary']['number_of_critiques']}")

        # Print critique overview
        critiques = extracted_data.get("critiques", [])
        for i, critique in enumerate(critiques):
            print(f"\nCritique {i+1} Overview:")
            if 'scores' in critique:
                for score_type, score_value in critique['scores'].items():
                    print(f"  {score_type.title()} Score: {score_value}")

            for section in ['significance', 'investigator', 'innovation', 'approach', 'environment']:
                if section in critique and critique[section]:
                    strengths = len(critique[section].get('strengths', []))
                    weaknesses = len(critique[section].get('weaknesses', []))
                    if strengths or weaknesses:
                        print(f"  {section.title()} - Strengths: {strengths}, Weaknesses: {weaknesses}")

    except Exception as e:
        logger.error(f"Error processing summary statement: {e}")
        raise

Extraction Summary:
Grant Type: SBIR
Application: 1 R43 AI145704- 01 
P
PI: 
Impact Score: 
Number of Critiques: 1

Critique 1 Overview:


In [33]:
# Enhanced AI Grant Reviewer using Claude API for Google Colab
# Make sure to install the required package first

# Install required packages


import json
import os
from typing import Dict, Any, List
from IPython.display import display, Markdown
from google.colab import userdata
import anthropic

class ClaudeGrantReviewer:
    """
    Enhanced AI Grant Reviewer that uses Claude API to analyze grant proposals
    and provide detailed critiques similar to NIH summary statements.
    """

    def __init__(self, api_key: str = None):
        """
        Initialize the Claude Grant Reviewer.

        Args:
            api_key: Anthropic API key. If None, will try to get from Google Colab secrets.
        """
        if api_key is None:
            try:
                api_key = userdata.get('ANTHROPIC_API_KEY')
                if not api_key:
                    raise ValueError("API key is empty")
            except Exception as e:
                print(f"Error getting API key from secrets: {e}")
                raise ValueError("API key must be provided either as parameter or stored in Google Colab secrets as 'ANTHROPIC_API_KEY'")

        try:
            self.client = anthropic.Anthropic(api_key=api_key)
            # Test the connection
            self._test_connection()
        except Exception as e:
            raise ValueError(f"Failed to initialize Anthropic client: {e}")

        self.system_prompt = self._get_system_prompt()

    def _test_connection(self):
        """Test the API connection with a simple request."""
        try:
            response = self.client.messages.create(
                model="claude-3-5-sonnet-20241022",
                max_tokens=10,
                messages=[{"role": "user", "content": "Hi"}]
            )
            return True
        except Exception as e:
            raise ValueError(f"API connection test failed: {e}")

    def _get_system_prompt(self) -> str:
        """Get the enhanced system prompt for Claude."""
        return """<role>
You are an expert AI Grant Reviewer, acting as a coach for PhD students. Your persona is that of a seasoned, constructive, and meticulous reviewer from a major funding agency like the NSF or NIH. Your goal is not to be overly harsh, but to provide clear, actionable feedback that will tangibly improve the proposal's chances of getting funded. You will analyze the structured JSON data of a grant proposal provided in the input.
</role>

<instructions>
1. **Analyze Holistically:** Carefully review all sections of the provided JSON input, paying special attention to the `project_description`, `research_plan`, and `budget` objects.

2. **Synthesize Key Points:** Identify the core research question, the proposed methods, and the expected outcomes.

3. **Structure Your Output:** Your final response must be in Markdown and contain exactly these sections:

   **SUMMARY**
   - A brief, 2-3 sentence paragraph summarizing the project's goals and approach.

   **OVERALL IMPACT AND SIGNIFICANCE**
   - Assess the potential impact of the proposed work
   - Evaluate the significance to the field and broader scientific community
   - Consider the urgency and unmet need addressed

   **STRENGTHS**
   - A bulleted list of 3-5 key strengths of the proposal
   - Focus on elements like intellectual merit, innovation, approach, team expertise, and resources

   **WEAKNESSES AND AREAS FOR IMPROVEMENT**
   - A numbered list of the most critical weaknesses that need to be addressed
   - For each point, first state the issue clearly, then provide a concrete suggestion for how to fix it
   - This is the most important section for improvement

   **DETAILED CRITIQUE BY CATEGORY**

   *Significance and Innovation:*
   - Evaluate the importance and novelty of the research
   - Assess potential to advance the field

   *Approach and Methodology:*
   - Review the soundness and feasibility of the research plan
   - Evaluate appropriateness of methods and experimental design

   *Team and Resources:*
   - Assess investigator qualifications and team composition
   - Evaluate institutional resources and environment

   *Budget and Timeline:*
   - Review budget justification and appropriateness
   - Assess timeline feasibility

   **RECOMMENDATIONS**
   - Specific recommendations for strengthening the proposal
   - Priority order for addressing weaknesses
   - Suggestions for enhancing impact and innovation

**Critical Analysis Criteria:**
- **Intellectual Merit & Innovation:** Is the idea novel and significant? Does the research plan present a compelling case for advancing knowledge?
- **Methodology:** Is the approach sound, well-described, and feasible within the proposed timeline?
- **Broader Impacts:** Does the proposal articulate a convincing benefit to society beyond the immediate research?
- **Budget Justification:** Do the costs align logically with the proposed research plan?
- **Team Expertise:** Does the team have the necessary qualifications and experience?
- **Feasibility:** Can the proposed work realistically be completed with the requested resources?

You must respond only with the markdown-formatted review following the exact structure specified above."""

    def validate_json_structure(self, proposal_data: Dict[str, Any]) -> List[str]:
        """
        Validates the structure of the grant proposal JSON data.

        Args:
            proposal_data: Dictionary containing the proposal data

        Returns:
            List of validation errors (empty if valid)
        """
        errors = []

        # Check if it's a dictionary
        if not isinstance(proposal_data, dict):
            errors.append("Input must be a JSON object/dictionary")
            return errors

        # Check required top-level keys
        required_keys = ['administrative_info', 'project_description']
        for key in required_keys:
            if key not in proposal_data:
                errors.append(f"Missing required section: {key}")

        # Check administrative_info structure
        if 'administrative_info' in proposal_data:
            admin_info = proposal_data['administrative_info']
            if isinstance(admin_info, dict):
                admin_keys = ['proposal_title', 'pi_name']
                for key in admin_keys:
                    if key not in admin_info:
                        errors.append(f"Missing administrative_info field: {key}")
            else:
                errors.append("administrative_info must be an object")

        return errors

    def analyze_proposal(self, proposal_json: str, model: str = "claude-3-5-sonnet-20241022") -> str:
        """
        Analyzes a grant proposal using Claude API and returns structured feedback.

        Args:
            proposal_json: JSON string containing the proposal data
            model: Claude model to use (default: claude-3-5-sonnet-20241022)

        Returns:
            Markdown-formatted review
        """
        try:
            # Parse and validate JSON
            proposal_data = json.loads(proposal_json)
            validation_errors = self.validate_json_structure(proposal_data)

            if validation_errors:
                return "**Validation Errors:**\n" + "\n".join(f"- {error}" for error in validation_errors)

            # Prepare the user message
            user_message = f"""Please review the following grant proposal JSON data and provide a comprehensive critique following the format of NIH summary statements:

```json
{proposal_json}
```

Provide your review following the exact format specified in your instructions, with particular attention to:
1. Overall impact and significance
2. Detailed strengths and weaknesses
3. Specific recommendations for improvement
4. Assessment of feasibility and innovation

Make your critique constructive and actionable, similar to how expert reviewers provide feedback in NIH summary statements."""

            # Call Claude API with better error handling
            try:
                response = self.client.messages.create(
                    model=model,
                    max_tokens=4000,
                    temperature=0.1,
                    system=self.system_prompt,
                    messages=[
                        {
                            "role": "user",
                            "content": user_message
                        }
                    ]
                )

                return response.content[0].text

            except anthropic.RateLimitError:
                return "**Rate Limit Error:** Please wait a moment before trying again."
            except anthropic.APIError as e:
                return f"**Anthropic API Error:** {str(e)}"
            except Exception as e:
                return f"**Unexpected API Error:** {str(e)}"

        except json.JSONDecodeError as e:
            return f"**JSON Parsing Error:** {str(e)}\n\nPlease ensure your input is valid JSON."
        except Exception as e:
            return f"**Error:** {str(e)}"

    def compare_with_summary_statement(self, proposal_json: str, summary_statement_json: str, model: str = "claude-3-5-sonnet-20241022") -> str:
        """
        Compares the proposal with an existing summary statement to provide enhanced feedback.

        Args:
            proposal_json: JSON string containing the proposal data
            summary_statement_json: JSON string containing the summary statement data
            model: Claude model to use

        Returns:
            Comparative analysis in markdown format
        """
        try:
            # Parse JSONs
            proposal_data = json.loads(proposal_json)
            summary_data = json.loads(summary_statement_json)

            # Prepare comparative analysis message
            user_message = f"""Please analyze this grant proposal and compare it with the provided summary statement to give enhanced feedback:

**ORIGINAL PROPOSAL:**
```json
{proposal_json}
```

**SUMMARY STATEMENT (for reference):**
```json
{summary_statement_json}
```

Provide a comprehensive analysis that:
1. Reviews the proposal using the same standards as the summary statement
2. Identifies how the proposal could be improved based on the feedback patterns in the summary statement
3. Provides specific recommendations for strengthening weak areas
4. Suggests how to maintain and enhance the strong aspects

Format your response according to the structured format specified in your instructions."""

            # Call Claude API
            try:
                response = self.client.messages.create(
                    model=model,
                    max_tokens=4000,
                    temperature=0.1,
                    system=self.system_prompt,
                    messages=[
                        {
                            "role": "user",
                            "content": user_message
                        }
                    ]
                )

                return response.content[0].text

            except anthropic.RateLimitError:
                return "**Rate Limit Error:** Please wait a moment before trying again."
            except anthropic.APIError as e:
                return f"**Anthropic API Error:** {str(e)}"
            except Exception as e:
                return f"**Unexpected API Error:** {str(e)}"

        except json.JSONDecodeError as e:
            return f"**JSON Parsing Error:** {str(e)}\n\nPlease ensure both inputs are valid JSON."
        except Exception as e:
            return f"**Error:** {str(e)}"

# Convenience functions for Google Colab usage
def setup_claude_reviewer(api_key: str = None):
    try:
        reviewer = ClaudeGrantReviewer(api_key)
        return reviewer
    except Exception as e:
        print(f"Error: {e}")
        print("Get your API key from: https://console.anthropic.com/")
        return None

def load_json_file(file_path: str) -> str:
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()
    json.loads(content)  # Validate JSON
    return content

def review_grant_proposal_from_file(reviewer: ClaudeGrantReviewer, proposal_file_path: str, display_review: bool = True):
    if reviewer is None:
        return None

    proposal_json = load_json_file(proposal_file_path)
    review = reviewer.analyze_proposal(proposal_json)

    if display_review:
        display(Markdown(review))

    return review

def compare_proposal_with_summary_from_files(reviewer: ClaudeGrantReviewer, proposal_file_path: str, summary_file_path: str, display_review: bool = True):
    if reviewer is None:
        return None

    proposal_json = load_json_file(proposal_file_path)
    summary_statement_json = load_json_file(summary_file_path)

    analysis = reviewer.compare_with_summary_statement(proposal_json, summary_statement_json)

    if display_review:
        display(Markdown(analysis))

    return analysis

# Legacy functions (kept for backward compatibility)
def review_grant_proposal(reviewer: ClaudeGrantReviewer, proposal_json: str, display_review: bool = True):
    if reviewer is None:
        return None

    review = reviewer.analyze_proposal(proposal_json)

    if display_review:
        display(Markdown(review))

    return review

def compare_proposal_with_summary(reviewer: ClaudeGrantReviewer, proposal_json: str, summary_statement_json: str, display_review: bool = True):
    if reviewer is None:
        return None

    analysis = reviewer.compare_with_summary_statement(proposal_json, summary_statement_json)

    if display_review:
        display(Markdown(analysis))

    return analysis

# Test with a simple, valid JSON example


print("Claude Grant Reviewer Ready")
print("Usage:")
reviewer = setup_claude_reviewer()
review = review_grant_proposal_from_file(reviewer, '/content/extracted_nsf_grant1.json')
analysis = compare_proposal_with_summary_from_files(reviewer, '/content/extracted_nsf_grant1.json', '/content/nih_summary_extracted5.json')

Claude Grant Reviewer Ready
Usage:


**Validation Errors:**
- Missing required section: project_description

# Grant Review Analysis

**SUMMARY**
This proposal aims to develop an integrated platform combining Surface Plasmon Resonance (SPR) with computational modeling to characterize antibody:antigen binding, specifically focusing on HSV glycoprotein D. The project leverages high-throughput experimental methods and computational approaches to map epitopes and understand antibody recognition patterns, with potential applications in therapeutic antibody development and viral immunology.

**OVERALL IMPACT AND SIGNIFICANCE**
- The proposal presents an innovative technical approach to a significant problem in antibody characterization and viral immunology
- The integration of computational and experimental methods shows strong potential for advancing the field
- The focus on HSV glycoprotein D provides immediate clinical relevance while demonstrating broader applicability
- However, like the reference summary statement's emphasis on preliminary data, this proposal would benefit from stronger validation data

**STRENGTHS**
- Strong multi-disciplinary team with complementary expertise across institutions
- Innovative integration of computational and experimental approaches
- Clear technical feasibility given the team's access to specialized equipment (Wasatch SPRi)
- Well-defined methodology with specific, measurable objectives
- Direct clinical relevance with broader implications for antibody characterization

**WEAKNESSES AND AREAS FOR IMPROVEMENT**
1. Preliminary Data Gap
   - Issue: Limited preliminary data demonstrating feasibility
   - Solution: Include pilot data showing successful SPR measurements with a subset of antibodies

2. Validation Strategy
   - Issue: Insufficient detail on validation approaches
   - Solution: Add orthogonal validation methods (e.g., crystallography or cryo-EM)

3. Risk Mitigation
   - Issue: Limited discussion of potential technical challenges
   - Solution: Include alternative approaches and contingency plans

**DETAILED CRITIQUE BY CATEGORY**

*Significance and Innovation:*
- High potential impact on antibody characterization methodology
- Novel integration of computational and experimental approaches
- Could benefit from stronger emphasis on immediate clinical applications
- Innovation level is high but needs better contextualization within current field

*Approach and Methodology:*
- Well-structured experimental design
- Clear technical feasibility
- Needs more detail on computational methods
- Should include more validation steps

*Team and Resources:*
- Strong multi-institutional collaboration
- Excellent mix of expertise
- Access to necessary specialized equipment
- Could benefit from additional computational biology expertise

*Budget and Timeline:*
- Budget details are incomplete in provided information
- Timeline appears feasible but needs more detailed milestones
- Equipment costs seem appropriate given the technical requirements

**RECOMMENDATIONS**

1. Priority Actions:
   - Add preliminary data showing proof-of-concept results
   - Develop more detailed validation strategy
   - Strengthen computational methods description

2. Enhancement Opportunities:
   - Include more specific milestones and deliverables
   - Add success criteria for each objective
   - Expand risk mitigation strategies

3. Impact Enhancement:
   - Better articulate immediate clinical applications
   - Strengthen connection to therapeutic development
   - Add broader impact statement for non-HSV applications

4. Learning from Summary Statement:
   - Like the reference case, emphasize preliminary data
   - Include more mechanistic studies
   - Add clear validation approaches
   - Strengthen methodology details

5. Team Development:
   - Consider adding computational biology collaborator
   - Define clear roles for each team member
   - Include regular team meeting schedule and coordination plan

The proposal shows strong potential but would benefit from addressing these recommendations to achieve the high standard of successful grants like the reference summary statement.