# Product Generate Article from Categories of the products
## Take the clusters and make a summary of all the reviews of each group
---
**Objective:** create a model that generates a short article (like a blog post) for each product category.

**Approach:**
1. Define the Evidence Layer
2. Use the classification
3. Use the clusters
4. Determine Top 3 Products
5. Determine Worst Product
6. Compute Key Differences
7. Build the Category Evidence Pack
8. Generate `.csv`
9. Create a Gradio app

## 1. Imports

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
print(torch.cuda.is_available())
print(torch.cuda.get_device_name(0))

True
NVIDIA GeForce RTX 4050 Laptop GPU


In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from collections import defaultdict
from sklearn.feature_extraction.text import TfidfVectorizer
import warnings

warnings.filterwarnings('ignore')
print('Libraries loaded')

Libraries loaded


## 2. Load & Merge the Data from the models

In [4]:
df_sentiment = pd.read_csv("data_with_predictions_v2.csv")
df_clusters = pd.read_csv("data_with_clusters.csv")
print(f'Dataset: {len(df_clusters):,} reviews, {df_clusters["cluster"].nunique()} unique clusters')
print(f'Dataset: {len(df_sentiment):,} reviews, {df_sentiment["predicted_label"].nunique()} unique sentiments')

Dataset: 28,332 reviews, 6 unique clusters
Dataset: 28,332 reviews, 3 unique sentiments


In [5]:
merge_cols = ["id", "reviews.text", "reviews.date", "reviews.username"]

df = df_clusters.merge(df_sentiment[ merge_cols + [
                                                    "predicted_label",
                                                    "predicted_score",
                                                    "score_negative",
                                                    "score_neutral",
                                                    "score_positive"
                                                    ]
                                    ],
                        on=merge_cols,
                        how="left"
                        )

In [None]:
# Normalizing
df["predicted_label"] = df["predicted_label"].str.upper().str.strip()

In [39]:
df.columns

Index(['id', 'name', 'asins', 'brand', 'categories', 'primaryCategories',
       'manufacturer', 'reviews.date', 'reviews.doRecommend',
       'reviews.numHelpful', 'reviews.rating', 'reviews.text', 'reviews.title',
       'reviews.username', 'sentiment', 'review_length', 'review_word_count',
       'dateAdded_parsed', 'dateUpdated_parsed', 'reviews.date_parsed',
       'reviews.dateSeen_parsed', 'cluster', 'cluster_name', 'predicted_label',
       'predicted_score', 'score_negative', 'score_neutral', 'score_positive'],
      dtype='object')

## 3. Obtaining the top 3 and the worst products plus briefing

In [None]:
CAT_COL = "cluster_name"
PROD_COL = "name"
TEXT_COL = "reviews.text"


product_stats = (
    df.groupby([CAT_COL, PROD_COL])
      .agg(
          review_count=(TEXT_COL, "count"),
          avg_rating=("reviews.rating", "mean"),
          neg_rate=("score_negative", "mean"),
          pos_rate=("score_positive", "mean"),
      )
      .reset_index()
)

# ranking score: rating plus confidence, weighted by volume
product_stats["rank_score"] = product_stats["avg_rating"] * np.log1p(product_stats["review_count"])

top3 = (
    product_stats.sort_values([CAT_COL, "rank_score"], ascending=[True, False])
    .groupby(CAT_COL).head(3)
)

min_reviews = 20
worst = (
    product_stats[product_stats["review_count"] >= min_reviews]
    .sort_values([CAT_COL, "avg_rating"], ascending=[True, True])
    .groupby(CAT_COL).head(1)
)


In [None]:
def top_complaints_for_product(texts, topn=8):
    '''basic cleanup and fetching the top products complaints'''
    texts = [t for t in texts if isinstance(t, str) and t.strip()]
    if len(texts) < 5:
        return []

    vec = TfidfVectorizer(
        lowercase=True,
        stop_words="english",
        ngram_range=(1, 2),
        min_df=2,
        max_df=0.95
    )
    X = vec.fit_transform(texts)
    scores = X.mean(axis=0).A1
    terms = np.array(vec.get_feature_names_out())
    top_idx = scores.argsort()[::-1][:topn]
    return terms[top_idx].tolist()



In [None]:
def get_top_product_complaints(df, category, product, topn=6):
    '''fetches the complaints'''
    neg_texts = df[
        (df[CAT_COL] == category) &
        (df[PROD_COL] == product) &
        (df["predicted_label"] == "NEGATIVE")
    ][TEXT_COL].tolist()
    return top_complaints_for_product(neg_texts, topn=topn)


