**Plan for Task AB-44**

Core LLM-based Task Extraction Engine for AB-44


*Objectives*

* Implement robust document processing with hierarchical awareness

* Build advanced LLM-based task extraction with chain-of-thought reasoning

* Create structured output schema for database integration

* Develop initial unit tests for validation


**Step 1: Library Installations**

In [None]:

!pip install llama-index llama-parse "psycopg[binary]" openai
!pip install langchain langchain-openai
!pip install instructlab
!pip install asyncio nest_asyncio


Collecting llama-index
  Downloading llama_index-0.12.28-py3-none-any.whl.metadata (12 kB)
Collecting llama-parse
  Downloading llama_parse-0.6.4.post1-py3-none-any.whl.metadata (6.9 kB)
Collecting psycopg[binary]
  Downloading psycopg-3.2.6-py3-none-any.whl.metadata (4.4 kB)
Collecting llama-index-agent-openai<0.5.0,>=0.4.0 (from llama-index)
  Downloading llama_index_agent_openai-0.4.6-py3-none-any.whl.metadata (727 bytes)
Collecting llama-index-cli<0.5.0,>=0.4.1 (from llama-index)
  Downloading llama_index_cli-0.4.1-py3-none-any.whl.metadata (1.5 kB)
Collecting llama-index-core<0.13.0,>=0.12.28 (from llama-index)
  Downloading llama_index_core-0.12.28-py3-none-any.whl.metadata (2.6 kB)
Collecting llama-index-embeddings-openai<0.4.0,>=0.3.0 (from llama-index)
  Downloading llama_index_embeddings_openai-0.3.1-py3-none-any.whl.metadata (684 bytes)
Collecting llama-index-indices-managed-llama-cloud>=0.4.0 (from llama-index)
  Downloading llama_index_indices_managed_llama_cloud-0.6.11-py

In [None]:
#Set openai api_key value
import os
os.environ['OPENAI_API_KEY'] = "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"


**Step 2: Define all classes**

2.1: Create Document Processsor class

