In [1]:
import os
os.environ["HF_TOKEN"] = "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"

from huggingface_hub import login

hf_token = os.getenv("HF_TOKEN")
if hf_token:
    login(hf_token)
    print("Hugging Face login successful.")
else:
    raise EnvironmentError("HF_TOKEN not found.")

Note: Environment variable`HF_TOKEN` is set and is the current active token independently from the token you've just configured.


Hugging Face login successful.


In [2]:
import os
import time
import json
import logging
import requests
from bs4 import BeautifulSoup

In [3]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)

# -------------------------------
# Medical Data Scraper Class
# -------------------------------
class MedicalDataScraper:
    def __init__(self):
        self.headers = {
            'User-Agent': 'Medical Research Bot'
        }
        self.data_dir = 'medical_data'
        os.makedirs(self.data_dir, exist_ok=True)

    def scrape_pubmed(self, query, max_results=100):
        """Scrape article titles and abstracts from PubMed."""
        logging.info(f"Scraping PubMed for query: {query}")
        
        base_url = "https://pubmed.ncbi.nlm.nih.gov"
        search_url = f"{base_url}/?term={query.replace(' ', '+')}"
        
        articles = []
        page = 1
        results_count = 0
        filename = os.path.join(self.data_dir, f"{query.replace(' ', '_')}_pubmed.json")

        while results_count < max_results:
            try:
                response = requests.get(f"{search_url}&page={page}", headers=self.headers)
                response.raise_for_status()
                soup = BeautifulSoup(response.text, 'html.parser')
                
                article_elements = soup.select('.docsum-content')
                if not article_elements:
                    break

                for article in article_elements:
                    if results_count >= max_results:
                        break
                    
                    title_element = article.select_one('.docsum-title')
                    title = title_element.text.strip() if title_element else "No title"
                    abstract_link = title_element.get('href') if title_element else None
                    abstract_url = base_url + abstract_link if abstract_link else None
                    abstract = self._get_abstract(abstract_url) if abstract_url else "No abstract"

                    articles.append({
                        'title': title,
                        'abstract': abstract,
                        'source': abstract_url or "N/A",
                        'query': query,
                        'scraped_at': time.strftime("%Y-%m-%d %H:%M:%S")
                    })

                    results_count += 1

                    # Save every 10 results as a backup
                    if results_count % 10 == 0:
                        with open(filename, 'w') as f:
                            json.dump(articles, f, indent=2)

                page += 1
                time.sleep(2)

            except Exception as e:
                logging.error(f"Error scraping page {page}: {e}")
                break

        with open(filename, 'w') as f:
            json.dump(articles, f, indent=2)

        logging.info(f"Saved {len(articles)} articles to {filename}")
        return articles

    def _get_abstract(self, url):
        """Helper to get abstract from article detail page."""
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            soup = BeautifulSoup(response.text, 'html.parser')

            abstract_div = soup.select_one('.abstract-content')
            return abstract_div.text.strip() if abstract_div else "Abstract not available"
        except Exception as e:
            logging.warning(f"Error fetching abstract from {url}: {e}")
            return "Error fetching abstract"

    def scrape_medical_dictionary(self, letters=None):
        """Scrape medical terms and definitions from MedicineNet."""
        if letters is None:
            letters = list('abcdefghijklmnopqrstuvwxyz')

        base_url = "https://www.medicinenet.com/script/main/alphaidx.asp?p="
        all_terms = {}

        for letter in letters:
            logging.info(f"Scraping medical terms for letter: {letter}")
            try:
                response = requests.get(f"{base_url}{letter}", headers=self.headers)
                response.raise_for_status()
                soup = BeautifulSoup(response.text, 'html.parser')

                term_elements = soup.select('.AZ_results a')
                terms = {}

                for element in term_elements:
                    term = element.text.strip()
                    link = element.get('href')
                    full_link = link if link.startswith("http") else "https://www.medicinenet.com" + link

                    if term and full_link:
                        definition = self._get_term_definition(full_link)
                        terms[term] = {
                            'definition': definition,
                            'source': full_link,
                            'scraped_at': time.strftime("%Y-%m-%d %H:%M:%S")
                        }

                all_terms.update(terms)
                time.sleep(2)

            except Exception as e:
                logging.error(f"Error scraping terms for letter '{letter}': {e}")

        filename = os.path.join(self.data_dir, "medical_terminology.json")
        with open(filename, 'w') as f:
            json.dump(all_terms, f, indent=2)

        logging.info(f"Saved {len(all_terms)} medical terms to {filename}")
        return all_terms

    def _get_term_definition(self, url):
        """Helper to get a term's definition from its page."""
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            soup = BeautifulSoup(response.text, 'html.parser')

            definition_div = soup.select_one('.main-content article')
            return definition_div.text.strip() if definition_div else "Definition not available"
        except Exception as e:
            logging.warning(f"Error fetching definition from {url}: {e}")
            return "Error fetching definition"

