In [None]:
import warnings

# Suppress warnings with clear justification (avoid blanket silencing)
warnings.filterwarnings(
    "ignore", 
    category=UserWarning,  # Only target UserWarnings (not DeprecationWarning, etc.)
    module="langchain",    # Optional: Limit to specific module to avoid hiding unrelated issues
    message=".*regex_pattern.*"  # Optional: Filter by message regex for precision
)

In [None]:
# Installing Python dependencies for a LangChain-based project with version pinning for reproducibility.
# Usage: Run this script in a fresh virtual environment to avoid conflicts.

import subprocess

# Defining dependencies with recommended versions 
REQUIRED_PACKAGES = [
    "langchain==0.1.11",         # Framework for LLM applications
    "langgraph==0.0.22",         # For building stateful, multi-actor workflows
    "cassio==0.1.10",             # Cassandra DB integration for LangChain
    "langchain-community==0.0.28", # Community-contributed LangChain integrations
    "openai==1.12.0",            # Official OpenAI API client
    "wikipedia==1.4.0",          # Wikipedia API wrapper for retrieval
    "tensorflow==2.15.0",        # ML framework (optional: replace with `tensorflow-cpu` if no GPU)
    "langchain-groq==0.1.2",     # Groq API integration for high-speed LLMs
    "tiktoken==0.6.0",           # Fast BPE tokenizer for OpenAI models
]

def install_dependencies():
    """Install all required packages with pinned versions."""
    try:
        subprocess.run(["pip", "install"] + REQUIRED_PACKAGES, check=True)
        print("✅ Dependencies installed successfully.")
    except subprocess.CalledProcessError as e:
        print(f"❌ Installation failed: {e}")
        raise

if __name__ == "__main__":
    install_dependencies()

In [None]:
# Third-Party General Imports
import numpy as np  # Numerical computing
from PIL import Image  # Image processing
import matplotlib.pyplot as plt  # Data visualization
import geocoder  # Location services (consider alternatives like Google Maps API for production)
import wikipedia  # Wikipedia API (handle with rate-limiting in production)

# Machine Learning & Deep Learning
import tensorflow as tf  # Standard alias for TensorFlow
from tensorflow.keras.models import load_model  # Model loading
from tensorflow.keras.preprocessing.image import img_to_array  # Image preprocessing

# LangChain Core Components
from langchain.text_splitter import RecursiveCharacterTextSplitter  # Text processing
from langchain.chains import RetrievalQA  # Question-answering chain
from langchain.chat_models import ChatOpenAI  # LLM providers (OpenAI + Groq)
from langchain_groq import ChatGroq

# Vector Stores & Embeddings
from langchain.embeddings import OpenAIEmbeddings  # OpenAI embeddings
from langchain_community.embeddings import HuggingFaceEmbeddings  # HF embeddings (alternative)
from langchain.vectorstores import AstraDB, Cassandra  # Database integrations

# Document Loaders (Community Maintained)
from langchain_community.document_loaders import WebBaseLoader  # Web content loader

# Cassandra-specific (only import if needed)
import cassio  # Cassandra driver for LangChain

In [None]:
# --- Constants ---
# Image dimensions expected by the model (width, height)
IMAGE_SIZE = (128, 128)  

# List of possible skin disease classes the model can predict
# Ordered to match model's output layer indices
class_names = ["Acne", "Actinic Keratosis", "Benign Tumors", "Bullous", "Candidiasis", "Drug Eruption",
               "Eczema", "Infestations/Bites", "Lichen", "Lupus", "Moles", "Psoriasis", "Rosacea",
               "Seborrheic Keratoses", "Skin Cancer", "Sun/Sunlight Damage", "Tinea", "Unknown/Normal",
               "Vascular Tumors", "Vasculitis", "Vitiligo", "Warts"]

# --- Model Loading --- 
# Load pre-trained Keras model from specified file path
# Note: Ensure the model architecture matches expected input/output dimensions
model = load_model('/content/skindisease_model.keras')

In [None]:
# Image Preprocessing
def preprocess_image(img_path):
    """
    Prepares an image for model prediction by:
    1. Loading the image
    2. Converting to RGB format
    3. Resizing to model's expected dimensions
    4. Normalizing pixel values
    
    Args:
        img_path: Path to input image file
        
    Returns:
        tuple: (processed_image_array, original_image) 
               - processed_image_array: Normalized numpy array ready for model prediction
               - original_image: PIL Image object for display purposes
    """
    
    # Open image and ensure RGB format (3 channels)
    img = Image.open(img_path).convert('RGB')
    
    # Resize to model's expected input dimensions (128x128)
    img = img.resize(IMAGE_SIZE)
    
    # Convert PIL image to numpy array
    img_array = img_to_array(img)
    
    # Add batch dimension (changes shape from (h,w,c) to (1,h,w,c))
    img_array = np.expand_dims(img_array, axis=0)
    
    # Normalize pixel values from [0,255] to [0,1] range
    img_array /= 255.0
    
    return img_array, img  # Return both processed array and original image

