# Medical EHR + MRI Summarization & Enhancement Notebook

This notebook is pre-configured to: upload/load your merged EHR CSV and MRI ZIP, run a Hugging Face summarization model, generate image captions using BLIP (with fallback), display MRI images inline, and produce diagnostic reports per patient.

How to use:
1. Run the first cell to install required libraries.
2. Run the upload cell to upload `Merged_EHR_Data.csv` (or `ALL IN ONE.csv`) and your `mri_images.zip`.
3. Run the subsequent cells step-by-step.

In [None]:
# Install required libraries (run once)
!pip install -q transformers==4.40.0 accelerate torch torchvision pillow pandas numpy datasets nbformat scikit-image ipywidgets --quiet || true
# BLIP is sometimes installed from GitHub; we will try the stable HF packages first.
!pip install -q git+https://github.com/salesforce/BLIP.git@main --quiet || true
!pip install -q sentence-transformers --quiet || true

print('Install step finished. If any install fails, check logs above and re-run the cell.')

In [None]:
# Upload files (works in Google Colab). If running locally, place files in working directory.
try:
    from google.colab import files, drive
    IN_COLAB = True
except Exception:
    IN_COLAB = False

print('Running in Colab:', IN_COLAB)

if IN_COLAB:
    print('Mounting Google Drive (optional). If you do not want to mount, skip by pressing Ctrl+C when prompted.)')
    try:
        drive.mount('/content/drive')
    except Exception as e:
        print('Drive mount skipped or failed:', e)

    print('\nUse the file-upload widget to upload your files. Upload your merged EHR CSV (e.g., Merged_EHR_Data.csv) and mri_images.zip.')
    uploaded = files.upload()
    for name in uploaded.keys():
        print('Uploaded:', name)
else:
    print('Not in Colab. Make sure your files are in the working directory (or update the paths manually).')

In [None]:
# Set file paths - adjust if needed.
import os, glob, zipfile, io, json
from pathlib import Path
BASE = Path('/content') if 'google.colab' in str(get_ipython).lower() else Path.cwd()
# Common filenames we expect; if uploaded via Colab these will be in /content/
CSV_OPTIONS = ['Merged_EHR_Data.csv', 'ALL IN ONE.csv', 'ALL_IN_ONE.csv', 'Final_EHR_1200.csv']
ZIP_OPTIONS = ['mri_images.zip', 'enhanced_mri_results.zip', 'TRAIN.zip', 'Images of MRI.zip']

def find_first_existing(options):
    for o in options:
        p = BASE/o
        if p.exists(): return p
    return None

CSV_PATH = find_first_existing(CSV_OPTIONS)
ZIP_PATH = find_first_existing(ZIP_OPTIONS)
EXTRACT_PATH = BASE/'extracted_mri_images'

print('Detected CSV path:', CSV_PATH)
print('Detected ZIP path:', ZIP_PATH)
print('Extract path:', EXTRACT_PATH)

# Safe CSV read
import pandas as pd
def safe_read_csv(path):
    if path is None:
        print('CSV not found. Please upload one of:', CSV_OPTIONS)
        return pd.DataFrame()
    for enc in ['utf-8','latin1','ISO-8859-1']:
        try:
            df = pd.read_csv(path, encoding=enc, low_memory=False)
            print(f'Loaded {path} using encoding {enc}; shape={df.shape}')
            return df
        except Exception as e:
            print(f'Failed to read with {enc}:', e)
    print('All attempts failed. You may need to pre-process the CSV.')
    return pd.DataFrame()

df = safe_read_csv(CSV_PATH)
df.columns = [c.strip() for c in df.columns]
print('\nColumns found:\n', df.columns.tolist())

In [None]:
# Setup summarization pipeline (Hugging Face)
from transformers import pipeline
import torch

device = 0 if torch.cuda.is_available() else -1
print('Using device (torch.cuda.is_available):', torch.cuda.is_available(), 'device index:', device)

