In [None]:
pip install transformers datasets tokenizers torch

: 

In [None]:
pip install notebook

In [None]:
pip install transformers[torch]

In [None]:
pip install -U ipywidgets

In [None]:

pip install numpy==1.26.4 pandas scipy transformers torch datasets

In [None]:
import os
import pandas as pd

def load_dataset(csv_path, required_columns=None):
    """
    Loads a dataset from a CSV file, validates required columns, and handles missing values.

    Args:
        csv_path (str): Path to the CSV file.
        required_columns (list, optional): List of required columns to validate.

    Returns:
        pd.DataFrame: Processed pandas DataFrame.
    """
    # 🔹 Check if the file exists
    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"Error: File not found at {csv_path}")

    # 🔹 Load CSV with all columns as strings
    df = pd.read_csv(csv_path, dtype=str, sep="\t")
    df.columns = df.columns.str.strip().str.lower()  # Normalize column names
    print("🔹 Available columns in CSV:", df.columns.tolist())  # Debugging

    # 🔹 Validate required columns (if provided)
    if required_columns:
        missing_columns = [col for col in required_columns if col not in df.columns]
        if missing_columns:
            raise KeyError(f"Missing columns in CSV: {missing_columns}")

    # 🔹 Handle missing values (replace NaNs with empty strings)
    df.fillna("", inplace=True)

    return df

# ✅ Example Usage:
csv_path = "D:\\train\\csv_output\\med.csv"
required_columns = ["condition", "symptoms", "duration", "severity", "risk_factors", "suggested_action"]

try:
    dataset = load_dataset(csv_path, required_columns)
    print("✅ Dataset loaded successfully!")
    print(dataset.head())  # Show first few rows
except Exception as e:
    print(f"❌ Error: {e}")


In [None]:
import pandas as pd

# ✅ Load CSV
csv_path = "D:\\trail\\csv_output\\med.csv"
df = pd.read_csv(csv_path, delimiter="\t")  # Adjust delimiter if needed

# ✅ Define required columns
required_columns = ["condition", "symptoms", "duration", "severity", "risk_factors", "suggested_action"]

# ✅ Check if required columns are present
if not set(required_columns).issubset(df.columns):
    raise ValueError(f"Missing required columns: {set(required_columns) - set(df.columns)}")

# ✅ Print total number of rows
print(f"Total rows in dataset: {len(df)}")

# ✅ Sample up to 25 rows (but don't fail if fewer rows exist)
df_test = df.sample(n=min(11, len(df)), random_state=42)

# ✅ Save only required columns
df_test[required_columns].to_csv("eval_texts.csv", index=False, encoding="utf-8")

print("✅ eval_texts.csv saved successfully with required columns!")


In [None]:
df = pd.read_csv(csv_path, sep="\t")

In [None]:
pip install seaborn

In [None]:
tokenizer_path = "D:\\trail\\custom_model\\tokenizer.json"


In [None]:
import json

tokenizer_path = "D:\\trail\\custom_model\\tokenizer_config.json"

try:
    with open(tokenizer_path, "r", encoding="utf-8") as file:
        json.load(file)  # Try loading JSON
    print("✅ JSON is valid!")
except json.JSONDecodeError as e:
    print(f"❌ JSON format error: {e}")


In [None]:
pip install scikit-learn


In [None]:
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from sklearn.metrics import confusion_matrix, classification_report
from datasets import Dataset
from tokenizers import Tokenizer

# ✅ Step 1: Load the Custom Tokenizer
tokenizer_path = "D:\\trail\\custom_model\\tokenizer.json"
if not os.path.exists(tokenizer_path):
    raise FileNotFoundError(f"Tokenizer file not found: {tokenizer_path}")

custom_tokenizer = Tokenizer.from_file(tokenizer_path)

# ✅ Step 2: Load Evaluation Texts from CSV
eval_csv_path = "D:\\trail\\csv_output\\eval_texts.csv"
if not os.path.exists(eval_csv_path):
    raise FileNotFoundError(f"CSV file not found: {eval_csv_path}")

