<a href="https://colab.research.google.com/github/herndoch/dermopath-ai-hub/blob/main/Knowledge_Pipeline_v4_0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Block 0

In [2]:
# @title {display-mode: "form"}
# ==============================================================================
# BLOCK 0: UNIVERSAL SETUP (Textbooks + Lectures)
# ==============================================================================
import os
import shutil
from google.colab import drive, userdata, auth
from google.cloud import storage
import google.generativeai as genai

print("--- STEP 0: INITIALIZATION ---")

# 1. Install & Configure System (Textbooks + Whisper/Video tools)
print("üì¶ Installing dependencies (PDF, Video, AI)...")
!sudo apt-get update -qq && sudo apt-get install -y ffmpeg > /dev/null 2>&1
!pip install -q -U google-generativeai PyMuPDF scikit-image aiohttp tqdm openai-whisper opencv-python-headless

# 2. Authentication
print("üîë Authenticating with Google Cloud...")
try:
    auth.authenticate_user()
    GEMINI_API_KEY = userdata.get('GEMINI_API_KEY')
    genai.configure(api_key=GEMINI_API_KEY)
except Exception as e:
    raise SystemExit(f"‚ùå Authentication Failed: {e}")

# 3. Mount Drive (Source Storage)
try:
    drive.mount('/content/drive', force_remount=True)
except:
    pass

# 4. Universal Configuration
GCS_BUCKET_NAME = 'pathology-hub-0'
DRIVE_ROOT = '/content/drive/MyDrive/1-Projects/Knowledge_Pipeline'

# Initialize GCS Client
storage_client = storage.Client()
bucket = storage_client.bucket(GCS_BUCKET_NAME)

# --- THE MASTER PATH MAP ---
# This dictionary handles routing for BOTH workflows.
PATHS = {
    # --- SOURCES (Local Google Drive) ---
    "source_pdfs":      os.path.join(DRIVE_ROOT, '_source_materials', 'pdfs'),
    "source_videos":    os.path.join(DRIVE_ROOT, '_source_materials', 'videos'),

    # --- DESTINATIONS (GCS Bucket Paths) ---
    "gcs_bucket":       GCS_BUCKET_NAME,
    "gcs_tags":         "Tags",  # Where your _Tags.txt files live

    # Textbook Pipeline
    "gcs_asset_textbooks":   "_asset_library/textbooks",
    "gcs_content_textbooks": "_content_library/textbooks",

    # Lecture Pipeline
    "gcs_asset_lectures":    "_asset_library/lectures",
    "gcs_content_lectures":  "_content_library/lectures"
}

# 5. Verification
print(f"\n‚úÖ Connected to Bucket: gs://{GCS_BUCKET_NAME}")
print(f"‚úÖ Source PDFs:   {PATHS['source_pdfs']}")
print(f"‚úÖ Source Videos: {PATHS['source_videos']}")
print("\nüöÄ SYSTEM READY. You can now run Block 1 (Textbook) or Block 1 (Lecture).")

--- STEP 0: INITIALIZATION ---
üì¶ Installing dependencies (PDF, Video, AI)...
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
üîë Authenticating with Google Cloud...
Mounted at /content/drive

‚úÖ Connected to Bucket: gs://pathology-hub-0
‚úÖ Source PDFs:   /content/drive/MyDrive/1-Projects/Knowledge_Pipeline/_source_materials/pdfs
‚úÖ Source Videos: /content/drive/MyDrive/1-Projects/Knowledge_Pipeline/_source_materials/videos

üöÄ SYSTEM READY. You can now run Block 1 (Textbook) or Block 1 (Lecture).


# PDF BLOCK 1: TEXTBOOK EXTRACTOR (Text + Figures)

In [None]:
# @title {display-mode: "form"}
# ==============================================================================
# BLOCK 1: TEXTBOOK EXTRACTOR (Gemini 3 Flash - Panel Aware)
# ==============================================================================
import base64
import fitz  # PyMuPDF
import json
import asyncio
import aiohttp
import re
import os
from tqdm.asyncio import tqdm_asyncio
from google.cloud import storage

# --- CONFIGURATION ---
TEXT_CONCURRENCY = 20
VISION_CONCURRENCY = 20
TEXT_MODEL_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-3-flash-preview:generateContent?key={GEMINI_API_KEY}"
VISION_MODEL_NAME = "gemini-3-flash-preview"

# --- HELPER: GCS UTILS ---
def gcs_exists(blob_path):
    return bucket.blob(blob_path).exists()

def gcs_upload_bytes(data, blob_path, content_type):
    blob = bucket.blob(blob_path)
    blob.upload_from_string(data, content_type=content_type)

def gcs_upload_json(data, blob_path):
    blob = bucket.blob(blob_path)
    blob.upload_from_string(json.dumps(data, indent=2), content_type='application/json')

def gcs_load_json(blob_path):
    blob = bucket.blob(blob_path)
    if blob.exists():
        return json.loads(blob.download_as_string())
    return []

# --- AI HELPERS ---
async def clean_text_async(session, text, page_num, sem):
    async with sem:
        if not text.strip(): return page_num, ""
        prompt = (
            "Clean this medical text. Fix OCR errors. Keep structure. "
            "Preserve Figure Captions exactly. Return JSON: {\"markdown\": \"...\"}"
            f"\n\nRAW TEXT:\n{text}"
        )
        payload = {"contents": [{"parts": [{"text": prompt}]}]}
        try:
            async with session.post(TEXT_MODEL_URL, json=payload) as res:
                if res.status == 200:
                    dat = await res.json()
                    raw = dat['candidates'][0]['content']['parts'][0]['text']
                    match = re.search(r'\{.*\}', raw, re.DOTALL)
                    if match: return page_num, json.loads(match.group(0)).get("markdown", text)
                return page_num, text
        except: return page_num, text