# -------------------------------
# Example Usage 
# -------------------------------
scraper = MedicalDataScraper()
articles = scraper.scrape_pubmed("diabetes treatment", max_results=50)
terms = scraper.scrape_medical_dictionary(letters=['a', 'b', 'c'])


2025-04-16 12:54:17,110 [INFO] Scraping PubMed for query: diabetes treatment
2025-04-16 12:54:58,678 [INFO] Saved 50 articles to medical_data/diabetes_treatment_pubmed.json
2025-04-16 12:54:58,681 [INFO] Scraping medical terms for letter: a
2025-04-16 12:55:00,515 [ERROR] Error scraping terms for letter 'a': 404 Client Error: Not Found for url: https://www.medicinenet.com/script/main/alphaidx.asp?p=a
2025-04-16 12:55:00,516 [INFO] Scraping medical terms for letter: b
2025-04-16 12:55:01,710 [ERROR] Error scraping terms for letter 'b': 404 Client Error: Not Found for url: https://www.medicinenet.com/script/main/alphaidx.asp?p=b
2025-04-16 12:55:01,711 [INFO] Scraping medical terms for letter: c
2025-04-16 12:55:02,983 [ERROR] Error scraping terms for letter 'c': 404 Client Error: Not Found for url: https://www.medicinenet.com/script/main/alphaidx.asp?p=c
2025-04-16 12:55:02,987 [INFO] Saved 0 medical terms to medical_data/medical_terminology.json


In [4]:
# -------------------------------
# Imports
# -------------------------------
import os
import re
import json
import nltk
import numpy as np
import pandas as pd
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from sklearn.model_selection import train_test_split

# -------------------------------
# Setup NLTK Resources (run once)
# -------------------------------
nltk.download('punkt', quiet=True)
nltk.download('wordnet')
nltk.download('stopwords')

# -------------------------------
# Medical Data Processor Class
# -------------------------------
class MedicalDataProcessor:
    def __init__(self):
        self.lemmatizer = WordNetLemmatizer()
        self.stop_words = set(stopwords.words('english')).union({
            'patient', 'patients', 'doctor', 'doctors', 'hospital', 'treatment'
        })
        self.data_dir = 'processed_data'
        os.makedirs(self.data_dir, exist_ok=True)

    def load_data(self, file_path):
        """Load raw JSON data from file."""
        with open(file_path, 'r') as f:
            return json.load(f)

    def preprocess_text(self, text):
        """Lowercase, remove punctuation, stop words, and lemmatize."""
        text = re.sub(r'[^\w\s]', '', text.lower())
        words = word_tokenize(text)
        processed_words = [
            self.lemmatizer.lemmatize(word)
            for word in words if word not in self.stop_words
        ]
        return ' '.join(processed_words)

    def process_medical_articles(self, file_path):
        """Clean titles and abstracts from scraped articles."""
        articles = self.load_data(file_path)
        processed_data = []

        for article in articles:
            processed_data.append({
                'title': self.preprocess_text(article['title']),
                'abstract': self.preprocess_text(article['abstract']),
                'original_title': article['title'],
                'original_abstract': article['abstract']
            })

        output_file = os.path.join(self.data_dir, os.path.basename(file_path))
        with open(output_file, 'w') as f:
            json.dump(processed_data, f, indent=2)

        print(f" Processed and saved {len(processed_data)} articles to {output_file}")
        return processed_data

    def extract_medical_entities(self, text):
        """Use regex to find basic disease/drug/symptom patterns."""
        patterns = {
            'diseases': [
                r'\b(?:[A-Z][a-z]+ (?:disease|disorder|syndrome))\b',
                r'\b(?:[A-Z][a-z]+ (?:cancer|tumor|carcinoma))\b'
            ],
            'drugs': [
                r'\b[A-Z][a-z]+(?:in|en)\b',
                r'\b[A-Z][a-z]+(?:ol|al)\b',
                r'\b[A-Z][a-z]+ide\b'
            ],
            'symptoms': [
                r'\b(?:pain|ache|discomfort|fever|swelling) (?:in|of) [a-z]+\b',
                r'\b(?:chronic|acute|severe) [a-z]+ (?:pain|inflammation)\b'
            ]
        }

        entities = {'diseases': [], 'drugs': [], 'symptoms': []}
        for entity_type, regexes in patterns.items():
            for pattern in regexes:
                matches = re.findall(pattern, text)
                if matches:
                    entities[entity_type].extend(matches)

        return entities

    def prepare_training_data(self, processed_articles, split_ratio=0.2):
        """Label abstracts, split for training, and save to JSON."""
        texts = [article['original_abstract'] for article in processed_articles]
        processed_texts = [article['abstract'] for article in processed_articles]

        # Labeling: 1 if text contains 'treatment' or 'therapy'
        labels = [
            1 if 'treatment' in text.lower() or 'therapy' in text.lower() else 0
            for text in texts
        ]

        X_train, X_val, y_train, y_val = train_test_split(
            processed_texts, labels, test_size=split_ratio, random_state=42
        )

        train_data = {'texts': X_train, 'labels': y_train}
        val_data = {'texts': X_val, 'labels': y_val}

        with open(os.path.join(self.data_dir, 'train_data.json'), 'w') as f:
            json.dump(train_data, f, indent=2)
        with open(os.path.join(self.data_dir, 'val_data.json'), 'w') as f:
            json.dump(val_data, f, indent=2)

        print(f"Training samples: {len(X_train)}, Validation samples: {len(X_val)}")
        return train_data, val_data