(Note, there is a new potential enhancement to this step, called PageIndex
(see this github repository's readMe page for details: https://github.com/VectifyAI/PageIndex)

In [None]:
# Cell 1: Define EnhancedDocumentProcessor class
import os
import re
import json
import logging
from typing import List, Dict, Any, Optional
from datetime import datetime

# LlamaParse for advanced PDF handling
from llama_parse import LlamaParse

# LlamaIndex for document structuring and retrieval
from llama_index.core import Document
from llama_index.core.node_parser import SentenceSplitter

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class EnhancedDocumentProcessor:
    """Process RFQ documents with hierarchical awareness and page tracking"""

    def __init__(self, llama_api_key: str):
        """Initialize with necessary API keys and configurations"""
        self.parser = LlamaParse(
            api_key=llama_api_key,
            result_type="markdown",  # Get structured markdown output
            max_request_size=20000,  # Handle larger documents
            verbose=True,
            include_metadata=True    # Ensure metadata like page numbers are included
        )
        self.node_parser = SentenceSplitter(
            chunk_size=512,           # Balance between context and specificity
            chunk_overlap=50,         # Maintain continuity between chunks
            paragraph_separator="\n\n"
        )

    async def process_document(self, pdf_path: str) -> List[Dict[str, Any]]:
        """Process document and extract structured nodes with rich metadata including page numbers"""
        logging.info(f"Processing document: {pdf_path}")

        # Extract file metadata
        filename = os.path.basename(pdf_path)
        document_id = os.path.splitext(filename)[0]
        nodes = []

        try:
            # Parse the PDF with LlamaParse
            parsed_document = await self.parser.aload_data(pdf_path)
            logging.info(f"Document parsed successfully: {len(parsed_document)} sections")

            # ADD DIAGNOSTIC Script HERE
            with open(pdf_path, 'rb') as f:
                content = f.read()
            logging.info(f"File size: {len(content)} bytes")
            logging.info(f"Parsed content length: {sum(len(str(item)) for item in parsed_document)}")
            logging.info(f"First 500 chars: {str(parsed_document[0])[:500] if parsed_document else 'EMPTY'}")
            # Try basic regex directly on raw text
            import re
            raw_text = " ".join([str(item) for item in parsed_document])
            shall_count = len(re.findall(r'\bshall\b', raw_text, re.IGNORECASE))
            must_count = len(re.findall(r'\bmust\b', raw_text, re.IGNORECASE))
            will_count = len(re.findall(r'\bwill\b', raw_text, re.IGNORECASE))
            logging.info(f"Found {shall_count} shall, {must_count} must, {will_count} will statements")
            # END DIAGNOSTIC CODE

            # Extract document structure with section hierarchy and page numbers
            structured_sections = self._extract_document_structure(parsed_document)
            logging.info(f"Extracted {len(structured_sections)} structured sections")

            # Create nodes with enhanced metadata
            nodes = self._create_nodes_with_metadata(structured_sections, document_id)
            logging.info(f"Created {len(nodes)} context-rich nodes")

        except Exception as e:
            logging.error(f"Error in primary parsing: {str(e)}")
            # Fallback to simpler parsing
            parsed_document = await self.parser.aload_data(pdf_path)

            logging.info(f"Document parsed successfully: {len(parsed_document)} sections")
            # Extract document structure with section hierarchy
            structured_sections = self._extract_document_structure(parsed_document)
            logging.info(f"Extracted {len(structured_sections)} structured sections")

            # Create nodes with enhanced metadata
            nodes = self._create_nodes_with_metadata(structured_sections, document_id)
            logging.info(f"Created {len(nodes)} context-rich nodes")

        return nodes

    def _extract_document_structure(self, parsed_document: List[Any]) -> List[Dict[str, Any]]:
        """Extract hierarchical section information and page numbers from parsed document"""
        structured_sections = []
        current_sections = {"level1": None, "level2": None, "level3": None}

        # Track the current header information
        current_header = "Unknown Header"
        current_header_num = ""

        for item in parsed_document:
            # Handle different response formats from LlamaParse
            if isinstance(item, dict):
                content = item.get('text', '')
                metadata = item.get('metadata', {})
            else:
                content = item.text
                metadata = item.metadata

            # Extract page number - add default if not available
            page_number = 1
            if hasattr(metadata, 'get'):
                page_number = metadata.get('page_number', 1)
            elif hasattr(metadata, 'page_label'):
                page_number = metadata.page_label
                if isinstance(page_number, str) and page_number.isdigit():
                    page_number = int(page_number)
                else:
                    page_number = 1

            # Detect section headers
            section_header = self._detect_section_header(content)
            header_text = section_header if section_header else current_header

            # Extract requirement header number if present
            header_number = self._extract_header_number(header_text)
            if header_number:
                current_header_num = header_number

            if section_header:
                # Determine section level from formatting and content
                level = self._determine_section_level(section_header, content)

                # Update current section hierarchy
                if level == 1:
                    current_sections["level1"] = section_header
                    current_sections["level2"] = None
                    current_sections["level3"] = None
                    current_header = section_header
                elif level == 2:
                    current_sections["level2"] = section_header
                    current_sections["level3"] = None
                    current_header = section_header
                elif level == 3:
                    current_sections["level3"] = section_header
                    current_header = section_header

            # Build full section path for context
            section_path = self._build_section_path(current_sections)

            # Store structured section with hierarchy information and page number
            level = max((i + 1 for i, s in enumerate(current_sections.values()) if s is not None), default=1)
            structured_sections.append({
                "text": content,
                "section": section_path,
                "level": level,
                "page_number": page_number,
                "header": current_header,
                "header_number": current_header_num,
                "metadata": metadata
            })

        return structured_sections

    def _detect_section_header(self, text: str) -> Optional[str]:
        """Enhanced section header detection for various document structures"""
        # Expanded patterns to match more document formats
        section_patterns = [
            # Standard section formats
            r'^(?:[A-Z]\.|\d+\.)\s*([A-Za-z\s]+)',              # A. Section Name or 1. Section Name
            r'^(?:Section|SECTION)\s+[A-Z]+[\.:]?\s*(.+)',      # Section A: Name
            r'^(?:[IVX]+\.|\d+\.\d+\.?)\s+([A-Za-z\s]+)',       # IV. Name or 1.2 Name
            # Specialized formats common in SOWs
            r'^(?:Task|TASK)\s+\d+(?:\.\d+)?[\.:]?\s*(.+)',     # Task 1: Name or Task 1.2: Name
            r'^(?:Deliverable|DELIVERABLE)\s+\d+[\.:]?\s*(.+)', # Deliverable 1: Name
            r'^[A-Z][A-Z\s]+:',                                 # PURPOSE: or BACKGROUND:
            r'^\s*[A-Z][A-Za-z\s]+\s*$'                         # Single line all caps or title case
        ]

        for pattern in section_patterns:
            matches = re.search(pattern, text, re.MULTILINE)
            if matches:
                # Clean up and normalize the header text
                header = matches.group(0).strip()
                # Store full section path information
                return header

        return None

    def _extract_header_number(self, header_text: str) -> str:
        """Extract numerical identifier from a header if present"""
        patterns = [
            r'(\d+\.\d+(?:\.\d+)?)',  # Matches 1.2 or 1.2.3
            r'([A-Z]\.\d+)',          # Matches C.1
            r'Task\s+(\d+(?:\.\d+)?)', # Matches Task 1 or Task 1.2
            r'Section\s+(\d+(?:\.\d+)?)', # Matches Section 1 or Section 1.2
            r'^(\d+)\.'               # Matches 1. at the beginning of a string
        ]

        for pattern in patterns:
            match = re.search(pattern, header_text)
            if match:
                return match.group(1)

        return ""

    def _determine_section_level(self, header: str, content: str) -> int:
        """Determine section level from header format"""
        # Check for heading level indicators
        if re.match(r'^C\.\d+$|^\d+\.$|^Task\s+\d+$|^Section\s+\d+$', header):
            return 1  # Top level
        elif re.match(r'^C\.\d+\.\d+$|^\d+\.\d+$|^Task\s+\d+\.\d+$', header):
            return 2  # Sub-section
        elif re.match(r'^C\.\d+\.\d+\.\d+$|^\d+\.\d+\.\d+$', header):
            return 3  # Sub-sub-section

        # Fallback on heading style if available
        if content.startswith('# '):
            return 1
        elif content.startswith('## '):
            return 2
        elif content.startswith('### '):
            return 3

        return 1  # Default to top level if unsure

    def _build_section_path(self, sections: Dict[str, Optional[str]]) -> str:
        """Build full section path for hierarchical context"""
        path_parts = []
        for level in ["level1", "level2", "level3"]:
            if sections[level]:
                path_parts.append(sections[level])

        return " > ".join(path_parts) if path_parts else "Unknown Section"

    def _create_nodes_with_metadata(self, structured_sections: List[Dict], document_id: str) -> List[Dict]:
        """Create content nodes with rich metadata from structured sections including page numbers"""
        enriched_nodes = []

        for section in structured_sections:
            # Skip empty sections
            if not section["text"].strip():
                continue

            # Create document for this section
            doc = Document(
                text=section["text"],
                metadata={
                    "section_path": section["section"],
                    "section_level": section["level"],
                    "document_id": document_id,
                    "page_number": section["page_number"],
                    "header": section["header"],
                    "header_number": section["header_number"],
                    "processing_date": datetime.now().isoformat(),
                }
            )

            # Parse into nodes with context preserved
            nodes = self.node_parser.get_nodes_from_documents([doc])

            # Add original metadata to each node
            for node in nodes:
                node_dict = node.to_dict()
                node_dict["metadata"].update(section.get("metadata", {}))
                enriched_nodes.append(node_dict)

        return enriched_nodes


2.2: Create Task Extraction class

In [None]:
# Updated Step 2: Task extraction tool build

import json
import logging
from typing import List, Dict, Any
import openai
from openai import AsyncOpenAI
import os
import re
from difflib import SequenceMatcher
from tenacity import retry, stop_after_attempt, wait_exponential

class TaskExtractionAgent:
    """Extract requirements from RFQ documents with page number and header tracking"""
    def __init__(self, api_key: str, model: str = "gpt-4o"):
        """Initialize with OpenAI API key and model"""
        self.api_key = api_key
        self.model = model
        openai.api_key = api_key  # Set the API key globally
        self.req_counter = 1  # Sequential requirement counter
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

    def _extract_technical_skill_area(self, task_text: str) -> str:
        """Identify technical skill area based on requirement text"""
        text_lower = task_text.lower()

        # Extended Technical Skill Areas mapping
        skill_areas = {
            "Data Management": ["data", "database", "storage", "repository", "files"],
            "Analytics": ["analysis", "analyze", "analytics", "statistics", "trends"],
            "Training": ["train", "education", "curriculum", "webinar", "instruction"],
            "Project Management": ["manage", "coordinate", "schedule", "plan", "timeline"],
            "Technical Support": ["support", "assist", "help desk", "guidance"],
            "Quality Improvement": ["quality", "improvement", "performance", "measure"],
            "Documentation": ["document", "report", "write", "prepare", "draft"],
            "Meeting Facilitation": ["meeting", "facilitate", "agenda", "discussion"],
            "Research": ["research", "study", "investigate", "literature"],
            "Compliance": ["comply", "compliance", "regulation", "requirement"],
            "Security": ["security", "protection", "confidential", "privacy"],

            # Design Services
            "Design Services": ["branding", "identity", "graphic", "illustration", "human-centered",
                              "multimedia", "video", "packaging", "print", "product", "industrial",
                              "web", "mobile", "design"],

            # Financial & Accounting Services
            "Financial & Accounting Services": ["accounting", "bookkeeping", "audit", "assurance",
                                             "banking", "payment", "cfo", "financial planning",
                                             "risk", "compliance", "strategy", "transformation",
                                             "erp", "forensic", "fraud", "grants", "investment",
                                             "treasury", "merger", "acquisition", "payroll",
                                             "procurement", "contract", "tax"],

            # Human Resources Services
            "Human Resources Services": ["compensation", "benefits", "diversity", "employee",
                                      "relations", "conflict", "hr policy", "hris", "hr technology",
                                      "outplacement", "offboarding", "performance", "staffing",
                                      "talent", "succession", "career", "recruitment"],

            # IT Services
            "IT Services": ["artificial intelligence", "machine learning", "blockchain", "distributed ledger",
                         "cloud", "cybersecurity", "data", "analytics", "devops", "automation",
                         "emerging", "healthcare it", "infrastructure", "networking", "internet of things",
                         "iot", "it consulting", "strategy", "governance", "support", "managed services",
                         "quantum", "software", "application", "development"],

            # Legal Services
            "Legal Services": ["dispute resolution", "adr", "contract law", "corporate governance",
                           "data privacy", "employment law", "government contracting", "intellectual property",
                           "ip", "litigation", "ediscovery", "m&a legal", "regulatory compliance"],

            # Management Services
            "Management Services": ["agile", "change management", "governance", "operations", "performance",
                                "process improvement", "procurement", "supply", "project", "program",
                                "strategy", "business planning"],

            # Marketing & PR Services
            "Marketing & PR Services": ["advertising", "brand", "content", "seo", "digital marketing",
                                     "event", "promotions", "market research", "public relations"],

            # Training & Communications
            "Training & Communications": ["change communications", "crisis", "elearning", "instructional",
                                       "internal communications", "knowledge management", "technical writing",
                                       "leadership", "management training", "media training", "public speaking",
                                       "onboarding", "orientation", "soft skills", "technical training"]
        }

        # Find matching skill area
        for area, keywords in skill_areas.items():
            if any(keyword in text_lower for keyword in keywords):
                return area
        # Default if no match found
        return "Process Implementation"

    def _generate_brief_requirement(self, task_text: str) -> str:
        """Generate brief requirement description from full text"""
        # Try to extract core requirement using patterns
        patterns = [
            r'shall\s+([^.,;]+)',
            r'will\s+([^.,;]+)',
            r'must\s+([^.,;]+)',
            r'required to\s+([^.,;]+)'
        ]

        for pattern in patterns:
            match = re.search(pattern, task_text, re.IGNORECASE)
            if match:
                brief = match.group(1).strip()
                # Capitalize first letter and ensure it ends with proper punctuation
                brief = brief[0].upper() + brief[1:]
                if len(brief) > 10 and len(brief) <= 80:
                    return brief

        # If no good matches, take first X characters and clean up
        if len(task_text) > 80:
            brief = task_text[:77] + "..."
        else:
            brief = task_text

        # Clean up any partial sentences
        brief = re.sub(r'^\s*the\s+contractor\s+shall\s+', '', brief, flags=re.IGNORECASE)

        return brief

    def validate_tasks(self, tasks: List[Dict[str, Any]], document_id: str, full_document_text: str) -> List[Dict[str, Any]]:
        valid_tasks = []
        seen_texts = set()

        task_id=0
        for task in tasks:
            # Less restrictive filtering
            if len(task.get("task_text", "")) > 5:
                task_text = task.get("task_text", "").lower().strip()
                if task_text not in seen_texts:
                    seen_texts.add(task_text)

                    # Calculate task_index using fuzzy matching
                    task_index = self.find_text_index(full_document_text, task.get("task_text", ""))
                    if task_index == -1:
                        logging.warning(f"Task text not found in document: {task.get('task_text', '')}")
                    else:
                        task_id += 1
                    # Create task in new required format
                    formatted_task = {
                        "task_id": task_id,
                        "document_id": document_id,
                        "task_text": task.get("task_text", ""),
                        "task_index": task_index,
                    }

                    valid_tasks.append(formatted_task)
        return valid_tasks

    """ prior code for 'find_text_index':
        def find_text_index(self, full_document_text: str, task_text: str) -> int:
        """ """Find the index of task_text within full_document_text using fuzzy matching.""" """
        normalized_document = full_document_text.lower().replace("\n", " ").strip()
        normalized_task = task_text.lower().replace("\n", " ").strip()

        # Use SequenceMatcher to find the best match
        match = SequenceMatcher(None, normalized_document, normalized_task).find_longest_match(0, len(normalized_document), 0, len(normalized_task))

        if match.size > 0:
            return match.a  # Return start index of match
        else:
            return -1  # Return -1 if no match found
            """
    def find_text_index(self, full_document_text: str, task_text: str) -> int:
        """Find the index of task_text within full_document_text reliably."""

        # Normalize both texts, remove extra spaces and newlines
        normalized_document = ' '.join(full_document_text.lower().split())
        normalized_task = ' '.join(task_text.lower().split())

        # Start with an exact substring search
        exact_index = normalized_document.find(normalized_task)
        if exact_index >= 0:
            return exact_index

        # If no exact match, perform fuzzy matching with higher threshold
        matcher = SequenceMatcher(None, normalized_document, normalized_task)
        match = matcher.find_longest_match(0, len(normalized_document), 0, len(normalized_task))

        # Set stronger threshold (match must account for at least 90% of task_text length)
        if match.size / len(normalized_task) > 0.9:
            return match.a
        else:
            # As an extra safety net, return -1 if no sufficiently good match is found
            # This clearly signals problematic matches
            return -1

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
    async def extract_tasks(self, document_node: Dict[str, Any], full_document_text: str) -> List[Dict[str, Any]]:
        """Extract requirements with page numbers and header context"""
        # Get text and metadata from the node
        text = document_node.get("text", "")
        metadata = document_node.get("metadata", {})
        document_id = metadata.get("document_id", "Unknown Document")
        section_path = metadata.get("section_path", "Unknown Section")
        header = metadata.get("header", "Unknown Section")
        page_number = metadata.get("page_number", 1)

        # Diagnostic information about the input text
        logging.info(f"Processing text from page {page_number}, length {len(text)} chars")
        logging.info(f"From section: {header}")

        # Check for common requirement indicators in text
        shall_count = len(re.findall(r'\bshall\b', text, re.IGNORECASE))
        must_count = len(re.findall(r'\bmust\b', text, re.IGNORECASE))
        will_count = len(re.findall(r'\bwill\b', text, re.IGNORECASE))
        logging.info(f"Text contains: {shall_count} 'shall', {must_count} 'must', {will_count} 'will' statements")

        # Initialize tasks list
        tasks = []

        # Use enhanced prompt focused on clear requirements
        system_prompt = """
          You are an expert contract analyst, extracting verbatim tasks precisely assigned to a contractor within procurement and contract documents.

          Extract every sentence or bullet exactly from the provided text that clearly assigns, instructs, describes, implies, or outlines any task, responsibility, or action to be performed by the Contractor.

          Rules for extraction:
          - Include EACH sentence or bullet verbatim exactly as it appears in the source text, without summarizing, paraphrasing, adding or omitting words, altering capitalization, bullet symbols, punctuation, spacing, or formatting.
          - Include sentences defined explicitly with phrases like "The Contractor shall/should/must/will/may..." etc.
          - Include sentences that clearly imply or instruct actions the contractor might take, even without explicitly including the word "Contractor".
          - Include short imperative-form tasks explicitly directed toward the contractor.
          - Include tasks with conditionals/qualifiers ("if applicable," "where appropriate," "as needed," etc.).
          - Include phrases explicitly mentioning collaboration or coordination ("shall collaborate with," "shall coordinate with," "shall work with," etc.).
          - Ensure compound sentences (multiple tasks linked within one sentence) are extracted as ONE complete sentence without breaking them apart.
          - Include any variations/substitutions explicitly indicating the Contractor ("The Contractor", "Implementation Contractor", "Contractor staff," etc.).

          Examples explicitly illustrating various extraction cases (EXACT FORMAT REQUIRED):
          - The Contractor shall produce materials and manage these products as syndicated content that can be distributed through multiple partner channels and mediums.
          - The Contractor shall work with other CMS contractors to ensure data integration and reporting quality metrics from CMMI models to other CMS programs are performed as needed.
          - Prepare a Final Report upon project completion.
          - Support the CMS actuarial certification review processes and the process of submitting the model for approval of expansion by the Secretary
          - The contractor may use concepts from overlapping disciplines, such as plain language, human-centered design (HCD), interaction design, and usability.
          - The Contractor shall explicitly support multiple models concurrently, if applicable.
          - The Contractor shall provide a framework for an evaluation design approach for any specific model that creates appropriate comparison groups.

          Your extraction MUST be exact and verbatim. DO NOT summarize, paraphrase, alter, reformat, remove, or add characters or words.
                      """
        # Enhanced prompt to capture more requirements
        user_prompt = f"""
        {system_prompt}

          Carefully extract ALL contractor tasks explicitly and clearly assigned or implied within the following provided document text.

          Output explicit instructions:
          - Provide each extracted requirement/task sentence exactly as it verbatim appears in the document text.
          - Output each extracted task on EXACTLY ONE (1) separate line.
          - Do NOT add numbering, quotation marks, symbols, bullets, or any additional characters whatsoever.
          - Do NOT join or split sentences; maintain each task exactly at its original sentence boundary, length, punctuation, and formatting exactly.

          Here is the document text explicitly for your analysis and extraction:
          {text}
          """

        try:
            client = AsyncOpenAI()
            response = await client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt}
                ],
                temperature=0.7,
                top_p=1,
                frequency_penalty=0.0,
                presence_penalty=0.0
            )
            extracted_tasks = response.choices[0].message.content.strip().splitlines()
            for task in extracted_tasks:
                # Less restrictive filtering
                if len(task) > 5:
                    tasks.append({
                        #"task_id": task_id
                        "document_id": document_id,
                        "task_text": task,
                        "task_index": self.find_text_index(full_document_text, task)
                    })

        except Exception as e:
            logging.error(f"Error extracting tasks: {str(e)}")
            # Fallback to regex extraction if LLM fails
            patterns = [
                r'(The\s+contractor\s+(?:shall|will|must)[^.]+\.?)',  # Capture full sentence including period
                r'(The\s+contractor\s+(?:shall|will|must)[^.]+)$',    # Capture sentence without period if at end of text
            ]

            for pattern in patterns:
                matches = re.findall(pattern, text, re.IGNORECASE)
                for match in matches:
                    tasks.append({
                        #"task_id": task_id,   # DEFINED LATER, IN THE VALIDATE_TASKS PROCEDURE
                        "document_id": document_id,
                        "task_text": match.strip(),  # Use the full match
                        "task_index": self.find_text_index(full_document_text, match)
                    })

        return tasks