async def analyze_figure_async(session, b64_img, context, sem):
    """
    Panel-Aware Vision Analysis.
    Tries to distinguish if the image is just Panel A or Panel B of a multipart figure.
    """
    async with sem:
        url = f"https://generativelanguage.googleapis.com/v1beta/models/{VISION_MODEL_NAME}:generateContent?key={GEMINI_API_KEY}"

        prompt = f"""
        PAGE CONTEXT:
        {context}

        TASK: Analyze the image below.
        1. Identify the Figure ID (e.g. "Fig 2.1") from the context that matches this image.
        2. **MULTI-PANEL CHECK:**
           - Does the caption describe multiple parts (e.g. "(A) ... (B) ...")?
           - If yes, determine if THIS specific image is Panel A, Panel B, etc.
           - If this image is ONLY Panel A, try to extract ONLY the caption text for (A).
           - If you cannot split the text, return the full caption but add "(Panel A)" to the ID.

        Return JSON: {{"figure_id": "Fig X.X (Panel A)", "matched_caption": "Specific caption..."}} or null.
        """

        parts = [
            {"text": prompt},
            {"inline_data": {"mime_type": "image/png", "data": b64_img}}
        ]

        try:
            async with session.post(url, json={"contents": [{"parts": parts}]}) as res:
                if res.status == 200:
                    dat = await res.json()
                    raw = dat['candidates'][0]['content']['parts'][0]['text']
                    match = re.search(r'\{.*\}', raw, re.DOTALL)
                    if match: return json.loads(match.group(0))
        except: return None
        return None

# --- MAIN PROCESSOR ---
async def process_textbook(pdf_path, start_p=1, end_p=None):
    fname = os.path.basename(pdf_path)
    book_name = os.path.splitext(fname)[0].replace(' ', '_')

    base_asset = f"{PATHS['gcs_asset_textbooks']}/{book_name}"
    path_fig_imgs = f"{base_asset}/figure_images"
    path_content = f"{PATHS['gcs_content_textbooks']}/{book_name}_CONTENT.json"
    path_figures = f"{PATHS['gcs_content_textbooks']}/{book_name}_FIGURES.json"

    print(f"\n{'='*60}\nüìò PROCESSING: {book_name}\n{'='*60}")

    doc = fitz.open(pdf_path)
    total = len(doc)
    final_p = min(end_p or total, total)

    # 1. TEXT (Skip if done)
    existing_content = gcs_load_json(path_content)
    if not existing_content:
        print(f"üìù Phase 1: Cleaning Text...")
        sem = asyncio.Semaphore(TEXT_CONCURRENCY)
        async with aiohttp.ClientSession() as sess:
            tasks = [clean_text_async(sess, doc.load_page(p).get_text("text"), p+1, sem) for p in range(start_p-1, final_p)]
            results = await tqdm_asyncio.gather(*tasks)
        content_data = sorted([{"source": fname, "page_number": p, "content": t} for p, t in results], key=lambda x: x['page_number'])
        gcs_upload_json(content_data, path_content)
    else:
        content_data = existing_content

    content_map = {c['page_number']: c['content'] for c in content_data}

    # 2. FIGURES
    print("üñºÔ∏è Phase 2: Figures & Vision (Panel-Aware)...")
    existing_figs = gcs_load_json(path_figures)
    processed_pages = {f['source_page'] for f in existing_figs}
    vision_tasks = []
    new_figures = []
    sem_vis = asyncio.Semaphore(VISION_CONCURRENCY)

    for p_idx in range(start_p-1, final_p):
        p_num = p_idx + 1
        if p_num in processed_pages: continue

        page = doc.load_page(p_idx)
        images = page.get_images(full=True)
        if not images: continue

        md_ctx = content_map.get(p_num, "")

        for i, img in enumerate(images):
            try:
                xref = img[0]
                base = doc.extract_image(xref)
                if len(base["image"]) < 5000: continue

                img_name = f"{book_name}_page_{p_num}_img_{i+1}.{base['ext']}"
                blob_path = f"{path_fig_imgs}/{img_name}"
                full_uri = f"gs://{GCS_BUCKET_NAME}/{blob_path}"

                if not gcs_exists(blob_path):
                    gcs_upload_bytes(base["image"], blob_path, f"image/{base['ext']}")

                b64 = base64.b64encode(base["image"]).decode('utf-8')
                vision_tasks.append({
                    "b64": b64, "ctx": md_ctx,
                    "meta": {"source_page": p_num, "gcs_path": full_uri}
                })
            except: pass

    if vision_tasks:
        print(f"   -> Analyzing {len(vision_tasks)} figures...")
        async with aiohttp.ClientSession() as sess:
            tasks = [analyze_figure_async(sess, t['b64'], t['ctx'], sem_vis) for t in vision_tasks]
            results = await tqdm_asyncio.gather(*tasks)

            for i, res in enumerate(results):
                if res and res.get('figure_id'):
                    meta = vision_tasks[i]['meta']
                    new_figures.append({
                        "source_document": fname,
                        "source_page": meta['source_page'],
                        "figure_id": res['figure_id'],
                        "description": res['matched_caption'],
                        "gcs_path": meta['gcs_path']
                    })

        final_list = existing_figs + new_figures
        final_list.sort(key=lambda x: x['source_page'])
        gcs_upload_json(final_list, path_figures)
        print(f"   -> Added {len(new_figures)} figures.")

# --- RUNNER ---
async def main():
    pdfs = sorted([f for f in os.listdir(PATHS['source_pdfs']) if f.endswith('.pdf')])
    if not pdfs: print("‚ùå No PDFs found."); return

    print("\n--- AVAILABLE TEXTBOOKS ---")
    for i, f in enumerate(pdfs): print(f"[{i+1}] {f}")

    sel = input("\nSelect book(s) (e.g. 1, 3): ")
    indices = [int(x)-1 for x in sel.split(',') if x.strip().isdigit()]

    for idx in indices:
        if 0 <= idx < len(pdfs):
            await process_textbook(os.path.join(PATHS['source_pdfs'], pdfs[idx]))