df_eval = pd.read_csv(eval_csv_path)

# ✅ Ensure required columns exist
required_columns = ["condition", "symptoms", "duration", "severity", "risk_factors", "suggested_action"]
missing_columns = [col for col in required_columns if col not in df_eval.columns]

if missing_columns:
    raise ValueError(f"Missing required columns: {missing_columns}")

# ✅ Combine columns into a single text input
def combine_text(row):
    return f"Condition: {row['condition']} | Symptoms: {row['symptoms']} | Duration: {row['duration']} | " \
           f"Severity: {row['severity']} | Risk Factors: {row['risk_factors']} | Suggested Action: {row['suggested_action']}"

eval_texts = df_eval.apply(combine_text, axis=1).tolist()
eval_labels = list(range(len(eval_texts)))  # Sequential labels for evaluation

# ✅ Step 3: Tokenize Evaluation Texts
MAX_LEN = 128

def preprocess_text(text):
    encoded = custom_tokenizer.encode(text)
    input_ids = encoded.ids[:MAX_LEN]  # Truncate if longer
    input_ids += [0] * (MAX_LEN - len(input_ids))  # Pad if shorter
    return input_ids

tokenized_eval = [preprocess_text(text) for text in eval_texts]

# ✅ Step 4: Convert Tokenized Data into a Dataset
tokenized_eval_dataset = Dataset.from_dict({
    "input_ids": tokenized_eval,
    "labels": eval_labels
})

# ✅ Step 5: Define and Load the Trained Model
class SimpleLSTMModel(nn.Module):
    def __init__(self, vocab_size, num_labels):
        super(SimpleLSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, 128)
        self.lstm = nn.LSTM(128, 256, batch_first=True)
        self.fc = nn.Linear(256, num_labels)

    def forward(self, input_ids):
        x = self.embedding(input_ids)
        _, (hidden, _) = self.lstm(x)
        logits = self.fc(hidden[-1])
        return logits

# ✅ Step 6: Load Model Weights
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 🛠 Determine `vocab_size`
try:
    vocab_size = len(custom_tokenizer.get_vocab())  # Preferred way
except AttributeError:
    vocab_size = 30522  # Default to BERT vocab size if `get_vocab()` fails

# 🛠 Load metadata (if saved) to get correct num_labels
metadata_path = "D:\\trail\\custom_model.pth"
if os.path.exists(metadata_path):
    metadata = torch.load(metadata_path)
    num_labels = metadata.get("num_labels", 8)  # Default to 8 if missing
else:
    num_labels = 8  # Set manually if metadata is missing

# ✅ Initialize Model
model = SimpleLSTMModel(vocab_size, num_labels).to(device)
model_weights_path = "D:\\trail\\custom_model.pth"

if not os.path.exists(model_weights_path):
    raise FileNotFoundError(f"Model weights file not found: {model_weights_path}")

# 🛠 Load Model Weights Safely
model.load_state_dict(torch.load(model_weights_path, map_location=device), strict=False)
model.eval()

# ✅ Step 7: Get Predictions on Evaluation Set
input_tensor = torch.tensor(tokenized_eval, dtype=torch.long).to(device)

with torch.no_grad():
    logits = model(input_tensor)

predicted_labels = torch.argmax(logits, axis=1).cpu().numpy()

# ✅ Step 8: Compute Confusion Matrix
true_labels = np.array(eval_labels)
conf_matrix = confusion_matrix(true_labels, predicted_labels)

# ✅ Step 9: Plot Confusion Matrix
plt.figure(figsize=(4, 2))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=range(num_labels), yticklabels=range(num_labels))
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")
plt.title("Confusion Matrix")
plt.show()

# ✅ Step 10: Print Classification Report
print(classification_report(true_labels, predicted_labels))


In [None]:
import torch
import torch.nn as nn
from transformers import BertForSequenceClassification