2.3 Main Orchestration Script

In [None]:
# Cell 3.1 : Main Orchestration Script
import asyncio
import os
import pandas as pd
import logging
from typing import List, Dict, Any

# Database connection
import sqlalchemy as sa
from sqlalchemy import create_engine, text

async def extract_requirements_from_rfq(pdf_path, llama_api_key, openai_api_key, db_config):
    """Main function to extract requirements from an RFQ document"""

    # Initialize processors
    document_processor = EnhancedDocumentProcessor(llama_api_key=llama_api_key)
    task_extractor = TaskExtractionAgent(api_key=openai_api_key)

    # Reset the requirement counter for each new document
    task_extractor.req_counter = 1

    # Step 1: Process document into structured nodes
    logging.info(f"Processing document: {pdf_path}")
    document_nodes = await document_processor.process_document(pdf_path)
    logging.info(f"Generated {len(document_nodes)} document nodes")

    # Extract full document text
    full_document_text = ""
    for node in document_nodes:
        full_document_text += node.get("text", "")

    # Step 2: Extract requirements from each node
    all_requirements = []
    for node in document_nodes:
        # Pass full_document_text to extract_tasks
        node_requirements = await task_extractor.extract_tasks(node,full_document_text)
        for requirement in node_requirements:
          all_requirements.append(requirement)

    logging.info(f"Extracted {len(all_requirements)} requirements from document")


    # Step 3: Validate and store requirements in database
    if db_config:
        validated_requirements = task_extractor.validate_tasks(all_requirements, "document_id", full_document_text)
        store_requirements_in_database(validated_requirements, db_config)

    return all_requirements