2025-04-16 12:55:03,925 [INFO] NumExpr defaulting to 8 threads.
[nltk_data] Downloading package wordnet to /Users/aditya/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/aditya/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [5]:
import os
import json
import torch
from transformers import (
    AutoTokenizer,
    AutoModelForTokenClassification,
    AutoModelForSeq2SeqLM,
    pipeline
)
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from datetime import datetime

In [6]:
class MedicalNLPSystem:
    def __init__(self, ehr_path='ehr_data.json'):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.models_dir = 'nlp_models'
        os.makedirs(self.models_dir, exist_ok=True)

        self.ehr_path = ehr_path
        if not os.path.exists(self.ehr_path):
            with open(self.ehr_path, 'w') as f:
                json.dump({}, f)

        self._load_models()

    def _load_models(self):
        print("Loading models...")
        self.stt_pipeline = pipeline(
            "automatic-speech-recognition",
            model="openai/whisper-medium",
            device=0 if torch.cuda.is_available() else -1
        )

        self.ner_tokenizer = AutoTokenizer.from_pretrained("d4data/biomedical-ner-all")
        self.ner_model = AutoModelForTokenClassification.from_pretrained("d4data/biomedical-ner-all").to(self.device)
        self.ner_pipeline = pipeline(
            "ner",
            model=self.ner_model,
            tokenizer=self.ner_tokenizer,
            aggregation_strategy="simple",
            device=0 if torch.cuda.is_available() else -1
        )

        self.summarizer_tokenizer = AutoTokenizer.from_pretrained("google/pegasus-pubmed")
        self.summarizer_model = AutoModelForSeq2SeqLM.from_pretrained("google/pegasus-pubmed").to(self.device)

    def transcribe_audio(self, audio_path):
        print(f"Transcribing: {audio_path}")
        result = self.stt_pipeline(audio_path)
        return result["text"]

    def extract_entities(self, text):
        print("Extracting medical entities...")
        raw_entities = self.ner_pipeline(text)
        entities = {}
        for item in raw_entities:
            label = item['entity_group']
            word = item['word']
            if label not in entities:
                entities[label] = []
            entities[label].append(word)
        return entities

    def summarize_text(self, text):
        print("Summarizing text...")
        inputs = self.summarizer_tokenizer(
            text, return_tensors="pt", truncation=True, max_length=1024
        ).to(self.device)
        summary_ids = self.summarizer_model.generate(
            inputs.input_ids,
            max_length=150,
            min_length=50,
            num_beams=4,
            early_stopping=True
        )
        return self.summarizer_tokenizer.decode(summary_ids[0], skip_special_tokens=True)

    def update_ehr(self, patient_id, transcript, summary, entities):
        with open(self.ehr_path, 'r') as f:
            ehr_data = json.load(f)

        if patient_id not in ehr_data:
            ehr_data[patient_id] = {
                "records": []
            }

        ehr_data[patient_id]["records"].append({
            "timestamp": datetime.now().isoformat(),
            "transcript": transcript,
            "summary": summary,
            "entities": entities
        })

        with open(self.ehr_path, 'w') as f:
            json.dump(ehr_data, f, indent=2)

        print(f"Updated EHR for patient {patient_id}.")

    def process_patient_audio(self, patient_id, audio_path):
        transcript = self.transcribe_audio(audio_path)
        summary = self.summarize_text(transcript)
        entities = self.extract_entities(transcript)
        self.update_ehr(patient_id, transcript, summary, entities)
        return transcript, summary, entities

    def chatbot_response(self, patient_id, question):
        with open(self.ehr_path, 'r') as f:
            ehr_data = json.load(f)

        if patient_id not in ehr_data:
            return "No data found for this patient."

        latest_record = ehr_data[patient_id]["records"][-1]
        entities = latest_record['entities']

        question_lower = question.lower()
        response = "I'm here to help. "

        if "symptom" in question_lower:
            response += f"Noted symptoms: {', '.join(entities.get('SYMPTOM', [])) or 'No symptoms recorded.'}"
        elif "treatment" in question_lower:
            response += f"Ongoing treatments: {', '.join(entities.get('TREATMENT', [])) or 'No treatments recorded.'}"
        elif "diagnosis" in question_lower:
            response += f"Diagnosis details: {', '.join(entities.get('DISEASE', [])) or 'No diagnosis found.'}"
        else:
            response += "You can ask about symptoms, treatments, or diagnosis."

        return response