class CustomBertForSequenceClassification(BertForSequenceClassification):
    def __init__(self, config, class_weights):
        super().__init__(config)
        self.class_weights = class_weights.to(config.device)  # Move to GPU if available
        self.loss_fn = nn.CrossEntropyLoss(weight=self.class_weights)

    def forward(self, input_ids, attention_mask=None, labels=None):
        outputs = super().forward(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        
        if labels is not None:
            loss = self.loss_fn(logits, labels)
            return {"loss": loss, "logits": logits}
        
        return {"logits": logits}


In [None]:
import pandas as pd

df = pd.read_csv("eval_texts.csv")  # Load again with correct column names
print(df.columns)  # Check if the original names are back


In [None]:
# Restore correct column names based on the actual dataset
df.columns = ["condition", "symptoms", "duration", "severity", "risk_factors", "suggested_action"]

# Verify columns are correctly restored
print(df.columns)


In [None]:
from sklearn.utils.class_weight import compute_class_weight
import numpy as np
import torch

# Ensure the 'condition' column (acting as labels) is correct
if "condition" not in df.columns:
    raise KeyError("🚨 Column 'condition' not found! Check column names:", df.columns)

# Ensure 'condition' values are valid
df = df.dropna(subset=["condition"])  # Remove NaNs
df["condition"] = df["condition"].astype("category").cat.codes  # Convert conditions to numerical labels

# Compute class weights
class_weights = compute_class_weight(
    class_weight="balanced",
    classes=np.unique(df["condition"]),
    y=df["condition"]
)

class_weights = torch.tensor(class_weights, dtype=torch.float)
print("✅ Fixed & computed class weights:", class_weights)


In [None]:
from sklearn.utils.class_weight import compute_class_weight
from transformers import TrainingArguments, Trainer
import torch.nn as nn
import torch
import numpy as np

# Define correct labels column based on dataset
label_column = "condition"  # Adjust if needed

# Compute class weights
unique_classes = np.unique(df[label_column])
class_weights = compute_class_weight(
    class_weight="balanced",
    classes=unique_classes,
    y=df[label_column]
)

# Convert to tensor
class_weights = torch.tensor(class_weights, dtype=torch.float)
print("Fixed & computed class weights:", class_weights)

# Define training arguments
training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=10,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    eval_strategy="epoch"
)

# Define loss function with class weights
loss_fn = nn.CrossEntropyLoss(weight=class_weights)

# Custom Trainer class to override compute_loss
class WeightedLossTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):  # ✅ FIXED: Accept additional args
        labels = inputs.pop("labels")  # Extract labels
        outputs = model(**inputs)  # Forward pass
        logits = outputs.logits  # Extract logits

        # Ensure correct shape for loss function
        if logits.shape[-1] == 1:  
            logits = logits.squeeze(-1)

        loss = loss_fn(logits, labels)  # Compute weighted loss

        return (loss, outputs) if return_outputs else loss

# Initialize trainer with custom loss function
trainer = WeightedLossTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset
)

# Start training
trainer.train()


In [None]:
import matplotlib.pyplot as plt

# Training and validation loss values
epochs = list(range(1, 11))  # Number of epochs (1 to 10)
training_loss = [1.9648204803466798] * 10  # Repeat last reported training loss
validation_loss = [2.465202, 2.690884, 2.940101, 2.937016, 3.000245, 3.014100, 3.037950, 3.019710, 3.039859, 3.041418]

# Plot graph
plt.figure(figsize=(4, 2))
plt.plot(epochs, training_loss, label="Training Loss", marker='o', linestyle='--', color='blue')
plt.plot(epochs, validation_loss, label="Validation Loss", marker='s', linestyle='-', color='red')

# Labels & title
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Training vs. Validation Loss")
plt.legend()
plt.grid(True)

# Show plot
plt.show()


In [None]:
df.rename(columns={"condition": "label"}, inplace=True)
df.rename(columns={"symptoms": "label"}, inplace=True)
df.rename(columns={"duration": "label"}, inplace=True)
df.rename(columns={"severity": "label"}, inplace=True)
df.rename(columns={"risk_factors": "label"}, inplace=True)
df.rename(columns={"suggested_action": "label"}, inplace=True)