In [None]:
def build_category_brief(category):
    '''build a in the right format'''
    t3 = top3[top3[CAT_COL] == category]
    w  = worst[worst[CAT_COL] == category]

    lines = []
    lines.append(f"CATEGORY: {category}")
    lines.append("")
    lines.append("TOP 3 PRODUCTS:")

    for i, row in enumerate(t3.itertuples(index=False), 1):
        complaints = get_top_product_complaints(df, category, getattr(row, PROD_COL), topn=5)
        lines.append(f"{i}) {getattr(row, PROD_COL)} | rating={row.avg_rating:.2f} | reviews={int(row.review_count)}")
        if complaints:
            lines.append("   complaints: " + "; ".join(complaints))
        else:
            lines.append("   complaints: (not enough negative reviews)")

    if len(w):
        wr = w.iloc[0]
        worst_complaints = get_top_product_complaints(df, category, wr[PROD_COL], topn=6)
        lines.append("")
        lines.append(f"WORST PRODUCT: {wr[PROD_COL]} | rating={wr.avg_rating:.2f} | reviews={int(wr.review_count)}")
        lines.append("avoid because: " + ("; ".join(worst_complaints) if worst_complaints else "low ratings / frequent negatives"))

    return "\n".join(lines)


In [14]:
categories = sorted(df[CAT_COL].dropna().unique().tolist())

briefs = {cat: build_category_brief(cat) for cat in categories}

# quick check
print(categories[:5])
print("\n--- SAMPLE BRIEF ---\n")
print(briefs[categories[0]])


['Accessories', 'Batteries & Household', 'E-Readers', 'Fire Tablets', 'Media & Home']

--- SAMPLE BRIEF ---

CATEGORY: Accessories

TOP 3 PRODUCTS:
1) Amazon 9W PowerFast Official OEM USB Charger and Power Adapter for Fire Tablets and Kindle eReaders | rating=4.67 | reviews=39
   complaints: (not enough negative reviews)
2) AmazonBasics 15.6-Inch Laptop and Tablet Bag | rating=4.52 | reviews=21
   complaints: (not enough negative reviews)
3) AmazonBasics Ventilated Adjustable Laptop Stand | rating=4.33 | reviews=24
   complaints: (not enough negative reviews)

WORST PRODUCT: AmazonBasics Backpack for Laptops up to 17-inches | rating=4.16 | reviews=25
avoid because: low ratings / frequent negatives


## 4. Using T5 to create our summaries with a little of prompt engineering

In [None]:
gen_model_name = "google/flan-t5-base"  # or flan-t5-large if you have GPU RAM

gen_tokenizer = AutoTokenizer.from_pretrained(gen_model_name)
gen_model = AutoModelForSeq2SeqLM.from_pretrained(gen_model_name)

device = "cuda" if torch.cuda.is_available() else "cpu"
gen_model = gen_model.to(device)

def generate_article(brief):
    '''prompt engineering to generate our article'''
    prompt = f"""
You are writing a short shopper-friendly blog post.

Use ONLY the facts in the brief.
Do NOT invent products or features.

BRIEF:
{brief}

Return your answer in EXACTLY this format:

===TITLE===
<one line title>

===SUMMARY===
<2 short paragraphs, 3â€“5 sentences total>

===TOP3===
- Product â€” Rating â€” Reviews â€” Complaints
- Product â€” Rating â€” Reviews â€” Complaints
- Product â€” Rating â€” Reviews â€” Complaints

===AVOID===
- Worst product â€” Rating â€” Reviews
- Reason: <from brief>

Keep under 220 words.
Do NOT omit any section.
"""

    inputs = gen_tokenizer(
        prompt,
        return_tensors="pt",
        truncation=True,
        max_length=1024
    ).to(device)

    out_ids = gen_model.generate(
        **inputs,
        max_new_tokens=320,
        do_sample=False,
        repetition_penalty=1.2
    )

    text = gen_tokenizer.decode(out_ids[0], skip_special_tokens=True)

    # ðŸ”¹ SECTION PARSING HAPPENS HERE ðŸ”¹
    try:
        title = text.split("===TITLE===")[1].split("===SUMMARY===")[0].strip()
        summary = text.split("===SUMMARY===")[1].split("===TOP3===")[0].strip()
        top3 = text.split("===TOP3===")[1].split("===AVOID===")[0].strip()
        avoid = text.split("===AVOID===")[1].strip()
    except IndexError:
        # fallback if model fails structure
        return text

    # You can now return structured content
    return {
        "title": title,
        "summary": summary,
        "top3": top3,
        "avoid": avoid
    }