In [7]:
nlp = MedicalNLPSystem()

sample_text = """
The patient was prescribed 500mg of Amoxicillin for an upper respiratory infection. 
He has a history of hypertension and Type 2 diabetes. 
Vitals are stable. Follow-up scheduled in 2 weeks.
"""

entities = nlp.extract_entities(sample_text)
print("\n🧬 Extracted Entities:")
for k, v in entities.items():
    print(f"{k}: {', '.join(v)}")

summary = nlp.summarize_text(sample_text)
print("\n📄 Generated Summary:")
print(summary)


Loading models...


Device set to use cpu
Device set to use cpu
Some weights of PegasusForConditionalGeneration were not initialized from the model checkpoint at google/pegasus-pubmed and are newly initialized: ['model.decoder.embed_positions.weight', 'model.encoder.embed_positions.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Extracting medical entities...

🧬 Extracted Entities:
Dosage: 500mg
Medication: am, ##oxicillin
Biological_structure: upper respiratory
History: hypertension, type 2 diabetes
Diagnostic_procedure: vital
Clinical_event: follow
Lab_value: 2
Summarizing text...

📄 Generated Summary:
key clinical messagea 77-year - old man was admitted to the intensive care unit ( icu ) with a 5-day history of dyspnea , cough , and chest pain . on admission , <n> a chest computed tomography ( ct ) scan revealed a large left pulmonary artery aneurysm . <n> oxicillin was administered , and the patient made an uneventful recovery .


In [8]:
import os
import json
from datetime import datetime

class EHRIntegration:
    def __init__(self):
        self.data_dir = "ehr_data"
        os.makedirs(self.data_dir, exist_ok=True)

    def _get_file_path(self, patient_id):
        return os.path.join(self.data_dir, f"{patient_id}.json")

    def _init_patient_record(self, patient_id):
        # Create a blank template for new patient
        return {
            "patient_id": patient_id,
            "demographics": {},
            "medical_history": {
                "conditions": [],
                "allergies": [],
            },
            "medications": [],
            "lab_results": {},
            "visits": [],
            "notes": [],
            "timestamps": {
                "created_at": str(datetime.now()),
                "updated_at": str(datetime.now())
            }
        }

    def fetch_patient_data(self, patient_id, fields=None):
        file_path = self._get_file_path(patient_id)
        if not os.path.exists(file_path):
            return {"error": "Patient not found"}
        with open(file_path, 'r') as f:
            data = json.load(f)
        if fields:
            return {k: data.get(k) for k in fields}
        return data

    def save_patient_data(self, patient_id, data):
        file_path = self._get_file_path(patient_id)
        data["timestamps"]["updated_at"] = str(datetime.now())
        with open(file_path, 'w') as f:
            json.dump(data, f, indent=2)

    def get_or_create_patient(self, patient_id):
        file_path = self._get_file_path(patient_id)
        if os.path.exists(file_path):
            return self.fetch_patient_data(patient_id)
        new_data = self._init_patient_record(patient_id)
        self.save_patient_data(patient_id, new_data)
        return new_data

    def update_patient_data(self, patient_id, updates):
        current_data = self.get_or_create_patient(patient_id)

        for key, value in updates.items():
            if key not in current_data:
                current_data[key] = value
            elif isinstance(current_data[key], list) and isinstance(value, list):
                current_data[key].extend(value)
            elif isinstance(current_data[key], dict) and isinstance(value, dict):
                current_data[key].update(value)
            else:
                current_data[key] = value

        self.save_patient_data(patient_id, current_data)
        return {"status": "success", "updated": updates}

    def append_note(self, patient_id, note):
        data = self.get_or_create_patient(patient_id)
        data["notes"].append({"note": note, "timestamp": str(datetime.now())})
        self.save_patient_data(patient_id, data)
        return {"status": "note added"}

    def append_visit(self, patient_id, visit_data):
        data = self.get_or_create_patient(patient_id)
        visit_data["timestamp"] = str(datetime.now())
        data["visits"].append(visit_data)
        self.save_patient_data(patient_id, data)
        return {"status": "visit added"}

    def create_patient_summary(self, patient_id):
        data = self.fetch_patient_data(patient_id)
        if "error" in data:
            return data
        age = self._calculate_age(data["demographics"].get("dob")) if data["demographics"].get("dob") else "N/A"
        return {
            "patient_id": data["patient_id"],
            "age": age,
            "gender": data["demographics"].get("gender", "N/A"),
            "conditions": data["medical_history"].get("conditions", []),
            "allergies": data["medical_history"].get("allergies", []),
            "medications": data.get("medications", []),
            "recent_notes": data.get("notes", [])[-3:],
            "last_updated": data["timestamps"]["updated_at"]
        }

    def _calculate_age(self, dob):
        try:
            birth_date = datetime.fromisoformat(dob)
            today = datetime.now()
            return today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day))
        except:
            return "Invalid DOB"