In [None]:
pip install matplotlib

In [None]:
import torch

# ✅ Save the trained model using torch.save()
torch.save(model.state_dict(), "custom_model.pth")

# ✅ Save the tokenizer (if using a custom tokenizer)
custom_tokenizer.save("saved_tokenizer.json")

print("Model and tokenizer saved successfully!")


In [None]:
import torch
import torch.nn as nn

class SimpleModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim=128, hidden_dim=256, output_dim=2):
        super(SimpleModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, input_ids):
        embedded = self.embedding(input_ids)
        _, (hidden, _) = self.lstm(embedded)
        logits = self.fc(hidden[-1])  # Extract final hidden state
        return {"logits": logits}



In [None]:
from transformers import TrainingArguments

# ✅ Fine-tuning with adjusted hyperparameters
training_args = TrainingArguments(
    output_dir="./fine_tuned_model",
    per_device_train_batch_size=32,  # Increase batch size if you have enough memory
    per_device_eval_batch_size=32,
    learning_rate=2e-5,  # Lower learning rate for better fine-tuning
    num_train_epochs=5,  # Train for more epochs
    weight_decay=0.01,  # Helps prevent overfitting
    eval_strategy="epoch",  # Evaluate after every epoch
    save_strategy="epoch",  # Save model checkpoints
    logging_dir="./logs",  # Log training data
    load_best_model_at_end=True,  # Use best model based on evaluation loss
)

print("Hyperparameters updated!")


In [None]:
pip install transformers[torch]

In [None]:
from transformers import TrainingArguments

# ✅ Fine-tuning with adjusted hyperparameters
training_args = TrainingArguments(
    output_dir="./fine_tuned_model",
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    learning_rate=2e-5,
    num_train_epochs=5,
    weight_decay=0.01,
    eval_strategy="epoch",  # ✅ Updated from evaluation_strategy
    save_strategy="epoch",
    logging_dir="./logs",
    load_best_model_at_end=True,
)

print("Hyperparameters updated!")


In [None]:
import os

model_dir = "custom_model"
if os.path.exists(model_dir):
    print("Directory exists:", os.listdir(model_dir))
else:
    print("Model directory not found!")


In [None]:
import pandas as pd

# Load CSV into Pandas DataFrame
df = pd.read_csv("D:\\trail\\csv_output\\med.csv")

# Check if df is loaded correctly
print(df.head())


In [None]:
from datasets import Dataset

# Convert Pandas DataFrame to Hugging Face Dataset
dataset = Dataset.from_pandas(df)

# Split into train/test (80-20 split)
dataset = dataset.train_test_split(test_size=0.2, seed=42)

# Now you should have `dataset["train"]` and `dataset["test"]`
print(dataset)


In [None]:
def preprocess_function(examples):
    return tokenizer(
        examples['condition', 'symptoms', 'duration', 'severity', 'risk_factors', 'suggested_action', 'condition_id']
,  # Replace "text" with the correct column name
        truncation=True,
        padding="max_length",
        max_length=512
    )


In [None]:
def preprocess_function(examples):
    if "Symptom,Duration,Severity,Possible Conditions,Risk Factors,Suggested Action" not in examples:  # Adjust to actual column name
        raise KeyError(f"Expected column 'text' not found. Available columns: {examples.keys()}")
    return tokenizer(
        examples["condition,symptoms,duration,severity,risk_factors,suggested_action"],
        truncation=True,
        padding="max_length",
        max_length=512
    )


In [None]:
import os

model_path = "D:\\trail\\custom_model"
print(os.listdir(model_path))


In [None]:
pip install torch

In [None]:
import torch

save_path = "D:\\trail\\custom_model"
torch.save(model.state_dict(), save_path + "/pytorch_model.bin")
print(f"Custom model saved to: {save_path}")


In [None]:
model.load_state_dict(torch.load("D:\\trail\\custom_model/pytorch_model.bin"))


