In [9]:
CONFIG_PATH = "config.yaml"
JSON_PATH = "json_data/"

In [50]:
import os
import json
import re
import yaml
from typing import Dict, List, Union, Literal, Tuple, Optional, Any
from pydantic import BaseModel, Field
import chromadb
import ollama
from langgraph.graph import StateGraph, START, END
from langchain_text_splitters import RecursiveCharacterTextSplitter

In [11]:
def load_config(file_path):
    """Load YAML configuration file."""
    try:
        with open(file_path, 'r') as file:
            return yaml.safe_load(file)
    except Exception as e:
        print(f"Error loading config: {e}")
        return None

In [104]:
class PromptFamily:
    """General purpose class for prompt formatting.

    This may be overwritten with a derived class that is model specific. The
    methods are broken down into two groups:

    1. Prompt Generators: These follow a standard format and are correlated with
        the ReportType enum. They should be accessed via
        get_prompt_by_report_type

    2. Prompt Methods: These are situation-specific methods that do not have a
        standard signature and are accessed directly in the agent code.

    All derived classes must retain the same set of method names, but may
    override individual methods.
    """
    def __init__(self, config: CONFIG):
        """Initialize with a config instance. This may be used by derived
        classes to select the correct prompting based on configured models and/
        or providers
        """
        self.cfg = config
    
    @staticmethod
    def generate_chunk_tagging_prompt(sections: List[str], section_descriptions: Dict, text: str) -> str:
        return f"""
        You are an expert medical note taker who is reading text pulled from UpToDate.
        
        <task>
        Classify the following text into exactly one of these sections:
        {sections}

        Return a comma-separated string of section names (e.g., 'definition,epidemiology') that match the text, based on the descriptions. If no sections match, return 'other'. If multiple sections apply, include all relevant ones. Return only the comma-separated string, no explanations or additional text.
        </task>
        
        <Section descriptions>
        The following is a description of what each section pertains.
        
        {section_descriptions}
        </Section descriptions>
        
        <text>
        {text}
        </text>
    """

In [98]:
CONFIG = load_config(CONFIG_PATH)

In [91]:
def open_json_file_and_extract_text_and_source(file):
    with open(file, 'r', encoding='utf-8') as f:
        data = json.load(f)
    try:
        text = data['content']['markdown']
        source = os.path.basename(file)
        return text, source
    except:
        raise ValueError("Failed")

def load_json_files(json_path: str) -> tuple[List[str], List[str]]:
    """
    Load JSON files from a path (file or directory) and extract markdown content.

    Args:
        json_path (str): Path to JSON file or directory.

    Returns:
        Tuple[List[str], List[str]]: List of markdown texts and their sources.
    """
    texts = []
    sources = []
    json_path = os.path.normpath(json_path)
    print(f"DEBUG: Loading JSON files from '{json_path}'")
    
    # load files
    if os.path.isfile(json_path):
        files = [json_path]
    else:
        files = [os.path.join(json_path, f) for f in os.listdir(json_path) if f.endswith('.json')]
    
    # Loop for files
    for file in files:
        try:
            text, source = open_json_file_and_extract_text_and_source(file)
            texts.append(text)
            sources.append(source)
        except Exception as e:
            print(f"Error processing '{file}' with utf-8: {str(e)}")
            break

    if not texts:
        print("Warning: No valid JSON data loaded")
    else:
        print(f"DEBUG: Loaded {len(texts)} texts from {len(sources)} sources")
    return texts, sources

texts, sources = load_json_files('./json_data')

DEBUG: Loading JSON files from 'json_data'
DEBUG: Loaded 2 texts from 2 sources


In [92]:
def chunk_text(text: str, chunk_size: int = CONFIG['ingestion_settings']['chunk_size'], chunk_overlap: int = 100) -> List[str]:
    """
    Split text into chunks using LangChain's RecursiveCharacterTextSplitter.

    Args:
        text (str): Input text to chunk.
        chunk_size (int): Target size of each chunk in characters.
        chunk_overlap (int): Overlap between chunks in characters.

    Returns:
        List[str]: List of text chunks.
    """
    try:
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", ". ", " ", ""],  # Prioritize paragraphs, sentences
            length_function=len
        )
        chunks = splitter.split_text(text)
        print(f"DEBUG: Split text into {len(chunks)} chunks")
        return chunks
    except Exception as e:
        print(f"Error chunking text: {str(e)}")
        return [text[:chunk_size]]  # Fallback to single chunk

In [93]:
def extract_template_from_config(template: str, config: Dict = CONFIG) -> Tuple[List[str], Dict]:
    list_of_sections = []
    description_of_sections = {}
    list_of_sections_with_descriptions = config['templates'][template]
    
    for section in list_of_sections_with_descriptions:
        list_of_sections.append(section['name'])
        description_of_sections[section['name']] = section['description']
    
    return list_of_sections, description_of_sections