Loading weights: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 282/282 [00:01<00:00, 188.33it/s, Materializing param=shared.weight]                                                       


## 5. Helper functions for cleaner code

In [None]:
complaint_cache = {}

def get_top_product_complaints_cached(df, category, product, topn=6):
    '''fetches the complaints now cached'''
    key = (category, product, topn)
    if key in complaint_cache:
        return complaint_cache[key]

    neg_texts = df[
        (df[CAT_COL] == category) &
        (df[PROD_COL] == product) &
        (df["predicted_label"] == "NEGATIVE")
    ][TEXT_COL].tolist()

    complaints = top_complaints_for_product(neg_texts, topn=topn)
    complaint_cache[key] = complaints
    return complaints



def get_negative_snippets(df, category, product, max_snippets=2, max_len=120):
    '''fetches the negative snippets and clean it'''
    neg_texts = df[
        (df[CAT_COL] == category) &
        (df[PROD_COL] == product) &
        (df["predicted_label"] == "NEGATIVE")
    ][TEXT_COL].dropna().tolist()

    snippets = []
    for text in neg_texts[:10]: 
        cleaned = text.strip().replace("\n", " ")
        if len(cleaned) > 20:
            snippets.append(cleaned[:max_len] + ("..." if len(cleaned) > max_len else ""))
        if len(snippets) >= max_snippets:
            break

    return snippets


def build_category_brief_stronger(category):
    '''build a better brief in the right format'''
    t3 = top3[top3[CAT_COL] == category]
    w  = worst[worst[CAT_COL] == category]

    lines = []
    lines.append(f"CATEGORY: {category}")
    lines.append("")
    lines.append("TOP 3 PRODUCTS:")

    for i, row in enumerate(t3.itertuples(index=False), 1):
        product_name = getattr(row, PROD_COL)

        complaints = get_top_product_complaints_cached(df, category, product_name, topn=5)
        snippets   = get_negative_snippets(df, category, product_name, max_snippets=2)

        lines.append(
            f"{i}) {product_name} | "
            f"rating={row.avg_rating:.2f} | "
            f"reviews={int(row.review_count)} | "
            f"pos_rate={row.pos_rate:.2f} | "
            f"neg_rate={row.neg_rate:.2f}"
        )

        if complaints:
            lines.append("   top complaints: " + "; ".join(complaints))
        else:
            lines.append("   top complaints: (insufficient negative reviews)")

        if snippets:
            lines.append("   sample negative feedback:")
            for s in snippets:
                lines.append(f'      - "{s}"')

        lines.append("")

    if len(w):
        wr = w.iloc[0]
        worst_name = wr[PROD_COL]
        worst_complaints = get_top_product_complaints_cached(df, category, worst_name, topn=6)
        worst_snippets   = get_negative_snippets(df, category, worst_name, max_snippets=2)

        lines.append("WORST PRODUCT:")
        lines.append(
            f"{worst_name} | rating={wr.avg_rating:.2f} | "
            f"reviews={int(wr.review_count)} | "
            f"neg_rate={wr.neg_rate:.2f}"
        )

        if worst_complaints:
            lines.append("   avoid because: " + "; ".join(worst_complaints))
        else:
            lines.append("   avoid because: frequent negative sentiment")

        if worst_snippets:
            lines.append("   example complaints:")
            for s in worst_snippets:
                lines.append(f'      - "{s}"')

    return "\n".join(lines)

## 6. Generating articles

In [None]:
articles = []
for cat in categories:
    brief = briefs[cat]
    article = generate_article(brief)
    articles.append({"category": cat, "brief": brief, "article": article})

articles_df = pd.DataFrame(articles)
articles_df.head()

Unnamed: 0,category,brief,article
0,Accessories,CATEGORY: Accessories\n\nTOP 3 PRODUCTS:\n1) A...,Amazon 9W PowerFast Official OEM USB Charger a...
1,Batteries & Household,CATEGORY: Batteries & Household\n\nTOP 3 PRODU...,AmazonBasics AAA Performance Alkaline Batterie...
2,E-Readers,CATEGORY: E-Readers\n\nTOP 3 PRODUCTS:\n1) Kin...,"Kindle Voyage E-reader, 6 High-Resolution Disp..."
3,Fire Tablets,CATEGORY: Fire Tablets\n\nTOP 3 PRODUCTS:\n1) ...,Fire Tablets: 3 products: Fire HD 8 Tablet wit...
4,Media & Home,CATEGORY: Media & Home\n\nTOP 3 PRODUCTS:\n1) ...,The top 3 products in the Media & Home category.


## 7. Saving into a .csv