In [None]:
# Get Top-3 Prediction from Model
def predict_top_3(img_path):
    """
    Makes top-3 disease predictions on an input image and displays results.
    
    Args:
        img_path: Path to the input image file
        
    Returns:
        List of tuples: [(disease_name1, confidence_score1), 
                        (disease_name2, confidence_score2),
                        (disease_name3, confidence_score3)]
                        (sorted by highest confidence first)
    """
    # Preprocess image - returns both array and original image
    img_array, original_img = preprocess_image(img_path)
    
    # Get model predictions (returns probability distribution)
    predictions = model.predict(img_array)[0]  # [0] gets first (and only) batch item
    
    # Get indices of top 3 predictions (sorted descending)
    top_indices = predictions.argsort()[-3:][::-1]
    
    # Map indices to class names and confidence scores
    top_diseases = [(class_names[idx], predictions[idx]) for idx in top_indices]
    
    # Display the image with top prediction as title
    plt.imshow(original_img)
    plt.title(f"Top Prediction: {top_diseases[0][0]}")
    plt.axis('off')  # Hide axes
    plt.show()
    
    return top_diseases

In [None]:
# Initialize Cassandra/AstraDB connection
# Security Critical: This establishes a persistent database connection with the provided credentials
# Recommended safeguards:
# 1. Validate credentials exist before calling (add checks in production)
# 2. Ensure this only runs once (duplicate inits may cause connection leaks)
# 3. In production, wrap in try-catch to handle connection failures gracefully
# 4. Consider adding connection timeout parameters for production use
cassio.init(token=ASTRA_DB_APPLICATION_TOKEN, database_id=ASTRA_DB_ID)

In [None]:
# --- Load and insert disease info into AstraDB ---
def load_and_insert_disease_info():
    """Loads skin disease information from web sources, processes it, and stores in AstraDB vector store."""
    
    # Initialize HuggingFace embeddings model for text vectorization
    embedding = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")

    # URLs with disease info (currently single GitHub README - consider adding more sources)
    urls = [
        "https://github.com/adityakr07-dev/Skin_disease/blob/main/README.md",
    ]
    
    # Load documents from all URLs (returns list of lists)
    docs = [WebBaseLoader(url).load() for url in urls]
    
    # Flatten the list of lists into single document list
    docs = [doc for sublist in docs for doc in sublist]  # flatten

    # Initialize text splitter for chunking documents
    splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
    
    # Split documents into manageable chunks
    doc_splits = splitter.split_documents(docs)

    # Initialize Cassandra/AstraDB vector store
    vectordb = Cassandra(
        embedding=embedding,          # Embedding model to use
        table_name="skin_disease_info",  # Target table name
        session=None,                # Auto-initialized session
        keyspace=None,               # Default keyspace
    )
    
    # Insert all document chunks into vector database
    vectordb.add_documents(doc_splits)
    
    # Confirm successful insertion
    print(f"✅ Inserted {len(doc_splits)} disease documents into AstraDB.")

In [None]:
# --- Setup AstraDB retriever ---
def setup_vectorstore():
    """
    Configures and returns a retriever for the skin disease information vector store.
    
    Returns:
        A retriever object configured to fetch the top 3 most relevant document chunks
        from the AstraDB/Cassandra vector store.
    """
    
    # Initialize HuggingFace sentence transformer embeddings
    # Using 'all-MiniLM-L6-v2' - a balance of speed and accuracy for general text
    embedding = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
    
    # Connect to existing Cassandra/AstraDB vector store
    # Note: Uses same table name as in load_and_insert_disease_info()
    vectordb = Cassandra(
        embedding=embedding,          # Embedding model for query encoding
        table_name="skin_disease_info",  # Must match previously created table
        session=None,                # Uses default session initialization
        keyspace=None,               # Uses default keyspace
    )
    
    # Convert vector store to retriever interface
    # Configured to return top 3 most relevant results (k=3)
    return vectordb.as_retriever(search_kwargs={"k": 3})

In [None]:
# --- Filter top diseases using symptom match ---
def filter_candidates_by_symptoms(symptom_text, top_diseases, retriever):
    """
    Filters predicted diseases by verifying symptom matches in the knowledge base.
    
    Args:
        symptom_text: User-provided symptom description
        top_diseases: List of (disease_name, confidence) tuples from model prediction
        retriever: Vector store retriever for disease information
        
    Returns:
        List of matching (disease_name, document) tuples where symptoms are confirmed
    """
    matches = []
    
    # Check each predicted disease against knowledge base
    for disease, _ in top_diseases:
        # Search for symptom+disease combination in vector store
        docs = retriever.get_relevant_documents(symptom_text + f" related to {disease}")
        
        # Verify disease is actually mentioned in retrieved documents
        for doc in docs:
            if disease.lower() in doc.page_content.lower():
                matches.append((disease, doc))
                break  # Stop after first confirming document
    
    return matches

