## JSON only experiment

### Setup

In [94]:
import base64
import json
import shutil
import logging
import os
import time
import threading
from collections import defaultdict  # Add this import
from typing import List, Dict, Tuple, Union, Optional, Any
from dataclasses import dataclass
from datetime import datetime
import httpx
from anthropic import RateLimitError, Anthropic
from openai import OpenAI
import google.generativeai as gemini
from dotenv import load_dotenv
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type

# Basic setup
logging.basicConfig(level=logging.INFO)
load_dotenv()

# Constants
CERT_PATH = '/opt/homebrew/etc/openssl@3/cert.pem'
OUTPUT_DIR = 'analysis_outputs'

In [95]:
# Update token limits and batch configuration
CLAUDE_CONFIG = {
    "model_name": "claude-3-5-sonnet-20240620",
    "max_tokens": 4096,  # Reduced from 8192
    "max_input_tokens": 8000,  # Reduced from 12000
    "requests_per_minute": 25,
    "delay_between_chunks": 2,
    "delay_between_batches": 10,
    "default_batch_size": 3,
    "temperature": 0.7
}

GPT_CONFIG = {
    "model_name": "gpt-3.5-turbo",
    "max_tokens": 2048,  # Significantly reduced from 4000
    "max_input_tokens": 2048,  # Reduced from 3000
    "temperature": 0.7
}

GEMINI_CONFIG = {
    "model_name": "models/gemini-1.5-pro-001",
    "max_tokens": 4096,  # Reduced from 8192
    "max_input_tokens": 8000,  # Reduced from 12000
    "temperature": 0.7
}

In [96]:
def estimate_tokens(text: str) -> int:
    """Estimate number of tokens in text (rough approximation)"""
    # Rough approximation: 1 token ≈ 4 characters for English text
    return len(text) // 4

Read in API keys for Claude, Gemini, and GPT from .env file

In [97]:

# System prompts focusing on test kit development
SYSTEM_PROMPT = """You are a seasoned Healthcare Integration Test Engineer with extensive FHIR experience.
Your task is to analyze technical documentation with a focus on creating a comprehensive FHIR Test Kit.
Pay special attention to:
1. Resource definitions and constraints
2. Required and optional elements
3. Search parameters and operations
4. Technical validation requirements
5. Conformance rules and expectations
6. Integration points and dependencies
7. Test scenarios and edge cases"""

In [98]:
# API Rate Limit Configurations
RATE_LIMITS = {
    "claude": {
        "requests_per_minute": 25,
        "max_requests_per_day": 5000,
        "delay_between_requests": 2
    },
    "gemini": {
        "requests_per_minute": 60,
        "max_requests_per_day": 60000,
        "delay_between_requests": 1
    },
    "gpt": {
        "requests_per_minute": 200,
        "max_requests_per_day": 10000,
        "delay_between_requests": 0.5
    }
}

def create_rate_limiter():
    """Create a rate limiter state dictionary for all APIs"""
    return {
        api: {
            'requests': [],
            'daily_requests': 0,
            'last_reset': time.time()
        }
        for api in RATE_LIMITS.keys()
    }

def check_rate_limits(rate_limiter: dict, api: str):
    """Check and wait if rate limits would be exceeded"""
    if api not in rate_limiter:
        raise ValueError(f"Unknown API: {api}")
        
    now = time.time()
    state = rate_limiter[api]
    limits = RATE_LIMITS[api]
    
    # Reset daily counts if needed
    day_seconds = 24 * 60 * 60
    if now - state['last_reset'] >= day_seconds:
        state['daily_requests'] = 0
        state['last_reset'] = now
    
    # Check daily limit
    if state['daily_requests'] >= limits['max_requests_per_day']:
        raise Exception(f"{api} daily request limit exceeded")
    
    # Remove old requests outside the current minute
    state['requests'] = [
        req_time for req_time in state['requests']
        if now - req_time < 60
    ]
    
    # Wait if at rate limit
    if len(state['requests']) >= limits['requests_per_minute']:
        sleep_time = 60 - (now - state['requests'][0])
        if sleep_time > 0:
            time.sleep(sleep_time)
        state['requests'] = state['requests'][1:]
    
    # Add minimum delay between requests
    if state['requests'] and now - state['requests'][-1] < limits['delay_between_requests']:
        time.sleep(limits['delay_between_requests'])
    
    # Record this request
    state['requests'].append(now)
    state['daily_requests'] += 1

