In [2]:
!pip install rarfile
!apt-get install -y unrar

import os
import json
import logging
import rarfile
from typing import List, Dict, Any

# Set up logging for debugging and error tracking
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def extract_rar(rar_path: str, extract_to: str) -> None:
    """
    Extract a RAR file to the specified directory.
    """
    try:
        with rarfile.RarFile(rar_path, 'r') as rf:
            rf.extractall(extract_to)
        logger.info(f"Extracted {rar_path} to {extract_to}")
    except rarfile.BadRarFile as e:
        logger.error(f"Invalid RAR file {rar_path}: {e}")
    except Exception as e:
        logger.error(f"Error extracting {rar_path}: {e}")

def load_finished_notes(finished_path: str) -> List[Dict[str, Any]]:
    """
    Recursively load all JSON files from the 'Finished' folder.
    Returns a list of dictionaries, each containing clinical note data and metadata.
    """
    notes = []
    try:
        for root, _, files in os.walk(finished_path):
            for file in files:
                if file.endswith('.json'):
                    file_path = os.path.join(root, file)
                    try:
                        with open(file_path, 'r', encoding='utf-8') as f:
                            note_data = json.load(f)
                            note_data['file_path'] = file_path
                            note_data['disease_category'] = os.path.basename(os.path.dirname(file_path))
                            notes.append(note_data)
                        logger.info(f"Loaded note: {file_path}")
                    except json.JSONDecodeError as e:
                        logger.error(f"Failed to parse JSON in {file_path}: {e}")
                    except Exception as e:
                        logger.error(f"Error loading {file_path}: {e}")
        logger.info(f"Total notes loaded from 'Finished': {len(notes)}")
        return notes
    except Exception as e:
        logger.error(f"Error traversing 'Finished' folder: {e}")
        return []

def load_diagnostic_kg(diagnostic_kg_path: str) -> Dict[str, List[Dict[str, Any]]]:
    """
    Load JSON files from the 'diagnostic_kg' folder.
    Returns a dictionary mapping disease categories to knowledge graph data.
    """
    kg_data = {}
    try:
        for file in os.listdir(diagnostic_kg_path):
            if file.endswith('.json'):
                file_path = os.path.join(diagnostic_kg_path, file)
                try:
                    with open(file_path, 'r', encoding='utf-8') as f:
                        graph_data = json.load(f)
                        # Use filename (without .json) as the category
                        category = file.replace('.json', '')
                        kg_data[category] = graph_data
                        kg_data[category]['file_path'] = file_path
                    logger.info(f"Loaded knowledge graph: {file_path}")
                except json.JSONDecodeError as e:
                    logger.error(f"Failed to parse JSON in {file_path}: {e}")
                except Exception as e:
                    logger.error(f"Error loading {file_path}: {e}")
        logger.info(f"Total disease categories in 'diagnostic_kg': {len(kg_data)}")
        return kg_data
    except Exception as e:
        logger.error(f"Error traversing 'diagnostic_kg' folder: {e}")
        return {}

# Define paths to your RAR files and extraction directories
finished_rar_path = '/content/Finished.rar'
diagnostic_kg_rar_path = '/content/diagnostic_kg.rar'
extract_base_path = '/content/extracted'

# Create extraction directory
os.makedirs(extract_base_path, exist_ok=True)

# Extract the RAR files
extract_rar(finished_rar_path, extract_base_path)
extract_rar(diagnostic_kg_rar_path, extract_base_path)

# Debug: Inspect extracted contents
logger.info("Extracted contents:")
!ls -R /content/extracted

# Define paths to extracted folders (updated for renamed folder)
finished_path = os.path.join(extract_base_path, 'Finished')
diagnostic_kg_path = os.path.join(extract_base_path, 'diagnostic_kg')

# Check if paths exist
if not os.path.exists(finished_path):
    logger.error(f"'Finished' folder not found at {finished_path}")
if not os.path.exists(diagnostic_kg_path):
    logger.error(f"'diagnostic_kg' folder not found at {diagnostic_kg_path}")

# Load data
clinical_notes = load_finished_notes(finished_path)
knowledge_graphs = load_diagnostic_kg(diagnostic_kg_path)

