In [None]:
# %% [markdown]
# ─── 1 │ Imports & configuration (hardened) ─────────────────────────────────────

import asyncio, json, os, math
from typing import List, Dict

import aiohttp, backoff, chromadb, nest_asyncio, openai, pandas as pd
from chromadb.config import Settings
from openai import AsyncOpenAI
from tqdm.asyncio import tqdm

nest_asyncio.apply()            # let Jupyter nest event loops

# ── Config knobs ───────────────────────────────────────────────────────────────
API_BATCH_SIZE        = 8        # ↓ to keep prompt+JSON < 8 k tokens
CONCURRENT_LIMIT      = 100      # simultaneous OpenAI calls
WINDOW_SIZE_BATCHES   = 2_000    # batches kept in memory at once
MODEL_NAME            = "gpt-4.1-nano-2025-04-14"
TIMEOUT_SECONDS       = 60
MAX_RETRIES           = 10

CSV_PATH              = "data/Books.csv"
OUTPUT_CSV            = "Enriched_Books_Metadata.csv"
CHROMA_DIR            = "chroma_books"
CHROMA_COLLECTION     = "books_metadata"

# ── OpenAI client & retry helpers ──────────────────────────────────────────────
client = AsyncOpenAI(api_key=OPENAI_API_KEY)

RETRYABLE_EXCEPTIONS = (
    openai.RateLimitError,
    openai.APIConnectionError,
    openai.APITimeoutError,
    aiohttp.ClientError,
    asyncio.TimeoutError,
)

def _log_backoff(details):
    """Pretty back‑off logger that won’t KeyError."""
    session_id = details["args"][1]
    wait       = details["wait"]
    tries      = details["tries"]
    max_tries  = details.get("max_tries", MAX_RETRIES)
    print(f"🔁 Retry batch {session_id}: sleeping {wait:.1f}s "
          f"(attempt {tries}/{max_tries})")

@backoff.on_exception(
    backoff.expo,
    RETRYABLE_EXCEPTIONS,
    max_tries=MAX_RETRIES,
    max_time=300,
    jitter=backoff.full_jitter,
    on_backoff=_log_backoff,
)
async def call_openai_with_timeout(book_batch: List[Dict], session_id: int):
    """
    Send one batch to OpenAI with both soft & hard timeouts, plus JSON
    validation so bad payloads go through the retry loop.
    """
    # Build user message
    user_lines = [
        f"id: {b['row_id']}, title: {b['Book_Title']}, author: {b['Book_Author']}"
        for b in book_batch
    ]
    input_text = "\n".join(user_lines)

    system_prompt = (
        "You are a helpful librarian assistant. For each book provided "
        "generate a JSON object with keys: summary, genre, category, theme, "
        "tone, audience, and the SAME id you received. Return the list under "
        "the top‑level key 'data'. Example: {\"data\": [{\"id\": 123, ...}]}"
    )

    response = await asyncio.wait_for(
        client.chat.completions.create(
            model=MODEL_NAME,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user",   "content": input_text},
            ],
            response_format={"type": "json_object"},
            temperature=0.2,
            max_tokens=2048,
            timeout=TIMEOUT_SECONDS - 5,
        ),
        timeout=TIMEOUT_SECONDS,
    )

    # Validate JSON – invalid → raise so back‑off retries
    try:
        content = json.loads(response.choices[0].message.content)
    except json.JSONDecodeError as e:
        raise openai.APIError(f"Bad JSON: {e}") from e

    return content.get("data", [])

async def process_batch(book_batch: List[Dict], session_id: int, sem: asyncio.Semaphore):
    """Concurrency‑guarded wrapper with alignment + fault placeholders."""
    async with sem:
        try:
            raw_result = await call_openai_with_timeout(book_batch, session_id)
        except Exception as exc:            # survived all retries
            print(f"❌ Batch {session_id} final failure: {exc}")
            raw_result = []

    # Safeguard: only dicts with an id
    result_map = {
        item["id"]: item
        for item in raw_result
        if isinstance(item, dict) and "id" in item
    }

    aligned = [
        result_map.get(
            row["row_id"],
            {"id": row["row_id"], "genre": "Error", "theme": "Missing", "tone": ""}
        )
        for row in book_batch
    ]
    return aligned

