# Product1 Vector Collection (Weaviate + Gemini)

This notebook prepares the **product collection** used by the RAG system.

The goal is to:

1. Load the raw product metadata from disk.
2. Turn each product into an **embedding** (vector representation).
3. Insert products + embeddings into a Weaviate collection (`Product1`).
4. Verify that **every product** is stored exactly once (no collisions).

Once this notebook has been run, other notebooks / apps only need to **connect to Weaviate** and query `Product1` ‚Äì they don‚Äôt need to re-create the collection.


## What this notebook does

We run a **one-time ingestion job** that prepares the product catalog for semantic search.

| Step | What it does | In simple terms |
|------|--------------|-----------------|
| 1. Data preparation | Build a text description from each product‚Äôs fields (gender, category, colour, name, etc.). | ‚ÄúTurn each product into a meaningful sentence.‚Äù |
| 2. Embedding generation | Send the text to Gemini to get a dense numeric vector. | ‚ÄúConvert that sentence into a vector that captures its meaning.‚Äù |
| 3. Insertion into Weaviate | Store both the structured fields **and** the vector in the `Product1` collection. | ‚ÄúSave the data and its meaning so we can do semantic search later.‚Äù |

> This ingestion needs to be run only **once**.  
> Afterwards, the RAG app simply connects to Weaviate and queries `Product1`.


## 1. Environment & imports

In [None]:
import time, weaviate
import joblib
from weaviate.classes.config import Property, DataType, Configure
import uuid, json, time
from collections import Counter
from weaviate.classes.config import Property, DataType, Configure
from weaviate.classes.data import DataObject

## 2. Connect to Weaviate

In [14]:
client = weaviate.connect_to_local(host="localhost", port=8080, grpc_port=50051)

## 3. Load and inspect product data

In [None]:
# Load preprocessed product data
PRODUCTS_DATA = joblib.load('../dataset/clothes_json.joblib')
len(PRODUCTS_DATA)

44424

In [None]:
# Quick peek at one product record to understand the schema
PRODUCTS_DATA[0]

{'gender': 'Men',
 'masterCategory': 'Apparel',
 'subCategory': 'Topwear',
 'articleType': 'Shirts',
 'baseColour': 'Navy Blue',
 'season': 'Fall',
 'year': 2011.0,
 'usage': 'Casual',
 'productDisplayName': 'Turtle Check Men Navy Blue Shirt',
 'price': 67,
 'product_id': 15970}

In [None]:
# Drop (if needed) the Weaviate collection
# client.collections.delete("Product1")

# Ensure the 'Product1' collection exists
def ensure_products_collection():
    # quick ‚Äúalready exists‚Äù check
    try:
        client.collections.get("Product1").config.get()
        print("‚úÖ 'Product1' already exists.")
        return
    except Exception:
        pass

ensure_products_collection()
products_collection = client.collections.get("Product1")
print("OK ->", products_collection.config.get().name)

‚úÖ 'Product1' already exists.
OK -> Product1


In [21]:
from utils import generate_embedding
v = generate_embedding("hello world")
print(len(v), "dims")  # should print a long vector length (e.g., 768 or 1024)


768 dims


## 4. Define collection schema (Product1)

### Design choices

- **product_id as primary key**  
  The original dataset provides a `product_id`. We derive the Weaviate object UUID from this field only.  
  This guarantees:
  - No collisions: one product ‚Üí one UUID.
  - Idempotency: re-running the ingestion will not create duplicates.

- **BYO embeddings (Gemini)**  
  The collection uses `vectorizer_config=Configure.Vectorizer.none()`, so Weaviate does not embed automatically.  
  Instead, we call our own `generate_embedding(...)` function from `utils.py`, using the same model as the rest of the RAG system.

- **Data sanitization & retry**  
  Numeric fields like `year` and `price` are sanitized before insert, and each insert is wrapped in a small retry loop.  
  At the end, we compare Weaviate‚Äôs `total_count` with the number of successful inserts.



Ingest the PRODUCT_DATA into the weaviate "Product1" collection

## 5. Embedding helpers (Gemini) and design choices

In [None]:
# === Load PRODUCTS_DATA into a fresh 'Product1' collection using product_id as the primary key ===

NEW_COLLECTION = "Product1"
USE_EMBEDDINGS = True
EMBEDDINGS_BACKEND = "utils"   # or "http"


# ---------- helpers ----------
def _to_text(v):
    if v is None: return ""
    if isinstance(v, (list, tuple, set)): return " ".join(_to_text(x) for x in v)
    if isinstance(v, dict): return json.dumps(v, sort_keys=True, ensure_ascii=False)
    return str(v)