def create_database_tables(engine):
    """Create or update required database tables in the rfq schema"""
    from sqlalchemy import text

    with engine.connect() as conn:
        # First ensure the 'rfq' schema exists
        conn.execute(text("CREATE SCHEMA IF NOT EXISTS rfq"))
        conn.commit()

        # Create the table with the new required columns
        conn.execute(text("""
            CREATE TABLE IF NOT EXISTS rfq.rfq_tasks (
            task_id integer,
            document_id VARCHAR(255) NOT NULL,
            task_text TEXT NOT NULL,
            task_index integer,
             created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
             )
        """))
        conn.commit()
        logging.info("Database table schema verified/created in rfq schema")

def store_requirements_in_database(requirements, db_config):
    """Store extracted requirements in database with proper error handling"""
    try:
        # Create connection string
        connection_string = f"postgresql+psycopg://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['dbname']}"
        engine = create_engine(connection_string)

        # Ensure database schema is set up
        create_database_tables(engine)

        # Create DataFrame from requirements
        requirements_df = pd.DataFrame(requirements)
        print("DataFrame columns:", requirements_df.columns)
        print("requirements_df head:",requirements_df.head())

        # List of columns that exist in the database schema
        valid_columns = ['task_id', 'document_id', 'task_text', 'task_index'     ]

        # Filter DataFrame to only include valid columns that exist in the dataframe
        existing_valid_columns = [col for col in valid_columns if col in requirements_df.columns]
        requirements_df = requirements_df[existing_valid_columns]

        # Convert any None or NaN values to empty strings to avoid TypeErrors
        requirements_df = requirements_df.fillna('')

        # Store in database - specify the rfq schema
        requirements_df.to_sql('rfq_tasks', engine, schema='rfq', if_exists='append', index=False)
        logging.info(f"Successfully stored {len(requirements)} requirements in rfq.rfq_tasks table")

    except Exception as e:
        logging.error(f"Error storing requirements in database: {str(e)}")
        # Continue execution rather than crashing