# ─── 2 │ Main orchestration coroutine ─────────────────────────────────────────
async def main():
    print("📚 Loading dataset …")
    books_df = (
        pd.read_csv(CSV_PATH, encoding="latin-1", low_memory=False)
          .loc[:, ["ISBN", "Book-Title", "Book-Author"]]
          .dropna().drop_duplicates().reset_index(drop=True)
          .rename(columns={"Book-Title": "Book_Title", "Book-Author": "Book_Author"})
    )
    # Add a unique row_id column that persists through batching
    books_df["row_id"] = books_df.index

    batches = [
        books_df.iloc[i : i + API_BATCH_SIZE].to_dict("records")
        for i in range(0, len(books_df), API_BATCH_SIZE)
    ]
    total_batches = len(batches)
    semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)

    print(f"🚀 Processing {len(books_df):,} books "
          f"in {total_batches:,} batches (≤{CONCURRENT_LIMIT} concurrent)…")

    all_metadata: List[Dict] = []
    failed_batches = 0

    # Feed the loop in windows so we never hold 27 k tasks at once
    for w_start in range(0, total_batches, WINDOW_SIZE_BATCHES):
        w_end  = min(w_start + WINDOW_SIZE_BATCHES, total_batches)
        window = [process_batch(batches[i], i, semaphore)
                  for i in range(w_start, w_end)]

        for fut in tqdm(asyncio.as_completed(window),
                        total=len(window),
                        desc=f"Batches {w_start}–{w_end-1}"):
            result = await fut
            if result and result[0].get("genre") == "Error":
                failed_batches += 1
            all_metadata.extend(result)

    print(f"✅ OpenAI enrichment done. Failed batches: {failed_batches}")

    # ── Merge & save CSV ───────────────────────────────────────────────────────
    metadata_df = pd.DataFrame(all_metadata).set_index("id")
    final_df = (
        books_df.set_index("row_id")
        .join(metadata_df, how="left")
        .reset_index(drop=True)
    )
    final_df.to_csv(OUTPUT_CSV, index=False)
    print(f"💾 Saved enriched metadata → {OUTPUT_CSV}")

    # ── Persist to ChromaDB ───────────────────────────────────────────────────
    chroma_client = chromadb.Client(Settings(
        persist_directory=CHROMA_DIR,
        anonymized_telemetry=False
    ))
    collection = chroma_client.get_or_create_collection(CHROMA_COLLECTION)

    valid_rows = final_df.dropna(subset=["genre"]).query("genre != 'Error'")
    if valid_rows.empty:
        print("⚠️ No valid rows to add to ChromaDB.")
        return

    collection.add(
        ids=valid_rows["ISBN"].tolist(),
        documents=[
            (f"Title: {r.Book_Title} by {r.Book_Author}. "
             f"Genre: {r.genre}. Theme: {r.theme}. Tone: {r.tone}.")
            for r in valid_rows.itertuples()
        ],
        metadatas=[
            {
                "title":   r.Book_Title,
                "author":  r.Book_Author,
                "genre":   r.genre,
                "theme":   r.theme,
                "tone":    r.tone,
            }
            for r in valid_rows.itertuples()
        ],
    )
    print(f"✅ Added {len(valid_rows):,} rows to ChromaDB.")

# %% [markdown]
# ─── 3 │ Kick off the async workflow (⇧⏎) ──────────────────────────────────────

# %%
await main()


In [9]:
import pandas as pd
from openai import OpenAI
import json
from tqdm import tqdm
import time
OPENAI_API_KEY = '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'


# Load enriched metadata
df = pd.read_csv("Enriched_Books_Metadata.csv").head(100)
# Load your OpenAI API key
client = OpenAI(api_key=OPENAI_API_KEY)
# Select and clean relevant columns
df = df[['ISBN', 'Book_Title', 'Book_Author', 'summary', 'genre', 'category', 'theme', 'tone', 'audience']]
df = df.dropna(subset=['summary', 'genre', 'category', 'theme', 'tone', 'audience'])

# Prepare batch processing
batch_size = 1000
results = []

# Normalization function using OpenAI
def normalize_metadata_batch(batch):
    prompt = (
        "You are a metadata normalization engine. Given a list of book metadata in JSON, return the same list with "
        "each field ('genre', 'category', 'theme', 'tone', 'audience') standardized across books. Merge similar values "
        "to unified tags for better recommendation use.\n\n"
        f"Metadata:\n{json.dumps(batch, ensure_ascii=False)}"
    )
    try:
        response = client.responses.create(
            model="gpt-4.1-nano-2025-04-14",
            instructions="Return JSON list with normalized metadata.",
            input=prompt
        )
        return json.loads(response.output_text)
    except Exception as e:
        return [{"error": str(e)} for _ in batch]