In [110]:
def tag_chunk_with_ollama(text: str, template_sections: List[str], section_descriptions: Dict) -> List[str]:
    """Tag chunk with a template section using Ollama."""
    prompt = PromptFamily(CONFIG).generate_chunk_tagging_prompt(
        sections=template_sections,
        section_descriptions=section_descriptions,
        text=text
    )
    response = ollama.generate(
        model="llama3.2",
        prompt=prompt
    )
    
    section = response.response
    return section

In [115]:
def ingest_uptodate_json(json_path: str, config: Dict[str, Any] = CONFIG) -> Tuple[Dict[str, Any], chromadb.Collection]:
    """Ingest UpToDate JSON, chunk, tag, and store in Chroma DB."""
    # Initialize Chroma DB and embedding model
    client = chromadb.PersistentClient(path=os.path.normpath("chroma_db"))
    collection = client.get_or_create_collection("medical_notes")
    # Extract template sections and descriptions
    template_sections, section_descriptions = extract_template_from_config("condition", config)

    # Load JSON files
    texts, sources = load_json_files(json_path)
    if not texts:
        return None, None

    # Process texts
    chunk_size = config['ingestion_settings']['chunk_size']
    chunk_overlap = config['ingestion_settings'].get('chunk_overlap', 100)
    
    filtered_chunks = []
    
    for text, source in zip(texts, sources):
        print('processing ', source)
        chunks = chunk_text(text, chunk_size, chunk_overlap)
        
        # Tag and filter chunks
        for i, chunk in enumerate(chunks):
            text_content = chunk
            section = tag_chunk_with_ollama(
                text=text_content,
                template_sections=template_sections,
                section_descriptions=section_descriptions
            )
            print("Identified section as ", section)
            response = ollama.embed(model="mxbai-embed-large", input=chunk)
            
            embedding = response['embeddings'][0]
            # Add to Chroma DB
            collection.add(
                documents=[text_content],
                metadatas=[{"section": section, "reference": source}],
                ids=[f"chunk_{source}_{i}"],
                embeddings=[embedding]
            )
            filtered_chunks.append({
                "text": text_content,
                "section": section,
                "reference": source,
            })

    # Structure data
    structured_data = {
        "topic": sources[0].replace(".json", "") if sources else "unknown",
        "chunks": filtered_chunks
    }
    print(f"Processed {len(filtered_chunks)} chunks for topic: {structured_data['topic']}")
    return structured_data, collection

In [116]:
structured_data, collection = ingest_uptodate_json(json_path=JSON_PATH)

DEBUG: Loading JSON files from 'json_data'
DEBUG: Loaded 2 texts from 2 sources
processing  hypertension_in_adults_initial_drug_therapy.json
DEBUG: Split text into 82 chunks
Identified section as  management, clinical_presentation
Identified section as  definition, epidemiology, clinical_presentation
Identified section as  management
Identified section as  definition,management
Identified section as  definition, clinical_presentation, signs
Identified section as  management,clinical_presentation
Identified section as  management, epidemiology
Identified section as  management, clinical_presentation
Identified section as  management,differential_diagnosis
Identified section as  definition, clinical_presentation
Identified section as  management,differential_diagnosis
Identified section as  management,clinical_presentation
Identified section as  management,differential_diagnosis
Identified section as  management, clinical_presentation
Identified section as  management,differential_diagno

In [129]:
management_chunks = [
        chunk for chunk in structured_data['chunks']
        if 'epidemiology' in chunk.get('section', '').lower()
    ]
len(management_chunks)

21

In [120]:
collection.get('doc_110')

{'ids': ['doc_110'],
 'embeddings': None,
 'documents': ['●Obstructive sleep apnea – Disordered breathing during sleep appears to be an independent risk factor for systemic hypertension. (See  ["Obstructive sleep apnea and cardiovascular disease in adults"](/contents/obstructive-sleep-apnea-and-cardiovascular-disease-in-adults?search=hypertension&topicRef=3852&source=see_link).)\n\n●Pheochromocytoma – Pheochromocytoma is a rare cause of secondary hypertension. Approximately one-half of patients with pheochromocytoma have paroxysmal hypertension; most of the rest have sustained elevations in blood pressure. (See  ["Clinical presentation and diagnosis of pheochromocytoma"](/contents/clinical-presentation-and-diagnosis-of-pheochromocytoma?search=hypertension&topicRef=3852&source=see_link) and  ["Treatment of pheochromocytoma in adults"](/contents/treatment-of-pheochromocytoma-in-adults?search=hypertension&topicRef=3852&source=see_link).)\n\n●Cushing\'s syndrome – Cushing\'s syndrome is a 