**Step 3: Integration and Testing**


3.0 (OPTIONAL) Table re-build

In [None]:
#OPTIONAL 2_PART Table Rebuild- USE WITH CARE AND ONLY IF TRULY NECESSARY
# This is where the table layout can be re-built.
import pandas as pd
from sqlalchemy import create_engine, inspect
from urllib.parse import quote
from sqlalchemy.types import String, Integer
from sqlalchemy import text

DB_PASSWORD="dG1RWVzD!F4YaneD$$"

import psycopg2
# Step 1: Verify Database Connection
# Database connection parameters
db_config = {
    "host": "advantantus-prod.cmfo86w02i47.us-east-1.rds.amazonaws.com",
    "port": "5432",
    "dbname": "postgres",
    "user": "postgres",
    "password": "dG1RWVzD!F4YaneD$$"
}

try:
    # Connect to the database
    conn = psycopg2.connect(
        host=db_config['host'],
        port=db_config['port'],
        database=db_config['dbname'],
        user=db_config['user'],
        password=db_config['password']
    )
    print("Connected to PostgreSQL database")
except Exception as e:
    print(f"Error connecting to database: {e}")

# async def extract_requirements_from_rfq(pdf_path, llama_api_key, openai_api_key, db_config):
#    """Main function to extract requirements from an RFQ document"""


#Step 2: Drop (if exists)  and Create the table

# Create Database Connection
encoded_password = quote(DB_PASSWORD, safe='')
connection_url = (
    f"postgresql+psycopg2://postgres:{encoded_password}@"
    "advantantus-prod.cmfo86w02i47.us-east-1.rds.amazonaws.com:5432/postgres"
)
engine = create_engine(connection_url)