try:
    summarizer = pipeline('summarization', model='facebook/bart-large-cnn', tokenizer='facebook/bart-large-cnn', device=device)
    print('Loaded facebook/bart-large-cnn summarizer.')
except Exception as e:
    print('Failed to load BART summarizer, falling back to t5-small summarizer:', e)
    summarizer = pipeline('summarization', model='t5-small', tokenizer='t5-small', device=device)

In [None]:
# Functions to create clinical text and summarize
def create_patient_summary(row, colmap=None):
    # colmap: optional mapping from normalized names to your dataset names
    get = lambda k, default='N/A': row.get(colmap.get(k,k)) if colmap else row.get(k, default)
    clinical_text = f"""Patient ID: {get('Patient_ID','N/A')}
Age: {get('Age','N/A')}
Sex: {get('Sex','N/A')}
Clinical Description: {get('Clinical_Description','')}
Date of Diagnosis: {get('Date_of_Diagnosis','N/A')}
ICD10 Code: {get('ICD10_Code','N/A')}
Laboratory Findings:
- Creatinine: {get('Creatinine_mg_dL','N/A')}
- BNP: {get('BNP_pg_mL','N/A')}
Imaging Findings: {get('Imaging_Findings','N/A')}
Treatment: {get('Treatment','N/A')}
Outcome: {get('Outcome','N/A')}
"""
    return clinical_text

def summarize_patient_data(row_text, max_len=120):
    # Use summarizer; keep safe for short text
    try:
        if len(str(row_text)) < 80:
            return str(row_text)[:max_len]
        out = summarizer(str(row_text), max_length=max_len, min_length=30, do_sample=False)
        return out[0]['summary_text']
    except Exception as e:
        # fallback truncation
        return str(row_text)[:max_len] + '...'

In [None]:
# Image captioning: try BLIP first, then fallback to ViT-GPT2
from PIL import Image
import os, glob

have_blip = False
processor = None
image_model = None
try:
    from transformers import BlipProcessor, BlipForConditionalGeneration
    processor = BlipProcessor.from_pretrained('Salesforce/blip-image-captioning-base')
    image_model = BlipForConditionalGeneration.from_pretrained('Salesforce/blip-image-captioning-base')
    have_blip = True
    print('Loaded BLIP model from Salesforce.')
except Exception as e:
    print('BLIP load failed, will try fallback. Error:', e)

if not have_blip:
    try:
        from transformers import VisionEncoderDecoderModel, ViTImageProcessor, AutoTokenizer
        processor = ViTImageProcessor.from_pretrained('nlpconnect/vit-gpt2-image-captioning')
        image_model = VisionEncoderDecoderModel.from_pretrained('nlpconnect/vit-gpt2-image-captioning')
        have_blip = True
        print('Loaded ViT-GPT2 fallback model.')
    except Exception as e:
        print('Fallback image caption model failed too:', e)
        have_blip = False

def analyze_mri_image(image_path):
    if not have_blip:
        return 'No image-captioning model available'
    try:
        img = Image.open(image_path).convert('RGB')
        if hasattr(processor, 'feature_extractor'):
            inputs = processor(images=img, return_tensors='pt').to('cuda' if torch.cuda.is_available() else 'cpu')
            out = image_model.generate(**inputs, max_length=64, num_beams=4)
            caption = processor.decode(out[0], skip_special_tokens=True)
        else:
            # ViT-GPT2 path
            pixel_values = processor(images=img, return_tensors='pt').pixel_values.to('cuda' if torch.cuda.is_available() else 'cpu')
            generated_ids = image_model.generate(pixel_values, max_length=64, num_beams=4)
            from transformers import AutoTokenizer
            tokenizer = AutoTokenizer.from_pretrained('nlpconnect/vit-gpt2-image-captioning')
            caption = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
        return caption
    except Exception as e:
        return f'Image analysis error: {e}'