In [9]:
import os
import json
import torch
import re
import random
from datetime import datetime
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline

EHR_FILE = "ehr_records.json"
AUDIT_LOG = "chatbot_audit_log.json"


# ------------------------------
# 🔧 Utility EHR Storage
# ------------------------------

def load_ehr():
    if not os.path.exists(EHR_FILE):
        return {}
    with open(EHR_FILE, "r") as f:
        return json.load(f)

def save_ehr(data):
    with open(EHR_FILE, "w") as f:
        json.dump(data, f, indent=2)

def generate_numeric_id():
    return str(random.randint(100000, 999999))

def enroll_new_patient(name, age, gender, medical_history=None):
    ehr = load_ehr()
    patient_id = generate_numeric_id()
    while patient_id in ehr:
        patient_id = generate_numeric_id()
    ehr[patient_id] = {
        "name": name,
        "age": age,
        "gender": gender,
        "medical_history": medical_history or [],
        "vitals": [],
        "records": [],
    }
    save_ehr(ehr)
    return patient_id

def update_medical_record(patient_id, note):
    ehr = load_ehr()
    if patient_id in ehr:
        ehr[patient_id]["records"].append({"note": note, "timestamp": str(datetime.now())})
        save_ehr(ehr)
        return "✅ Medical record updated."
    return "❌ Patient ID not found."

def update_vitals(patient_id, vitals):
    ehr = load_ehr()
    if patient_id in ehr:
        vitals["timestamp"] = str(datetime.now())
        ehr[patient_id]["vitals"].append(vitals)
        save_ehr(ehr)
        return "✅ Vitals updated."
    return "❌ Patient ID not found."


# ------------------------------
# 📄 EHR Wrapper Class
# ------------------------------

class EHRIntegration:
    def fetch_patient_data(self, patient_id):
        ehr = load_ehr()
        return ehr.get(patient_id, {"error": "Patient not found"})

    def update_patient_data(self, patient_id, updates):
        ehr = load_ehr()
        if patient_id not in ehr:
            return {"error": "Patient not found"}
        for key, value in updates.items():
            if isinstance(ehr[patient_id].get(key), list) and isinstance(value, list):
                ehr[patient_id][key].extend(value)
            else:
                ehr[patient_id][key] = value
        save_ehr(ehr)
        return {"status": "updated", "updates": updates}

    def create_patient_summary(self, patient_id):
        data = self.fetch_patient_data(patient_id)
        if "error" in data:
            return data
        return {
            "summary": f"{data['name']}, {data['age']} y/o, {data['gender']}.",
            "medical_history": data.get("medical_history", []),
            "vitals": data.get("vitals", [])[-3:],
            "records": data.get("records", [])[-3:]
        }

    def log_audit_event(self, event_type, user_id, patient_id, details):
        log_entry = {
            "timestamp": str(datetime.now()),
            "event": event_type,
            "user": user_id,
            "patient_id": patient_id,
            "details": details
        }
        if not os.path.exists(AUDIT_LOG):
            logs = []
        else:
            with open(AUDIT_LOG, 'r') as f:
                logs = json.load(f)
        logs.append(log_entry)
        with open(AUDIT_LOG, 'w') as f:
            json.dump(logs, f, indent=2)


# ------------------------------
# Medical Chatbot Core
# ------------------------------