# Print summary
print(f"Loaded {len(clinical_notes)} clinical notes from 'Finished'")
print(f"Loaded knowledge graphs for {len(knowledge_graphs)} disease categories from 'diagnostic_kg'")

# Inspect sample data
if clinical_notes:
    print("\nSample clinical note fields:", list(clinical_notes[0].keys()))
    print("Sample clinical note content:", clinical_notes[0].get('text', 'No text field'))
if knowledge_graphs:
    first_category = list(knowledge_graphs.keys())[0]
    print(f"\nSample knowledge graph fields for {first_category}:",
          list(knowledge_graphs[first_category].keys()))

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
unrar is already the newest version (1:6.1.5-1ubuntu0.1).
0 upgraded, 0 newly installed, 0 to remove and 34 not upgraded.
/content/extracted:
diagnostic_kg  Finished

/content/extracted/diagnostic_kg:
'Acute Coronary Syndrome.json'
'Adrenal Insufficiency.json'
 Alzheimer.json
'Aortic Dissection.json'
 Asthma.json
'Atrial Fibrillation.json'
 Cardiomyopathy.json
 COPD.json
 Diabetes.json
 Epilepsy.json
'Gastro-oesophageal Reflux Disease.json'
'Heart Failure.json'
 Hyperlipidemia.json
 Hypertension.json
 Migraine.json
'Multiple Sclerosis.json'
'Peptic Ulcer Disease.json'
'Pituitary Disease.json'
 Pneumonia.json
'Pulmonary Embolism.json'
 Stroke.json
'Thyroid Disease.json'
 Tuberculosis.json
'Upper Gastrointestinal Bleeding.json'

/content/extracted/Finished:
'Acute Coronary Syndrome'	      Hyperlipidemia
'Adrenal Insufficiency'		      Hypertension
 Alzheimer			      Migraine
'Aortic Dissection

Preprocessing

In [7]:
import os
import json
import csv
import logging

# Set up logging for debugging and error tracking
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Path for 'diagnostic_kg' folder (direct files)
diagnostic_kg_base_path = '/content/extracted/diagnostic_kg'

