# StyleSync Ingestion Pipeline (Colab)

This notebook performs the heavy lifting of ingesting 44k images, generating embeddings using CLIP, and storing them in a ChromaDB vector store. The result is zipped and uploaded to S3.

In [None]:
!pip install boto3 pandas pillow open_clip_torch langchain-chroma tqdm

In [None]:
import os
import boto3
import pandas as pd
import io
from PIL import Image
import torch
import open_clip
from langchain_chroma import Chroma
from tqdm.notebook import tqdm
import concurrent.futures

# Configuration (Hardcoded for Colab)
os.environ["AWS_ACCESS_KEY_ID"] = "YOUR_AWS_ACCESS_KEY_ID"
os.environ["AWS_SECRET_ACCESS_KEY"] = "YOUR_AWS_SECRET_ACCESS_KEY"
os.environ["S3_BUCKET_NAME"] = "stylesync-mlops-data"
os.environ["HF_TOKEN"] = "YOUR_HF_TOKEN"

S3_BUCKET = os.environ["S3_BUCKET_NAME"]
CHROMA_DB_DIR = "./chroma_db"
MODEL_NAME = "ViT-B-32"
CHECKPOINT = "laion2b_s34b_b79k"
BATCH_SIZE = 32
MAX_WORKERS = 8

In [None]:
s3 = boto3.client('s3')

def load_styles_csv():
    print("Loading styles.csv from S3...")
    obj = s3.get_object(Bucket=S3_BUCKET, Key="style-sync/raw/fashion/styles.csv")
    df = pd.read_csv(obj['Body'], on_bad_lines='skip')
    print(f"Loaded {len(df)} rows.")
    return df

def process_metadata(df):
    print("Processing metadata...")
    df['rich_caption'] = df.apply(
        lambda x: f"{x['productDisplayName']} {x['usage']} {x['season']} {x['baseColour']}", 
        axis=1
    )
    return df

df = load_styles_csv()
df = process_metadata(df)

In [None]:
print(f"Loading OpenCLIP model: {MODEL_NAME}...")
model, _, preprocess = open_clip.create_model_and_transforms(MODEL_NAME, pretrained=CHECKPOINT)
model.eval()
tokenizer = open_clip.get_tokenizer(MODEL_NAME)

if torch.cuda.is_available():
    model = model.cuda()
    print("Using GPU")
else:
    print("Using CPU")

In [None]:
class OpenCLIPEmbedder:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
        self.device = next(model.parameters()).device
        
    def embed_documents(self, texts):
        with torch.no_grad():
            text = self.tokenizer(texts).to(self.device)
            text_features = self.model.encode_text(text)
            text_features /= text_features.norm(dim=-1, keepdim=True)
            return text_features.cpu().tolist()
    
    def embed_query(self, text):
        return self.embed_documents([text])[0]

embedding_function = OpenCLIPEmbedder(model, tokenizer)

vectorstore = Chroma(
    collection_name="style_sync",
    embedding_function=embedding_function,
    persist_directory=CHROMA_DB_DIR
)

In [None]:
def process_image_s3(row):
    image_id = row['id']
    s3_key = f"style-sync/raw/fashion/images/{image_id}.jpg"
    try:
        response = s3.get_object(Bucket=S3_BUCKET, Key=s3_key)
        image_bytes = response['Body'].read()
        image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
        return {"id": image_id, "image": image, "row": row, "status": "success"}
    except Exception as e:
        return {"id": image_id, "status": "failed", "error": str(e)}

print("Starting ingestion...")
device = next(model.parameters()).device

# Process in batches
for i in tqdm(range(0, len(df), BATCH_SIZE), desc="Ingesting Batches"):
    batch_df = df.iloc[i:i+BATCH_SIZE]
    
    results = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = [executor.submit(process_image_s3, row) for _, row in batch_df.iterrows()]
        for future in concurrent.futures.as_completed(futures):
            results.append(future.result())
    
    ids = []
    metadatas = []
    embeddings = []
    texts = []
    
    for res in results:
        if res['status'] == 'success':
            try:
                image = res['image']
                image_input = preprocess(image).unsqueeze(0).to(device)
                
                with torch.no_grad():
                    image_features = model.encode_image(image_input)
                    image_features /= image_features.norm(dim=-1, keepdim=True)
                
                ids.append(str(res['id']))
                metadatas.append(res['row'].to_dict())
                texts.append(res['row']['rich_caption'])
                embeddings.append(image_features.cpu().squeeze().tolist())
                
            except Exception as e:
                print(f"Error embedding {res['id']}: {e}")
    
    if embeddings:
        vectorstore.add_texts(
            texts=texts,
            embeddings=embeddings,
            metadatas=metadatas,
            ids=ids
        )


In [None]:
print("Ingestion complete. Zipping database...")
!zip -r chroma_db.zip chroma_db

In [None]:
print("Uploading to S3...")
s3.upload_file("chroma_db.zip", S3_BUCKET, "style-sync/artifacts/chroma_db.zip")
print("Upload complete! You can now run download_db.py locally.")