In [None]:
articles_df.to_csv("category_blog_posts.csv", index=False)
print("Saved category_blog_posts.csv")

Saved category_blog_posts.csv


## 8. Gradio Test

##### You can uncomment this section to run it or run with the .py

In [None]:
# import re
# import json
# import tempfile
# from pathlib import Path
# import gradio as gr
# import pandas as pd
# import torch
# from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

# # ---------------- Model ----------------
# GEN_MODEL_NAME = "google/flan-t5-base"
# device = "cuda" if torch.cuda.is_available() else "cpu"

# dtype = torch.float16 if device == "cuda" else torch.float32
# tokenizer = AutoTokenizer.from_pretrained(GEN_MODEL_NAME)
# model = AutoModelForSeq2SeqLM.from_pretrained(
#     GEN_MODEL_NAME,
#     torch_dtype=dtype
# ).to(device)
# model.eval()

# print("Device:", device)

# STATE = {"df": None}

# # ---------------- Clean complaint keywords ----------------
# STOP = {"don", "didn", "doesn", "dont", "isn", "wasn", "weren",
#         "cant", "couldn", "wouldn", "buy"}

# def prettify_reason(reason: str) -> str:
#     r = (reason or "").strip()
#     if not r:
#         return "No clear reason provided."

#     if ";" not in r and len(r.split()) > 5:
#         return r

#     toks = [t.strip().lower() for t in r.split(";") if t.strip()]
#     toks = [t for t in toks if len(t) >= 3 and t not in STOP]

#     seen = set()
#     cleaned = []
#     for t in toks:
#         if t not in seen:
#             seen.add(t)
#             cleaned.append(t)

#     cleaned = cleaned[:6]

#     if not cleaned:
#         return "Negative feedback appears general without a clear recurring issue."

#     return "Negative reviews repeatedly mention: " + ", ".join(cleaned) + "."

# # ---------------- Brief parsing ----------------
# def parse_brief(brief: str):
#     b = (brief or "").replace("\r\n", "\n").replace("\r", "\n").strip()

#     cat = "Category"
#     m = re.search(r"^\s*CATEGORY:\s*(.+?)\s*$", b, flags=re.MULTILINE)
#     if m:
#         cat = m.group(1).strip()

#     top3 = []
#     top_match = re.search(
#         r"TOP 3 PRODUCTS:\s*(.*?)(?:\n\s*WORST PRODUCT:|\Z)",
#         b,
#         flags=re.DOTALL | re.IGNORECASE
#     )
#     top_block = top_match.group(1).strip() if top_match else ""

#     if top_block:
#         chunks = re.split(r"\n\s*(?=\d+\)\s)", "\n" + top_block)
#         for ch in chunks:
#             ch = ch.strip()
#             if not ch:
#                 continue
#             lines = [ln.strip() for ln in ch.split("\n") if ln.strip()]
#             header = re.sub(r"^\d+\)\s*", "", lines[0]).strip()
#             name = header.split("|")[0].strip()

#             rating = None
#             reviews = None
#             mr = re.search(r"rating\s*=\s*([0-9.]+)", header, flags=re.IGNORECASE)
#             mv = re.search(r"reviews\s*=\s*(\d+)", header, flags=re.IGNORECASE)
#             if mr: rating = float(mr.group(1))
#             if mv: reviews = int(mv.group(1))

#             complaints = ""
#             for ln in lines[1:]:
#                 if ln.lower().startswith("complaints:"):
#                     complaints = ln.split(":", 1)[1].strip()
#                     break

#             top3.append({
#                 "name": name,
#                 "rating": rating,
#                 "reviews": reviews,
#                 "complaints": complaints
#             })

#     worst = {"name": "", "rating": None, "reviews": None, "reason": ""}
#     w = re.search(r"WORST PRODUCT:\s*(.+)", b, flags=re.IGNORECASE)
#     if w:
#         wline = w.group(1).strip().split("\n")[0]
#         worst["name"] = wline.split("|")[0].strip()
#         mr = re.search(r"rating\s*=\s*([0-9.]+)", wline, flags=re.IGNORECASE)
#         mv = re.search(r"reviews\s*=\s*(\d+)", wline, flags=re.IGNORECASE)
#         if mr: worst["rating"] = float(mr.group(1))
#         if mv: worst["reviews"] = int(mv.group(1))

#     r = re.search(r"avoid because:\s*(.+)\s*$", b, flags=re.IGNORECASE | re.MULTILINE)
#     if r:
#         worst["reason"] = r.group(1).strip()

#     return cat, top3, worst