In [99]:
def setup_clients():
    """Initialize clients for each LLM service with proper authentication and timeout"""
    try:
        # Claude setup
        verify_path = CERT_PATH if os.path.exists(CERT_PATH) else True
        http_client = httpx.Client(verify=verify_path, timeout=60.0)  # Increased timeout
        claude_client = Anthropic(
            api_key=os.getenv('ANTHROPIC_API_KEY'),
            http_client=http_client
        )
        
        # Gemini setup
        gemini_api_key = os.getenv('GEMINI_API_KEY')
        if not gemini_api_key:
            raise ValueError("GEMINI_API_KEY not found in environment variables")
        gemini.configure(api_key=gemini_api_key)
        gemini_client = gemini.GenerativeModel(
            model_name=GEMINI_CONFIG["model_name"],
            generation_config={
                "max_output_tokens": GEMINI_CONFIG["max_tokens"],
                "temperature": GEMINI_CONFIG["temperature"]
            }
        )
        
        # OpenAI setup
        openai_api_key = os.getenv('OPENAI_API_KEY')
        if not openai_api_key:
            raise ValueError("OPENAI_API_KEY not found in environment variables")
        openai_client = OpenAI(
            api_key=openai_api_key,
            organization=os.getenv('OPENAI_ORG_ID'),
            timeout=60.0  # Add timeout
        )
        
        # Simple ping test instead of full API test
        return claude_client, gemini_client, openai_client
        
    except Exception as e:
        logging.error(f"Error setting up clients: {str(e)}")
        raise


In [100]:
def copy_json_files(source_folder='full-ig/site', destination_folder='full-ig/json_only'):
    """
    Copy JSON files from source to destination directory,
    excluding compound extensions and creating the directory if needed.
    """
    # Create the destination folder if it doesn't exist
    if not os.path.exists(destination_folder):
        os.makedirs(destination_folder)

    json_files = []
    for file_name in os.listdir(source_folder):
        # Check if the file ends with .json but not with compound extensions
        if (file_name.endswith('.json') and 
            not any(file_name.endswith(ext) for ext in [
                '.ttl.json', 
                '.jsonld.json', 
                '.xml.json', 
                '.change.history.json'
            ])):
            json_files.append(file_name)
            # Copy the file to the destination folder
            shutil.copy(os.path.join(source_folder, file_name), destination_folder)
            
    logging.info(f"Copied {len(json_files)} JSON files to {destination_folder}")
    return json_files

def group_files_by_base_name(directory_path: str, delimiter: str = '-') -> dict:
    """Group files in the directory by their base name."""
    grouped_files = defaultdict(list)
    
    for filename in os.listdir(directory_path):
        if filename.endswith('.json') and delimiter in filename:
            base_name = filename.split(delimiter)[0]
            grouped_files[base_name].append(filename)
    
    return grouped_files

def copy_files_to_folders(directory_path: str, grouped_files: dict):
    """Create folders for each base name and copy related files into them."""
    for base_name, files in grouped_files.items():
        if len(files) >= 1:
            # Create base name folder
            base_folder = os.path.join(directory_path, base_name)
            if not os.path.exists(base_folder):
                os.makedirs(base_folder)
            logging.info(f"Created folder: {base_folder}")
            
            # Copy files to their respective folders
            for file in files:
                source_file = os.path.join(directory_path, file)
                destination_file = os.path.join(base_folder, file)
                shutil.copy(source_file, destination_file)

