# Med-Diagnosis: Semantic Chunking + Embedding Ingestion

**Input:** `corpus_full_text.jsonl`  
Each line: `{"protocol_id": "p_...", "text": "...", ...}`

**Outputs:**
- `corpus.json` → copy to `./backend/data/corpus.json`  
- `chroma_export.zip` → extract as `./chroma_data/` in project root

On next app startup, `corpus_loader.py` reads `corpus.json` for SQL records  
and skips Chroma embedding (vectors already present).

In [None]:
!pip install -q chromadb>=0.6.3 langchain-chroma>=0.0.5 langchain-huggingface langchain-openai sentence-transformers tiktoken

In [None]:
# ── CONFIG ────────────────────────────────────────────────────────────────

# Input: raw full-text protocols (before cleaning)
RAW_JSONL = "/content/corpus_full_text_raw.jsonl"

# Cleaned output (preamble + postamble stripped) — also used for chunking
CORPUS_FULL_TEXT_JSONL = "/content/corpus_full_text.jsonl"

# Outputs
CHROMA_PATH = "/content/chroma_data"   # → ./chroma_data/ in project
CORPUS_JSON = "/content/corpus.json"   # → ./backend/data/corpus.json

# Must match CHROMA_COLLECTION_NAME in .env (default: "documents")
COLLECTION_NAME = "documents"

# ── Chunking ──
CHUNK_SIZE = 300   # tokens per chunk
CHUNK_OVERLAP = 50  # token overlap between chunks

# ── Embeddings ──
# "huggingface" : local GPU inference (must use the SAME model as TEI)
# "openai"      : OpenAI-compatible API
EMBEDDINGS_MODE  = "huggingface"
EMBEDDINGS_MODEL = "google/embeddinggemma-300m"  # must match TEI MODEL_ID
HF_TOKEN         = "your-hf-token-here"

# OpenAI (only if EMBEDDINGS_MODE="openai")
OPENAI_API_KEY   = ""
OPENAI_API_BASE  = "https://api.openai.com/v1"
OPENAI_EMB_MODEL = "text-embedding-ada-002"

In [None]:
# ── Imports ───────────────────────────────────────────────────────────────
import hashlib, json, os, re, shutil
import chromadb
import tiktoken
from typing import List, Dict
from langchain_chroma import Chroma
from langchain_core.documents import Document as LangchainDocument

os.makedirs(CHROMA_PATH, exist_ok=True)

In [None]:
# ── SemanticChunker ───────────────────────────────────────────────────────
#
# Two distinct protocol formats observed in the corpus:
#
# FORMAT A (2015 style):
#   I. ВВОДНАЯ ЧАСТЬ          ← Roman-numeral major section
#   II. МЕТОДЫ ДИАГНОСТИКИ
#   III. ОРГАНИЗАЦИОННЫЕ АСПЕКТЫ  ← administrative tail → strip
#   Items inside sections are flat-numbered: 1. 2. 3. ... (NOT section boundaries)
#
# FORMAT B (2017 style):
#   1. ВВОДНАЯ ЧАСТЬ          ← top-level decimal section (e.g. "1.")
#   1.1 Код(ы) МКБ-10         ← subsection (NOT a boundary)
#   1.9 МЕТОДЫ, ПОДХОДЫ...    ← subsection, may be ALL-CAPS
#   2.1 Диагностический алгоритм
#   3.1 ТАКТИКА ЛЕЧЕНИЯ...    ← subsection
#   6. ОРГАНИЗАЦИОННЫЕ АСПЕКТЫ  ← administrative tail → strip
#
# Strategy:
#   1. Strip boilerplate preamble ("Одобрен/Рекомендовано ... Протокол №NN")
#   2. Detect format (Roman vs decimal) from text
#   3. For Format A: split on Roman-numeral section headers + ALL-CAPS title lines
#   4. For Format B: split on top-level decimal sections ONLY (not subsections like "1.1")
#   5. Strip the administrative tail before chunking (authors, references, reviewers)
#   6. Large sections → sliding-window with overlap; small → keep atomic

import re
from typing import List, Dict
import tiktoken

# ── Regex patterns ─────────────────────────────────────────────────────────

_ROMAN = r"(?:X{0,3}(?:IX|IV|V?I{0,3}))"

# Format A: Roman numerals (I. II. III. ...) and ALL-CAPS title lines
_FORMAT_A_RE = re.compile(
    r"(?m)^(?:"
    r"#{1,3}\s+"
    rf"|{_ROMAN}\.\s+"                          # I. II. III. …
    r"|[А-ЯЁA-Z][А-ЯЁA-Z\s\-\/]{5,}$"         # ALL-CAPS Cyrillic/Latin ≥6 chars
    r")"
)