await main()


--- AVAILABLE TEXTBOOKS ---
[1] BST Horvai.pdf
[2] Bone Atlas.pdf
[3] Bone Dorfman.pdf
[4] Bone Pattern.pdf
[5] Breast Atlas.pdf
[6] Breast Biopsy.pdf
[7] Breast FAQ.pdf
[8] Breast Pattern.pdf
[9] Cyto Breast Yokohama.pdf
[10] Cyto Cibas.pdf
[11] Cyto Comprehensive Part One.pdf
[12] Cyto Comprehensive Part Two.pdf
[13] Cyto GU Paris.pdf
[14] Cyto Gyn Bethesda.pdf
[15] Cyto Milan.pdf
[16] Cyto PSC Lung.pdf
[17] Cyto Pattern.pdf
[18] Cyto Serous Fluids.pdf
[19] Cyto Thyroid Bethesda.pdf
[20] Derm Dermoscopy Atlas.pdf
[21] Derm McKee.pdf
[22] Derm_Elston.pdf
[23] Derm_Levers.pdf
[24] Derm_Patterson.pdf
[25] Derm_Weedon.pdf
[26] Endo Atlas.pdf
[27] GI Atlas.pdf
[28] GI Biopsy Interpretation (Neoplastic).pdf
[29] GI Biopsy Interpretation (Non Neoplastic).pdf
[30] GI Intestinal Atlas.pdf
[31] GI Liver Atlas Chan.pdf
[32] GI Liver Gonzalez.pdf
[33] GI Liver Lefkowitch.pdf
[34] GI Liver Macsween.pdf
[35] GI Liver Mounajjed.pdf
[36] GI Non-Neoplastic Zhang.pdf
[37] GI Odze.pdf
[38] GU Biopsy I

 11%|‚ñà‚ñè        | 88/781 [01:50<19:27,  1.69s/it]

# PDF BLOCK 2: TEXTBOOK ARCHITECT (High-Fidelity Monolith)


In [None]:
# @title {display-mode: "form"}
# ==============================================================================
# BLOCK 2: TEXTBOOK ARCHITECT (High-Fidelity Pro + Auto-Save)
# ==============================================================================
import json
import asyncio
import aiohttp
import re
import difflib
import random
import os
from typing import List, Dict, Set, Any
from google.cloud import storage
from tqdm.asyncio import tqdm_asyncio

# --- CONFIGURATION ---
MODEL_NAME = "gemini-3-pro-preview"    # "Our Most Intelligent Model"
CONCURRENCY_LIMIT = 2                  # Strict limit to prevent 429s on Pro
PAGES_PER_CHUNK = 40                   # 40 pages = optimal context window
PAGE_OVERLAP = 2                       # Prevents cutting entities in half
MAX_RETRIES = 10                       # Resilience against API timeouts
SAVE_EVERY_N_CHUNKS = 5                # Auto-save to GCS every ~200 pages

# --- HELPERS ---
def gcs_read_text(blob_path: str) -> str:
    blob = bucket.blob(blob_path)
    return blob.download_as_string().decode('utf-8') if blob.exists() else ""

def gcs_load_json(blob_path: str) -> List:
    blob = bucket.blob(blob_path)
    return json.loads(blob.download_as_string()) if blob.exists() else []

def gcs_upload_json(data: Any, blob_path: str):
    blob = bucket.blob(blob_path)
    blob.upload_from_string(json.dumps(data, indent=2), content_type='application/json')

def validate_tag(tag: str, valid_set: Set[str]) -> str:
    if not tag: return "Skin::Unclassified"
    clean = tag.strip()
    if clean in valid_set: return clean
    matches = difflib.get_close_matches(clean, list(valid_set), n=1, cutoff=0.7)
    return matches[0] if matches else clean

# ------------------------------------------------------------------------------
# 2. PROMPT ENGINEERING (THE MENU METHOD)
# ------------------------------------------------------------------------------
def construct_textbook_prompt(text_content, figure_menu, valid_tags_list):
    return f"""
Role: You are a Senior Dermatopathologist and Data Engineer.
Objective: Convert the provided TEXTBOOK CONTENT into a standardized Knowledge Base.

INPUT CONTEXT:
This input represents a ~{PAGES_PER_CHUNK} page section of a medical textbook.

INSTRUCTIONS:
1. **Identify Entities:** Extract every distinct disease/pathology entity discussed.
2. **High-Fidelity Extraction:**
   - **Microscopic:** Extract detailed histological descriptions.
   - **Ancillary Studies:** List specific stains (CD45+, S100-) and genetics (t(11;22)).
3. **Figure Linking (THE MENU RULE):**
   - Below is a "MENU OF AVAILABLE FIGURES" found on these pages.
   - Each entry has an ID (e.g., Fig 1.2) and a GOLDEN LINK (gs://...).
   - If the text mentions "Figure 1.2 shows Lichen Planus", you MUST find Fig 1.2 in the menu and copy the **GOLDEN LINK** exactly into the 'src' and 'gcs_path' fields.
   - Do NOT invent paths. Only use paths from the menu.

REQUIRED JSON SCHEMA (List of Objects):
[
  {{
    "entity_name": "Disease Name",
    "definition": "...",
    "tags": ["Exact_Tag_From_List"],
    "html_gcs_path": null,
    "clinical": "...",
    "pathogenesis": "...",
    "macroscopic": "...",
    "microscopic": "Detailed histology...",
    "cytology": "...",
    "ancillary_studies": "List specific IHC/Molecular.",
    "differential_diagnosis": "...",
    "staging": "...",
    "prognosis_and_prediction": "...",

    "related_figures": [
        {{
            "id": "Figure ID (e.g. Fig 1.2)",
            "src": "gs://... (COPY FROM MENU)",
            "gcs_path": "gs://... (COPY FROM MENU)",
            "diagnosis": "Disease Name",
            "legend": "Full caption from text."
        }}
    ]
  }}
]

REFERENCE TAGS:
{valid_tags_list}

--- MENU OF AVAILABLE FIGURES (GOLDEN LINKS) ---
{figure_menu}

--- TEXTBOOK CONTENT ---
{text_content}
"""