# # ---------------- Summary generation ----------------
# @torch.inference_mode()
# def generate_summary_from_brief(brief: str) -> str:
#     cat, top3, worst = parse_brief(brief)

#     lines = [f"Category: {cat}."]
#     for p in top3:
#         lines.append(
#             f"{p['name']} has rating {p['rating']} from {p['reviews']} reviews."
#         )

#     if worst["name"]:
#         lines.append(
#             f"The lowest rated is {worst['name']} with rating {worst['rating']} from {worst['reviews']} reviews."
#         )

#     facts = "\n".join(lines)

#     prompt = f"""Write 2 short natural paragraphs for shoppers.

# Use ONLY the facts below.

# FACTS:
# {facts}
# """

#     inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=384).to(device)

#     out = model.generate(
#         **inputs,
#         max_new_tokens=140,
#         do_sample=True,
#         temperature=0.6,
#         top_p=0.9,
#         repetition_penalty=1.15,
#         use_cache=False,
#     )

#     return tokenizer.decode(out[0], skip_special_tokens=True).strip()

# # ---------------- Markdown builder ----------------
# def build_full_markdown(brief: str) -> str:
#     cat, top3, worst = parse_brief(brief)
#     summary = generate_summary_from_brief(brief)

#     md = [f"# {cat}", "", summary, "", "## Top Picks"]

#     for p in top3:
#         md.append(
#             f"**{p['name']}**  \n"
#             f"Rating: {p['rating']} â€¢ Reviews: {p['reviews']}  \n"
#             f"Complaints: {p['complaints']}"
#         )

#     md.append("\n## Avoid / Lowest Rated")

#     if worst["name"]:
#         md.append(
#             f"**{worst['name']}**  \n"
#             f"Rating: {worst['rating']} â€¢ Reviews: {worst['reviews']}  \n"
#             f"Complaint signal: {prettify_reason(worst['reason'])}"
#         )

#     return "\n\n".join(md)

# # ---------------- JSON Export ----------------
# def export_json():
#     df = STATE["df"]
#     if df is None:
#         raise ValueError("No CSV loaded.")

#     cluster_summaries = {}
#     product_summaries = {}

#     for _, row in df.iterrows():
#         category = str(row["category"])
#         brief = str(row["brief"])

#         cluster_summaries[category] = build_full_markdown(brief)

#         cat, top3, worst = parse_brief(brief)

#         for p in top3:
#             product_summaries[p["name"]] = {
#                 "cluster": category,
#                 "stats": {
#                     "total_reviews": p["reviews"],
#                     "avg_rating": p["rating"],
#                     "pct_positive": None,
#                     "pct_negative": None,
#                     "pct_neutral": None
#                 },
#                 "summary": f"{p['name']} has rating {p['rating']} from {p['reviews']} reviews."
#             }

#     export_obj = {
#         "provider": "huggingface",
#         "model": "google/flan-t5-base",
#         "cluster_summaries": cluster_summaries,
#         "product_summaries": product_summaries
#     }

#     tmpdir = tempfile.mkdtemp()
#     path = Path(tmpdir) / "export.json"
#     path.write_text(json.dumps(export_obj, indent=2, ensure_ascii=False), encoding="utf-8")

#     return str(path)

# # ---------------- Gradio UI ----------------
# with gr.Blocks(title="Category Blog Generator") as demo:
#     gr.Markdown("# Category Blog Generator")

#     csv_file = gr.File(file_types=[".csv"])
#     load_btn = gr.Button("Load CSV")

#     category_dd = gr.Dropdown(label="Category")
#     gen_btn = gr.Button("Generate Summary")

#     output_md = gr.Markdown()

#     export_btn = gr.Button("Export JSON")
#     export_file = gr.File()

#     def load_csv(file):
#         df = pd.read_csv(file.name)
#         STATE["df"] = df
#         categories = sorted(df["category"].unique())
#         return gr.Dropdown(choices=categories, value=categories[0])

#     def generate(category):
#         df = STATE["df"]
#         row = df[df["category"] == category].iloc[0]
#         return build_full_markdown(row["brief"])

#     load_btn.click(load_csv, inputs=[csv_file], outputs=[category_dd])
#     gen_btn.click(generate, inputs=[category_dd], outputs=[output_md])
#     export_btn.click(export_json, outputs=[export_file])

# if __name__ == "__main__":
#     demo.launch()

`torch_dtype` is deprecated! Use `dtype` instead!
Loading weights: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 282/282 [00:02<00:00, 132.32it/s, Materializing param=shared.weight]                                                       


Device: cuda
* Running on local URL:  http://127.0.0.1:7881
* To create a public link, set `share=True` in `launch()`.