# Batch processing with progress
for i in tqdm(range(0, len(df), batch_size), desc="Normalizing metadata"):
    batch = df.iloc[i:i + batch_size].to_dict(orient='records')
    normalized = normalize_metadata_batch(batch)
    for original, updated in zip(batch, normalized):
        results.append({**original, **updated})
    time.sleep(1)  # be kind to rate limits

# Save result
pd.DataFrame(results).to_csv("Normalized_Enriched_Metadata.csv", index=False)
print("✅ Normalized data saved to 'Normalized_Enriched_Metadata.csv'")


  df = pd.read_csv("Enriched_Books_Metadata.csv").head(100)
Normalizing metadata: 100%|██████████| 1/1 [02:47<00:00, 167.56s/it]

✅ Normalized data saved to 'Normalized_Enriched_Metadata.csv'





In [12]:
import pandas as pd
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
from sklearn.cluster import KMeans
import numpy as np
import json
import os

# Load and clean dataset
df = pd.read_csv("Enriched_Books_Metadata.csv", low_memory=False)
df = df[['ISBN', 'Book_Title', 'Book_Author', 'summary', 'genre', 'category', 'theme', 'tone', 'audience']]
df.dropna(subset=['genre', 'category', 'theme', 'tone', 'audience'], inplace=True)
df.reset_index(drop=True, inplace=True)

# Combine all metadata columns
text_columns = ['genre', 'category', 'theme', 'tone', 'audience']

# Initialize model
model = SentenceTransformer("all-MiniLM-L6-v2")

# Create batches
batch_size = 1000
batches = [df[i:i+batch_size] for i in range(0, len(df), batch_size)]

# Track results
clustered_data = []

# Create output directory
os.makedirs("clustered_outputs", exist_ok=True)

# Clustering settings per column
n_clusters_dict = {
    'genre': 20,
    'category': 20,
    'theme': 25,
    'tone': 20,
    'audience': 15,
}

# Process each metadata column
for col in text_columns:
    print(f"\n🔍 Clustering column: {col}")
    encoded_batches = []

    # Encode texts
    for batch in tqdm(batches, desc="Embedding Batches"):
        texts = batch[col].astype(str).tolist()
        embeddings = model.encode(texts, show_progress_bar=False)
        encoded_batches.append(embeddings)

    # Stack all embeddings
    all_embeddings = np.vstack(encoded_batches)

    # Fit KMeans
    kmeans = KMeans(n_clusters=n_clusters_dict[col], random_state=42, n_init=10)
    labels = kmeans.fit_predict(all_embeddings)

    # Assign back to df
    df[f"{col}_cluster"] = labels

    # Save partial clustered outputs
    df[['ISBN', col, f"{col}_cluster"]].to_csv(f"clustered_outputs/clustered_{col}.csv", index=False)

# Save final full clustered metadata
df.to_csv("clustered_outputs/clustered_books_metadata.csv", index=False)
print("\n✅ All metadata columns clustered and saved to 'clustered_outputs/'.")



🔍 Clustering column: genre


Embedding Batches: 100%|██████████| 271/271 [05:07<00:00,  1.14s/it]



🔍 Clustering column: category


Embedding Batches: 100%|██████████| 271/271 [04:53<00:00,  1.08s/it]



🔍 Clustering column: theme


Embedding Batches: 100%|██████████| 271/271 [06:52<00:00,  1.52s/it]



🔍 Clustering column: tone


Embedding Batches: 100%|██████████| 271/271 [06:39<00:00,  1.47s/it]



🔍 Clustering column: audience


Embedding Batches: 100%|██████████| 271/271 [06:47<00:00,  1.50s/it]



✅ All metadata columns clustered and saved to 'clustered_outputs/'.


In [15]:
import pandas as pd

# Load enriched book metadata
df = pd.read_csv("Enriched_Books_Metadata.csv", low_memory=False)

# Load clustered data
clustered_df = pd.read_csv("clustered_outputs/clustered_books_metadata.csv")

# Ensure matching ISBNs (safe merge)
merged_df = df.merge(
    clustered_df[['ISBN', 'genre_cluster', 'category_cluster', 'theme_cluster', 'tone_cluster', 'audience_cluster']],
    on='ISBN',
    how='left'
)

# Replace original metadata with cluster labels
merged_df['genre'] = merged_df['genre_cluster']
merged_df['category'] = merged_df['category_cluster']
merged_df['theme'] = merged_df['theme_cluster']
merged_df['tone'] = merged_df['tone_cluster']
merged_df['audience'] = merged_df['audience_cluster']