# Format B: top-level decimal ONLY — "1. " or "2. " at line start,
# but NOT "1.1" / "2.3" (subsections)
_FORMAT_B_RE = re.compile(
    r"(?m)^\d+\.\s+(?!\d)"                     # "3. " yes, "3.1 " no
)

# Preamble: boilerplate header before the protocol title
_PREAMBLE_RE = re.compile(
    r"^.*?(?=КЛИНИЧЕСКИЙ ПРОТОКОЛ|КЛИНИЧЕСКИЕ РЕКОМЕНДАЦИИ|ПРОТОКОЛ ДИАГНОСТИКИ)",
    re.DOTALL | re.IGNORECASE,
)

# Administrative tail markers — strip everything from here to end
# Both formats: Roman III + decimal 6 are typically the admin section
_TAIL_RE = re.compile(
    r"(?m)(?:"
    r"^III\.\s+ОРГАНИЗАЦИОННЫЕ"            # Format A tail
    r"|^VI?\.\s+ОРГАНИЗАЦИОННЫЕ"           # Format B tail (section 6 or 7)
    r"|^(?:6|7)\.\s+ОРГАНИЗАЦИОННЫЕ"       # decimal "6. ОРГАНИЗАЦИОННЫЕ..."
    r")",
    re.IGNORECASE,
)


class SemanticChunker:
    def __init__(self, chunk_size: int = 300, overlap: int = 50):
        self.chunk_size = chunk_size
        self.overlap = overlap
        self.encoder = tiktoken.get_encoding("cl100k_base")

    def chunk_document(self, text: str, metadata: Dict) -> List[Dict]:
        text = self._strip_preamble(text)
        text = self._strip_admin_tail(text)
        fmt = self._detect_format(text)
        sections = self._detect_sections(text, fmt)
        chunks = []
        for section in sections:
            if self._is_atomic_section(section):
                chunks.append(self._create_chunk(section, metadata))
            else:
                chunks.extend(
                    self._create_chunk(sub, metadata)
                    for sub in self._split_with_overlap(section, self.chunk_size, self.overlap)
                )
        return chunks

    # ── Pre-processing ──────────────────────────────────────────────────────

    def _strip_preamble(self, text: str) -> str:
        m = _PREAMBLE_RE.match(text)
        return text[m.end():].strip() if m and m.end() > 0 else text

    def _strip_admin_tail(self, text: str) -> str:
        m = _TAIL_RE.search(text)
        return text[: m.start()].strip() if m else text

    # ── Format detection ────────────────────────────────────────────────────

    def _detect_format(self, text: str) -> str:
        """Return 'A' if Roman-numeral sections found, else 'B' (decimal)."""
        roman_hits = re.findall(rf"(?m)^{_ROMAN}\.\s+", text)
        return "A" if len(roman_hits) >= 2 else "B"

    # ── Section detection ───────────────────────────────────────────────────

    def _detect_sections(self, text: str, fmt: str) -> List[str]:
        pattern = _FORMAT_A_RE if fmt == "A" else _FORMAT_B_RE
        matches = list(pattern.finditer(text))
        if not matches:
            return [text.strip()] if text.strip() else []

        sections = []
        preamble = text[: matches[0].start()].strip()
        if preamble:
            sections.append(preamble)

        for i, m in enumerate(matches):
            end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
            section = text[m.start() : end].strip()
            if section:
                sections.append(section)

        return sections

    # ── Chunking helpers ────────────────────────────────────────────────────

    def _is_atomic_section(self, section: str) -> bool:
        return len(self.encoder.encode(section)) <= self.chunk_size

    def _split_with_overlap(self, text: str, size: int, overlap: int) -> List[str]:
        tokens = self.encoder.encode(text)
        chunks = []
        step = max(size - overlap, 1)
        for i in range(0, len(tokens), step):
            chunk_tokens = tokens[i : i + size]
            chunks.append(self.encoder.decode(chunk_tokens))
        return chunks

    def _create_chunk(self, text: str, metadata: Dict) -> Dict:
        return {
            "text": text,
            "metadata": {
                **metadata,
                "chunk_size": len(self.encoder.encode(text)),
                "preview": text[:100] + "...",
            },
        }


chunker = SemanticChunker(chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP)
print("SemanticChunker ready")

