In [None]:
# Install dependencies
!pip -q install pandas numpy scikit-learn sentence-transformers nltk tqdm gradio pyyaml

# Avoid TensorFlow imports from transformers
import os
os.environ["TRANSFORMERS_NO_TF"] = "1"
os.environ["USE_TF"] = "0"
os.environ["TOKENIZERS_PARALLELISM"] = "false"



In [None]:
# Imports and helpers
from typing import Iterable, List, Tuple
import numpy as np
import pandas as pd
import nltk
from nltk.sentiment import SentimentIntensityAnalyzer
from sentence_transformers import SentenceTransformer
from tqdm import tqdm

# Download VADER
try:
    _ = nltk.data.find('sentiment/vader_lexicon.zip')
except LookupError:
    nltk.download('vader_lexicon', quiet=True)


def l2_normalize(matrix: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    norms = np.linalg.norm(matrix, axis=1, keepdims=True)
    norms = np.maximum(norms, eps)
    return matrix / norms

class VaderSentiment:
    def __init__(self):
        self.analyzer = SentimentIntensityAnalyzer()
    def score_texts(self, texts: Iterable[str]) -> List[float]:
        out = []
        for t in texts:
            pol = self.analyzer.polarity_scores(t or "")
            out.append(float(pol.get("compound", 0.0)))
        return out

class EmbeddingModel:
    def __init__(self, model_name: str = "sentence-transformers/all-mpnet-base-v2"):
        self.model = SentenceTransformer(model_name)
    def encode(self, texts: Iterable[str], batch_size: int = 256) -> np.ndarray:
        embs = self.model.encode(list(texts), batch_size=batch_size, convert_to_numpy=True, normalize_embeddings=False, show_progress_bar=True)
        return l2_normalize(embs)


def aggregate_product_scores(
    df: pd.DataFrame,
    retrieved_indices: np.ndarray,
    retrieved_scores: np.ndarray,
    review_sentiments: np.ndarray,
    k_recommendations: int = 10,
    sentiment_weight: float = 0.6,
    rating_weight: float = 0.4,
    sentiment_min: float = 0.0,
    rating_min: float = 4.0,
) -> List[Tuple[str, float]]:
    asins = df["asin"].to_numpy()
    ratings = df["overall"].to_numpy(dtype=float)
    product_to_score = {}
    product_to_hits = {}
    for row_scores, row_idx in zip(retrieved_scores, retrieved_indices):
        for s, i in zip(row_scores, row_idx):
            sim = float(s)
            if sim <= 0.0:
                continue
            asin = asins[i]
            rating = ratings[i]
            sent = float(review_sentiments[i])
            if sent < sentiment_min or rating < rating_min:
                continue
            review_score = sim * (1.0 + sentiment_weight * sent) * (1.0 + rating_weight * (rating - 3.0) / 2.0)
            product_to_score[asin] = product_to_score.get(asin, 0.0) + review_score
            product_to_hits[asin] = product_to_hits.get(asin, 0) + 1
    items = []
    for asin, tot in product_to_score.items():
        hits = product_to_hits.get(asin, 1)
        items.append((asin, tot / float(hits)))
    if not items:
        rated = df[["asin", "overall"]].groupby("asin").mean()["overall"].sort_values(ascending=False)
        items = [(a, float(score)) for a, score in rated.head(k_recommendations).items()]
        return items
    items.sort(key=lambda x: x[1], reverse=True)
    return items[:k_recommendations]


In [None]:
# Load your CSV from Google Drive or local upload
# Option A: Upload via Colab UI (left sidebar > Files)
# csv_path = "/content/amazon_reviews.csv"

# Option B: Mount Drive and set path
#from google.colab import drive
#drive.mount('/content/drive')
#csv_path = "/content/drive/MyDrive/amazon_reviews.csv"

# For demo, create a tiny sample if you haven't uploaded yet
import os
csv_path = os.environ.get("CSV_PATH", "/content/amazon_reviews.csv")
if not os.path.exists(csv_path):
    import io
    sample = io.StringIO("""userName,itemName,description,image,brand,feature,category,price,rating,reviewTime,summary,reviewText,vote
u1,Sample Table,desc,,BrandA,,Furniture,49.9,5,2024-01-01,Great,Sturdy table and looks nice,10
u2,Study Chair,desc,,BrandB,,Furniture,39.9,4,2024-01-02,Good,Comfortable for long hours,5
u3,Table Lamp,desc,,BrandC,,Lighting,19.9,4,2024-01-03,Nice,Soft light and elegant,3
""")
    with open(csv_path, "w", encoding="utf-8") as f:
        f.write(sample.getvalue())

# Column map to adapt to your headers
column_map = {
    "id": "asin",      # if you don't have an ID, we'll fallback to itemName
    "title": "itemName",
    "rating": "rating",
    "review": "reviewText",
    "user": "userName",
    "summary": "summary",
    "category": "category",
    "brand": "brand",
}

raw = pd.read_csv(csv_path)

def remap_columns(df: pd.DataFrame, column_map: dict) -> pd.DataFrame:
    mapped = pd.DataFrame()
    id_col = column_map.get("id")
    if id_col and id_col in df.columns:
        mapped["asin"] = df[id_col].astype(str)
    else:
        mapped["asin"] = df[column_map.get("title", "itemName")].astype(str)
    mapped["title"] = df.get(column_map.get("title", "itemName"), "").astype(str)
    mapped["reviewText"] = df.get(column_map.get("review", "reviewText"), "").astype(str)
    mapped["overall"] = pd.to_numeric(df.get(column_map.get("rating", "rating"), 0), errors="coerce").fillna(0)
    return mapped

print(raw.head(3))
df = remap_columns(raw, column_map)
# Clean
df = df.dropna(subset=["reviewText"]).copy()
df["overall"] = pd.to_numeric(df["overall"], errors="coerce").fillna(0)
df["reviewText"] = df["reviewText"].astype(str).str.slice(0, 512)
df["title"] = df["title"].fillna("")
df = df.reset_index(drop=True)
print(df.head(3))


In [None]:
# Sentiment + Embeddings + Simple NumPy index
sent_model = VaderSentiment()
df["sentiment"] = sent_model.score_texts(df["reviewText"].tolist())

emb_model = EmbeddingModel("sentence-transformers/all-mpnet-base-v2")
embs = emb_model.encode(df["reviewText"].tolist(), batch_size=256)

# Query function
def search_reviews(query: str, top_k: int = 50):
    q = emb_model.encode([query], batch_size=1)
    # cosine via dot on normalized vectors
    sim = (q @ embs.T)[0]
    # take top_k
    idx = np.argpartition(-sim, kth=min(top_k, len(sim)-1))[:top_k]
    idx = idx[np.argsort(-sim[idx])]
    scores = sim[idx]
    return scores, idx

# Recommend function
def recommend(query: str, top_k: int = 50, n_recs: int = 10, sent_min: float = 0.1, rating_min: float = 4.0):
    scores, idx = search_reviews(query, top_k=top_k)
    recs = aggregate_product_scores(
        df=df,
        retrieved_indices=np.array([idx]),
        retrieved_scores=np.array([scores]),
        review_sentiments=df["sentiment"].to_numpy(),
        k_recommendations=n_recs,
        sentiment_weight=0.6,
        rating_weight=0.4,
        sentiment_min=sent_min,
        rating_min=rating_min,
    )
    return scores, idx, recs

print("Ready. Try recommend('table')")


In [None]:
# Gradio UI for search + recommendations
import gradio as gr

def ui_recommend(query, top_k, n_recs, sent_min, rating_min):
    if not query:
        return "", "Enter a query"
    scores, idx, recs = recommend(query, int(top_k), int(n_recs), float(sent_min), float(rating_min))
    # Top reviews block
    lines = []
    shown = 0
    for j, i in enumerate(idx):
        if shown >= min(20, len(idx)):
            break
        if df.loc[i, "overall"] >= float(rating_min) and df.loc[i, "sentiment"] >= float(sent_min) and float(scores[j]) > 0:
            lines.append(f"- [{df.loc[i, 'title']}] {df.loc[i, 'reviewText']}")
            shown += 1
    if not lines:
        lines.append("No positive reviews matched filters. Try lowering thresholds.")
    # Recs
    rec_lines = [f"{i+1}. {asin} | score={score:.4f}" for i, (asin, score) in enumerate(recs)]
    return "\n".join(lines), "\n".join(rec_lines)

with gr.Blocks() as demo:
    gr.Markdown("# Amazon Reviews IR + Recommender (Colab)")
    with gr.Row():
        query = gr.Textbox(label="Query", value="table for study")
    with gr.Row():
        top_k = gr.Slider(10, 200, value=50, step=10, label="Top K Reviews")
        n_recs = gr.Slider(3, 20, value=10, step=1, label="Number of recommendations")
    with gr.Row():
        sent_min = gr.Slider(-1.0, 1.0, value=0.1, step=0.05, label="Minimum sentiment")
        rating_min = gr.Slider(1, 5, value=4, step=1, label="Minimum rating")
    run_btn = gr.Button("Search & Recommend")
    reviews_out = gr.Textbox(label="Top Reviews", lines=12)
    recs_out = gr.Textbox(label="Recommended Products", lines=12)
    run_btn.click(ui_recommend, inputs=[query, top_k, n_recs, sent_min, rating_min], outputs=[reviews_out, recs_out])

demo.launch(share=False)