with engine.begin() as conn:
    # This creates a transaction that will be committed at the end of the block
    conn.execute(text('DROP TABLE IF EXISTS "rfq"."rfq_tasks" CASCADE;'))
    print("Dropped existing rfq_tasks table if it existed")

# Create the table with the new schema
with engine.begin() as conn:  # Using begin() to ensure transaction is committed
    # Verify table doesn't exist before creating
    result = conn.execute(text("""
        SELECT EXISTS (
            SELECT 1 FROM information_schema.tables
            WHERE table_schema = 'rfq' AND table_name = 'rfq_tasks'
        );
    """))

    if not result.scalar():
      # Create the new table with option to updated schema
        conn.execute(text("""
        CREATE TABLE "rfq"."rfq_tasks" (
            task_id integer,
            document_id VARCHAR(255) NOT NULL,
            task_text TEXT NOT NULL,
            task_index integer
            );
        """))
        print("Created table: rfq.rfq_tasks with new schema")
    else:
      print("Table rfq_tasks already exists - verify schema manually")

Connected to PostgreSQL database
Dropped existing rfq_tasks table if it existed
Created table: rfq.rfq_tasks with new schema


**SECTION 3.1: RUN-TIME SCRIPT**

In [None]:
#Updated v. of Run-time script (from Copy of Advantantus_2):
#Run-time  Cell: Run the extraction with your RFQ
import logging
import asyncio
import pandas as pd
from sqlalchemy import create_engine, text
import nest_asyncio

# Enable nested asyncio (needed for Jupyter)
nest_asyncio.apply()

#OPTIONAL DEFINITION OF OOPENAI api KEY:
openai_api_key="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",


# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def create_database_tables(engine):
    """Create or update required database tables in the rfq schema"""
    from sqlalchemy import text

    with engine.connect() as conn:
        # First ensure the 'rfq' schema exists
        conn.execute(text("CREATE SCHEMA IF NOT EXISTS rfq"))
        conn.commit()

        # Create the table with the new required columns
        conn.execute(text("""
            CREATE TABLE IF NOT EXISTS "rfq"."rfq_tasks" (
                task_id integer,
                document_id VARCHAR(255) NOT NULL,
                task_text TEXT NOT NULL,
                task_index numeric
            )
        """))
        conn.commit()
        logging.info("Database tables verified/created in rfq schema")


def store_requirements_in_database(requirements, db_config):
    """Store extracted requirements in database with proper error handling"""
    try:
        # Create connection string
        connection_string = f"postgresql+psycopg://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['dbname']}"
        engine = create_engine(connection_string)

        # Ensure database schema is set up
        create_database_tables(engine)

        # Create DataFrame from requirements
        requirements_df = pd.DataFrame(requirements)
        print("DataFrame columns:", requirements_df.columns)

        # Drop rows with task_index < 0
        requirements_df = requirements_df[requirements_df['task_index'] >= 0]

        # List of columns that exist in the database schema
        valid_columns = ['task_id', 'document_id', 'task_text', 'task_index' ]

        # Filter DataFrame to only include valid columns that exist in the dataframe
        existing_valid_columns = [col for col in valid_columns if col in requirements_df.columns]
        requirements_df = requirements_df[existing_valid_columns]

        # Convert any None or NaN values to empty strings to avoid TypeErrors
        requirements_df = requirements_df.fillna('')

        # Store in database - specify the rfq schema
        #requirements_df.to_sql('rfq_tasks', engine, schema='rfq', if_exists='append', index=False)
        requirements_df.to_sql( name='rfq_tasks',con=engine, schema='rfq', if_exists='append',index=False)
        print("rfq tasks inserted successfully.")

        logging.info(f"Successfully stored {len(requirements_df)} requirements in rfq.rfq_tasks table")
        print(f"Successfully stored {len(requirements_df)} requirements in rfq.rfq_tasks table")
    except Exception as e:
        logging.error(f"Error storing requirements in database: {str(e)}")
        # Continue execution rather than crashing


async def extract_requirements_from_rfq(pdf_path, llama_api_key, openai_api_key, db_config):
    """Main function to extract requirements from an RFQ document"""

    # Initialize processors
    document_processor = EnhancedDocumentProcessor(llama_api_key=llama_api_key)
    task_extractor = TaskExtractionAgent(api_key=openai_api_key)

    # Reset the requirement counter for each new document
    task_extractor.req_counter = 1

    # Step 1: Process document into structured nodes
    logging.info(f"Processing document: {pdf_path}")
    document_nodes = await document_processor.process_document(pdf_path)
    logging.info(f"Generated {len(document_nodes)} document nodes")
    # Extract full document text
    full_document_text = ""
    for node in document_nodes:
        full_document_text += node.get("text", "")

    # Step 2: Extract requirements from each node
    all_requirements = []
    for node in document_nodes:
        node_requirements = await task_extractor.extract_tasks(node,full_document_text)
        all_requirements.extend(node_requirements)

    logging.info(f"Extracted {len(all_requirements)} requirements from document")
    # Step 3: Store requirements in database
    if db_config:
      validated_requirements = task_extractor.validate_tasks(all_requirements, "document_id", full_document_text)
      store_requirements_in_database(validated_requirements, db_config)
    # PRIOR
    #Step 3: Store requirements in database
    #if db_config:
    #    store_requirements_in_database(all_requirements, db_config)

    return all_requirements