# ── Sanity check: Format A (2015) ─────────────────────────────────────────
_SAMPLE_A = """Рекомендовано Экспертным советом РГП на ПХВ «Республиканский центр» от «20» ноября 2015 года Протокол № 16 КЛИНИЧЕСКИЙ ПРОТОКОЛ ДИАГНОСТИКИ И ЛЕЧЕНИЯ ЭНТЕРОВИРУСНАЯ ИНФЕКЦИЯ У ДЕТЕЙ I.ВВОДНАЯ ЧАСТЬ 1.Название протокола: Энтеровирусная инфекция у детей. 2.Код протокола: 3.Код (коды) по МКБ – 10: А85.0 Энтеровирусный энцефалит II. МЕТОДЫ, ПОДХОДЫ И ПРОЦЕДУРЫ ДИАГНОСТИКИ И ЛЕЧЕНИЯ 8.Определение: Энтеровирусная инфекция – заболевание... III. ОРГАНИЗАЦИОННЫЕ АСПЕКТЫ ВНЕДРЕНИЯ ПРОТОКОЛА: 16.Список разработчиков протокола..."""
_SAMPLE_B = """Одобрен Объединенной комиссией по качеству медицинских услуг от «29» июня 2017 года Протокол № 24 КЛИНИЧЕСКИЙ ПРОТОКОЛ ДИАГНОСТИКИ И ЛЕЧЕНИЯ ЭНТЕРОБИОЗУ ДЕТЕЙ 1. ВВОДНАЯ ЧАСТЬ 1.1 Код(ы) МКБ-10: МКБ-10 Код Название В80 Энтеробиоз 1.7 Определение: Энтеробиоз – гельминтоз... 2. МЕТОДЫ ДИАГНОСТИКИ 2.1 Диагностический алгоритм: схема... 3. ТАКТИКА ЛЕЧЕНИЯ 3.1 ТАКТИКА ЛЕЧЕНИЯ НА АМБУЛАТОРНОМ УРОВНЕ: Лечение энтеробиоза... 6. ОРГАНИЗАЦИОННЫЕ АСПЕКТЫ ПРОТОКОЛА: 6.1 Список разработчиков..."""

for label, sample in [("A (Roman)", _SAMPLE_A), ("B (Decimal)", _SAMPLE_B)]:
    tc = SemanticChunker(chunk_size=300, overlap=50)
    stripped = tc._strip_preamble(sample)
    stripped = tc._strip_admin_tail(stripped)
    fmt = tc._detect_format(stripped)
    sections = tc._detect_sections(stripped, fmt)
    print(f"\nFormat {label} → detected: '{fmt}'")
    print(f"After stripping tail, sections: {len(sections)}")
    for i, s in enumerate(sections):
        print(f"  [{i}] {s[:80]}...")

In [None]:
# ── Clean corpus_full_text.jsonl ──────────────────────────────────────────
# Reads RAW_JSONL, strips preamble + administrative postamble from each
# protocol's text, writes cleaned records to CORPUS_FULL_TEXT_JSONL.
#
# Uses _PREAMBLE_RE and _TAIL_RE defined in the SemanticChunker cell above.

def _clean_protocol_text(text: str) -> str:
    m = _PREAMBLE_RE.match(text)
    if m and m.end() > 0:
        text = text[m.end():].strip()
    m = _TAIL_RE.search(text)
    if m:
        text = text[: m.start()].strip()
    return text


skipped = 0
cleaned_protocols: list[dict] = []
total_removed = 0

with open(RAW_JSONL, encoding="utf-8", errors="replace") as f:
    for line in f:
        line = line.strip()
        if not line:
            continue
        try:
            obj = json.loads(line)
        except json.JSONDecodeError:
            skipped += 1
            continue

        original_text = obj.get("text", "")
        cleaned_text = _clean_protocol_text(original_text)
        total_removed += len(original_text) - len(cleaned_text)
        obj["text"] = cleaned_text
        cleaned_protocols.append(obj)

with open(CORPUS_FULL_TEXT_JSONL, "w", encoding="utf-8") as f:
    for obj in cleaned_protocols:
        f.write(json.dumps(obj, ensure_ascii=False) + "\n")

print(f"Cleaned {len(cleaned_protocols)} protocols → {CORPUS_FULL_TEXT_JSONL}")
print(f"Total chars removed: {total_removed:,} ({total_removed / max(len(cleaned_protocols),1):.0f} avg per protocol)")
if skipped:
    print(f"Skipped {skipped} malformed lines")

# Sanity: show before/after for first protocol
if cleaned_protocols:
    s = cleaned_protocols[0]
    print(f"\nprotocol_id: {s.get('protocol_id')}")
    print(f"Cleaned preview:\n{s['text'][:400]}...")

In [None]:
# ── Use cleaned protocols for chunking ────────────────────────────────────
protocols = cleaned_protocols
print(f"Loaded {len(protocols)} protocols")
print("Sample protocol_id:", protocols[0].get("protocol_id"), "| text length:", len(protocols[0].get("text", "")))