In [None]:
def predict_symptoms(user_input):
    # ✅ Tokenize input text
    inputs = tokenizer(user_input, return_tensors="pt", truncation=True, padding=True, max_length=512)

    # ✅ Move inputs to the same device as the model
    inputs = {key: value.to(device) for key, value in inputs.items()}

    # ✅ Get model predictions
    with torch.no_grad():
        outputs = model(**inputs)

    # ✅ Get the predicted label
    predicted_class = torch.argmax(outputs.logits, dim=1).item()

    return predicted_class  # Return predicted label


In [None]:
from transformers import AutoTokenizer

# ✅ Load tokenizer from a working model (e.g., 'bert-base-uncased' or your trained tokenizer)
model_checkpoint = "bert-base-uncased"  # Change this if you trained a custom tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

# ✅ Re-save tokenizer with all required files
save_path = "D:\\trail\\custom_tokenizer"
tokenizer.save_pretrained(save_path)

print(f"✅ Tokenizer re-saved to: {save_path}")


In [None]:
tokenizer = AutoTokenizer.from_pretrained("D:\\trail\\custom_tokenizer")
print("✅ Tokenizer loaded successfully!")

In [None]:
df = pd.read_csv(csv_path, encoding='utf-8')


In [None]:
print(df.head())


In [None]:
pip install pyaudio

In [None]:
print(df.columns)

In [None]:
df.rename(columns=lambda x: x.strip(), inplace=True)  # Remove leading/trailing spaces
df.rename(columns={'symptoms': 'Symptoms', 'condition': 'Condition'}, inplace=True)  # Adjust based on actual names

In [None]:
print(df.head())

In [None]:
df = pd.read_csv(csv_path, delimiter=",")  # Change delimiter if necessary

In [None]:
import os
print(os.path.exists(csv_path))  # Should print True if the file exists


In [None]:
if 'Symptoms' not in df.columns:
    print("Symptoms column is missing! Available columns:", df.columns)


In [None]:
print(df.isnull().sum())


In [None]:
print(df.columns)

In [None]:
pip install speechrecognition gtts pandas


In [None]:
pip install playsound==1.2.2


In [None]:
import speech_recognition as sr
from gtts import gTTS
import pandas as pd
import re
import os
import threading
from playsound import playsound

# ✅ Load dataset
csv_path = "D:\\trail\\csv_output\\eval_texts.csv"

# Read CSV and handle delimiter issues
df = pd.read_csv(csv_path, delimiter=",")  # Try "\t" if comma doesn't work

# Fix merged column issue
if len(df.columns) == 1:
    df = df[df.columns[0]].str.split(",", expand=True)  # Split into columns
    df.columns = df.iloc[0]  # Set first row as column names
    df = df[1:].reset_index(drop=True)  # Remove the first row

# Clean column names
df.columns = df.columns.str.strip()

# Required columns check
required_columns = ["condition", "symptoms", "duration", "severity", "risk_factors", "suggested_action"]
missing_columns = set(required_columns) - set(df.columns)
if missing_columns:
    raise ValueError(f"❌ Missing required columns: {missing_columns}. Available columns: {df.columns}")

# Convert symptoms column to lowercase for better matching
df["symptoms"] = df["symptoms"].astype(str).str.lower()

# ✅ Speak function using gTTS
def speak(text):
    """Convert text to speech using gTTS and play it."""
    print(f"\nMira 👩‍⚕️: {text}")
    tts = gTTS(text=text, lang='en')
    temp_audio = "temp_audio.mp3"
    tts.save(temp_audio)
    playsound(temp_audio)
    os.remove(temp_audio)

# ✅ Initialize speech recognition
recognizer = sr.Recognizer()
mic = sr.Microphone() if sr.Microphone.list_microphone_names() else None