def consolidate_jsons(base_directory: str = 'full-ig/json_only'):
    """
    Consolidate related JSON files while maintaining object integrity.
    Creates combined files for each resource type.
    """
    subdirs = [d for d in os.listdir(base_directory) 
              if os.path.isdir(os.path.join(base_directory, d))]
    
    for subdir in subdirs:
        folder_path = os.path.join(base_directory, subdir)
        combined_data = []
        
        for filename in os.listdir(folder_path):
            if filename.endswith('.json'):
                file_path = os.path.join(folder_path, filename)
                try:
                    with open(file_path, 'r') as f:
                        json_content = json.load(f)
                        if isinstance(json_content, dict) and 'entry' in json_content:
                            combined_data.extend(json_content['entry'])
                        else:
                            combined_data.append(json_content)
                except json.JSONDecodeError as e:
                    logging.error(f"Error decoding JSON from {filename}: {e}")
                    continue
        
        if combined_data:
            output_filename = f"{subdir}_combined.json"
            output_path = os.path.join(base_directory, output_filename)
            
            try:
                with open(output_path, 'w') as outfile:
                    json.dump({
                        "resourceType": subdir,
                        "total": len(combined_data),
                        "entry": combined_data
                    }, outfile, indent=2)
                logging.info(f"Created {output_filename} with {len(combined_data)} entries")
            except Exception as e:
                logging.error(f"Error writing {output_filename}: {e}")

def prepare_json_directory():
    """Prepare JSON directory with all necessary files."""
    # Add import if not already present
    import shutil
    
    # Define directories
    source_dir = 'full-ig/site'
    json_dir = 'full-ig/json_only'
    
    # Create directories if they don't exist
    os.makedirs('full-ig', exist_ok=True)
    os.makedirs(source_dir, exist_ok=True)
    
    # Copy and organize JSON files
    copy_json_files(source_dir, json_dir)
    grouped_files = group_files_by_base_name(json_dir)
    copy_files_to_folders(json_dir, grouped_files)
    consolidate_jsons(json_dir)
    
    return json_dir

In [101]:
def create_technical_analysis_prompt(json_data: Union[dict, list], chunk_num: int, total_chunks: int) -> str:
    """Create prompt focusing on technical details for test kit development"""
    return f"""Analyze this portion ({chunk_num} of {total_chunks}) of a FHIR Implementation Guide JSON resource bundle.
    Focus on extracting technical details needed for creating a FHIR Test Kit.
    
    JSON Content:
    {json.dumps(json_data, indent=2)}
    
    Please provide detailed technical analysis of:
    1. Resource Types and Profiles
        - Required elements and cardinality
        - Must Support elements
        - Extensions and custom data types
    2. Search Parameters
        - Parameter names and types
        - Required and optional parameters
        - Chaining and reverse chaining capabilities
    3. Technical Operations
        - Supported CRUD operations
        - Custom operations
        - Required headers and parameters
    4. Validation Requirements
        - Resource validation rules
        - Business logic constraints
        - Required terminology bindings
    5. Integration Points
        - Dependencies between resources
        - Required external systems or services
        - Authentication and authorization requirements
    6. Conformance Requirements
        - Server capabilities
        - Required profiles and extensions
        - Version compatibility

    Maintain technical precision and include specific details that would be needed for test case development."""


In [102]:
@retry(
    wait=wait_exponential(multiplier=1, min=4, max=60),
    stop=stop_after_attempt(5),
    retry=retry_if_exception_type((RateLimitError, TimeoutError, Exception))
)
def split_json_for_llm(json_data: Union[dict, list], llm_config: dict) -> List[dict]:
    """Split JSON into smaller chunks to avoid token limits"""
    if isinstance(json_data, dict):
        json_data = [json_data]
    
    chunks = []
    current_chunk = []
    current_tokens = 0
    max_chunk_tokens = llm_config.get('max_input_tokens', 2048)  # Default to smaller chunks
    
    for item in json_data:
        item_json = json.dumps(item)
        item_tokens = estimate_tokens(item_json)
        
        # Handle large individual items by splitting them
        if item_tokens > max_chunk_tokens:
            split_chunks = split_large_item(item, max_chunk_tokens)
            chunks.extend(split_chunks)
            continue
            
        if current_tokens + item_tokens > max_chunk_tokens and current_chunk:
            chunks.append(current_chunk)
            current_chunk = []
            current_tokens = 0
        
        current_chunk.append(item)
        current_tokens += item_tokens
    
    if current_chunk:
        chunks.append(current_chunk)
    
    return chunks


def split_large_item(item: dict, max_tokens: int) -> List[List[dict]]:
    """Split a large item into smaller chunks"""
    chunks = []
    if isinstance(item, dict):
        current_chunk = {}
        current_tokens = 0
        
        for key, value in item.items():
            value_json = json.dumps({key: value})
            value_tokens = estimate_tokens(value_json)
            
            if current_tokens + value_tokens > max_tokens and current_chunk:
                chunks.append([current_chunk.copy()])
                current_chunk = {}
                current_tokens = 0
            
            current_chunk[key] = value
            current_tokens += value_tokens
        
        if current_chunk:
            chunks.append([current_chunk])
    else:
        # If we can't split it, just return it as a single chunk
        chunks.append([item])
    
    return chunks