class MedicalChatbot:
    def __init__(self, knowledge_base_path=None):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

        self.tokenizer = AutoTokenizer.from_pretrained("microsoft/BioGPT")
        self.model = AutoModelForCausalLM.from_pretrained("microsoft/BioGPT").to(self.device)

        self.knowledge_base = {}
        if knowledge_base_path and os.path.exists(knowledge_base_path):
            with open(knowledge_base_path, 'r') as f:
                self.knowledge_base = json.load(f)

        self.ehr = EHRIntegration()

    def classify_query_intent(self, query):
        intents = {
            'request_information': ['what is', 'tell me about', 'explain'],
            'documentation': ['note', 'record', 'update'],
            'medication': ['prescribe', 'drug', 'dosage'],
            'diagnosis': ['diagnose', 'symptoms'],
        }
        query_lower = query.lower()
        for intent, keywords in intents.items():
            if any(keyword in query_lower for keyword in keywords):
                return intent
        return 'general_query'

    def extract_medical_terms(self, text):
        prefixes = ['hyper', 'hypo', 'neuro', 'cardio']
        suffixes = ['itis', 'osis', 'oma', 'algia']
        words = re.findall(r'\b\w+\b', text.lower())
        return list({word for word in words if any(word.startswith(p) for p in prefixes) or any(word.endswith(s) for s in suffixes)})

    def process_query(self, query, patient_id=None, user_id="system"):
        intent = self.classify_query_intent(query)
        terms = self.extract_medical_terms(query)
        context = f"Doctor asked: {query}\n\nIntent: {intent}\n"

        if patient_id:
            summary = self.ehr.create_patient_summary(patient_id)
            context += f"Patient Summary:\n{json.dumps(summary, indent=2)}\n"

        # Inject KB terms if found
        for term in terms:
            if term in self.knowledge_base:
                context += f"{term}: {self.knowledge_base[term]}\n"

        inputs = self.tokenizer(context, return_tensors="pt", truncation=True).to(self.device)
        response_ids = self.model.generate(inputs["input_ids"], max_length=150, do_sample=True, top_p=0.9, temperature=0.7)
        response = self.tokenizer.decode(response_ids[0], skip_special_tokens=True)

        # Auto update and audit
        if intent == 'documentation' and patient_id:
            update_medical_record(patient_id, query)
            response += "\n\n📝 Patient record has been updated."
            self.ehr.log_audit_event("documentation", user_id, patient_id, query)

        elif intent == 'medication' and patient_id:
            data = self.ehr.fetch_patient_data(patient_id)
            meds = data.get("medications", [])
            allergies = data.get("medical_history", {}).get("allergies", [])
            response += f"\n\n💊 Medications: {', '.join(meds)}"
            response += f"\n⚠️ Allergies: {', '.join(allergies)}"
            self.ehr.log_audit_event("medication_check", user_id, patient_id, f"Query: {query}")

        else:
            self.ehr.log_audit_event("chat_query", user_id, patient_id or "unknown", query)

        return response, intent, ", ".join(terms)


In [10]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline

class LocalChatBot:
    def __init__(self, model_name="google/flan-t5-base"):
        print("Loading model, please wait...")
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
        self.pipeline = pipeline("text2text-generation", model=self.model, tokenizer=self.tokenizer)

    def process_query(self, query, context=""):
        prompt = f"Answer the question based on the context.\nContext: {context}\nQuestion: {query}"
        result = self.pipeline(prompt, max_new_tokens=100, do_sample=False)
        return result[0]["generated_text"]


In [11]:
bot = LocalChatBot()

question = "What are symptoms of high blood pressure?"
context = "Patient is experiencing headaches, blurred vision, and fatigue."

response = bot.process_query(question, context)
print("Bot says:", response)


Loading model, please wait...


Device set to use mps:0


Bot says: headaches, blurred vision, and fatigue


In [12]:
import os
import json
import random
from datetime import datetime

import whisper
import torch
from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification

EHR_FILE = "ehr_data.json"