# ------------------------------------------------------------------------------
# 3. CHUNK PROCESSOR
# ------------------------------------------------------------------------------
async def process_textbook_chunk(session, chunk_data, figures_in_chunk, valid_tags_text, valid_tags_set, sem):
    async with sem:
        full_text = "\n\n".join([f"--- Page {p['page_number']} ---\n{p['content']}" for p in chunk_data])

        fig_menu_list = []
        for f in figures_in_chunk:
            fig_menu_list.append(
                f"[ID: {f.get('figure_id', 'Unknown')}] -> GOLDEN LINK: {f['gcs_path']} | Caption: {f.get('description', '')}"
            )
        fig_menu = "\n".join(fig_menu_list)

        url = f"https://generativelanguage.googleapis.com/v1beta/models/{MODEL_NAME}:generateContent?key={GEMINI_API_KEY}"
        payload = {"contents": [{"parts": [{"text": construct_textbook_prompt(full_text, fig_menu, valid_tags_text)}]}]}

        for attempt in range(MAX_RETRIES):
            try:
                # 10 minute timeout for deep reasoning
                async with session.post(url, json=payload, timeout=600) as response:
                    if response.status == 200:
                        data = await response.json()
                        raw_txt = data['candidates'][0]['content']['parts'][0]['text']
                        raw_txt = raw_txt.replace("```json", "").replace("```", "")

                        match = re.search(r'\[.*\]', raw_txt, re.DOTALL)
                        if match:
                            entities = json.loads(match.group(0))
                            valid_entities = []
                            for ent in entities:
                                if not ent.get('entity_name'): continue
                                ent['tags'] = [validate_tag(t, valid_tags_set) for t in ent.get('tags', [])]
                                ent['html_gcs_path'] = None

                                # Enforce Schema Nulls
                                keys = ["clinical", "microscopic", "ancillary_studies", "differential_diagnosis", "pathogenesis"]
                                for k in keys:
                                    if k not in ent: ent[k] = None

                                valid_entities.append(ent)
                            return valid_entities
                        return []
                    elif response.status == 429:
                        wait = (2 ** attempt) + random.uniform(5, 15)
                        print(f"  ‚è≥ Rate Limit (Chunk p{chunk_data[0]['page_number']})... Waiting {wait:.1f}s")
                        await asyncio.sleep(wait)
                        continue
                    else:
                        print(f"‚ùå Error {response.status} on chunk p{chunk_data[0]['page_number']}")
                        return []
            except Exception as e:
                await asyncio.sleep(15)
        return []

# ------------------------------------------------------------------------------
# 4. MAIN WORKFLOW
# ------------------------------------------------------------------------------
async def main_textbook_monolith():
    # A. Select Tags
    tag_files = [b.name for b in bucket.list_blobs(prefix="Tags/") if b.name.endswith('.txt')]
    if not tag_files: print("‚ùå No tags found."); return

    print("\n--- SELECT TAG LIST ---")
    for i, f in enumerate(tag_files): print(f"[{i+1}] {f.split('/')[-1]}")
    t_idx = int(input("Choice: ")) - 1
    tags_text = gcs_read_text(tag_files[t_idx])
    tags_set = set(l.strip() for l in tags_text.splitlines() if l.strip())

    # B. Select Textbook
    content_files = [b.name for b in bucket.list_blobs(prefix=PATHS['gcs_content_textbooks']) if "_CONTENT.json" in b.name]
    if not content_files: print("‚ùå No CONTENT files found."); return

    print("\n--- SELECT TEXTBOOK ---")
    for i, f in enumerate(content_files): print(f"[{i+1}] {f.split('/')[-1]}")
    c_idx = int(input("Choice: ")) - 1

    content_path = content_files[c_idx]
    book_base = content_path.split('/')[-1].replace("_CONTENT.json", "")
    fig_path = content_path.replace("_CONTENT.json", "_FIGURES.json")
    final_path = f"{PATHS['gcs_content_textbooks']}/{book_base}_MASTER.json"

    print(f"\nüöÄ Processing: {book_base}")

    # C. RESUME LOGIC
    master_kb = []

    if bucket.blob(final_path).exists():
        print("üîÑ Found existing MASTER file. Resuming...")
        master_kb = gcs_load_json(final_path)

    raw_content = gcs_load_json(content_path)
    raw_figures = gcs_load_json(fig_path)
    raw_content.sort(key=lambda x: x['page_number'])

    # D. Prepare Chunks
    chunks = []
    total_pages = len(raw_content)
    for i in range(0, total_pages, PAGES_PER_CHUNK):
        end_idx = min(i + PAGES_PER_CHUNK + PAGE_OVERLAP, total_pages)
        chunk = raw_content[i : end_idx]
        chunks.append(chunk)

    print(f"üì¶ Total Chunks: {len(chunks)} (~{PAGES_PER_CHUNK} pages each)")

    # E. Batch Processing
    # We group chunks into batches (e.g. 5 chunks) to save intermittently
    chunk_batches = [chunks[i:i + SAVE_EVERY_N_CHUNKS] for i in range(0, len(chunks), SAVE_EVERY_N_CHUNKS)]

    sem = asyncio.Semaphore(CONCURRENCY_LIMIT)

    async with aiohttp.ClientSession() as session:
        for batch_idx, batch in enumerate(chunk_batches):
            print(f"\n--- Processing Batch {batch_idx + 1}/{len(chunk_batches)} ---")

            tasks = []
            for chunk in batch:
                page_nums = {p['page_number'] for p in chunk}
                chunk_figs = [f for f in raw_figures if f['source_page'] in page_nums]
                tasks.append(process_textbook_chunk(session, chunk, chunk_figs, tags_text, tags_set, sem))

            results = await tqdm_asyncio.gather(*tasks)

            # Add to Master List
            new_count = 0
            for res_list in results:
                for ent in res_list:
                    master_kb.append(ent)
                    new_count += 1

            # --- AUTO SAVE ---
            if new_count > 0:
                print(f"üíæ Saving progress... (+{new_count} entities)")
                gcs_upload_json(master_kb, final_path)
            else:
                print("   (No new entities in this batch)")

    # F. Final Deduplication & Save
    print("\nüßπ Final Deduplication...")
    unique_kb = []
    seen_keys = set()
    for ent in master_kb:
        # Dedupe key: Name + first 50 chars of definition
        key = (ent.get('entity_name'), ent.get('definition', '')[:50])
        if key not in seen_keys:
            unique_kb.append(ent)
            seen_keys.add(key)

    gcs_upload_json(unique_kb, final_path)
    print(f"\n‚úÖ DONE: gs://{GCS_BUCKET_NAME}/{final_path}")
    print(f"üìä Final Count: {len(unique_kb)} Entities")