# ✅ Listen function (Continuous mode)
def listen():
    """Continuously listens for user input."""
    if not mic:
        speak("No microphone detected. Please check your device settings.")
        return ""

    with mic as source:
        recognizer.adjust_for_ambient_noise(source)
        print("\n🎤 Listening... (Say 'exit' to quit)")
        audio = recognizer.listen(source)

    try:
        user_input = recognizer.recognize_google(audio).lower()
        print(f"User 🎤: {user_input}")
        return user_input
    except sr.UnknownValueError:
        speak("Sorry, I didn’t catch that. Could you repeat?")
        return ""
    except sr.RequestError:
        speak("I'm having trouble understanding right now. Please try again.")
        return ""

# ✅ Extract numeric duration from user input
def extract_duration(text):
    """Extracts the first number found in the text."""
    match = re.search(r"\d+", text)
    return int(match.group()) if match else None

# ✅ Check if duration is valid
def is_duration_valid(user_duration, duration_range):
    """Checks if the user's duration falls within the range specified in the dataset."""
    try:
        user_duration = int(user_duration)
        match = re.match(r"(\d+)-(\d+)", duration_range)
        if match:
            lower, upper = map(int, match.groups())
            return lower <= user_duration <= upper
        return user_duration == int(duration_range)
    except ValueError:
        return False

# ✅ Live Consultation
def start_live_consultation():
    """Keeps Mira AI running for live interaction."""
    speak("Hello, I'm Dr. Mira. You can start by saying your symptoms.")
    
    while True:
        # Step 1: Ask for symptoms
        speak("What symptoms are you experiencing?")
        symptom = listen().strip()
        if symptom == "exit":
            speak("Goodbye! Stay healthy.")
            break

        matched_conditions = df[df["symptoms"].str.contains(symptom, na=False, case=False)]
        if matched_conditions.empty:
            speak("I couldn't find any condition with that symptom. Please try again.")
            continue

        speak(f"I found {len(matched_conditions)} conditions related to {symptom}. Let's narrow it down.")

        # Step 2: Ask for duration
        speak("For how many days have you been experiencing this?")
        duration_input = listen().strip()
        if duration_input == "exit":
            speak("Goodbye! Stay healthy.")
            break

        duration_number = extract_duration(duration_input)
        if not duration_number:
            speak("I didn't hear a valid duration. Please try again.")
            continue

        # ✅ Filter conditions based on duration range
        valid_conditions = matched_conditions[
            matched_conditions["duration"].apply(lambda x: is_duration_valid(duration_number, str(x)))
        ]

        if valid_conditions.empty:
            speak("No conditions match that duration range. Please consult a doctor.")
            continue

        speak(f"{len(valid_conditions)} conditions match your symptom and duration.")

        # Step 3: Ask for severity
        speak("Is it mild, moderate, or severe?")
        severity = listen().strip()
        if severity == "exit":
            speak("Goodbye! Stay healthy.")
            break

        valid_conditions = valid_conditions[valid_conditions["severity"].str.contains(severity, na=False, case=False)]
        if valid_conditions.empty:
            speak("No conditions match that severity level. Please consult a doctor.")
            continue

        speak(f"{len(valid_conditions)} conditions match your symptom, duration, and severity.")

        # Step 4: Ask about risk factors
        risk_factors = valid_conditions["risk_factors"].dropna().unique()
        risk_factors_text = ", ".join(risk_factors)

        if risk_factors_text:
            speak(f"Based on the conditions found, some possible risk factors are: {risk_factors_text}. Do any of these apply to you?")
            user_risk_factor = listen().strip()
            if user_risk_factor == "exit":
                speak("Goodbye! Stay healthy.")
                break

            if user_risk_factor:
                valid_conditions = valid_conditions[valid_conditions["risk_factors"].str.contains(user_risk_factor, na=False, case=False)]
        
        # Step 5: Identify the most probable condition
        if valid_conditions.empty:
            speak("I couldn't find a perfect match. However, based on the symptoms and severity, you should consult a doctor.")
            continue

        condition_name = valid_conditions.iloc[0]["condition"]
        suggested_action = valid_conditions.iloc[0]["suggested_action"]

        speak(f"It looks like you might have {condition_name}. {suggested_action}")

# ✅ Start live chatbot
start_live_consultation()
