# extract

> Extract Data from text using LLM

In [None]:
#| default_exp extract

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import time
import uuid
from typing import List, Dict, Any, Optional, Tuple
from llm_data_extractor.models import Question, ExtractionResult, LLMConfig
from llm_data_extractor.prompt_builder import build_prompt
from llm_data_extractor.llm_client import get_llm_response, parse_llm_json_response, LLMClientError
from llm_data_extractor.validator import validate_answer
import sys, logging

In [None]:
#| exporti

log = logging.getLogger(__name__)
def setup_logging(level=logging.INFO):
    # Use IPython-bypass in notebooks, normal stdout in scripts
    stream = getattr(sys, "__stdout__", sys.stdout)
    root = logging.getLogger()
    root.handlers.clear()
    root.setLevel(level)
    h = logging.StreamHandler(stream)
    h.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(threadName)s | %(name)s | %(message)s"))
    root.addHandler(h)

# setup_logging(level=logging.INFO)

In [None]:
#| export

def _process_question_response(question: Question, 
                              llm_responses: List[Dict], 
                              raw_response: str) -> ExtractionResult:
    """Process a single question's response from the LLM."""
    
    # Find matching response
    question_response = None
    for response in llm_responses:
        # Handle both dict and potentially malformed responses
        if isinstance(response, dict) and response.get('question_id') == question.id:
            question_response = response
            break
    
    if not question_response:
        return ExtractionResult(
            question_id=question.id,
            raw_answer="",
            parsed_answer=None,
            confidence=0.0,
            is_valid=False,
            validation_error="No response found for question"
        )
    
    # Safely extract answer and confidence
    try:
        raw_answer = str(question_response.get('answer', ''))
        confidence = float(question_response.get('confidence', 0.0))
    except (AttributeError, TypeError, ValueError) as e:
        log.error(f"Error extracting answer/confidence: {str(e)}")
        return ExtractionResult(
            question_id=question.id,
            raw_answer="",
            parsed_answer=None,
            confidence=0.0,
            is_valid=False,
            validation_error=f"Response format error: {str(e)}"
        )
    
    # Validate the answer
    parsed_answer, is_valid, validation_error = validate_answer(raw_answer, question)
    
    return ExtractionResult(
        question_id=question.id,
        raw_answer=raw_answer,
        parsed_answer=parsed_answer,
        confidence=confidence,
        is_valid=is_valid,
        validation_error=validation_error if validation_error else None
    )



In [None]:
#| export

def _fallback_parse_response(raw_response: str, questions: List[Question]) -> List[Dict]:
    """
    Fallback parser for when structured JSON parsing fails.
    Attempts to extract answers using simple text parsing.
    """
    
    responses = []
    lines = raw_response.split('\n')
    
    for question in questions:
        # Look for patterns like "Question 1:", "Q1:", question ID, etc.
        answer = ""
        confidence = 0.0
        
        for i, line in enumerate(lines):
            line = line.strip()
            
            # Check if this line references our question
            if (question.id in line or 
                f"Question {questions.index(question) + 1}" in line or
                question.text[:20] in line):
                
                # Look ahead for the answer
                for j in range(i + 1, min(i + 5, len(lines))):
                    next_line = lines[j].strip()
                    if next_line and not next_line.startswith('Q'):
                        answer = next_line
                        break
                
                break
        
        responses.append({
            'question_id': question.id,
            'answer': answer,
            'confidence': confidence
        })
    
    return responses

In [None]:
#| export

def extract_data(source_text: str, 
                questions: List[Question], 
                llm_config: LLMConfig,
                source_id: Optional[str] = None,
                batch_id: Optional[str] = None) -> Tuple[List[ExtractionResult], Dict[str, Any]]:
    """
    Extract structured data from unstructured text using LLM.
    
    Args:
        source_text: The text to extract data from
        questions: List of questions to answer
        llm_config: LLM configuration
        source_id: Optional identifier for the source
        batch_id: Optional batch identifier
        
    Returns:
        Tuple of (extraction_results, metadata)
    """
    
    if not questions:
        return [], {'error': 'No questions provided'}
    
    # Generate batch_id if not provided
    if not batch_id:
        batch_id = str(uuid.uuid4())
    
    start_time = time.time()

    log.debug('-'*80); log.debug('QUESTIONS'); log.debug('-'*80); log.debug(questions); log.debug('-'*80)
    
    try:
        log.debug('Calling build_prompt()')
        # Step 1: Build prompt
        prompt = build_prompt(questions, source_text)
        
        # Step 2: Get LLM response
        log.debug('Calling get_llm_response()')
        raw_response = get_llm_response(prompt, llm_config)
        log.debug('RAW RESPONSE'); log.debug('-'*80); log.debug(raw_response); log.debug('-'*80)

        # Step 3: Parse JSON response
        try:
            parsed_response = parse_llm_json_response(raw_response)
            log.debug('PARSED RESPONSE'); log.debug('-'*80); log.debug(parsed_response); log.debug('-'*80)
            
            llm_responses = parsed_response.get('responses', [])
        except Exception as e:
            log.warning(f"Failed to parse structured JSON response: {str(e)}")
            # Fallback: try to extract answers from unstructured response
            llm_responses = _fallback_parse_response(raw_response, questions)
        
        # Step 4: Validate and create results
        results = []
        for question in questions:
            result = _process_question_response(question, llm_responses, raw_response)
            results.append(result)
        
        # Step 5: Create metadata
        processing_time = time.time() - start_time
        metadata = {
            'batch_id': batch_id,
            'source_id': source_id,
            'processing_time_seconds': processing_time,
            'total_questions': len(questions),
            'raw_llm_response': raw_response,
            'prompt_used': prompt,
            'llm_config': llm_config.__dict__,
        }
        
        return results, metadata
        
    except Exception as e:
        # Handle catastrophic failures
        processing_time = time.time() - start_time
        error_results = []
        
        for question in questions:
            error_result = ExtractionResult(
                question_id=question.id,
                raw_answer="",
                parsed_answer=None,
                confidence=0.0,
                is_valid=False,
                input_row_id=None,
                validation_error=f"Processing failed: {str(e)}"
            )
            error_results.append(error_result)
        
        metadata = {
            'batch_id': batch_id,
            'source_id': source_id,
            'processing_time_seconds': processing_time,
            'total_questions': len(questions),
            'error': str(e),
            'llm_config': llm_config.__dict__
        }
        
        return error_results, metadata


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()