await main_textbook_monolith()

# PDF BLOCK 3: THE CONSOLIDATOR (Map-Reduce Merge)

In [None]:
# @title {display-mode: "form"}
# ==============================================================================
# BLOCK 3: THE CONSOLIDATOR (Map-Reduce Merge)
# ==============================================================================
import json
import asyncio
import aiohttp
import re
from collections import defaultdict
from typing import List, Dict, Any  # <--- Added missing imports
from google.cloud import storage
from tqdm.asyncio import tqdm_asyncio

# --- CONFIGURATION ---
# Flash is perfect for merging text. It is fast and respects the data.
MODEL_NAME = "gemini-3-flash-preview"
CONCURRENCY_LIMIT = 15

# --- HELPERS ---
def gcs_load_json(blob_path: str) -> List:
    blob = bucket.blob(blob_path)
    return json.loads(blob.download_as_string()) if blob.exists() else []

def gcs_upload_json(data: Any, blob_path: str):
    blob = bucket.blob(blob_path)
    blob.upload_from_string(json.dumps(data, indent=2), content_type='application/json')

# --- PROMPT ---
def construct_merge_prompt(entity_name, fragments):
    return f"""
Role: Medical Data Editor.
Task: Merge these fragmented records for "{entity_name}" into ONE comprehensive entry.

INPUT FRAGMENTS (from different chapters):
{json.dumps(fragments, indent=2)}

INSTRUCTIONS:
1. **Consolidate Text:** Combine 'clinical', 'microscopic', 'definition', etc. Do not lose details.
   - If Fragment A has "Clinical: Itchy" and Fragment B has "Clinical: Purple papules", the result must be "Itchy, purple papules."
   - **Crucial:** Preserve all specific stains (CD45+, S100) and genetic findings.
2. **Merge Figures:** Combine all 'related_figures' into one list. Remove duplicates if exact same ID.
3. **Preserve Tags:** Use the most specific tag available.
4. **Output:** A single JSON object.

REQUIRED SCHEMA:
{{
    "entity_name": "{entity_name}",
    "definition": "Merged...",
    "tags": ["..."],
    "html_gcs_path": null,
    "clinical": "...",
    "pathogenesis": "...",
    "macroscopic": "...",
    "microscopic": "...",
    "cytology": "...",
    "ancillary_studies": "...",
    "differential_diagnosis": "...",
    "staging": "...",
    "prognosis_and_prediction": "...",
    "related_figures": [...]
}}
"""

# --- AI WORKER ---
async def merge_entity_group(session, tag, group, sem):
    async with sem:
        # If only 1 entry, no merge needed
        if len(group) == 1:
            return group[0]

        # Construct Prompt
        entity_name = group[0]['entity_name']
        prompt = construct_merge_prompt(entity_name, group)

        url = f"https://generativelanguage.googleapis.com/v1beta/models/{MODEL_NAME}:generateContent?key={GEMINI_API_KEY}"
        payload = {"contents": [{"parts": [{"text": prompt}]}]}

        try:
            async with session.post(url, json=payload) as response:
                if response.status == 200:
                    data = await response.json()
                    raw = data['candidates'][0]['content']['parts'][0]['text']
                    match = re.search(r'\{.*\}', raw, re.DOTALL)
                    if match:
                        return json.loads(match.group(0))
        except:
            pass

        # Fallback: Just return the first one if AI fails (prevents data loss)
        print(f"‚ö†Ô∏è Merge failed for {entity_name}, keeping fragments.")
        return group[0]

# --- MAIN ---
async def main_consolidator():
    # 1. Select Content
    content_files = [b.name for b in bucket.list_blobs(prefix=PATHS['gcs_content_textbooks']) if "_MASTER.json" in b.name and "_CONSOLIDATED" not in b.name]
    if not content_files: print("‚ùå No MASTER files found. Run Block 2 first."); return

    print("\n--- SELECT MASTER FILE TO CONSOLIDATE ---")
    for i, f in enumerate(content_files): print(f"[{i+1}] {f.split('/')[-1]}")
    c_idx = int(input("Choice: ")) - 1

    master_path = content_files[c_idx]
    raw_entities = gcs_load_json(master_path)

    print(f"\nüöÄ Consolidating {len(raw_entities)} entities...")

    # 2. Group by Tag (or Name if Tag is generic)
    groups = defaultdict(list)
    for ent in raw_entities:
        # Key strategy: Use the first Tag as the primary key.
        # If tag is missing/generic, fallback to Entity Name.
        tag_list = ent.get('tags', [])
        key = tag_list[0] if tag_list else ent.get('entity_name', 'Unknown')
        groups[key].append(ent)

    print(f"   -> Found {len(groups)} unique topics (Tags/Names).")

    # 3. Process Groups
    sem = asyncio.Semaphore(CONCURRENCY_LIMIT)
    final_kb = []

    async with aiohttp.ClientSession() as session:
        tasks = []
        for key, group in groups.items():
            tasks.append(merge_entity_group(session, key, group, sem))

        results = await tqdm_asyncio.gather(*tasks)
        final_kb = results

    # 4. Save
    out_path = master_path.replace("_MASTER.json", "_CONSOLIDATED.json")
    gcs_upload_json(final_kb, out_path)
    print(f"\n‚úÖ CONSOLIDATED MASTER SAVED: gs://{GCS_BUCKET_NAME}/{out_path}")
    print(f"üìä Reduced {len(raw_entities)} fragments -> {len(final_kb)} unique entities.")