# Main execution
async def main():
    # Configuration
    config = {
        "llama_api_key": "llx-mW09cH1F4xP5F9EKHHVNptBhOkbXMjPu1zr1Xzek7Zt2kuZa",
        "openai_api_key": "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",
        "pdf_path":"75FCMC25RJ018_Section_C.pdf",
                      #"Attachment 1 - PALS SOW.pdf",#"COREQ_SOW.pdf", #"PM3 II Consolidated PWS 02132024.pdf",
                      #  ,"SA InsightsAI II 06172022 FE.pdf",'RMADA_II_75FCMC25RJ018_Section_C.pdf', "75FCMC25RJ018_ALL_SECTIONS.pdf",
        "db_config": {
            "host": "advantantus-prod.cmfo86w02i47.us-east-1.rds.amazonaws.com",
            "port": "5432",
            "dbname": "postgres",
            "user": "postgres",
            "password": "dG1RWVzD!F4YaneD$$"
        }
    }

    # Check if PDF exists
    try:
        with open(config["pdf_path"], 'rb') as f:
            pdf_exists = True
        logging.info(f"PDF file found at {config['pdf_path']}")
    except FileNotFoundError:
        pdf_exists = False
        logging.error(f"PDF file not found at {config['pdf_path']}. Please upload it.")
        return

    # Process the document
    if pdf_exists:
        requirements = await extract_requirements_from_rfq(
            config["pdf_path"],
            config["llama_api_key"],
            config["openai_api_key"],
            config["db_config"]
        )

        # Display results with new column structure
        logging.info(f"Extracted {len(requirements)} requirements (includes non-indexed rows to be dropped later)")
        df = pd.DataFrame(requirements)
        print(df.head(10))

        # Export to Excel/CSV with new column structure
        def export_requirements_to_excel(req_df, filename="rfq_requirements_export.xlsx", filename2="rfq_requirements_export_NOINDEX.xlsx"):
          """Export requirements to Excel spreadsheet"""
          try:
              # Drop rows with task_index < 0
              req_df_indexed = req_df[req_df['task_index'] >= 0]
              req_df_no_index = req_df[req_df['task_index'] < 0]

              # Install openpyxl if needed
              try:
                  import openpyxl
              except ImportError:
                  import pip
                  pip.main(['install', 'openpyxl'])

              # Export to Excel
              req_df_indexed.to_excel(filename, index=False, engine='openpyxl')
              print(f"Successfully exported {len(req_df_indexed)} requirements to {filename}")

              if not req_df_no_index.empty:
                  req_df_no_index.to_excel(filename2, index=False, engine='openpyxl')
                  print(f"Successfully exported {len(req_df_no_index)} Non-indexed requirements to {filename2}")
              else:
                  print(f"No non-indexed requirements found.")

          except Exception as e:
              print(f"Error exporting to Excel: {e}")
              # Fallback to CSV if Excel export fails
              csv_filename = filename.replace('.xlsx', '.csv')
              req_df_indexed.to_csv(csv_filename, index=False)
              print(f"Exported to CSV instead: {csv_filename}")

              if not req_df_no_index.empty:
                  csv_filename2 = filename2.replace('.xlsx', '.csv')
                  req_df_no_index.to_csv(csv_filename2, index=False)
                  print(f"Exported to CSV instead: {csv_filename2}")
              else:
                  print(f"No non-indexed requirements found.")

        # Call the function to export requirements
        if requirements:
            df = pd.DataFrame(requirements)
            export_requirements_to_excel(df)

        return requirements

# Run the script
if __name__ == "__main__":
    asyncio.run(main())
else:
    # For Jupyter notebook
    requirements = await main()


Started parsing the file under job_id bb2c0d24-6767-41d2-aaae-24d34d1b4cf2




DataFrame columns: Index(['task_id', 'document_id', 'task_text', 'task_index'], dtype='object')
rfq tasks inserted successfully.
Successfully stored 286 requirements in rfq.rfq_tasks table
               document_id                                          task_text  \
0  75FCMC25RJ018_Section_C  The Contractor shall develop, implement, monit...   
1  75FCMC25RJ018_Section_C  The Contractor shall build collaborative learn...   
2  75FCMC25RJ018_Section_C  The Contractor shall develop the necessary tec...   
3  75FCMC25RJ018_Section_C  The Contractor shall conduct rapid cycle analy...   
4  75FCMC25RJ018_Section_C  The Contractor shall provide rapid reporting o...   
5  75FCMC25RJ018_Section_C  The Contractor shall work with CMS on matters ...   
6  75FCMC25RJ018_Section_C  - Supporting all aspects of model design and o...   
7  75FCMC25RJ018_Section_C  - Conducting program, data, and environmental ...   
8  75FCMC25RJ018_Section_C           - Monitoring model site implementations.   
9

Section 3.2: Validation

In [None]:
# 3.4 Validate rfq.rfq_tasks Table
import pandas as pd
from sqlalchemy import create_engine, inspect
from urllib.parse import quote
#from sqlalchemy.types import String, Integer
#from sqlalchemy import text

DB_PASSWORD="dG1RWVzD!F4YaneD$$"
# Create Database Connection
encoded_password = quote(DB_PASSWORD, safe='')
connection_url = (
    f"postgresql+psycopg2://postgres:{encoded_password}@"
    "advantantus-prod.cmfo86w02i47.us-east-1.rds.amazonaws.com:5432/postgres"
)
engine = create_engine(connection_url)

print("Validating `rfq.rfq_tasks`...")

rfq_tasks_df = pd.read_sql('SELECT * FROM "rfq"."rfq_tasks"', con=engine)
if rfq_tasks_df.empty:
    print("❌ `rfq.rfq_tasks` table is empty.")
else:
    print(f"✅ `rfq.rfq_tasks` table contains {len(rfq_tasks_df)} rows.")
    print(rfq_tasks_df.tail(405))

Validating `rfq.rfq_tasks`...
✅ `rfq.rfq_tasks` table contains 286 rows.
     task_id  document_id                                          task_text  \
0          1  document_id  The Contractor shall work with CMS on matters ...   
1          2  document_id  - Supporting all aspects of model design and o...   
2          3  document_id  - Conducting program, data, and environmental ...   
3          4  document_id           - Monitoring model site implementations.   
4          5  document_id  - Designing and carrying out surveys and other...   
..       ...          ...                                                ...   
281      282  document_id  The Contractor shall provide CMS with a copy o...   
282      283  document_id  All CDs, DVDs, data tapes, and other files sha...   
283      284  document_id  Analytic files must be accompanied by appropri...   
284      285  document_id  When required, the Contractor shall provide su...   
285      286  document_id  The Contractor shall

In [None]:
#View Task DataFrame
rfq_tasks_df.sort_values(by=['task_index'], ascending=True) # Assuming you want to sort by the 'task_index' column