class MedicalAIAssistant:
    def __init__(self):
        print("Initializing Medical AI Assistant...")
        self.chatbot = LocalChatBot(model_name="google/flan-t5-base")
        self.transcriber = whisper.load_model("base")

        # Load biomedical NER model
        self.ner_pipeline = pipeline(
            "ner",
            model="d4data/biomedical-ner-all",
            tokenizer="d4data/biomedical-ner-all",
            aggregation_strategy="simple"
        )

        # Summarization pipeline
        self.summarizer = pipeline("summarization", model="facebook/bart-large-cnn")

        os.makedirs("logs", exist_ok=True)
        os.makedirs("audit_logs", exist_ok=True)

    def load_ehr(self):
        if not os.path.exists(EHR_FILE):
            return {}
        with open(EHR_FILE, "r") as f:
            return json.load(f)

    def save_ehr(self, data):
        with open(EHR_FILE, "w") as f:
            json.dump(data, f, indent=2)

    def generate_numeric_id(self):
        return str(random.randint(100000, 999999))

    def enroll_new_patient(self, name, age, gender, medical_history=None):
        ehr = self.load_ehr()
        patient_id = self.generate_numeric_id()
        while patient_id in ehr:
            patient_id = self.generate_numeric_id()
        ehr[patient_id] = {
            "name": name,
            "age": age,
            "gender": gender,
            "medical_history": medical_history or [],
            "vitals": [],
            "records": [],
        }
        self.save_ehr(ehr)
        return patient_id

    def update_patient_record(self, patient_id, note=None, vitals=None):
        ehr = self.load_ehr()
        if patient_id not in ehr:
            return {"error": "Patient ID not found"}
        if note:
            ehr[patient_id]["records"].append({
                "note": note,
                "timestamp": str(datetime.now())
            })
        if vitals:
            vitals["timestamp"] = str(datetime.now())
            ehr[patient_id]["vitals"].append(vitals)
        self.save_ehr(ehr)
        return {"status": "updated", "patient_id": patient_id}

    def view_patient_record(self, patient_id):
        ehr = self.load_ehr()
        return ehr.get(patient_id, {"error": "Patient not found"})

    def process_doctor_query(self, query, patient_id=None, user_id=None):
        if user_id is None:
            return {"error": "User ID required"}

        patient_data = self.view_patient_record(patient_id)
        context = json.dumps(patient_data, indent=2)

        response_text, intent, terms = self.chatbot.process_query(query, context)

        log_entry = {
            "timestamp": str(datetime.now()),
            "user_id": user_id,
            "patient_id": patient_id,
            "query": query,
            "intent": intent,
            "terms": terms,
            "response": response_text
        }

        log_path = f"logs/{datetime.now().strftime('%Y%m%d_%H%M%S')}_{user_id}.json"
        with open(log_path, "w") as f:
            json.dump(log_entry, f, indent=2)

        return {"response": response_text, "intent": intent, "terms": terms}

    def fetch_patient_data(self, patient_id):
        ehr = self.load_ehr()
        return ehr.get(patient_id, {"error": "Patient not found"})

    def update_patient_data(self, patient_id, updates):
        ehr = self.load_ehr()
        if patient_id not in ehr:
            return {"error": "Patient ID not found"}

        if "notes" in updates:
            ehr[patient_id]["records"].append({
                "note": updates["notes"],
                "timestamp": str(datetime.now())
            })

        if "vital_signs" in updates:
            updates["vital_signs"]["timestamp"] = str(datetime.now())
            ehr[patient_id]["vitals"].append(updates["vital_signs"])

        if "medications" in updates:
            ehr[patient_id].setdefault("medications", []).extend(updates["medications"])

        if "lab_results" in updates:
            ehr[patient_id].setdefault("lab_results", {}).update(updates["lab_results"])

        self.save_ehr(ehr)
        return {"status": "updated", "patient_id": patient_id}

    def process_audio_recording(self, audio_file_path, user_id):
        print(f"Processing audio: {audio_file_path}")

        # Step 1: Transcribe audio to text
        result = self.transcriber.transcribe(audio_file_path)
        transcript = result["text"]

        # Step 2: Extract biomedical entities
        ner_results = self.ner_pipeline(transcript)
        entities = list(set(item["word"] for item in ner_results))

        # Step 3: Summarize the transcript
        chunks = [transcript[i:i+1000] for i in range(0, len(transcript), 1000)]
        summaries = [
            self.summarizer(chunk, max_length=100, min_length=25, do_sample=False)[0]["summary_text"]
            for chunk in chunks
        ]
        summary = " ".join(summaries)

        # Step 4: Log the audio processing event
        audit_log = {
            "timestamp": str(datetime.now()),
            "event": "audio_processing",
            "user_id": user_id,
            "file": audio_file_path
        }
        with open(f"audit_logs/audio_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", "w") as f:
            json.dump(audit_log, f, indent=2)

        return {
            "transcript": transcript,
            "entities": entities,
            "summary": summary
        }


In [13]:
assistant = MedicalAIAssistant()

audio_path = "/Users/aditya/Desktop/65yo-diabete.wav"  # Use your actual audio filename
user_id = "doctor_123"  # Replace with your user ID if needed

result = assistant.process_audio_recording(audio_path, user_id)

print("\n--- Transcript ---")
print(result["transcript"])

print("\n--- Extracted Medical Entities ---")
print(result["entities"])

print("\n--- Summary ---")
print(result["summary"])


Initializing Medical AI Assistant...
Loading model, please wait...


Device set to use mps:0
Device set to use mps:0
Device set to use mps:0


Processing audio: /Users/aditya/Desktop/65yo-diabete.wav


Your max_length is set to 100, but your input_length is only 63. Since this is a summarization task, where outputs shorter than the input are typically wanted, you might consider decreasing max_length manually, e.g. summarizer('...', max_length=31)



--- Transcript ---
 The patient is a 65 year old male with a history of type 2 diabetes and hypertension. He reports increased fatigue, blood vision and elevated blood glucose levels over the past week. Current medications include metformin and lycinopiril. Recommend checking HBA1C and adjusting medication if necessary.

--- Extracted Medical Entities ---
['hyper', 'blood vision', '##y', '##ril', 'medication', 'elevated', '65 year old', '##form', 'met', 'blood glucose', 'l', 'male', 'h', '##in', '##cino', 'type 2 diabetes', 'fatigue', '##pi']

--- Summary ---
The patient is a 65 year old male with a history of type 2 diabetes and hypertension. He reports increased fatigue, blood vision and elevated blood glucose levels over the past week. Current medications include metformin and lycinopiril.


In [14]:
import gradio as gr
import json

assistant = MedicalAIAssistant()

