<a href="https://colab.research.google.com/github/MSadimba/milvusdb-test/blob/main/ingestion_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install --upgrade pip

# Python 3.12â€“compatible packages
!pip install pymilvus==2.4.4 pypdf arxiv tqdm python-dotenv

# LangChain + HuggingFace embeddings (safe)
!pip install langchain langchain-community langchain-huggingface

# PyTorch CPU (Python 3.12 compatible)
!pip install torch --index-url https://download.pytorch.org/whl/cpu

# Transformers (safe for Python 3.12)
!pip install transformers


Looking in indexes: https://download.pytorch.org/whl/cpu


In [None]:
try:
    from langchain_huggingface import HuggingFaceEmbeddings
    from transformers import pipeline
    from pypdf import PdfReader
    import arxiv
    from pymilvus import connections
    print("Environment OK")
except Exception as e:
    print("ERROR:", e)


Environment OK


In [None]:
!pip uninstall -y tensorflow tensorflow-cpu tensorflow-intel tensorflow-macos keras keras-nightly keras-preprocessing


[0m

In [None]:
!pip install --upgrade transformers --no-deps




In [None]:
!pip install torch --index-url https://download.pytorch.org/whl/cpu


Looking in indexes: https://download.pytorch.org/whl/cpu


In [None]:
try:
    from transformers import pipeline
    from langchain_huggingface import HuggingFaceEmbeddings
    from pypdf import PdfReader
    import arxiv
    print("Environment OK")
except Exception as e:
    print("ERROR:", e)


Environment OK


In [None]:
import os, io, re, uuid, logging, textwrap
from typing import List, Dict, Tuple
from tqdm import tqdm

import requests
import arxiv
from pypdf import PdfReader

from transformers import pipeline
from langchain_huggingface import HuggingFaceEmbeddings

from pymilvus import (
    connections, FieldSchema, CollectionSchema, DataType, Collection, utility
)

# ====== CONFIG ======
# IMPORTANT: FILL THESE WITH YOUR REAL MILVUS DETAILS
# You can find these in your Zilliz Cloud console or Milvus standalone setup.
MILVUS_URI   = "tx"      # e.g. "https://xxx.api.gcp-us-west1.zillizcloud.com"
MILVUS_TOKEN = "ty"    # Your API key or token

RAI_COLLECTION    = "rai_docs_v1"
VALUES_COLLECTION = "values_hq"

EMB_MODEL_NAME    = "sentence-transformers/all-MiniLM-L6-v2"  # 384-dim

# Hybrid classifier thresholds
ZS_ACCEPT_THRESHOLD = 0.60   # zero-shot minimum score
REQ_POS_KW = 1               # at least N positive keywords
MAX_NEG_KW = 2               # at most N negative keywords

# Text chunking
MAX_TEXT_CHARS   = 4000
CHUNK_SIZE_CHARS = 1800
CHUNK_OVERLAP    = 200

logging.basicConfig(level=logging.INFO)
print("Config loaded.")

Config loaded.


In [None]:
import torch

from pymilvus import connections, utility
from langchain_huggingface import HuggingFaceEmbeddings

# 1. Connect to Zilliz / Milvus
connections.connect(
    alias="default",
    uri=MILVUS_URI,
    token=MILVUS_TOKEN,
)

print("Connected to Milvus. Server version:", utility.get_server_version())

# 2. Initialize embedding model
emb_model = HuggingFaceEmbeddings(model_name=EMB_MODEL_NAME)

# 3. Infer embedding dimension dynamically
test_vec = emb_model.embed_query("test sentence about responsible AI")
EMB_DIM = len(test_vec)

print("Embedding dimension:", EMB_DIM)


Connected to Milvus. Server version: Zilliz Cloud Vector Database(Compatible with Milvus 2.6)


ValueError: Your currently installed version of Keras is Keras 3, but this is not yet supported in Transformers. Please install the backwards-compatible tf-keras package with `pip install tf-keras`.

In [None]:
!pip install tf-keras

# 3.1 Embeddings model
embeddings = HuggingFaceEmbeddings(model_name=EMB_MODEL_NAME)
print("Embeddings model loaded.")

def embed_texts(texts: List[str]) -> List[List[float]]:
    # LangChain wrapper returns a list of list[float]
    return embeddings.embed_documents(texts)

# 3.2 Zero-shot classifier (force PyTorch backend)
zs_classifier = pipeline(
    "zero-shot-classification",
    model="facebook/bart-large-mnli",
    framework="pt"
)
print("Zero-shot classifier loaded.")

# 3.3 Keywords
POS_KEYWORDS = {
    "responsible ai","ai safety","ai governance","ai ethics","trustworthy ai",
    "fairness","bias","discrimination","transparency","explainability","accountability",
    "privacy","gdpr","oecd","nist","eu ai act","iso","robustness","reliability",
    "risk management","harm","incident","audit","human oversight","evaluation","benchmark",
    "policy","standard","compliance","governance","safety","safeguard"
}   # presence increases the likelihood of the paper being added to the DB

NEG_KEYWORDS = {
    "resnet","vgg","yolo","gan","segmentation","transformer architecture",
    "image classification","object detection","sota accuracy","bleu score","perplexity",
    "fpga","asic","throughput","compression ratio","quantization aware training",
    "wavelet","svm kernel","k-means clustering","neural architecture search",
    "convolutional","backpropagation tricks"
}   # presence decreases the likelihood of the paper being added to the DB

def keyword_stats(text: str) -> Tuple[int, int]:
    '''
    Counts the number of pos / neg keywords in a document
    '''
    t = text.lower()
    pos = sum(1 for k in POS_KEYWORDS if k in t)
    neg = sum(1 for k in NEG_KEYWORDS if k in t)
    return pos, neg

ZS_LABELS = [
    "responsible AI", "AI safety", "AI governance", "AI ethics",
    "fairness", "transparency", "accountability", "privacy",
    "robustness", "reliability", "risk management", "safety evaluation"
]   # will be stored as metadata in the DB; classes / types of papers

def zero_shot_score(title: str, abstract: str) -> float:
    '''

    '''
    txt = (title or "") + "\n" + (abstract or "")
    txt = txt[:2000]  # avoid very long
    res = zs_classifier(txt, candidate_labels=ZS_LABELS, multi_label=True)
    return float(max(res["scores"])) if "scores" in res else 0.0

def is_relevant(title: str, abstract: str) -> Tuple[bool, Dict]:
    zs_score = zero_shot_score(title, abstract)
    pos, neg = keyword_stats((title or "") + " " + (abstract or ""))
    decision = (zs_score >= ZS_ACCEPT_THRESHOLD) and (pos >= REQ_POS_KW) and (neg <= MAX_NEG_KW)
    return decision, {"zs": zs_score, "pos": pos, "neg": neg}


In [None]:
ARXIV_QUERIES = [
    'ti:"responsible AI" OR "responsible artificial intelligence"',
    '"AI governance" OR "AI safety" OR "AI ethics" OR "trustworthy AI"',
    '"AI risk management" OR "AI standards" OR "harm mitigation" OR "algorithmic fairness"'
]

def search_arxiv(max_results: int = 150):
    # Add delay_seconds to arxiv.Client to prevent rate limiting
    client = arxiv.Client(delay_seconds=20)
    found = {}
    for q in ARXIV_QUERIES:
        search = arxiv.Search(query=q, max_results=max_results, sort_by=arxiv.SortCriterion.Relevance)
        for r in client.results(search):
            key = r.entry_id
            if key not in found:
                found[key] = r
    return list(found.values())

def fetch_pdf_text(pdf_url: str) -> str:
    try:
        resp = requests.get(pdf_url, timeout=60)
        resp.raise_for_status()
        reader = PdfReader(io.BytesIO(resp.content))
        pages = [pg.extract_text() or "" for pg in reader.pages]
        return "\n".join(pages)
    except Exception as e:
        logging.warning(f"PDF fetch/parse failed: {e}")
        return ""

def extract_text_for_record(rec) -> Tuple[str, str, int, str, str]:
    title = rec.title or ""
    abstract = rec.summary or ""
    year = rec.published.year if rec.published else -1
    url = rec.entry_id

    pdf_url = getattr(rec, "pdf_url", None)
    if not pdf_url and "/abs/" in rec.entry_id:
        pdf_url = rec.entry_id.replace("/abs/", "/pdf/") + ".pdf"

    full_text = fetch_pdf_text(pdf_url) if pdf_url else ""
    return title, abstract, year, full_text, url

def chunk_text(text: str) -> List[str]:
    text = re.sub(r"\s+", " ", text).strip()
    if not text:
        return []
    chunks = []
    i = 0
    while i < len(text):
        chunk = text[i:i+CHUNK_SIZE_CHARS]
        chunks.append(chunk[:MAX_TEXT_CHARS])
        i += max(CHUNK_SIZE_CHARS - CHUNK_OVERLAP, 1)
    return chunks


In [None]:
import time
from typing import List
from pymilvus import Collection
import logging
import uuid
from tqdm import tqdm
from pymilvus import connections

RAI_COLLECTION = "rai_docs_v1"

def upsert_rai_chunks(
    col: Collection,
    doc_id: str,
    title: str,
    year: int,
    url: str,
    chunks: List[str],
    embeddings: List[List[float]]
):
    n = len(chunks)
    if n == 0:
        return
    data = [
        [doc_id] * n,
        [title] * n,
        [int(year)] * n,
        ["arxiv"] * n,
        [url] * n,
        ["general"] * n,
        ["unspecified"] * n,
        ["unspecified"] * n,
        list(range(n)),
        chunks,
        embeddings
    ]
    col.insert(data)
    col.flush()

def ingest_rai_from_arxiv(limit: int = 150):
    # Ensure Milvus connection is active before proceeding
    if not connections.has_connection("default"):
        logging.info("Milvus connection not found, re-establishing...")
        connections.connect(alias="default", uri=MILVUS_URI, token=MILVUS_TOKEN)
        logging.info("Milvus connection re-established.")

    col = Collection(RAI_COLLECTION)
    col.load()
    records = search_arxiv(max_results=limit)
    kept, skipped = 0, 0

    for rec in tqdm(records, desc="Processing arXiv"):
        entry_id = rec.entry_id
        title, abstract, year, full_text, url = extract_text_for_record(rec)

        ok, stats = is_relevant(title, abstract)
        if not ok:
            skipped += 1
            continue

        base_text = full_text if len(full_text) > 800 else (abstract or title)
        chunks = chunk_text(base_text)
        if not chunks:
            skipped += 1
            continue

        vecs = embed_texts(chunks)
        doc_id = str(uuid.uuid5(uuid.NAMESPACE_URL, entry_id))
        upsert_rai_chunks(col, doc_id, title, year, url, chunks, vecs)
        kept += 1

    print(f"Ingest complete. Kept {kept} high-quality RAI/AI-safety papers; skipped {skipped} as irrelevant.")

MAX_ATTEMPTS = 3
for attempt in range(MAX_ATTEMPTS):
    try:
        ingest_rai_from_arxiv(limit=30)
        print(f"Ingestion successful after {attempt+1} attempts.")
        break
    except Exception as e:
        if "HTTP 429" in str(e) and attempt < MAX_ATTEMPTS - 1:
            logging.warning(f"Rate limit hit (attempt {attempt+1}/{MAX_ATTEMPTS}). Retrying in 60 seconds...")
            time.sleep(60)
        else:
            logging.error(f"Ingestion failed after {attempt+1} attempts: {e}")
            raise

In [None]:
CURATED_VALUES = [
    {
        "value_name": "Fairness",
        "description": "Avoid unjust bias and discrimination; design, measure, and mitigate disparate impact across groups when using AI models."
    },
    {
        "value_name": "Reliability",
        "description": "Ensure consistent, stable, and robust AI behavior under expected conditions; implement monitoring, stress tests, and safe fallbacks."
    },
    {
        "value_name": "Transparency",
        "description": "Make system capabilities and limitations visible to users and stakeholders; document data sources, model behavior, and decision boundaries."
    },
    {
        "value_name": "Explainability",
        "description": "Provide meaningful, understandable explanations for predictions and decisions appropriate to the domain and user needs."
    },
    {
        "value_name": "Accountability",
        "description": "Assign responsibility for AI outcomes; maintain audit trails; enable oversight and redress when harm or errors occur."
    },
    {
        "value_name": "Privacy",
        "description": "Protect personal data in AI workflows using minimization, security controls, and lawful processing; reduce risk of data leakage."
    },
    {
        "value_name": "Security",
        "description": "Safeguard AI systems and underlying data from attacks, tampering, and misuse through secure engineering practices."
    },
    {
        "value_name": "Human Oversight",
        "description": "Design meaningful human control and escalation paths for high-risk decisions; ensure humans can intervene and override AI outputs."
    },
    {
        "value_name": "Safety",
        "description": "Prevent harmful, dangerous, or destabilizing AI behaviors; conduct safety evaluations, red-teaming, and implement guardrails."
    }
]

def ingest_values():
    col = Collection(VALUES_COLLECTION)
    col.load()
    names = [v["value_name"] for v in CURATED_VALUES]
    descs = [v["description"] for v in CURATED_VALUES]
    text_for_emb = [f"{n}: {d}" for n, d in zip(names, descs)]
    vecs = embed_texts(text_for_emb)
    data = [
        [0] * len(names), # Placeholder for 'id' field (auto_id=True)
        names,
        descs,
        vecs
    ]
    col.insert(data)
    col.flush()
    print(f"Ingested {len(CURATED_VALUES)} curated values into {VALUES_COLLECTION}.")

ingest_values()


In [None]:
import textwrap # Import textwrap for shortening text

def search_rai(query: str, k: int = 5):
    col = Collection(RAI_COLLECTION)
    col.load()
    # Use the globally defined embed_texts function from cell 8HfDctMiuHpD
    q_vec = embed_texts([query])[0]
    res = col.search(
        data=[q_vec],
        anns_field="embedding",
        param={"metric_type": "IP", "params": {"ef": 128}},
        limit=k,
        output_fields=["title", "year", "url", "text", "chunk_index"]
    )

    hits = res[0]
    for hit in hits:
        # Construct dictionary from attributes of hit.entity
        ent = {
            'title': hit.entity.title if hasattr(hit.entity, 'title') else 'N/A',
            'year': hit.entity.year if hasattr(hit.entity, 'year') else -1,
            'url': hit.entity.url if hasattr(hit.entity, 'url') else 'N/A',
            'text': hit.entity.text if hasattr(hit.entity, 'text') else 'N/A',
            'chunk_index': hit.entity.chunk_index if hasattr(hit.entity, 'chunk_index') else -1
        }
        print(f"[score={hit.distance:.3f}] {ent.get('title')} ({ent.get('year')})")
        print(f"  url: {ent.get('url')}")
        print("  snippet:", textwrap.shorten(ent.get("text", ""), width=160))
        print()
    return hits

def search_values(query: str, k: int = 5):
    col = Collection(VALUES_COLLECTION)
    col.load()
    # Use the globally defined embed_texts function from cell 8HfDctMiuHpD
    q_vec = embed_texts([query])[0]
    res = col.search(
        data=[q_vec],
        anns_field="embedding",
        param={"metric_type": "IP", "params": {"ef": 128}},
        limit=k,
        output_fields=["value_name", "description"]
    )
    for hit in res[0]:
        # Construct dictionary from attributes of hit.entity
        ent = {
            'value_name': hit.entity.value_name if hasattr(hit.entity, 'value_name') else 'N/A',
            'description': hit.entity.description if hasattr(hit.entity, 'description') else 'N/A'
        }
        print(f"[score={hit.distance:.3f}] {ent.get('value_name')}")
        print("  ", textwrap.shorten(ent.get("description", ""), width=160))
        print()
    return res[0]


In [None]:
VALUE_REQUIREMENTS_MAP = {
    "Fairness": [
        "Run demographic parity and disparate impact analysis.",
        "Audit training data for representational bias.",
        "Implement mitigation strategies for identified bias.",
        "Document fairness risks and remediation steps.",
        "Provide explainability for groups affected by decisions."
    ],
    "Reliability": [
        "Conduct stress tests under expected and unexpected loads.",
        "Implement monitoring of system uptime and error rates.",
        "Use fallback responses when the model is uncertain.",
        "Design robust data validation and anomaly detection.",
        "Track system performance drift over time."
    ],
    "Transparency": [
        "Provide model cards and data sheets.",
        "Expose system capabilities and limitations to users.",
        "Document training data sources and assumptions.",
        "Maintain clear logs of decisions and model behavior."
    ],
    "Privacy": [
        "Minimize personal data collection.",
        "Apply anonymization or differential privacy where possible.",
        "Ensure encrypted data storage and transfer.",
        "Provide users access, deletion, and data rights mechanisms."
    ],
    "Security": [
        "Implement adversarial robustness testing.",
        "Protect model endpoints from injection attacks.",
        "Secure training pipeline and data sources.",
        "Monitor for misuse and anomalous access."
    ],
    "Human Oversight": [
        "Maintain a human-in-the-loop for high-impact decisions.",
        "Allow escalation to a human supervisor.",
        "Enable override mechanisms for incorrect predictions.",
        "Define clear human accountability during deployment."
    ],
    "Safety": [
        "Conduct red-teaming and safety evaluations.",
        "Identify potential harms and unintended consequences.",
        "Implement guardrails and response filtering.",
        "Monitor for emergent harmful behaviors.",
        "Implement shutdown or safe-mode fallback mechanisms."
    ],
    "Accountability": [
        "Assign responsibility for AI outputs.",
        "Maintain audit trails for important decisions.",
        "Define redress paths for user grievances.",
        "Document model lineage and versioning for traceability."
    ],
    "Inclusiveness & Accessibility": [
        "Ensure system usability for diverse user groups, including persons with disabilities.",
        "Test system outputs for linguistic, cultural, and contextual inclusion.",
        "Provide alternative interaction formats for users with access challenges.",
        "Avoid exclusionary data assumptions that reduce model applicability."
    ],
    "Environmental Sustainability": [
        "Document energy consumption during model training and deployment.",
        "Prefer efficient models when performance trade-offs are acceptable.",
        "Monitor carbon footprint of large-scale inference workloads.",
        "Use infrastructure providers with renewable-energy commitments."
    ],
    "Explainability": [
        "Provide user-friendly explanation interfaces.",
        "Ensure explanations match the technical level of the audience.",
        "Trace predictions to influential features or evidence.",
        "Evaluate explanation consistency across similar cases."
    ],
    "Integrity & Misuse Prevention": [
        "Implement safeguards against manipulation and disinformation.",
        "Restrict model capabilities that enable fraud, impersonation, or deception.",
        "Monitor for misuse patterns and unauthorized system behavior.",
        "Apply watermarking or provenance signals where applicable."
    ],
    "Well-Being & Social Benefit": [
        "Assess the societal impact of the system's deployment.",
        "Ensure outputs do not cause psychological, economic, or social harm.",
        "Embed constraints that prioritize user welfare in decision-making.",
        "Evaluate long-term externalities of system behavior."
    ]
}

In [None]:
def get_requirements_for_values(selected_values):
    results = {}
    for v in selected_values:
        if v in VALUE_REQUIREMENTS_MAP:
            results[v] = VALUE_REQUIREMENTS_MAP[v]
    return results


In [None]:
USE_CASE_KEYWORDS = {
    "finance": ["loan", "credit", "bank", "risk score", "approval"],
    "health": ["diagnosis", "medical", "triage", "therapy", "symptom"],
    "hiring": ["recruitment", "cv", "resume", "job", "candidate"],
    "education": ["exam", "grading", "assignment", "student"],
    "surveillance": ["face recognition", "biometric", "tracking"],
    "legal": ["court", "legal advice", "evidence"],
    "general": []
}

EU_HIGHRISK_MAP = {
    "finance": True,
    "health": True,
    "hiring": True,
    "surveillance": True,
    "education": True,
    "legal": True,
    "general": False
}

def classify_use_case(description: str):
    text = description.lower()
    for domain, kws in USE_CASE_KEYWORDS.items():
        for kw in kws:
            if kw in text:
                return domain, EU_HIGHRISK_MAP[domain]
    return "general", False


In [None]:
def merge_profile(values, use_case_description):
    # 1. Classification
    domain, highrisk = classify_use_case(use_case_description)

    # 2. Requirements from selected values
    value_reqs = get_requirements_for_values(values)

    # 3. Retrieve insights from RAI DB
    # search_rai prints results, so we capture nothing here
    print("\n--- Relevant RAI Insights (DB Search) ---")
    search_rai(use_case_description, k=5)

    # 4. Return structured profile
    return {
        "domain": domain,
        "is_high_risk": highrisk,
        "value_requirements": value_reqs,
        "recommended_guardrails": [
            "Implement human oversight for critical decisions.",
            "Perform safety monitoring & drift detection.",
            "Document decisions and preserve audit trails.",
            "Apply privacy + security controls consistently."
        ],
        "next_steps": [
            "Complete risk assessment documentation.",
            "Run fairness + robustness + privacy evaluations.",
            "Prepare guardrails for deployment.",
            "Map values to operational controls."
        ]
    }


In [None]:
def generate_report(profile):
    print("\n===== RESPONSIBLE AI REPORT =====\n")

    print(f"Domain: {profile['domain']}")
    print(f"High-Risk (EU AI Act): {profile['is_high_risk']}")

    print("\n--- Value Requirements ---")
    for v, reqs in profile["value_requirements"].items():
        print(f"\n{v}:")
        for r in reqs:
            print(f"  - {r}")

    print("\n--- Recommended Guardrails ---")
    for g in profile["recommended_guardrails"]:
        print(f" - {g}")

    print("\n--- Next Steps ---")
    for s in profile["next_steps"]:
        print(f" - {s}")

    print("\n(Additional insights shown above in RAI DB search output.)")


In [None]:
# The project uses HuggingFaceEmbeddings via embed_texts, making SentenceTransformer and embed_text redundant.
# This cell is cleared to avoid redundancy.


In [None]:
# This search_rai function is now defined in cell Beez1OmHFlab and uses embed_texts.
# This cell is cleared to avoid redundancy.


In [None]:
def merge_profile(values, use_case_description):
    # 1. Classification
    domain, highrisk = classify_use_case(use_case_description)

    # 2. Requirements from selected values
    value_reqs = get_requirements_for_values(values)

    # 3. Retrieve insights from RAI DB
    print("\n--- Relevant RAI Insights (DB Search) ---")
    search_rai(use_case_description, k=5)

    # 4. Return structured profile
    return {
        "domain": domain,
        "is_high_risk": highrisk,
        "value_requirements": value_reqs,
        "recommended_guardrails": [
            "Implement human oversight for critical decisions.",
            "Perform safety monitoring & drift detection.",
            "Document decisions and preserve audit trails.",
            "Apply privacy + security controls consistently."
        ],
        "next_steps": [
            "Complete risk assessment documentation.",
            "Run fairness + robustness + privacy evaluations.",
            "Prepare guardrails for deployment.",
            "Map values to operational controls."
        ]
    }


In [None]:
profile = merge_profile(
    values=["Fairness", "Reliability", "Privacy"],
    use_case_description="We are building a loan approval chatbot for consumer microfinance."
)

In [None]:
generate_report(profile)

In [None]:
profile = merge_profile(
    values=["Fairness"],
    use_case_description="We are building AI for screening job applicants."
)
generate_report(profile)

In [None]:
profile = merge_profile(
    values=["Safety", "Human Oversight"],
    use_case_description="We are creating an AI system to assist in medical diagnosis."
)
generate_report(profile)

In [None]:
profile = merge_profile(
    values=["Well-Being"],
    use_case_description="We are creating an AI system for ensuring mental health wellness."
)
generate_report(profile)