# Drop the old cluster columns
merged_df.drop(columns=[
    'summary','genre_cluster', 'category_cluster', 'theme_cluster', 'tone_cluster', 'audience_cluster'
], inplace=True)

# Save final normalized dataset
merged_df.to_csv("final_normalized_data.csv", index=False)
print("✅ Final normalized metadata saved as 'final_normalized_data.csv'")


✅ Final normalized metadata saved as 'final_normalized_data.csv'


In [16]:
merged_df

Unnamed: 0,ISBN,Book_Title,Book_Author,genre,category,theme,tone,audience,title,author,Ab 12 J.,Kim Fupz Aakeson,"A story about Ulla's adventures and everyday life, exploring friendship, family, and personal growth through relatable experiences for young readers. : 1.3,",Juvenile Fiction / Coming-of-Age,Friendship / Family / Personal Growth,Light-hearted / Reflective,Young Readers / Pre-teens,67841,67843,json
0,0195153448,Classical Mythology,Mark P. O. Morford,14.0,14.0,23.0,1.0,8.0,,,,,,,,,,,,
1,0002005018,Clara Callan,Richard Bruce Wright,10.0,2.0,4.0,12.0,0.0,,,,,,,,,,,,
2,0060973129,Decision in Normandy,Carlo D'Este,6.0,0.0,3.0,8.0,3.0,,,,,,,,,,,,
3,0374157065,Flu: The Story of the Great Influenza Pandemic...,Gina Bari Kolata,14.0,14.0,8.0,1.0,3.0,,,,,,,,,,,,
4,0393045218,The Mummies of Urumchi,E. J. W. Barber,14.0,14.0,23.0,1.0,3.0,,,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
271353,0440400988,There's a Bat in Bunk Five,Paula Danziger,2.0,18.0,7.0,10.0,2.0,,,,,,,,,,,,
271354,0525447644,From One to One Hundred,Teri Sloat,14.0,14.0,21.0,5.0,2.0,,,,,,,,,,,,
271355,006008667X,Lily Dale : The True Story of the Town that Ta...,Christine Wicker,14.0,3.0,16.0,17.0,4.0,,,,,,,,,,,,
271356,0192126040,Republic (World's Classics),Plato,16.0,5.0,2.0,19.0,8.0,,,,,,,,,,,,


In [None]:
import pandas as pd
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.cluster import KMeans
from tqdm import tqdm
import os

# Load data
df = pd.read_csv("clustered_outputs/clustered_books_metadata.csv", low_memory=False)
original_df = pd.read_csv("Enriched_Books_Metadata.csv", low_memory=False)

# Clean columns
columns = ['genre', 'category', 'theme', 'tone', 'audience']
df.dropna(subset=columns, inplace=True)
df.reset_index(drop=True, inplace=True)

# Init model
model = SentenceTransformer("all-MiniLM-L6-v2")

# Map clusters to labels
cluster_name_maps = {}

for col in columns:
    print(f"\n🔍 Generating names for {col} clusters")
    texts = df[col].astype(str).tolist()
    embeddings = model.encode(texts, show_progress_bar=True)
    
    # Cluster again to find representatives
    n_clusters = df[f"{col}_cluster"].nunique()
    kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
    labels = kmeans.fit_predict(embeddings)
    
    # Map cluster index to closest label
    cluster_names = {}
    for i in range(n_clusters):
        cluster_points = np.array(texts)[np.array(labels) == i]
        cluster_names[i] = ", ".join(cluster_points[:2])  # use top 2 representative phrases
    
    cluster_name_maps[col] = cluster_names
    df[f"{col}_normalized"] = df[f"{col}_cluster"].map(cluster_names)

# Replace original columns in the enriched metadata
original_df = original_df.merge(
    df[['ISBN'] + [f"{col}_normalized" for col in columns]],
    on="ISBN",
    how="left"
)

# Replace old metadata columns with normalized ones
for col in columns:
    original_df[col] = original_df[f"{col}_normalized"]
    original_df.drop(columns=[f"{col}_normalized"], inplace=True)

# Save the final dataset
original_df.to_csv("final_normalized_metadata.csv", index=False)
print("\n✅ Saved as 'final_normalized_metadata.csv'")



🔍 Generating names for genre clusters


Batches: 100%|██████████| 8469/8469 [04:48<00:00, 29.39it/s]



🔍 Generating names for category clusters


Batches:   6%|▌         | 506/8469 [01:00<19:41,  6.74it/s]

In [None]:
original_df