def process_with_llm_with_retries(client: Any, content: str, llm_type: str, rate_limiter: dict) -> str:
    """Process content with specified LLM using rate limiting, retries, and token management"""
    try:
        # Apply rate limiting
        check_rate_limits(rate_limiter, llm_type)
        
        # Estimate tokens in content
        content_tokens = estimate_tokens(content)
        llm_config = globals()[f"{llm_type.upper()}_CONFIG"]
        max_input_tokens = llm_config.get('max_input_tokens')
        
        if content_tokens > max_input_tokens:
            raise ValueError(f"Content exceeds maximum input tokens for {llm_type} "
                           f"({content_tokens} > {max_input_tokens})")
        
        if llm_type == "claude":
            try:
                response = client.messages.create(
                    model=CLAUDE_CONFIG["model_name"],
                    messages=[{"role": "user", "content": content}],
                    system=SYSTEM_PROMPT,
                    max_tokens=CLAUDE_CONFIG["max_tokens"]
                )
                return response.content[0].text
            except Exception as e:
                if "overloaded" in str(e).lower():
                    logging.warning(f"Claude API overloaded, waiting before retry...")
                    time.sleep(30)
                    raise
                raise

        elif llm_type == "gemini":
            try:
                response = client.generate_content(content)
                return response.text
            except Exception as e:
                if "length" in str(e).lower():
                    logging.error(f"Content too long for Gemini: {str(e)}")
                    raise ValueError(f"Content exceeds Gemini limits: {str(e)}")
                logging.warning(f"Gemini API error: {str(e)}")
                time.sleep(5)
                raise

        elif llm_type == "gpt":
            try:
                # Calculate total tokens including system prompt
                system_tokens = estimate_tokens(SYSTEM_PROMPT)
                total_tokens = content_tokens + system_tokens + GPT_CONFIG["max_tokens"]
                
                if total_tokens > 8192:  # GPT's total context limit
                    raise ValueError(f"Total tokens ({total_tokens}) would exceed GPT context limit")
                
                response = client.chat.completions.create(
                    model=GPT_CONFIG["model_name"],
                    messages=[
                        {"role": "system", "content": SYSTEM_PROMPT},
                        {"role": "user", "content": content}
                    ],
                    max_tokens=GPT_CONFIG["max_tokens"],
                    temperature=GPT_CONFIG["temperature"]
                )
                return response.choices[0].message.content
            except Exception as e:
                if "context_length_exceeded" in str(e):
                    logging.error(f"GPT context length exceeded: {str(e)}")
                    raise ValueError(f"Content exceeds GPT limits: {str(e)}")
                logging.warning(f"GPT API error: {str(e)}")
                time.sleep(5)
                raise
    
    except Exception as e:
        logging.error(f"Error processing with {llm_type}: {str(e)}")
        raise

In [103]:
def prepare_chunks_for_llm(json_file_path: str, llm_config: dict) -> List[dict]:
    """
    Prepare JSON file for LLM processing with improved chunking and validation.
    """
    try:
        # Read and prepare JSON with better error handling
        json_data = prepare_json_for_processing(json_file_path)
        if json_data is None:
            logging.warning(f"Could not process {json_file_path} - invalid or empty data")
            return []
        
        # Ensure data is in list format
        if isinstance(json_data, dict):
            if 'entry' in json_data:
                json_data = json_data['entry']
            else:
                json_data = [json_data]
                
        # Get max tokens for this LLM
        max_tokens = llm_config.get('max_input_tokens', 2048)
        
        # Calculate overhead tokens (system prompt, template, etc)
        overhead_tokens = estimate_tokens(SYSTEM_PROMPT) + 500  # Extra padding for safety
        available_tokens = max_tokens - overhead_tokens
        
        chunks = []
        current_chunk = []
        current_tokens = 0
        
        for item in json_data:
            # Add resourceType if missing
            if isinstance(item, dict) and 'resourceType' not in item:
                file_name = os.path.basename(json_file_path)
                base_name = file_name.split('_')[0]  # Get resource type from filename
                item['resourceType'] = base_name
            
            # Estimate tokens for this item
            item_json = json.dumps(item)
            item_tokens = estimate_tokens(item_json)
            
            # If item is too large, split it
            if item_tokens > available_tokens:
                split_items = split_large_resource(item, available_tokens)
                for split_item in split_items:
                    if validate_chunk(split_item):
                        chunks.append([split_item])
                continue
            
            # If adding item would exceed limit, start new chunk
            if current_tokens + item_tokens > available_tokens and current_chunk:
                if validate_chunk(current_chunk):
                    chunks.append(current_chunk)
                current_chunk = []
                current_tokens = 0
            
            current_chunk.append(item)
            current_tokens += item_tokens
        
        # Add final chunk if it exists
        if current_chunk and validate_chunk(current_chunk):
            chunks.append(current_chunk)
        
        return chunks
        
    except Exception as e:
        logging.error(f"Error preparing chunks from {json_file_path}: {str(e)}")
        return []

