# 01 — Data Pipeline: OpenSearch Indexing & Embedding

This notebook stubs the data ingestion pipeline for Picosearch.

## Pipeline stages
1. **Ingest** — load raw assets from S3 / local storage
2. **Caption** — generate text descriptions (e.g. BLIP-2, GPT-4V)
3. **Embed text** — dense vectors via a text embedding model (e.g. `text-embedding-3-small`)
4. **Embed image** — CLIP vectors for visual similarity
5. **Index** — upsert into OpenSearch with both kNN fields


In [None]:
# Dependencies (install once per environment)
# !pip install opensearch-py sentence-transformers Pillow boto3 python-dotenv

In [None]:
import os
from dotenv import load_dotenv

load_dotenv("../backend/.env")

OPENSEARCH_HOST = os.getenv("OPENSEARCH_HOST", "http://localhost:9200")
OPENSEARCH_INDEX = os.getenv("OPENSEARCH_INDEX", "picosearch-assets")
print(f"Target index: {OPENSEARCH_HOST}/{OPENSEARCH_INDEX}")

In [None]:
# ── STUB: OpenSearch client ──────────────────────────────────────────────────
# from opensearchpy import OpenSearch
#
# client = OpenSearch(
#     hosts=[OPENSEARCH_HOST],
#     http_auth=(os.getenv("OPENSEARCH_USER"), os.getenv("OPENSEARCH_PASSWORD")),
#     use_ssl=OPENSEARCH_HOST.startswith("https"),
# )
# print(client.info())
print("[STUB] OpenSearch client — replace with real connection")

In [None]:
# ── STUB: Index mapping ──────────────────────────────────────────────────────
INDEX_MAPPING = {
    "settings": {"index.knn": True},
    "mappings": {
        "properties": {
            "id": {"type": "keyword"},
            "url": {"type": "keyword"},
            "description": {"type": "text"},
            "tags": {"type": "keyword"},
            "text_vector": {
                "type": "knn_vector",
                "dimension": 1536,  # text-embedding-3-small
                "method": {"name": "hnsw", "engine": "faiss"},
            },
            "clip_vector": {
                "type": "knn_vector",
                "dimension": 512,   # CLIP ViT-B/32
                "method": {"name": "hnsw", "engine": "faiss"},
            },
        }
    },
}

# client.indices.create(index=OPENSEARCH_INDEX, body=INDEX_MAPPING, ignore=400)
print("[STUB] Index mapping defined — uncomment to create")

In [None]:
# ── STUB: Ingest & embed ─────────────────────────────────────────────────────
SAMPLE_ASSETS = [
    {"id": "asset-001", "url": "s3://my-bucket/photo1.jpg", "description": "Team meeting", "tags": ["team", "office"]},
    {"id": "asset-002", "url": "s3://my-bucket/photo2.jpg", "description": "Product launch", "tags": ["product", "marketing"]},
]

def embed_text(text: str) -> list[float]:
    """STUB: replace with real embedding call."""
    return [0.0] * 1536

def embed_image(url: str) -> list[float]:
    """STUB: replace with CLIP embedding call."""
    return [0.0] * 512

docs = []
for asset in SAMPLE_ASSETS:
    doc = {
        **asset,
        "text_vector": embed_text(asset["description"]),
        "clip_vector": embed_image(asset["url"]),
    }
    docs.append(doc)

print(f"[STUB] Prepared {len(docs)} documents for indexing")

In [None]:
# ── STUB: Bulk index ─────────────────────────────────────────────────────────
# from opensearchpy.helpers import bulk
# actions = [{"_index": OPENSEARCH_INDEX, "_id": doc["id"], "_source": doc} for doc in docs]
# success, errors = bulk(client, actions)
# print(f"Indexed {success} docs, {len(errors)} errors")
print("[STUB] Bulk index — uncomment when OpenSearch is connected")