In [None]:
# Extract MRI ZIP (if present) and list images
import zipfile, os
EXTRACT_PATH = Path(EXTRACT_PATH)
if ZIP_PATH and ZIP_PATH.exists():
    EXTRACT_PATH.mkdir(parents=True, exist_ok=True)
    try:
        with zipfile.ZipFile(ZIP_PATH, 'r') as zf:
            zf.extractall(EXTRACT_PATH)
        print('Extracted ZIP to', EXTRACT_PATH)
    except Exception as e:
        print('Failed to extract ZIP:', e)
else:
    print('No ZIP file found at', ZIP_PATH)

# find images
img_patterns = ['**/*.png','**/*.jpg','**/*.jpeg','**/*.bmp']
mri_images = []
for p in img_patterns:
    mri_images.extend(list(EXTRACT_PATH.glob(p)))
mri_images = sorted(mri_images)
print('Found', len(mri_images), 'images. Sample:', [str(x.name) for x in mri_images[:5]])

# Display first image inline (if available)
from IPython.display import display
if mri_images:
    sample = mri_images[0]
    print('Displaying sample image:', sample.name)
    display(Image.open(sample).resize((512,512)))
else:
    print('No images to display.')

In [None]:
# Process top N patients and generate reports + optional image analysis
from IPython.display import display, HTML
output_reports_dir = Path('/content/patient_reports')
output_reports_dir.mkdir(exist_ok=True, parents=True)

N = min(5, len(df))  # small demo; change as needed (you asked for 5 samples earlier)
reports = []
colmap = {}  # optional mapping from expected names to your CSV columns; leave empty to use raw names

for idx, row in df.head(N).iterrows():
    pid = row.get('Patient_ID') or row.get('PatientId') or row.get('PatientID') or f'row_{idx}'
    print('\n' + '='*60)
    print('Processing Patient:', pid)
    clinical_raw = create_patient_summary(row, colmap=colmap)
    clinical_sum = summarize_patient_data(clinical_raw, max_len=120)
    # image - try to pick a corresponding image by PID substring, otherwise use sample by index
    chosen_img = None
    for img in mri_images:
        if str(pid) in img.name:
            chosen_img = img; break
    if chosen_img is None and mri_images:
        chosen_img = mri_images[idx % len(mri_images)]
    if chosen_img:
        caption = analyze_mri_image(str(chosen_img))
    else:
        caption = 'No MRI available'
    report_text = f"""PATIENT REPORT - {pid}
Clinical summary: {clinical_sum}
Image caption: {caption}
"""
    # save report
    outp = output_reports_dir/f'report_{pid}.txt'
    outp.write_text(report_text)
    reports.append({'Patient_ID': pid, 'summary': clinical_sum, 'image_caption': caption, 'image_path': str(chosen_img) if chosen_img else ''})
    # display minimal card
    display(HTML(f"""<div style='border:1px solid #ccc;padding:10px;margin:8px;border-radius:8px;'>
    <h4>Patient {pid}</h4><b>Summary:</b><p>{clinical_sum}</p><b>Image caption:</b><p>{caption}</p></div>"""))

# Save combined reports CSV
import pandas as pd
pd.DataFrame(reports).to_csv('/content/patient_reports_summary.csv', index=False)
print('Saved reports to /content/patient_reports and /content/patient_reports_summary.csv')

In [None]:
# Final tips & optional copy to Drive
print('If you mounted Google Drive earlier, you can copy outputs there:')
try:
    from google.colab import drive, files
    drive_root = Path('/content/drive/MyDrive/Medical_Reports')
    drive_root.mkdir(parents=True, exist_ok=True)
    # copy files
    import shutil
    shutil.copy('/content/patient_reports_summary.csv', drive_root/'patient_reports_summary.csv')
    print('Copied patient_reports_summary.csv to', drive_root)
except Exception as e:
    print('Drive copy skipped (not in Colab or mount failed):', e)

print('Notebook complete. If you want the .ipynb file downloaded, use the file browser or download from the runtime.')