In [None]:
# ── Chunk all protocols ───────────────────────────────────────────────────
raw_chunks: list[dict] = []

for protocol in protocols:
    protocol_id = protocol.get("protocol_id", "")
    text = protocol.get("text", "")
    if not text.strip():
        continue

    # Base metadata carried into every chunk
    metadata = {
        "protocol_id": protocol_id,
        "source": "corpus",
    }

    chunks = chunker.chunk_document(text, metadata)
    raw_chunks.extend(chunks)

print(f"Total chunks produced: {len(raw_chunks)}")
print(f"Avg chunks per protocol: {len(raw_chunks) / max(len(protocols), 1):.1f}")

In [None]:
# ── Convert to LangChain format + assign chunk IDs ────────────────────────
# chunk_id formula must match corpus_loader.py: SHA256(f"{source}:{page_content}")
seen_ids: set[str] = set()
lc_chunks: list[dict] = []  # LangChain format: page_content + metadata + _id

for c in raw_chunks:
    page_content = c["text"]
    source = c["metadata"].get("source", "corpus")
    chunk_id = hashlib.sha256(f"{source}:{page_content}".encode()).hexdigest()

    if chunk_id in seen_ids:
        continue
    seen_ids.add(chunk_id)

    lc_chunks.append({
        "page_content": page_content,
        "metadata": c["metadata"],
        "_id": chunk_id,
    })

print(f"Unique chunks after dedup: {len(lc_chunks)}")

In [None]:
# ── Embeddings model ──────────────────────────────────────────────────────
if EMBEDDINGS_MODE == "huggingface":
    import torch
    from langchain_huggingface import HuggingFaceEmbeddings
    os.environ["HF_TOKEN"] = HF_TOKEN
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Device: {device}")
    embeddings = HuggingFaceEmbeddings(
        model_name=EMBEDDINGS_MODEL,
        model_kwargs={"device": device, "token": HF_TOKEN},
        encode_kwargs={"normalize_embeddings": True},
    )
elif EMBEDDINGS_MODE == "openai":
    from langchain_openai import OpenAIEmbeddings
    embeddings = OpenAIEmbeddings(
        api_key=OPENAI_API_KEY, base_url=OPENAI_API_BASE, model=OPENAI_EMB_MODEL,
    )
else:
    raise ValueError(f"Unknown EMBEDDINGS_MODE: {EMBEDDINGS_MODE}")

print(f"Embedding dim: {len(embeddings.embed_query('test'))}")

In [None]:
# ── Chroma setup ──────────────────────────────────────────────────────────
chroma_client = chromadb.PersistentClient(path=CHROMA_PATH)
vector_store  = Chroma(
    client=chroma_client,
    collection_name=COLLECTION_NAME,
    embedding_function=embeddings,
)
existing_ids = set(vector_store._collection.get()["ids"])
print(f"Collection '{COLLECTION_NAME}': {len(existing_ids)} existing vectors")

In [None]:
# ── Embed & store (skips already-present chunks) ──────────────────────────
to_embed = [
    LangchainDocument(
        page_content=c["page_content"],
        metadata=c["metadata"],
        id=c["_id"],
    )
    for c in lc_chunks if c["_id"] not in existing_ids
]
print(f"{len(to_embed)} new chunks to embed  ({len(lc_chunks) - len(to_embed)} already in Chroma)")

BATCH = 100
for i in range(0, len(to_embed), BATCH):
    batch = to_embed[i : i + BATCH]
    vector_store.add_documents(batch, ids=[d.id for d in batch])
    print(f"  {min(i + BATCH, len(to_embed))}/{len(to_embed)}")

print(f"Done. Collection total: {vector_store._collection.count()} vectors")

In [ ]:
# ── Save corpus.json (strip internal _id key) ─────────────────────────────
corpus = [{"page_content": c["page_content"], "metadata": c["metadata"]} for c in lc_chunks]
with open(CORPUS_JSON, "w", encoding="utf-8") as f:
    json.dump(corpus, f, ensure_ascii=False, indent=2)
print(f"Saved {len(corpus)} chunks → {CORPUS_JSON}")

## Deploy
```bash
cp corpus.json ./backend/data/corpus.json
unzip -o chroma_export.zip -d .
docker compose -f docker-compose.dev.cpu.yml up -d --force-recreate backend
```

In [None]:
# ── Package & download ────────────────────────────────────────────────────
shutil.make_archive("/content/chroma_export", "zip", "/content", "chroma_data")
print("Zipped → /content/chroma_export.zip")

try:
    from google.colab import files
    files.download(CORPUS_JSON)
    files.download("/content/chroma_export.zip")
except ImportError:
    print(f"Files ready:\n  {CORPUS_JSON}\n  /content/chroma_export.zip")