In [None]:
# --- Fallback Wikipedia summary ---
def fetch_wikipedia_info(disease_name):
    """
    Fetches a brief Wikipedia summary for a given disease as fallback information.
    
    Args:
        disease_name: Name of the disease to look up
        
    Returns:
        Either a 3-sentence summary from Wikipedia, or an error message if not found.
    """
    try:
        # Attempt to fetch Wikipedia summary (first 3 sentences)
        return wikipedia.summary(disease_name, sentences=3)
    except Exception:
        # Graceful fallback if Wikipedia lookup fails
        return f"No Wikipedia information found for {disease_name}."

In [None]:
# Helper function to find disease names in text by matching against known class_names
def extract_disease_names_from_text(text):
    return [disease for disease in class_names if disease in text.lower()]

class SkinDiseaseChatBot:
    def __init__(self, image_path):
        # Initialize with:
        # - Top 3 predicted diseases from image model (lowercased)
        # - Empty symptom accumulator
        # - Pre-configured vectorstore retriever
        # - Groq LLM with deterministic settings (temperature=0)
        self.top_diseases = [disease.lower() for disease, _ in predict_top_3(image_path)]
        self.accumulated_symptoms = ""
        self.retriever = setup_vectorstore()
        self.llm = ChatGroq(
            model_name="deepseek-r1-distill-llama-70b",
            groq_api_key="....",  # Note: Should be properly configured in production
            temperature=0  # For consistent responses
        )
        self.chain = RetrievalQA.from_chain_type(llm=self.llm, retriever=self.retriever)

    def update_symptoms(self, new_symptoms):
        # Append new symptoms and trigger diagnosis refinement
        self.accumulated_symptoms += " " + new_symptoms.strip()
        return self.refine_diagnosis()

    def refine_diagnosis(self):
        # Three-tier diagnosis approach:
        # 1. High confidence: Match against top predicted diseases
        # 2. Medium confidence: Match against other known diseases
        # 3. Low confidence: LLM inference + Wikipedia fallback
        results = []
        matched_diseases = set()

        # High confidence check (top predicted diseases)
        relevant_docs = self.retriever.get_relevant_documents(self.accumulated_symptoms)
        for doc in relevant_docs:
            content = doc.page_content.lower()
            for top_disease in self.top_diseases:
                if top_disease in content:
                    if top_disease not in matched_diseases:
                        g = geocoder.ip('me')
                        address_my=g.city+","+g.state+","+g.country
                        refined = self.chain.run(f"Given this info: {content}\n\nGive information about what are the symptoms and treatments of {top_disease}? and possible skin doctors near {address_my}. Give your asnwer in structured way.")
                        confidence="High Possibility"
                        results.append((top_disease.title(), confidence, refined))
                        return results

        # Medium confidence check (other known diseases)
        for doc in relevant_docs:
            content = doc.page_content.lower()
            for disease in extract_disease_names_from_text(content):
                disease = disease.lower()
                if disease not in matched_diseases and disease not in self.top_diseases:
                    g = geocoder.ip('me')
                    address_my=g.city+","+g.state+","+g.country
                    refined = self.chain.run(f"Given this info: {content}\n\nGive information about symptoms and treatments of {disease}? and possible skin  doctors near {address_my}. Give your asnwer in structured way.")
                    confidence="Medium Possibility"
                    results.append((disease.title(), confidence, refined))
                    return results

        # Low confidence fallback (LLM + Wikipedia)
        if not results:
            guessed_disease = self.llm.invoke(
                f"Based on the following symptoms, what skin disease or any other disease is most likely?\n\nSymptoms: {self.accumulated_symptoms}\n\nOnly return the name of the disease."
            ).content.strip()

            wiki_info = fetch_wikipedia_info(guessed_disease)
            confidence = "Low Possibility"

            if wiki_info != f"No Wikipedia info found for {guessed_disease}.":
                g = geocoder.ip('me')
                address_my=g.city+","+g.state+","+g.country
                refined_answer = self.llm.invoke(
                    f"{self.accumulated_symptoms}\n\nBased on this and the information below, give information about symptoms and treatments for {guessed_disease}?\n\n{wiki_info} . Give your asnwer in structured way."
                )
                results.append((guessed_disease, confidence, refined_answer))
            else:
                results.append((guessed_disease, confidence, "No relevant information available."))

        return results

In [None]:
# Initialize chatbot with sample skin disease image
# Note: Image path should point to actual uploaded file
bot = SkinDiseaseChatBot("/content/Actinic-Keratosis-01.jpg")

# --- First Interaction ---
# Provide initial symptom ("moles") and get response
response1 = bot.update_symptoms("heart pain and cough")

# Print formatted results (disease name, confidence, information)
print("\n=== First Diagnosis ===")
for disease, confidence, info in response1:
    print(f"\nDisease: {disease}")
    print(f"Confidence: {confidence}")
    print(f"Information:\n{info}")
    print("-"*50)

# --- Second Interaction --- 
# Refine diagnosis by asking follow-up question
response2 = bot.update_symptoms("what disease do you think i have")

# Print refined results
print("\n=== Refined Diagnosis ===")
for disease, confidence, info in response2:
    print(f"\nDisease: {disease}")
    print(f"Confidence: {confidence}")
    print(f"Information:\n{info}")
    print("-"*50)