Unnamed: 0,task_id,document_id,task_text,task_index
0,1,document_id,The Contractor shall work with CMS on matters ...,2851
1,2,document_id,- Supporting all aspects of model design and o...,2927
2,3,document_id,"- Conducting program, data, and environmental ...",3016
3,4,document_id,- Monitoring model site implementations.,3072
4,5,document_id,- Designing and carrying out surveys and other...,3113
...,...,...,...,...
332,330,document_id,The requestor or custodian listed on the DUA s...,84517
333,331,document_id,Only one closure request can be sent for one D...,84749
221,332,document_id,The requestor or custodian shall fill out the ...,84862
187,333,document_id,"This form must be printed, signed, scanned, an...",85016


################################################################################################################################################################################################################################################

**END OF PROGRAM**

################################################################################################################################################################################################################################################


** (incomplete) 3.3 Post-Production Unit Tests:**

In [None]:
# Unit Tests for RFQ Requirement Extraction
import asyncio
import os
import pandas as pd
import logging
import pytest
from typing import List, Dict, Any
# Database connection
import sqlalchemy as sa
from sqlalchemy import create_engine, text


# Skip tests if API keys not available
pytestmark = pytest.mark.skipif(
    not os.environ.get("LLAMA_CLOUD_API_KEY") or
    not os.environ.get("OPENAI_API_KEY"),
    reason="API keys not available"
)

@pytest.fixture
def sample_pdf_path():
    return "75FCMC25RJ018_ALL_SECTIONS.pdf"

@pytest.fixture
def document_processor():
    return EnhancedDocumentProcessor(
        llama_api_key=os.environ.get("LLAMA_CLOUD_API_KEY")
    )

@pytest.fixture
def task_extractor():
    return TaskExtractionAgent(
        api_key=os.environ.get("OPENAI_API_KEY")
    )

@pytest.mark.asyncio
async def test_document_processing(document_processor, sample_pdf_path):
    """Test that document processing extracts structured nodes with page numbers and headers"""
    nodes = await document_processor.process_document(sample_pdf_path)

    # Verify we got nodes
    assert len(nodes) > 0

    # Verify node structure with new required fields
    for node in nodes:
        assert "text" in node
        assert "metadata" in node
        assert "section_path" in node["metadata"]
        assert "section_level" in node["metadata"]
        assert "page_number" in node["metadata"]
        assert "header" in node["metadata"]
        assert "document_id" in node["metadata"]

def test_requirement_extraction(task_extractor):
    """Test that requirement extraction produces structured requirements with new schema"""
    # Sample node with "shall" statements
    sample_node = {
        "text": "The Contractor shall validate payment calculations using SAS code.",
        "metadata": {
            "section_path": "C.3.ii.10 Payment Validation",
            "section_level": 2,
            "document_id": "sample_doc",
            "page_number": 42,
            "header": "C.3.ii.10 Payment Validation"
        }
    }

    # Reset counter to ensure predictable task_id
    task_extractor.req_counter = 1

    requirements = task_extractor.extract_tasks(sample_node)

    # Verify requirements were extracted
    assert len(requirements) > 0

    # Verify requirement structure with new fields
    requirement = requirements[0]
    assert "document_id" in requirement
    assert "page_number" in requirement
    assert "task_id" in requirement
    assert "header" in requirement
    assert "req" in requirement
    assert "task_text" in requirement

    # Verify content matches expected format
    assert "validate payment" in requirement["task_text"].lower()
    assert requirement["task_id"] == "001"  # First requirement should be 001
    assert requirement["page_number"] == 42
    assert requirement["header"] == "C.3.ii.10 Payment Validation"

    # Verify req contains a technical skill area name
    assert len(requirement["req"]) > 0
    assert isinstance(requirement["req"], str)


SyntaxError: invalid decimal literal (<ipython-input-48-768689b325bf>, line 11)

** END Program**


** Appendix:**

**Task Extraction: Chain-of-Thought Example**

Here's how the chain-of-thought extraction works on a real RFQ section:

*Input Text:*

text
**C.3.ii.10 Payment Validation
The Contractor shall validate payment calculations using SAS code. The validation must compare results against CMS internal calculations. Discrepancy reports should be produced weekly, listing any variations greater than 0.1%.**


*Chain-of-Thought Processing:*

Analysis:

Explicit requirement: "The Contractor shall validate payment calculations using SAS code"

Explicit requirement: "The validation must compare results against CMS internal calculations"

Implicit requirement: Production of weekly discrepancy reports

*Categorization:*

Type: PROCESS (validation task) and REPORTING (discrepancy reports)

Criticality: HIGH (uses "shall" and "must")

Dependencies: Requires access to CMS internal calculations

*Extraction:*

Creates structured JSON for each requirement

Preserves context from section C.3.ii.10

Includes timeframe (weekly) for reporting requirement

*Output:*

json
[
  {
    "task_id": "C3ii10_1",
    "task_text": "The Contractor shall validate payment calculations using SAS code",
    "task_type": "PROCESS",
    "criticality": "HIGH",
    "implied": false,
    "dependencies": ["Access to CMS internal calculations"],
    "timeframe": null,
    "source_section": "C.3.ii.10 Payment Validation",
    "context": "Payment validation against CMS internal calculations"
  },
  {
    "task_id": "C3ii10_2",
    "task_text": "The validation must compare results against CMS internal calculations",
    "task_type": "PROCESS",
    "criticality": "HIGH",
    "implied": false,
    "dependencies": [],
    "timeframe": null,
    "source_section": "C.3.ii.10 Payment Validation",
    "context": "Part of payment validation process"
  },
  {
    "task_id": "C3ii10_3",
    "task_text": "Produce weekly discrepancy reports listing variations greater than 0.1%",
    "task_type": "REPORTING",
    "criticality": "MEDIUM",
    "implied": true,
    "dependencies": ["C3ii10_1", "C3ii10_2"],
    "timeframe": "weekly",
    "source_section": "C.3.ii.10 Payment Validation",
    "context": "Output of validation process"
  }
]