await main_consolidator()

# VIDEO BLOCK 1

In [None]:
# @title {display-mode: "form"}
# ==============================================================================
# BLOCK 1: LECTURE EXTRACTOR (Whisper + Gemini 3 Flash)
# ==============================================================================
import shutil, cv2, whisper, json, os, io, base64, re, asyncio, aiohttp
import logging
from skimage.metrics import structural_similarity as ssim
from PIL import Image
from tqdm.notebook import tqdm
from tqdm.asyncio import tqdm_asyncio
from google.cloud import storage

# --- CONFIGURATION ---
logging.getLogger("urllib3").setLevel(logging.ERROR)
API_CONCURRENCY_LIMIT = 20
VISION_MODEL = "gemini-3-pro-preview" # Fast & Cheap for per-slide analysis

# --- HELPERS ---
def gcs_upload_file(local_path, blob_path):
    blob = bucket.blob(blob_path)
    blob.upload_from_filename(local_path)

def gcs_upload_json(data, blob_path):
    blob = bucket.blob(blob_path)
    blob.upload_from_string(json.dumps(data, indent=2), content_type='application/json')

def gcs_exists(blob_path):
    return bucket.blob(blob_path).exists()

def get_comparison_frame(frame):
    h, w = frame.shape[:2]
    new_w = 200
    new_h = int(h * (new_w / w))
    small = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA)
    gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY)
    return cv2.GaussianBlur(gray, (5, 5), 0)

# --- AI ANALYST ---
async def analyze_slide_async(session, slide_data, local_img_path, sem):
    async with sem:
        if not os.path.exists(local_img_path): return slide_data

        try:
            # Prepare Image
            with Image.open(local_img_path) as img:
                buf = io.BytesIO()
                img.convert("RGB").save(buf, format="JPEG")
                b64_img = base64.b64encode(buf.getvalue()).decode("utf-8")

            url = f"https://generativelanguage.googleapis.com/v1beta/models/{VISION_MODEL}:generateContent?key={GEMINI_API_KEY}"

            # Prompt: Extract raw visual data. We don't need deep reasoning yet, just "What is on this slide?"
            prompt = (
                f"Transcript Context: \"{slide_data['raw_transcript'][:1000]}...\"\n\n"
                "TASK: Analyze this slide image. \n"
                "1. Extract the Title.\n"
                "2. Extract text labels verbatim (e.g. 'CD45', 'H&E', '40x').\n"
                "3. Summarize the visual content (e.g., 'Histology showing...').\n"
                "Return JSON: {\"slide_title\": \"...\", \"key_points\": [\"...\"], \"visual_desc\": \"...\"}"
            )

            payload = {"contents": [{"parts": [{"text": prompt}, {"inline_data": {"mime_type": "image/jpeg", "data": b64_img}}]}]}

            async with session.post(url, json=payload) as res:
                if res.status == 200:
                    dat = await res.json()
                    txt = dat['candidates'][0]['content']['parts'][0]['text']
                    match = re.search(r'\{.*\}', txt, re.DOTALL)
                    if match:
                        slide_data.update(json.loads(match.group(0)))
        except Exception as e:
            pass # Skip frame if AI fails

        return slide_data