# Process 'diagnostic_kg' data and generate CSV
def process_diagnostic_kg_data():
    diagnostic_kg_rows = []

    # Iterate over each file in diagnostic_kg folder
    for file in os.listdir(diagnostic_kg_base_path):
        if file.endswith('.json'):
            file_path = os.path.join(diagnostic_kg_base_path, file)
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    graph_data = json.load(f)
                    category = file.replace('.json', '')  # Disease category is the filename without '.json'
                    # Assuming the content of the JSON files has meaningful data
                    # You can customize this as needed based on the structure of your JSON files
                    diagnostic_kg_rows.append([category, file_path, json.dumps(graph_data)])
            except json.JSONDecodeError as e:
                logger.error(f"Error decoding JSON in {file_path}: {e}")
            except Exception as e:
                logger.error(f"Error processing file {file_path}: {e}")

    # Write to CSV
    with open('/content/diagnostic_kg_data.csv', 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(['Disease Category', 'File Path', 'Knowledge Graph Data'])
        writer.writerows(diagnostic_kg_rows)

    logger.info(f"Diagnostic KG data CSV created with {len(diagnostic_kg_rows)} rows.")

# Main execution
def main():
    process_diagnostic_kg_data()

if __name__ == "__main__":
    main()


In [8]:
import os
import json
import csv
import logging

# Set up logging for debugging and error tracking
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Path for 'Finished' folder (both direct files and nested directories)
finished_base_path = '/content/extracted/Finished'

# Process 'Finished' data and generate CSV
def process_finished_data():
    finished_rows = []

    # Walk through the 'Finished' folder to handle both direct files and nested subdirectories
    for root, dirs, files in os.walk(finished_base_path):
        for file in files:
            if file.endswith('.json'):
                file_path = os.path.join(root, file)
                try:
                    with open(file_path, 'r', encoding='utf-8') as f:
                        data = json.load(f)
                        # Folder structure helps in identifying the disease and condition
                        # Extract disease name from the folder structure
                        relative_path = os.path.relpath(file_path, finished_base_path)
                        parts = relative_path.split(os.sep)
                        if len(parts) > 1:
                            disease_category = parts[0]
                            condition = parts[1] if len(parts) > 1 else 'Unknown Condition'
                        else:
                            disease_category = parts[0]
                            condition = 'Unknown Condition'

                        # Add data row (disease, condition, file path, graph data)
                        finished_rows.append([disease_category, condition, file_path, json.dumps(data)])
                except json.JSONDecodeError as e:
                    logger.error(f"Error decoding JSON in {file_path}: {e}")
                except Exception as e:
                    logger.error(f"Error processing file {file_path}: {e}")

    # Write to CSV
    with open('/content/finished_data.csv', 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(['Disease Category', 'Condition', 'File Path', 'Knowledge Graph Data'])
        writer.writerows(finished_rows)

    logger.info(f"Finished data CSV created with {len(finished_rows)} rows.")

# Main execution
def main():
    process_finished_data()

if __name__ == "__main__":
    main()


In [15]:
import pandas as pd

# Load the Excel files (make sure you uploaded them in Colab first)
diagnostic_df = pd.read_excel('diagnostic_kg_data_flattened.xlsx')
finished_df = pd.read_excel('finished_data_flattened.xlsx')

# Quick preview
print("Diagnostic KG Data:")
display(diagnostic_df.head())

print("Finished Data:")
display(finished_df.head())


Diagnostic KG Data:


Unnamed: 0,Disease Category,File Path,Flattened Text
0,Peptic Ulcer Disease,/content/extracted/diagnostic_kg/Peptic Ulcer ...,knowledge → Suspected Peptic Ulcer Disease → R...
1,COPD,/content/extracted/diagnostic_kg/COPD.json,knowledge → Suspected COPD → Risk Factors: Lon...
2,Pneumonia,/content/extracted/diagnostic_kg/Pneumonia.json,knowledge → Suspected Pneumonia → Risk Factors...
3,Pulmonary Embolism,/content/extracted/diagnostic_kg/Pulmonary Emb...,knowledge → Suspected Pulmonary Embolism → Ris...
4,Cardiomyopathy,/content/extracted/diagnostic_kg/Cardiomyopath...,knowledge → Suspected Cardiomyopathy → Risk Fa...


Finished Data:


Unnamed: 0,Disease Category,Condition,File Path,Flattened Text
0,Upper Gastrointestinal Bleeding,15616719-DS-22.json,/content/extracted/Finished/Upper Gastrointest...,input1: None\n\ninput2: Patient with developme...
1,Upper Gastrointestinal Bleeding,17028519-DS-23.json,/content/extracted/Finished/Upper Gastrointest...,"input1: CC: coffee ground emesis, melena\n\nin..."
2,Upper Gastrointestinal Bleeding,16620644-DS-7.json,/content/extracted/Finished/Upper Gastrointest...,input1: None\n\ninput2: Male with recurrent sy...
3,Upper Gastrointestinal Bleeding,11878388-DS-21.json,/content/extracted/Finished/Upper Gastrointest...,input1: requesting detox\n\ninput2: A male wit...
4,Upper Gastrointestinal Bleeding,15247348-DS-14.json,/content/extracted/Finished/Upper Gastrointest...,input1: None\n\ninput2: Patient with a PMH of ...


In [11]:
!pip install rank_bm25 nltk


Collecting rank_bm25
  Downloading rank_bm25-0.2.2-py3-none-any.whl.metadata (3.2 kB)
Downloading rank_bm25-0.2.2-py3-none-any.whl (8.6 kB)
Installing collected packages: rank_bm25
Successfully installed rank_bm25-0.2.2


In [16]:
# Install necessary packages
!pip install transformers langchain langchain_community sentence-transformers faiss-cpu pandas numpy tqdm

Collecting langchain_community
  Downloading langchain_community-0.3.23-py3-none-any.whl.metadata (2.5 kB)
Collecting faiss-cpu
  Downloading faiss_cpu-1.11.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (4.8 kB)
Collecting dataclasses-json<0.7,>=0.5.7 (from langchain_community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting pydantic-settings<3.0.0,>=2.4.0 (from langchain_community)
  Downloading pydantic_settings-2.9.1-py3-none-any.whl.metadata (3.8 kB)
Collecting httpx-sse<1.0.0,>=0.4.0 (from langchain_community)
  Downloading httpx_sse-0.4.0-py3-none-any.whl.metadata (9.0 kB)
Collecting marshmallow<4.0.0,>=3.18.0 (from dataclasses-json<0.7,>=0.5.7->langchain_community)
  Downloading marshmallow-3.26.1-py3-none-any.whl.metadata (7.3 kB)
Collecting typing-inspect<1,>=0.4.0 (from dataclasses-json<0.7,>=0.5.7->langchain_community)
  Downloading typing_inspect-0.9.0-py3-none-any.whl.metadata (1.5 kB)
Collecting python-dotenv>=0.21.0 (from pydantic-set

In [17]:
import pandas as pd
import numpy as np
import os
import re
import json
from tqdm.notebook import tqdm

# Load the Excel files (make sure you uploaded them in Colab first)
diagnostic_df = pd.read_excel('diagnostic_kg_data_flattened.xlsx')
finished_df = pd.read_excel('finished_data_flattened.xlsx')

# Quick preview
print("Diagnostic KG Data:")
display(diagnostic_df.head())

print("Finished Data:")
display(finished_df.head())

# Check shapes to understand the volume of data
print(f"Diagnostic data shape: {diagnostic_df.shape}")
print(f"Finished data shape: {finished_df.shape}")

Diagnostic KG Data:


Unnamed: 0,Disease Category,File Path,Flattened Text
0,Peptic Ulcer Disease,/content/extracted/diagnostic_kg/Peptic Ulcer ...,knowledge → Suspected Peptic Ulcer Disease → R...
1,COPD,/content/extracted/diagnostic_kg/COPD.json,knowledge → Suspected COPD → Risk Factors: Lon...
2,Pneumonia,/content/extracted/diagnostic_kg/Pneumonia.json,knowledge → Suspected Pneumonia → Risk Factors...
3,Pulmonary Embolism,/content/extracted/diagnostic_kg/Pulmonary Emb...,knowledge → Suspected Pulmonary Embolism → Ris...
4,Cardiomyopathy,/content/extracted/diagnostic_kg/Cardiomyopath...,knowledge → Suspected Cardiomyopathy → Risk Fa...


Finished Data:


Unnamed: 0,Disease Category,Condition,File Path,Flattened Text
0,Upper Gastrointestinal Bleeding,15616719-DS-22.json,/content/extracted/Finished/Upper Gastrointest...,input1: None\n\ninput2: Patient with developme...
1,Upper Gastrointestinal Bleeding,17028519-DS-23.json,/content/extracted/Finished/Upper Gastrointest...,"input1: CC: coffee ground emesis, melena\n\nin..."
2,Upper Gastrointestinal Bleeding,16620644-DS-7.json,/content/extracted/Finished/Upper Gastrointest...,input1: None\n\ninput2: Male with recurrent sy...
3,Upper Gastrointestinal Bleeding,11878388-DS-21.json,/content/extracted/Finished/Upper Gastrointest...,input1: requesting detox\n\ninput2: A male wit...
4,Upper Gastrointestinal Bleeding,15247348-DS-14.json,/content/extracted/Finished/Upper Gastrointest...,input1: None\n\ninput2: Patient with a PMH of ...


Diagnostic data shape: (24, 3)
Finished data shape: (511, 4)


In [18]:
# Function to clean text
def clean_text(text):
    if isinstance(text, str):
        # Remove special characters and extra spaces
        text = re.sub(r'\s+', ' ', text)
        text = text.strip()
        return text
    return ""

# Apply cleaning to text columns
diagnostic_df['Clean_Text'] = diagnostic_df['Flattened Text'].apply(clean_text)
finished_df['Clean_Text'] = finished_df['Flattened Text'].apply(clean_text)

# Create document IDs for easier reference
diagnostic_df['doc_id'] = 'diag_' + diagnostic_df.index.astype(str)
finished_df['doc_id'] = 'fin_' + finished_df.index.astype(str)

# Create combined documents for retrieval
diagnostic_docs = []
for _, row in diagnostic_df.iterrows():
    doc = {
        'doc_id': row['doc_id'],
        'content': row['Clean_Text'],
        'disease_category': row['Disease Category'],
        'file_path': row['File Path']
    }
    diagnostic_docs.append(doc)

finished_docs = []
for _, row in finished_df.iterrows():
    doc = {
        'doc_id': row['doc_id'],
        'content': row['Clean_Text'],
        'disease_category': row['Disease Category'],
        'condition': row['Condition'],
        'file_path': row['File Path']
    }
    finished_docs.append(doc)

# Combine all documents
all_docs = diagnostic_docs + finished_docs

print(f"Total documents prepared for retrieval: {len(all_docs)}")

Total documents prepared for retrieval: 535


In [19]:
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np

# Initialize the embedding model
model_name = 'all-MiniLM-L6-v2'  # A lightweight model good for retrieval
embedding_model = SentenceTransformer(model_name)

# Generate embeddings for all documents
print("Generating embeddings...")
docs_content = [doc['content'] for doc in all_docs]
embeddings = embedding_model.encode(docs_content, show_progress_bar=True)

# Create FAISS index
dimension = embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)  # Using L2 distance
index.add(np.array(embeddings).astype('float32'))

print(f"Created FAISS index with {index.ntotal} vectors of dimension {dimension}")

Generating embeddings...


Batches:   0%|          | 0/17 [00:00<?, ?it/s]

Created FAISS index with 535 vectors of dimension 384


In [20]:
def retrieve_relevant_docs(query, top_k=5):
    """
    Retrieve the most relevant documents for a given query

    Args:
        query (str): The user's query
        top_k (int): Number of documents to retrieve

    Returns:
        list: List of retrieved documents
    """
    # Generate embedding for the query
    query_embedding = embedding_model.encode([query])

    # Search in the FAISS index
    distances, indices = index.search(np.array(query_embedding).astype('float32'), top_k)

    # Get the relevant documents
    retrieved_docs = []
    for idx in indices[0]:
        retrieved_docs.append(all_docs[idx])

    return retrieved_docs

In [24]:
# Install the required packages for Google Gemini
!pip install -q google-generativeai

# Import necessary libraries
import google.generativeai as genai
import os

# Set up Gemini API with your provided key
os.environ["GOOGLE_API_KEY"] = "AIzaSyBgS-tbIfTRkDyNei8kwxlZEq0X7keTnAc"
print("API key set for this session")

# Configure the Gemini API
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])

# Use a model that's available in your list - gemini-1.5-pro seems like a good choice
MODEL_NAME = "gemini-1.5-pro"
print(f"Using model: {MODEL_NAME}")

# Initialize the model
model = genai.GenerativeModel(MODEL_NAME)

# Test the model
try:
    response = model.generate_content("What is the capital of France?")
    print("Gemini model test successful!")
    print(f"Test response: {response.text}")
except Exception as e:
    print(f"Error testing Gemini model: {e}")
    print("Please check your Google API key and internet connection.")

# Update generate_response function to use direct Gemini API
def generate_response(query, retrieved_docs):
    """
    Generate a response based on the query and retrieved documents using Gemini

    Args:
        query (str): The user's query
        retrieved_docs (list): List of retrieved documents

    Returns:
        str: Generated response
    """
    # Create context from retrieved documents
    context = ""
    for i, doc in enumerate(retrieved_docs):
        context += f"\nDocument {i+1}:\n"
        context += f"Disease Category: {doc['disease_category']}\n"
        if 'condition' in doc:
            context += f"Condition: {doc['condition']}\n"
        context += f"Content: {doc['content']}\n"
        context += "---\n"

    # Create prompt
    prompt = f"""As a medical AI assistant, use the following medical documents to answer the query accurately and professionally.

DOCUMENTS:
{context}

QUERY: {query}

Provide a detailed medical response using information from these documents:"""

    # Generate response with Gemini
    try:
        response = model.generate_content(prompt)
        return response.text
    except Exception as e:
        return f"Error generating response: {str(e)}"

API key set for this session
Using model: gemini-1.5-pro
Gemini model test successful!
Test response: Paris



In [25]:
def rag_pipeline(query):
    """
    Full RAG pipeline: retrieve documents and generate response

    Args:
        query (str): The user's query

    Returns:
        tuple: (response, retrieved_docs)
    """
    # Retrieve relevant documents
    retrieved_docs = retrieve_relevant_docs(query)

    # Generate response
    response = generate_response(query, retrieved_docs)

    return response, retrieved_docs

# Test with a sample query
test_query = "What are the risk factors for Pulmonary Embolism?"
response, retrieved_docs = rag_pipeline(test_query)

print("QUERY:", test_query)
print("\nRETRIEVED DOCUMENTS:")
for i, doc in enumerate(retrieved_docs):
    print(f"\nDocument {i+1}:")
    print(f"Disease Category: {doc['disease_category']}")
    if 'condition' in doc:
        print(f"Condition: {doc['condition']}")
    print(f"Content: {doc['content'][:200]}...")  # Show first 200 chars

print("\nGENERATED RESPONSE:")
print(response)

QUERY: What are the risk factors for Pulmonary Embolism?

RETRIEVED DOCUMENTS:

Document 1:
Disease Category: Pulmonary Embolism
Content: knowledge → Suspected Pulmonary Embolism → Risk Factors: HTN; Previous VTE; Immobility or recent surgery; Cancer; Thrombophilia; Hormonal therapy (e.g., oral contraceptives or hormone replacement ther...

Document 2:
Disease Category: Heart Failure
Content: knowledge → Suspected Heart Failure → Risk Factors: CAD, Hypertension, Valve disease, Arrhythmias, CMPs, Congenital heart disease, Infective, Drug-induced, Infiltrative, Storage disorders, Endomyocard...

Document 3:
Disease Category: COPD
Content: knowledge → Suspected COPD → Risk Factors: Long-term exposure to harmful particles or gases (tobacco smoke, occupational dust and chemicals, air pollution) / Genetic predisposition (e.g., alpha-1 anti...

Document 4:
Disease Category: Heart Failure
Condition: 18740324-DS-19.json
Content: input1: Multiple complaints: lower extremity edema, shortness of b

In [26]:
def evaluate_rag_system(test_queries):
    """
    Evaluate the RAG system with a set of test queries

    Args:
        test_queries (list): List of test queries

    Returns:
        dict: Evaluation results
    """
    results = []

    for query in test_queries:
        response, retrieved_docs = rag_pipeline(query)

        result = {
            'query': query,
            'response': response,
            'retrieved_docs': [doc['doc_id'] for doc in retrieved_docs],
            'disease_categories': [doc['disease_category'] for doc in retrieved_docs]
        }

        results.append(result)

    return results

# Define test queries
test_queries = [
    "What are the symptoms of Peptic Ulcer Disease?",
    "How is COPD diagnosed?",
    "What are the treatment options for Upper Gastrointestinal Bleeding?",
    "What are the risk factors for Pneumonia?",
    "What are the complications of Cardiomyopathy?"
]

# Run evaluation
evaluation_results = evaluate_rag_system(test_queries)

# Display results
for i, result in enumerate(evaluation_results):
    print(f"\nQuery {i+1}: {result['query']}")
    print(f"Response: {result['response'][:200]}...")  # Show first 200 chars
    print(f"Retrieved disease categories: {result['disease_categories']}")
    print("-" * 80)




Query 1: What are the symptoms of Peptic Ulcer Disease?
Response: Peptic ulcer disease (PUD) presents with a variety of symptoms, primarily centered around epigastric pain.  This pain can manifest in several ways, including a burning or gnawing sensation.  It's ofte...
Retrieved disease categories: ['Peptic Ulcer Disease', 'Gastritis', 'Peptic Ulcer Disease', 'Gastritis', 'Gastro-oesophageal Reflux Disease']
--------------------------------------------------------------------------------

Query 2: How is COPD diagnosed?
Response: COPD diagnosis hinges on demonstrating persistent airflow limitation, confirmed by post-bronchodilator spirometry.  Specifically, a post-bronchodilator FEV1/FVC ratio less than 0.70 is required.  This...
Retrieved disease categories: ['COPD', 'Asthma', 'COPD', 'Pneumonia', 'COPD']
--------------------------------------------------------------------------------

Query 3: What are the treatment options for Upper Gastrointestinal Bleeding?
Response: The provided



In [32]:
def evaluate_rag_system(test_queries):
    """
    Evaluate the RAG system with a set of test queries

    Args:
        test_queries (list): List of test queries

    Returns:
        dict: Evaluation results
    """
    results = []

    for query in test_queries:
        response, retrieved_docs = rag_pipeline(query)

        result = {
            'query': query,
            'response': response,
            'retrieved_docs': [doc['doc_id'] for doc in retrieved_docs],
            'disease_categories': [doc['disease_category'] for doc in retrieved_docs]
        }

        results.append(result)

    return results

# Define test queries
test_queries = [
    "What are risk of heart attack?"
]

# Run evaluation
evaluation_results = evaluate_rag_system(test_queries)

# Display results
for i, result in enumerate(evaluation_results):
    print(f"\nQuery {i+1}: {result['query']}")
    print(f"Response: {result['response'][:200]}...")  # Show first 200 chars
    print(f"Retrieved disease categories: {result['disease_categories']}")
    print("-" * 80)

KeyboardInterrupt: 

In [27]:
import pickle

# Save FAISS index
faiss.write_index(index, 'medical_docs_faiss.index')

# Save documents mapping
with open('all_docs.pkl', 'wb') as f:
    pickle.dump(all_docs, f)

# Save embedding model path
embedding_info = {
    'model_name': model_name,
    'dimension': dimension
}
with open('embedding_info.json', 'w') as f:
    json.dump(embedding_info, f)

print("Saved models and data for future use")

Saved models and data for future use


In [28]:
!pip install gradio

Collecting gradio
  Downloading gradio-5.29.0-py3-none-any.whl.metadata (16 kB)
Collecting aiofiles<25.0,>=22.0 (from gradio)
  Downloading aiofiles-24.1.0-py3-none-any.whl.metadata (10 kB)
Collecting fastapi<1.0,>=0.115.2 (from gradio)
  Downloading fastapi-0.115.12-py3-none-any.whl.metadata (27 kB)
Collecting ffmpy (from gradio)
  Downloading ffmpy-0.5.0-py3-none-any.whl.metadata (3.0 kB)
Collecting gradio-client==1.10.0 (from gradio)
  Downloading gradio_client-1.10.0-py3-none-any.whl.metadata (7.1 kB)
Collecting groovy~=0.1 (from gradio)
  Downloading groovy-0.1.2-py3-none-any.whl.metadata (6.1 kB)
Collecting pydub (from gradio)
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting python-multipart>=0.0.18 (from gradio)
  Downloading python_multipart-0.0.20-py3-none-any.whl.metadata (1.8 kB)
Collecting ruff>=0.9.3 (from gradio)
  Downloading ruff-0.11.8-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (25 kB)
Collecting safehttpx<0.2.0,>=0.1.6

In [30]:
# Install Gradio (if not already installed)
!pip install -q gradio

import gradio as gr
def rag_pipeline(query):
    """
    Full RAG pipeline: retrieve documents and generate response

    Args:
        query (str): The user's query

    Returns:
        tuple: (response, retrieved_docs)
    """
    # Retrieve relevant documents
    retrieved_docs = retrieve_relevant_docs(query)

    # Generate response
    response = generate_response(query, retrieved_docs)

    return response, retrieved_docs

# Function to evaluate queries
def evaluate_rag_system_interface(queries_input):
    test_queries = [q.strip() for q in queries_input.strip().split('\n') if q.strip()]
    results = []

    for query in test_queries:
        response, retrieved_docs = rag_pipeline(query)
        result = {
            'query': query,
            'response': response,
            'retrieved_docs': [doc['doc_id'] for doc in retrieved_docs],
            'disease_categories': [doc['disease_category'] for doc in retrieved_docs]
        }
        results.append(result)

    # Format output
    display = ""
    for i, r in enumerate(results):
        display += f"🔍 **Query {i+1}:** {r['query']}\n"
        display += f"📝 **Response:** {r['response'][:300]}...\n"
        display += f"📄 **Docs:** {', '.join(r['retrieved_docs'])}\n"
        display += f"🩺 **Disease Categories:** {', '.join(r['disease_categories'])}\n"
        display += "-" * 60 + "\n"

    return display

# Launch Gradio UI
gr.Interface(
    fn=evaluate_rag_system_interface,
    inputs=gr.Textbox(label="Enter test queries (one per line)", lines=10, placeholder="e.g.\nWhat are the symptoms of Peptic Ulcer Disease?"),
    outputs=gr.Markdown(label="Evaluation Output"),
    title="🧠 RAG System Evaluator",
    description="Paste multiple test queries below to evaluate your Retrieval-Augmented Generation (RAG) system."
).launch(share=True)


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://ec7db71035b7cabc76.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