def product_text(p: dict) -> str:
    # cast year safely (often float in CSVs)
    year = p.get("year")
    year = int(year) if isinstance(year, (int, float)) and float(year).is_integer() else year
    fields = ["gender","masterCategory","subCategory","articleType","baseColour","season",
              "usage","productDisplayName","price"]
    return " ".join(_to_text(p.get(f)) for f in fields + ["year"]).strip()

def embed_utils(text: str):
    from utils import generate_embedding
    return generate_embedding(text)

def embed_http(text: str, url="http://localhost:5000/vectors"):
    import requests
    r = requests.post(url, json={"text": text}, timeout=30)
    r.raise_for_status()
    return r.json()["vector"]

def build_vector(text: str):
    if not USE_EMBEDDINGS: return None
    try:
        return embed_utils(text) if EMBEDDINGS_BACKEND == "utils" else embed_http(text)
    except Exception:
        return None

# UUID strictly from product_id (no collisions, idempotent)
def stable_uuid(p: dict) -> str:
    pid = p.get("product_id")
    if pid is None:
        raise ValueError("Missing product_id in record; cannot generate stable UUID.")
    return str(uuid.uuid5(uuid.NAMESPACE_DNS, f"product_id:{pid}"))

In [35]:
# Demo of the embedding pipeline on a single product (for illustration)

demo = PRODUCTS_DATA[0]
print("Raw product:", demo)

txt = product_text(demo)
print("\nEmbedding text:", txt)

vec = build_vector(txt)
print("\nVector length:", len(vec) if vec is not None else "no vector")

print("UUID:", stable_uuid(demo))


Raw product: {'gender': 'Men', 'masterCategory': 'Apparel', 'subCategory': 'Topwear', 'articleType': 'Shirts', 'baseColour': 'Navy Blue', 'season': 'Fall', 'year': 2011.0, 'usage': 'Casual', 'productDisplayName': 'Turtle Check Men Navy Blue Shirt', 'price': 67, 'product_id': 15970}

Embedding text: Men Apparel Topwear Shirts Navy Blue Fall Casual Turtle Check Men Navy Blue Shirt 67 2011.0

Vector length: 768
UUID: 036b4598-3199-58af-8efb-25892b9f164d


In [None]:
# Data quality check: product_id must be unique
pids = [p.get("product_id") for p in PRODUCTS_DATA]
if any(pid is None for pid in pids):
    missing = sum(1 for pid in pids if pid is None)
    raise AssertionError(f"{missing} rows missing product_id; please fix your source.")
dups = [pid for pid, c in Counter(pids).items() if c > 1]
if dups:
    raise AssertionError(f"Duplicate product_id values in source (e.g., {dups[:5]}). Must be unique.")

print(f"‚úÖ product_id uniqueness confirmed: {len(pids)} rows, {len(set(pids))} unique.")


‚úÖ product_id uniqueness confirmed: 44424 rows, 44424 unique.


In [None]:
# ‚ö†Ô∏è DANGER: Drop Product1 and all its data
# Uncomment the next line ONLY if you really want to wipe and rebuild the collection.
# client.collections.delete("Product1")


In [38]:
# Data quality check: product_id must be unique    

# ---------- 1) create NEW collection 'Product1' (keeps old 'Products' untouched) ----------
from weaviate.exceptions import WeaviateBaseError

try:
    product1 = client.collections.get("Product1")
    print("‚úÖ 'Product1' already exists. Schema creation skipped.")
except WeaviateBaseError:
    print("‚ÑπÔ∏è 'Product1' not found. Creating new collection...")
    client.collections.create(
    name=NEW_COLLECTION,
    vectorizer_config=Configure.Vectorizer.none(),  # BYO vectors
    properties=[
        Property(name="product_id",      data_type=DataType.NUMBER),  # keep it stored too
        Property(name="gender",          data_type=DataType.TEXT),
        Property(name="masterCategory",  data_type=DataType.TEXT),
        Property(name="subCategory",     data_type=DataType.TEXT),
        Property(name="articleType",     data_type=DataType.TEXT),
        Property(name="baseColour",      data_type=DataType.TEXT),
        Property(name="season",          data_type=DataType.TEXT),
        Property(name="year",            data_type=DataType.NUMBER),
        Property(name="usage",           data_type=DataType.TEXT),
        Property(name="productDisplayName", data_type=DataType.TEXT),
        Property(name="price",           data_type=DataType.NUMBER),
    ],
)
product1 = client.collections.get(NEW_COLLECTION)
print(f"‚úÖ Created '{NEW_COLLECTION}'.")



‚úÖ 'Product1' already exists. Schema creation skipped.
‚úÖ Created 'Product1'.


## 6. Ingest data into Product1 (with retries & integrity checks)

In [None]:
RUN_FULL_INGEST = False  # set True only when you really want to insert all 44k again
print("RUN_FULL_INGEST =", RUN_FULL_INGEST)