def validate_chunk(chunk: Union[dict, list]) -> bool:
    """
    Enhanced validation of FHIR resource chunks.
    """
    try:
        if isinstance(chunk, list):
            for item in chunk:
                if not validate_resource(item):
                    return False
            return len(chunk) > 0
        else:
            return validate_resource(chunk)
    except Exception as e:
        logging.error(f"Error validating chunk: {str(e)}")
        return False

def validate_resource(resource: Any) -> bool:
    """
    Validate individual FHIR resource.
    """
    if not isinstance(resource, dict):
        return False
        
    # Check for required FHIR elements
    if 'resourceType' not in resource:
        return False
        
    # Ensure resourceType is not empty
    if not resource['resourceType']:
        return False
        
    return True

def validate_json_structure(chunk: Union[dict, list]) -> bool:
    """
    Validate that a JSON chunk maintains proper FHIR resource structure.
    """
    try:
        if isinstance(chunk, list):
            for item in chunk:
                if not isinstance(item, dict):
                    return False
                if 'resourceType' not in item:
                    return False
        elif isinstance(chunk, dict):
            if 'resourceType' not in chunk:
                return False
        else:
            return False
        return True
    except Exception as e:
        logging.error(f"Error validating JSON structure: {str(e)}")
        return False

def get_resource_stats(chunks: List[dict]) -> Dict[str, int]:
    """
    Get statistics about resources in the chunks for logging and verification.
    
    Args:
        chunks: List of JSON chunks
    Returns:
        Dictionary with resource type counts
    """
    stats = defaultdict(int)
    
    for chunk in chunks:
        if isinstance(chunk, list):
            for item in chunk:
                if isinstance(item, dict) and 'resourceType' in item:
                    stats[item['resourceType']] += 1
        elif isinstance(chunk, dict) and 'resourceType' in chunk:
            stats[chunk['resourceType']] += 1
            
    return dict(stats)