class MedicalBot:
    def __init__(self):
        self.ai = assistant

    def process_query(self, query, patient_id):
        result = self.ai.process_doctor_query(query, patient_id=patient_id, user_id="doc001")
        return result["response"], result["intent"], result["terms"]

    @property
    def ehr(self):
        return self.ai

    def handle_audio(self, audio_file, user_id, patient_id):
        if not audio_file or not user_id.strip():
            return "Missing audio or user ID", "", ""
        if not patient_id:
            return "❌ Patient ID is required", "", ""
        result = self.ai.process_audio_recording(audio_file, user_id)
        return result["transcript"], ", ".join(result["entities"]), result["summary"]

    def view_patient(self, patient_id):
        record = self.ai.view_patient_record(patient_id)
        return json.dumps(record, indent=2)

    def enroll_patient(self, name, age, gender, history):
        pid = self.ai.enroll_new_patient(name, age, gender, history.split(",") if history else [])
        return f"New Patient ID: {pid}", pid

    def update_record(self, patient_id, note, bp, hr, temp):
        vitals = {
            "blood_pressure": bp,
            "heart_rate": hr,
            "temperature": temp
        } if any([bp, hr, temp]) else None
        result = self.ai.update_patient_record(patient_id, note=note, vitals=vitals)
        return json.dumps(result, indent=2)

    def ask_grok(self, query, patient_id):
        if not patient_id:
            return "❌ Patient ID is required."
        
        record = self.ai.view_patient_record(patient_id)
        if "error" in record:
            return f"❌ {record['error']}"
        
        context = json.dumps(record, indent=2)
        response = self.ai.chatbot.process_query(query, context)
        return response

bot = MedicalBot()

with gr.Blocks(title="Medical AI Assistant") as demo:
    gr.Markdown("## 🩺 Medical AI Voice Assistant")

    patient_state = gr.State()

    with gr.Tab("🗣️ Audio Analysis"):
        user_id = gr.Textbox(label="User ID")
        audio_input = gr.Audio(label="Upload Audio", type="filepath")
        transcript = gr.Textbox(label="Transcript")
        entities = gr.Textbox(label="Extracted Medical Entities")
        summary = gr.Textbox(label="Summary")
        gr.Button("Process Audio").click(
            fn=bot.handle_audio,
            inputs=[audio_input, user_id, patient_state],
            outputs=[transcript, entities, summary]
        )

    with gr.Tab("💬 Ask Open AI"):
        gr.Markdown("Ask medical questions based on patient record")
        grok_query = gr.Textbox(label="Your Question")
        grok_response = gr.Textbox(label="AI Response", lines=8)
        gr.Button("Ask Open AI").click(
            fn=bot.ask_grok,
            inputs=[grok_query, patient_state],
            outputs=grok_response
        )

    with gr.Tab("👤 Patient Management"):
        with gr.Row():
            name = gr.Textbox(label="Name")
            age = gr.Textbox(label="Age")
            gender = gr.Dropdown(["Male", "Female", "Other"], label="Gender")
            history = gr.Textbox(label="Medical History (comma separated)")
        output_pid = gr.Textbox(label="Patient ID")
        gr.Button("Enroll New Patient").click(
            fn=bot.enroll_patient,
            inputs=[name, age, gender, history],
            outputs=[output_pid, patient_state]
        )

    with gr.Tab("📋 Update/View Record"):
        pid_input = gr.Textbox(label="Patient ID")
        note = gr.Textbox(label="Doctor Note")
        bp = gr.Textbox(label="Blood Pressure")
        hr = gr.Textbox(label="Heart Rate")
        temp = gr.Textbox(label="Temperature")
        update_output = gr.Textbox(label="Update Result")
        gr.Button("Update Record").click(
            fn=bot.update_record,
            inputs=[pid_input, note, bp, hr, temp],
            outputs=update_output
        )

        gr.Markdown("### 🔍 View Patient Record")
        view_output = gr.Textbox(label="Patient Record", lines=10)
        gr.Button("View Record").click(
            fn=bot.view_patient,
            inputs=[pid_input],
            outputs=view_output
        )

demo.launch(share=True)


Initializing Medical AI Assistant...
Loading model, please wait...


2025-04-16 12:55:52,578 [INFO] HTTP Request: GET https://api.gradio.app/gradio-messaging/en "HTTP/1.1 200 OK"
Device set to use mps:0
Device set to use mps:0
Device set to use mps:0
2025-04-16 12:56:16,171 [INFO] HTTP Request: GET http://127.0.0.1:7860/gradio_api/startup-events "HTTP/1.1 200 OK"
2025-04-16 12:56:16,222 [INFO] HTTP Request: HEAD http://127.0.0.1:7860/ "HTTP/1.1 200 OK"


* Running on local URL:  http://127.0.0.1:7860


2025-04-16 12:56:17,122 [INFO] HTTP Request: GET https://api.gradio.app/pkg-version "HTTP/1.1 200 OK"
2025-04-16 12:56:17,462 [INFO] HTTP Request: GET https://api.gradio.app/v3/tunnel-request "HTTP/1.1 200 OK"


* Running on public URL: https://5efcfa6113625c51e9.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


2025-04-16 12:56:20,163 [INFO] HTTP Request: HEAD https://5efcfa6113625c51e9.gradio.live "HTTP/1.1 200 OK"