# --- PIPELINE ---
async def process_video(video_path, counter, total):
    fname = os.path.basename(video_path)
    lecture_name = os.path.splitext(fname)[0].replace(" ", "_")

    # GCS Paths
    asset_base = f"{PATHS['gcs_asset_lectures']}/{lecture_name}"
    raw_json_path = f"{PATHS['gcs_content_lectures']}/{lecture_name}_RAW.json"

    print(f"\n{'='*60}\nüé• PROCESSING {counter}/{total}: {lecture_name}\n{'='*60}")

    if gcs_exists(raw_json_path):
        print("‚úÖ Already processed in GCS. Skipping.")
        return

    # 1. WHISPER TRANSCRIPTION
    print("üéôÔ∏è Step 1: Whisper Transcription...")
    model = whisper.load_model("base") # Use 'small' if you have GPU RAM, 'base' is fast
    result = model.transcribe(video_path, fp16=False)

    # 2. FRAME EXTRACTION & MERGING
    print("üéûÔ∏è Step 2: Extracting Slides...")
    cap = cv2.VideoCapture(video_path)
    slides = []
    curr_slide = None
    prev_cmp = None

    # We use TQDM to track progress through the audio segments
    for seg in tqdm(result['segments'], desc="Scanning", unit="seg"):
        cap.set(cv2.CAP_PROP_POS_MSEC, seg['start'] * 1000)
        ret, frame = cap.read()
        if not ret: continue

        curr_cmp = get_comparison_frame(frame)

        if curr_slide is None:
            curr_slide = {**seg, 'frame': frame}
            prev_cmp = curr_cmp
            continue

        # SSIM Check (Merge if > 85% similar)
        if ssim(prev_cmp, curr_cmp, data_range=255) >= 0.85:
            curr_slide['text'] += " " + seg['text']
            curr_slide['end'] = seg['end']
        else:
            slides.append(curr_slide)
            curr_slide = {**seg, 'frame': frame}
            prev_cmp = curr_cmp

    if curr_slide: slides.append(curr_slide)
    cap.release()
    print(f"   -> Consolidated into {len(slides)} unique slides.")

    # 3. UPLOAD & PREPARE
    print("‚òÅÔ∏è Step 3: Uploading Images...")
    final_data = []
    local_imgs = {} # Map id -> local path for AI step

    for i, slide in enumerate(slides):
        img_name = f"{lecture_name}_slide_{i+1:04d}.jpg"
        local_p = f"/tmp/{img_name}"
        gcs_p = f"{asset_base}/{img_name}"
        full_uri = f"gs://{GCS_BUCKET_NAME}/{gcs_p}"

        cv2.imwrite(local_p, slide['frame'])

        if not gcs_exists(gcs_p):
            gcs_upload_file(local_p, gcs_p)

        local_imgs[i] = local_p

        final_data.append({
            "id": i,
            "timestamp_start": slide['start'],
            "timestamp_end": slide['end'],
            "raw_transcript": slide['text'].strip(),
            "image_path": full_uri,
            "gcs_path": full_uri, # Important for Block 2
            "slide_title": "",
            "key_points": [],
            "visual_desc": ""
        })

    # 4. GEMINI ENHANCEMENT
    print("üß† Step 4: Gemini Vision Analysis...")
    sem = asyncio.Semaphore(API_CONCURRENCY_LIMIT)
    async with aiohttp.ClientSession() as sess:
        tasks = [analyze_slide_async(sess, d, local_imgs[d['id']], sem) for d in final_data]
        enhanced_data = await tqdm_asyncio.gather(*tasks)

    # 5. SAVE RAW JSON
    gcs_upload_json(enhanced_data, raw_json_path)
    print(f"‚úÖ Saved RAW data: {raw_json_path}")

    # Cleanup
    for p in local_imgs.values():
        if os.path.exists(p): os.remove(p)

# --- RUNNER ---
async def main_lectures():
    vid_files = sorted([f for f in os.listdir(PATHS['source_videos']) if f.lower().endswith(('.mp4', '.mov'))])
    if not vid_files: print("‚ùå No videos found."); return

    print("\n--- AVAILABLE LECTURES ---")
    for i, v in enumerate(vid_files): print(f"[{i+1}] {v}")

    sel = input("\nSelect (e.g. 1, 3-5, or 'all'): ")
    indices = set()
    if sel == 'all': indices = range(len(vid_files))
    else:
        for part in sel.split(','):
            if '-' in part:
                s, e = map(int, part.split('-'))
                indices.update(range(s-1, e))
            elif part.strip().isdigit():
                indices.add(int(part)-1)

    for idx in sorted(list(indices)):
        if 0 <= idx < len(vid_files):
            await process_video(os.path.join(PATHS['source_videos'], vid_files[idx]), idx+1, len(indices))

await main_lectures()

# VIDEO BLOCK 2

In [None]:
# @title {display-mode: "form"}
# ==============================================================================
# BLOCK 2: LECTURE ARCHITECT (High-Fidelity Monolith)
# ==============================================================================
import json
import asyncio
import aiohttp
import re
import difflib
import random
from typing import List, Dict, Set, Any
from google.cloud import storage
from tqdm.asyncio import tqdm_asyncio

# --- CONFIGURATION ---
# Use the smartest model you have access to. 1.5 Pro is the current gold standard for context.
MODEL_NAME = "gemini-3-pro-preview"
MAX_RETRIES = 5

# --- HELPERS ---
def gcs_read_text(blob_path: str) -> str:
    blob = bucket.blob(blob_path)
    return blob.download_as_string().decode('utf-8') if blob.exists() else ""

def gcs_load_json(blob_path: str) -> List:
    blob = bucket.blob(blob_path)
    return json.loads(blob.download_as_string()) if blob.exists() else []

def gcs_upload_json(data: Any, blob_path: str):
    blob = bucket.blob(blob_path)
    blob.upload_from_string(json.dumps(data, indent=2), content_type='application/json')

def validate_tag(tag: str, valid_set: Set[str]) -> str:
    if not tag: return "Skin::Unclassified"
    clean = tag.strip()
    if clean in valid_set: return clean
    matches = difflib.get_close_matches(clean, list(valid_set), n=1, cutoff=0.7)
    return matches[0] if matches else clean

# ------------------------------------------------------------------------------
# 2. PROMPT ENGINEERING (MONOLITHIC)
# ------------------------------------------------------------------------------
def construct_monolith_prompt(transcript_data, valid_tags_list):
    return f"""
Role: You are a Senior Dermatopathologist and Data Engineer.
Objective: Convert the ENTIRE LECTURE provided below into a standardized Knowledge Base.

INPUT DATA:
- A chronological sequence of slides and transcript segments.
- Each segment includes: Visual Description (from AI), Key Text on Slide, and Audio Transcript.

INSTRUCTIONS:
1. **Consolidate:** The lecture may discuss one disease (e.g., Merkel Cell) across 10 slides. You must merge all that info into a SINGLE JSON entry.
2. **Detail Extraction (CRITICAL):**
   - **Stains (IHC):** You MUST list every specific stain mentioned in text OR visible on the slide (e.g., "CK20+", "TTF-1 negative", "CD45-").
   - **Do NOT summarize** as "ruled out lymphoma". Explicitly write "CD45 negative".
   - **Genetics:** List specific mutations (e.g., "Polyomavirus", "t(11;19)").
3. **Images:** Select the BEST representative slides for the 'related_figures' field.
   - **Legend:** Must be specific (e.g., "CK20 immunostain showing perinuclear dot-like positivity").

REQUIRED JSON SCHEMA (Do not deviate):
[
  {{
    "entity_name": "Disease Name",
    "definition": "...",
    "related_terminology": "...",
    "subtypes": "...",
    "tags": ["Exact_Tag_From_List"],
    "html_gcs_path": null,

    "localization": "...",
    "clinical": "...",
    "pathogenesis": "...",
    "staging": "...",
    "prognosis_and_prediction": "...",

    "macroscopic": "...",
    "microscopic": "Detailed histology...",
    "cytology": "...",
    "ancillary_studies": "List ALL positive/negative stains and molecular findings.",
    "diagnostic_molecular_pathology": "...",

    "differential_diagnosis": "...",
    "essential_and_desirable_diagnostic_criteria": "...",

    "related_figures": [
        {{
            "id": "Slide_ID",
            "src": "gs://...",
            "gcs_path": "gs://...",
            "diagnosis": "Disease Name",
            "legend": "Specific description including stains/features shown."
        }}
    ]
  }}
]

REFERENCE TAGS:
{valid_tags_list}

LECTURE CONTENT:
{transcript_data}
"""

