In [None]:

import os
import json
from typing import List, Dict, Any

import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer


In [None]:

# Adjust this to your folder path
SOPS_FOLDER = "./sops_json"  # folder containing multiple .json SOP files
PERSIST_DIR = "./chroma_sops" # where Chroma DB will store data
COLLECTION_NAME = "sops"

# Minimal set of fields expected in each SOP JSON
REQUIRED_FIELDS = {
    "sop_id", "db_type", "error_category", "problem_statement",
    "symptoms", "resolution_summary", "auto_fix_supported",
    "requires_human_approval", "risk_level", "tags"
}

def load_sops_from_folder(folder: str) -> List[Dict[str, Any]]:
    sops = []
    for file in os.listdir(folder):
        if file.endswith(".json"):
            path = os.path.join(folder, file)
            with open(path, "r", encoding="utf-8") as f:
                data = json.load(f)
            missing = REQUIRED_FIELDS - set(data.keys())
            if missing:
                raise ValueError(f"{file} missing fields: {missing}")
            sops.append(data)
    if not sops:
        raise FileNotFoundError(f"No JSON files found in {folder}")
    print(f"Loaded {len(sops)} SOPs from {folder}")
    return sops

def sop_to_text(sop: Dict[str, Any]) -> str:
    symptoms_str = "\n".join(f"- {s}" for s in sop.get("symptoms", []))
    tags_str = ", ".join(sop.get("tags", []))
    text = (
        f"SOP ID: {sop['sop_id']}\n"
        f"DB Type: {sop['db_type']}\n"
        f"Error Category: {sop['error_category']}\n"
        f"Risk Level: {sop['risk_level']}\n"
        f"Auto Fix Supported: {sop['auto_fix_supported']}\n"
        f"Requires Human Approval: {sop['requires_human_approval']}\n\n"
        f"Problem Statement:\n{sop['problem_statement']}\n\n"
        f"Symptoms:\n{symptoms_str}\n\n"
        f"Resolution Summary:\n{sop['resolution_summary']}\n\n"
        f"Tags: {tags_str}\n"
    )
    return text.strip()

def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 150) -> List[str]:
    chunks = []
    start = 0
    n = len(text)
    while start < n:
        end = min(start + chunk_size, n)
        chunks.append(text[start:end])
        if end == n:
            break
        start = max(0, end - overlap)
    return chunks
``


In [None]:

# Fast, lightweight local embedding model
EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
embedder = SentenceTransformer(EMBEDDING_MODEL_NAME)

def embed_texts(texts: List[str]) -> List[List[float]]:
    # normalize_embeddings=True -> cosine similarity works well
    return embedder.encode(texts, show_progress_bar=False, normalize_embeddings=True).tolist()


In [None]:

def get_chroma_collection(persist_dir: str = PERSIST_DIR, collection_name: str = COLLECTION_NAME):
    client = chromadb.PersistentClient(path=persist_dir, settings=Settings(allow_reset=True))
    try:
        collection = client.get_collection(collection_name)
    except Exception:
        collection = client.create_collection(
            name=collection_name,
            metadata={"hnsw:space": "cosine"}  # cosine for normalized embeddings
        )
    return collection

collection = get_chroma_collection()
collection
``


In [None]:

def upsert_sop_chunks(collection, sop: Dict[str, Any], chunk_texts: List[str]):
    base_meta = {
        "sop_id": sop["sop_id"],
        "db_type": sop["db_type"],
        "error_category": sop["error_category"],
        "risk_level": sop["risk_level"],
        "auto_fix_supported": sop["auto_fix_supported"],
        "requires_human_approval": sop["requires_human_approval"],
        "tags": sop.get("tags", []),
    }
    ids, docs, metas = [], [], []
    for idx, ct in enumerate(chunk_texts):
        cid = f"{sop['sop_id']}::chunk::{idx}"
        ids.append(cid)
        docs.append(ct)
        meta = dict(base_meta)
        meta["chunk_index"] = idx
        metas.append(meta)

    embeddings = embed_texts(docs)
    collection.upsert(ids=ids, embeddings=embeddings, documents=docs, metadatas=metas)

# Load, format, chunk, embed, upsert
sops = load_sops_from_folder(SOPS_FOLDER)
for sop in sops:
    text = sop_to_text(sop)
    chunks = chunk_text(text, chunk_size=1000, overlap=150)
    upsert_sop_chunks(collection, sop, chunks)

print("Upsert complete.")


In [None]:

from typing import Optional

def retrieve(collection, query: str, k: int = 5, where: Optional[Dict[str, Any]] = None):
    q_emb = embed_texts([query])[0]
    results = collection.query(
        query_embeddings=[q_emb],
        n_results=k,
        where=where or {}
    )
    return results

# Example query
query = "Rubrik backup failed because SQL Server transaction log chain is broken."
results = retrieve(
    collection,
    query=query,
    k=5,
    where={"db_type": "SQL_Server"}  # try removing or changing filters
)

for doc, meta, dist in zip(results["documents"][0], results["metadatas"][0], results["distances"][0]):
    print(f"\nScore={1 - dist:.4f} | sop_id={meta['sop_id']} | chunk={meta['chunk_index']}")
    print(doc[:500], "..." if len(doc) > 500 else "")