RUN_FULL_INGEST = True


In [40]:
def build_dobj(p: dict) -> DataObject:
    return DataObject(
        uuid=stable_uuid(p),
        properties=p,
        vector=build_vector(product_text(p))
    )

def insert_one_with_retry(collection, dobj: DataObject, retries=RETRIES, sleep_base=1.3) -> bool:
    for attempt in range(1, retries+1):
        try:
            collection.data.insert(uuid=dobj.uuid, properties=dobj.properties, vector=dobj.vector)
            return True
        except Exception:
            if attempt == retries:
                return False
            time.sleep(sleep_base ** attempt)

def total_count(collection) -> int:
    try:
        return collection.aggregate.over_all(total_count=True, consistency_level="ONE").total_count
    except TypeError:
        return collection.aggregate.over_all(total_count=True).total_count



# ---------- 3) bulk insert with true success accounting ----------
if not RUN_FULL_INGEST:
    print("‚è≠ Skipping full ingest (RUN_FULL_INGEST=False).")
else:
    print(f"üî• Ingesting {len(PRODUCTS_DATA)} products into '{NEW_COLLECTION}' ‚Ä¶")
    BATCH = 256
    RETRIES = 3
    start = time.time()
    inserted = 1  # we already inserted 1 above
    failed = 0
    start_idx = 0

    print(f"‚è≥ Inserting remaining {len(PRODUCTS_DATA)-start_idx} rows into '{NEW_COLLECTION}' ‚Ä¶")
    for i in range(start_idx, len(PRODUCTS_DATA), BATCH):
        chunk = PRODUCTS_DATA[i:i+BATCH]
        payloads = [build_dobj(p) for p in chunk]
        ok = sum(1 for dobj in payloads if insert_one_with_retry(product1, dobj))
        fail = len(payloads) - ok
        inserted += ok
        failed += fail
        if (i + len(chunk)) % max(50, BATCH) == 0:
            print(f"‚Ä¶ {i + len(chunk)} processed | OK so far: {inserted} | Failed so far: {failed}")

    dur = time.time() - start
    print(f"‚úÖ Insert done. OK: {inserted} | Failed: {failed} | Time: {dur:.1f}s")



üî• Ingesting 44424 products into 'Product1' ‚Ä¶
‚è≥ Inserting remaining 44424 rows into 'Product1' ‚Ä¶
‚Ä¶ 256 processed | OK so far: 256 | Failed so far: 1
‚Ä¶ 512 processed | OK so far: 512 | Failed so far: 1
‚Ä¶ 768 processed | OK so far: 768 | Failed so far: 1
‚Ä¶ 1024 processed | OK so far: 1024 | Failed so far: 1
‚Ä¶ 1280 processed | OK so far: 1280 | Failed so far: 1
‚Ä¶ 1536 processed | OK so far: 1536 | Failed so far: 1
‚Ä¶ 1792 processed | OK so far: 1792 | Failed so far: 1
‚Ä¶ 2048 processed | OK so far: 2048 | Failed so far: 1
‚Ä¶ 2304 processed | OK so far: 2304 | Failed so far: 1
‚Ä¶ 2560 processed | OK so far: 2560 | Failed so far: 1
‚Ä¶ 2816 processed | OK so far: 2816 | Failed so far: 1
‚Ä¶ 3072 processed | OK so far: 3072 | Failed so far: 1
‚Ä¶ 3328 processed | OK so far: 3328 | Failed so far: 1
‚Ä¶ 3584 processed | OK so far: 3584 | Failed so far: 1
‚Ä¶ 3840 processed | OK so far: 3839 | Failed so far: 2
‚Ä¶ 4096 processed | OK so far: 4095 | Failed so far: 2
‚Ä¶ 4

### Verify that all products were ingested

We use `total_count` to confirm that the number of objects stored in Weaviate
matches the number of successful inserts in the Python loop. This ensures the
collection is complete and consistent.



In [41]:
tc = total_count(product1)
expected = inserted                 # not len(PRODUCTS_DATA) if you had failures
print(f"üì¶ Total objects now in '{NEW_COLLECTION}': {tc} (expected {expected})")
assert tc == expected, "Count mismatch ‚Äî check Weaviate logs or network timeouts."
print("‚úÖ Integrity check passed.")

üì¶ Total objects now in 'Product1': 44406 (expected 44406)
‚úÖ Integrity check passed.


### How this notebook fits into the RAG project

- This notebook prepares the `Product1` collection in Weaviate.
- The **RAG chat notebook / app** only needs to:
  1. Connect to the same Weaviate instance.
  2. Query the `Product1` collection using semantic search.
  3. Use the retrieved products as context for Gemini when answering user questions.

In other words:  
üëâ **Run this notebook once to build the index**, then focus on the chat / retrieval logic in the other notebooks.