# ------------------------------------------------------------------------------
# 3. AI PROCESSING (SINGLE SHOT)
# ------------------------------------------------------------------------------
async def process_full_lecture(session, slides, valid_tags_text, valid_tags_set):
    # 1. BUILD THE MONOLITH
    # Construct one massive string containing the whole lecture.
    formatted_input = ""
    for s in slides:
        formatted_input += f"\n--- SLIDE {s['id']} (Time: {s['timestamp_start']:.0f}s) ---\n"
        formatted_input += f"IMAGE_PATH: {s['gcs_path']}\n"
        formatted_input += f"VISUAL_CONTEXT: {s.get('visual_desc', '')}\n"
        formatted_input += f"TEXT_SEEN_ON_SLIDE: {s.get('key_points', [])}\n" # Vital for "CD45"
        formatted_input += f"TRANSCRIPT: {s['raw_transcript']}\n"

    print(f"üì¶ Payload Size: {len(formatted_input)} characters. Sending to Gemini Pro...")

    url = f"https://generativelanguage.googleapis.com/v1beta/models/{MODEL_NAME}:generateContent?key={GEMINI_API_KEY}"
    payload = {"contents": [{"parts": [{"text": construct_monolith_prompt(formatted_input, valid_tags_text)}]}]}

    # Retry logic for network blips
    for attempt in range(MAX_RETRIES):
        try:
            # 5-minute timeout for deep thinking
            async with session.post(url, json=payload, timeout=300) as response:
                if response.status == 200:
                    data = await response.json()
                    raw_txt = data['candidates'][0]['content']['parts'][0]['text']

                    # Clean markdown code blocks
                    raw_txt = raw_txt.replace("```json", "").replace("```", "")

                    match = re.search(r'\[.*\]', raw_txt, re.DOTALL)
                    if match:
                        entities = json.loads(match.group(0))

                        print(f"üß† AI identified {len(entities)} entities.")
                        for ent in entities:
                            # Tag Validation
                            ent['tags'] = [validate_tag(t, valid_tags_set) for t in ent.get('tags', [])]

                            # Null Enforcing
                            required_keys = [
                                "microscopic", "ancillary_studies", "differential_diagnosis",
                                "diagnostic_molecular_pathology", "staging", "cytology"
                            ]
                            for key in required_keys:
                                if key not in ent: ent[key] = None
                            ent['html_gcs_path'] = None

                        return entities
                    else:
                        print("‚ùå Failed to parse JSON from AI response.")
                        return []
                elif response.status == 429:
                    print(f"  ‚è≥ Rate Limit... Waiting 20s")
                    await asyncio.sleep(20)
                else:
                    print(f"‚ùå API Error {response.status}")
                    await asyncio.sleep(5)
        except Exception as e:
            print(f"‚ùå Exception: {e}")
            await asyncio.sleep(5)

    return []

# ------------------------------------------------------------------------------
# 4. MAIN WORKFLOW
# ------------------------------------------------------------------------------
async def main_lecture_monolith():
    # 1. Tags
    tag_files = [b.name for b in bucket.list_blobs(prefix="Tags/") if b.name.endswith('.txt')]
    if not tag_files: print("‚ùå No tags found."); return

    print("\n--- SELECT TAG LIST ---")
    for i, f in enumerate(tag_files): print(f"[{i+1}] {f.split('/')[-1]}")
    t_idx = int(input("Choice: ")) - 1
    tags_text = gcs_read_text(tag_files[t_idx])
    tags_set = set(l.strip() for l in tags_text.splitlines() if l.strip())

    # 2. Lecture Raw Data
    raw_files = [b.name for b in bucket.list_blobs(prefix=PATHS['gcs_content_lectures']) if "_RAW.json" in b.name]
    if not raw_files: print("‚ùå No RAW files. Run Block 1 first."); return

    print("\n--- SELECT LECTURE ---")
    for i, f in enumerate(raw_files): print(f"[{i+1}] {f.split('/')[-1]}")
    c_idx = int(input("Choice: ")) - 1

    raw_path = raw_files[c_idx]
    lecture_name = raw_path.split('/')[-1].replace("_RAW.json", "")
    slides_data = gcs_load_json(raw_path)

    print(f"\nüöÄ Processing: {lecture_name}")
    print(f"   Mode: THE MONOLITH (Full Context Analysis)")

    async with aiohttp.ClientSession() as session:
        master_kb = await process_full_lecture(session, slides_data, tags_text, tags_set)

    if master_kb:
        final_path = f"{PATHS['gcs_content_lectures']}/{lecture_name}_MASTER.json"
        gcs_upload_json(master_kb, final_path)
        print(f"\n‚úÖ MASTER SAVED: gs://{GCS_BUCKET_NAME}/{final_path}")
        print(f"üìä Extracted {len(master_kb)} High-Quality Entities")
    else:
        print("‚ùå Architecture failed.")

await main_lecture_monolith()