In [104]:
def save_analysis_results(results: Dict[str, Any], llm_type: str):
    """Save analysis results with timestamp"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_path = os.path.join(OUTPUT_DIR, llm_type, f"analysis_results_{timestamp}.json")
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    
    with open(output_path, 'w') as f:
        json.dump({
            "metadata": {
                "timestamp": timestamp,
                "llm_type": llm_type,
                "config": globals()[f"{llm_type.upper()}_CONFIG"],
                "generation_date": datetime.now().isoformat()
            },
            "results": results
        }, f, indent=2)

In [105]:
def create_meta_summary_prompt(summaries: Dict[str, str]) -> str:
    """Create prompt for technical meta-summary"""
    return f"""Synthesize these technical analyses into a comprehensive summary focused on FHIR Test Kit development requirements:

    {json.dumps(summaries, indent=2)}

    Create a detailed technical analysis that preserves specific implementation details:
    1. Resource Specifications
       - Complete list of resources with their constraints
       - Detailed element requirements and cardinalities
       - Specific extensions and their usage

    2. Interaction Requirements
       - Detailed search parameter specifications
       - Operation definitions and requirements
       - Required and optional capabilities

    3. Technical Validation Rules
       - Specific validation requirements for each resource
       - Business rule constraints
       - Required terminologies and value sets

    4. Integration Requirements
       - Detailed API specifications
       - Authentication and authorization requirements
       - System dependencies and prerequisites

    5. Test Coverage Requirements
       - Required test scenarios
       - Edge cases and boundary conditions
       - Error handling requirements

    Focus on maintaining technical precision and specific details needed for test case development."""


In [106]:
def analyze_json_with_llm_safe(
    client: Any,
    llm_type: str,
    json_file_path: str,
    rate_limiter: dict
) -> Dict[str, Any]:
    """Analyze a JSON file with specified LLM using rate limiting and safe error handling"""
    max_retries = 3
    retry_delay = 30  # seconds
    
    for attempt in range(max_retries):
        try:
            # Get appropriate config for LLM
            llm_config = globals()[f"{llm_type.upper()}_CONFIG"]
            
            # Prepare and validate chunks
            chunks = prepare_chunks_for_llm(json_file_path, llm_config)
            if not chunks:
                raise ValueError(f"No valid chunks produced from {json_file_path}")
            
            # Log resource statistics
            stats = get_resource_stats(chunks)
            logging.info(f"Processing {json_file_path} with {llm_type}. "
                        f"Resource counts: {json.dumps(stats, indent=2)}")
            
            chunk_analyses = []
            for i, chunk in enumerate(chunks, 1):
                # Create technical analysis prompt
                prompt = create_technical_analysis_prompt(chunk, i, len(chunks))
                
                # Process with rate limiting and retries
                analysis = process_with_llm_with_retries(client, prompt, llm_type, rate_limiter)
                chunk_analyses.append(analysis)
                
                # Add delay between chunks
                time.sleep(CLAUDE_CONFIG["delay_between_chunks"])
            
            # Create meta-summary
            meta_prompt = create_meta_summary_prompt({
                "chunk_analyses": chunk_analyses,
                "resource_stats": stats
            })
            
            meta_summary = process_with_llm_with_retries(client, meta_prompt, llm_type, rate_limiter)
            
            return {
                "file_path": json_file_path,
                "resource_stats": stats,
                "chunk_analyses": chunk_analyses,
                "meta_summary": meta_summary
            }
            
        except Exception as e:
            if attempt < max_retries - 1:
                logging.warning(f"Attempt {attempt + 1} failed for {llm_type}, retrying in {retry_delay} seconds...")
                time.sleep(retry_delay)
                continue
            else:
                logging.error(f"All retries failed for {llm_type} on {json_file_path}: {str(e)}")
                raise

def run_multi_llm_analysis(json_directory: str) -> Dict[str, Dict[str, Any]]:
    """Run analysis using multiple LLMs with improved error handling"""
    try:
        # Setup clients
        claude_client, gemini_client, openai_client = setup_clients()
        
        # Create single rate limiter for all APIs
        rate_limiter = create_rate_limiter()
        
        # Get JSON files
        json_files = [
            os.path.join(json_directory, f) 
            for f in os.listdir(json_directory) 
            if f.endswith('_combined.json')
        ]
        
        results = {
            "claude": {},
            "gemini": {},
            "gpt": {}
        }
        
        # Process with each LLM
        for json_file in json_files:
            logging.info(f"Processing {json_file}")
            
            # Process with each LLM using the same rate limiter
            for llm_type, client in [
                ("claude", claude_client),
                ("gemini", gemini_client),
                ("gpt", openai_client)
            ]:
                try:
                    results[llm_type][json_file] = analyze_json_with_llm_safe(
                        client, llm_type, json_file, rate_limiter
                    )
                except Exception as e:
                    logging.error(f"Failed to process {json_file} with {llm_type}: {str(e)}")
                    results[llm_type][json_file] = {
                        "error": str(e),
                        "file_path": json_file
                    }
                
                # Add delay between different LLMs
                time.sleep(5)
            
            # Add delay between files
            time.sleep(CLAUDE_CONFIG["delay_between_batches"])
        
        # Save results
        for llm_type, llm_results in results.items():
            save_analysis_results(llm_results, llm_type)
        
        return results
        
    except Exception as e:
        logging.error(f"Error in multi-LLM analysis: {str(e)}")
        raise

In [107]:
def prepare_json_for_processing(json_file_path: str) -> Union[dict, list]:
    """
    Read and prepare JSON file for processing, handling encoding issues and validation.
    """
    try:
        # First try UTF-8-sig to handle BOM
        try:
            with open(json_file_path, 'r', encoding='utf-8-sig') as f:
                data = json.load(f)
        except UnicodeDecodeError:
            # Fallback to regular UTF-8
            with open(json_file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
        
        # Handle empty or invalid data
        if not data:
            logging.warning(f"Empty data in {json_file_path}")
            return None
            
        # Extract entries if present in a bundle
        if isinstance(data, dict):
            if 'entry' in data:
                return data['entry']
            if 'resourceType' in data:
                return [data]  # Single resource
            
        return data
        
    except Exception as e:
        logging.error(f"Error reading {json_file_path}: {str(e)}")
        return None

def adaptive_chunk_sizing(content: Union[dict, list], max_tokens: int) -> List[dict]:
    """
    Adaptively size chunks based on content length and token limits.
    """
    if isinstance(content, dict):
        content = [content]
    
    chunks = []
    current_chunk = []
    current_tokens = 0
    
    # Estimate tokens needed for prompts and system message
    overhead_tokens = estimate_tokens(SYSTEM_PROMPT) + 200  # 200 for prompt template
    available_tokens = max_tokens - overhead_tokens
    
    for item in content:
        item_json = json.dumps(item)
        item_tokens = estimate_tokens(item_json)
        
        # If single item is too large, try to split it
        if item_tokens > available_tokens:
            split_chunks = split_large_resource(item, available_tokens)
            chunks.extend(split_chunks)
            continue
            
        # Check if adding item would exceed limit
        if current_tokens + item_tokens > available_tokens and current_chunk:
            chunks.append(current_chunk)
            current_chunk = []
            current_tokens = 0
        
        current_chunk.append(item)
        current_tokens += item_tokens
    
    if current_chunk:
        chunks.append(current_chunk)
    
    return chunks

def split_large_resource(resource: dict, max_tokens: int) -> List[dict]:
    """
    Split large FHIR resources while maintaining valid resource structure.
    """
    chunks = []
    
    try:
        # Keep essential FHIR elements
        base_resource = {
            'resourceType': resource['resourceType'],
            'id': resource.get('id', 'split-resource'),
            'meta': resource.get('meta', {})
        }
        base_tokens = estimate_tokens(json.dumps(base_resource))
        
        current_chunk = base_resource.copy()
        current_tokens = base_tokens
        
        # Process remaining elements
        for key, value in resource.items():
            if key in base_resource:
                continue
                
            element_json = json.dumps({key: value})
            element_tokens = estimate_tokens(element_json)
            
            if current_tokens + element_tokens > max_tokens:
                chunks.append(current_chunk)
                current_chunk = base_resource.copy()
                current_tokens = base_tokens
            
            current_chunk[key] = value
            current_tokens += element_tokens
        
        if len(current_chunk) > len(base_resource):
            chunks.append(current_chunk)
            
        return chunks if chunks else [resource]
        
    except Exception as e:
        logging.error(f"Error splitting resource: {str(e)}")
        return [resource]  # Return original if splitting fails

def process_with_llm_with_retries(client: Any, content: str, llm_type: str, rate_limiter: dict) -> str:
    """Process content with specified LLM using improved error handling."""
    try:
        # Apply rate limiting
        check_rate_limits(rate_limiter, llm_type)
        
        # Get appropriate config
        llm_config = globals()[f"{llm_type.upper()}_CONFIG"]
        
        # Estimate tokens and check limits
        content_tokens = estimate_tokens(content)
        max_input_tokens = llm_config.get('max_input_tokens')
        
        if content_tokens > max_input_tokens:
            # Try to reduce content size for GPT
            if llm_type == "gpt":
                content = truncate_content_for_gpt(content, max_input_tokens)
            else:
                raise ValueError(f"Content exceeds maximum input tokens for {llm_type} "
                               f"({content_tokens} > {max_input_tokens})")
        
        # Process with appropriate LLM
        if llm_type == "claude":
            response = client.messages.create(
                model=CLAUDE_CONFIG["model_name"],
                messages=[{"role": "user", "content": content}],
                system=SYSTEM_PROMPT,
                max_tokens=CLAUDE_CONFIG["max_tokens"]
            )
            return response.content[0].text
            
        elif llm_type == "gemini":
            response = client.generate_content(content)
            return response.text
            
        elif llm_type == "gpt":
            response = client.chat.completions.create(
                model=GPT_CONFIG["model_name"],
                messages=[
                    {"role": "system", "content": SYSTEM_PROMPT},
                    {"role": "user", "content": content}
                ],
                max_tokens=GPT_CONFIG["max_tokens"]
            )
            return response.choices[0].message.content
            
    except Exception as e:
        logging.error(f"Error processing with {llm_type}: {str(e)}")
        raise

def truncate_content_for_gpt(content: str, max_tokens: int) -> str:
    """Truncate content to fit GPT token limits while maintaining JSON validity."""
    try:
        # Parse content to find JSON portion
        import re
        json_match = re.search(r'(\{[\s\S]*\}|\[[\s\S]*\])', content)
        if not json_match:
            return content[:max_tokens * 4]  # Rough char estimate
            
        json_str = json_match.group(0)
        json_data = json.loads(json_str)
        
        # Truncate arrays or objects while maintaining structure
        if isinstance(json_data, list):
            while estimate_tokens(json.dumps(json_data)) > max_tokens:
                json_data = json_data[:-1]
        elif isinstance(json_data, dict):
            keys = list(json_data.keys())
            while estimate_tokens(json.dumps(json_data)) > max_tokens:
                if keys:
                    del json_data[keys.pop()]
                    
        # Reconstruct content
        before_json = content[:json_match.start()]
        after_json = content[json_match.end():]
        return before_json + json.dumps(json_data) + after_json
        
    except Exception as e:
        logging.warning(f"Error truncating content: {str(e)}")
        return content[:max_tokens * 4]  # Fallback to simple truncation

In [108]:
# Prepare JSON directory
json_dir = prepare_json_directory()
        

# Run analysis
results = run_multi_llm_analysis(json_dir)

print("\nAnalysis Complete!")
print(f"Results saved in: {OUTPUT_DIR}")

INFO:root:Copied 166 JSON files to full-ig/json_only
INFO:root:Created folder: full-ig/json_only/Location
INFO:root:Created folder: full-ig/json_only/StructureDefinition
INFO:root:Created folder: full-ig/json_only/ValueSet
INFO:root:Created folder: full-ig/json_only/CodeSystem
INFO:root:Created folder: full-ig/json_only/OrganizationAffiliation
INFO:root:Created folder: full-ig/json_only/SearchParameter
INFO:root:Created folder: full-ig/json_only/HealthcareService
INFO:root:Created folder: full-ig/json_only/usage
INFO:root:Created folder: full-ig/json_only/Organization
INFO:root:Created folder: full-ig/json_only/CapabilityStatement
INFO:root:Created folder: full-ig/json_only/PractitionerRole
INFO:root:Created folder: full-ig/json_only/ImplementationGuide
INFO:root:Created folder: full-ig/json_only/InsurancePlan
INFO:root:Created folder: full-ig/json_only/Practitioner
INFO:root:Created folder: full-ig/json_only/Endpoint
INFO:root:Created folder: full-ig/json_only/plan
INFO:root:Created O

KeyboardInterrupt: 

In [None]:
# Create an interactive query session
session = InteractiveQuerySession(client, 'summarized_output/technical_summary_json_only.md')

# Ask questions
session.ask_question("What specific test cases should be created for the Location resource?")
session.ask_question("How should we validate the search parameters?")

# Save the conversation
session.save_conversation('summarized_output/query_session.md')

In [None]:
# Run full analysis
results = run_full_analysis()

# Print summary
print("\nAnalysis Complete!")
print(f"Technical analysis saved to: {results['analysis_file']}")
print(f"Verification results available in: {results['output_directory']}")
print("\nYou can now use the query session for additional questions:")
print("query_session = results['query_session']")
print("answer = query_session.ask_question('Your question here')")

In [None]:
# Access the interactive session
session = results['query_session']

In [None]:
# Ask additional questions
answer = session.ask_question("What are the key test scenarios for the Location resource?")

In [None]:
# Save the conversation
session.save_conversation('summarized_output/interactive_session.md')

In [None]:
gemini_version = "models/gemini-1.5-pro-001" #gemini-1.5-flash-latest
gemini_max_output_tokens = 8192
temp = 0.75 