In [10]:
from huggingface_hub import login
import os
from dotenv import load_dotenv

load_dotenv()  # looks for .env in current dir
hf_token = os.getenv("HF_TOKEN")
login(token=hf_token)

Note: Environment variable`HF_TOKEN` is set and is the current active token independently from the token you've just configured.


In [11]:
import torch
from transformers import pipeline, AutoProcessor

device = "mps" if torch.backends.mps.is_available() else "cpu"
dtype  = torch.bfloat16  # bf16 on MPS can be flaky

model_id = "google/gemma-3-4b-it"

processor = AutoProcessor.from_pretrained(model_id, use_fast=True)
pipe = pipeline(
    task="image-text-to-text",
    model=model_id,
    processor=processor,          # forces fast processor, no warning
    torch_dtype=dtype,
    device=device                 # works for CPU/"mps"/cuda in recent Transformers
)

OSError: google/gemma-3-4b-it is not a local folder and is not a valid model identifier listed on 'https://huggingface.co/models'
If this is a private repository, make sure to pass a token having permission to this repo either by logging in with `hf auth login` or by passing `token=<your_token>`

In [None]:
import pandas as pd

# ======================
# 1. Load Dataset
# ======================
df = pd.read_csv("test.csv")   # must have "review_text" column

# ======================
# 2. Rule-based Filters
# ======================
AD_PATTERNS = [
    r"(https?://\S+)",           # URLs
    r"www\.\S+",                 # www links
    r"\b\d{3}[-\s]?\d{3}[-\s]?\d{4}\b",  # phone numbers
    r"\b(code|coupon|discount|promo|referral)\b",
    r"@[A-Za-z0-9_]+",           # Instagram/Twitter handles
]

NO_VISIT_PATTERNS = [
    r"\bnever been\b",
    r"\bhaven['’]t visited\b",
    r"\bdidn['’]t go in\b",
    r"\bjust walked past\b",
    r"\bheard\b",
    r"\brumou?r\b",
]

IRRELEVANT_PATTERNS = [
    r"\bbitcoin\b",
    r"\bthe government\b",
    r"\bpolitics?\b",
]

In [None]:
import pandas as pd
from keywords_examples import spam_keywords, irrelevant_keywords, non_visitor_keywords
import re

# ======================
# 1. Load Dataset
# ======================
df = pd.read_csv("test.csv")   # must have "review_text" column

# ======================
# 2. Rule-based Filters
# ======================
# Convert keyword lists to regex patterns
AD_PATTERNS = [r"\b" + re.escape(keyword) + r"\b" for keyword in spam_keywords]

NO_VISIT_PATTERNS = [r"\b" + re.escape(keyword) + r"\b" for keyword in non_visitor_keywords]

IRRELEVANT_PATTERNS = [r"\b" + re.escape(keyword) + r"\b" for keyword in irrelevant_keywords]

# Add some additional regex patterns that are better as regex
AD_PATTERNS.extend([
    r"(https?://\S+)",           # URLs
    r"www\.\S+",                 # www links
    r"\b\d{3}[-\s]?\d{3}[-\s]?\d{4}\b",  # phone numbers
    r"@[A-Za-z0-9_]+",           # Instagram/Twitter handles
])

NO_VISIT_PATTERNS.extend[
    r"\bnever been\b",
    r"\bhaven['’]t visited\b",
    r"\bdidn['’]t go in\b",
    r"\bjust walked past\b",
    r"\bheard\b",
    r"\brumou?r\b",
]

IRRELEVANT_PATTERNS.extend[
    r"\bbitcoin\b",
    r"\bthe government\b",
    r"\bpolitics?\b",
]

In [None]:
from transformers import AutoTokenizer
import re
import json

tokenizer = AutoTokenizer.from_pretrained(model_id)

SYSTEM_PROMPT = (
    "You are a moderation system for business reviews.\n"
    "Classify the given review into one or more violation categories:\n"
    "- ads (advertisement or promotional content)\n"
    "- irrelevant (talks about unrelated topics)\n"
    "- no_visit_rant (complaints/rants without actual visit)\n"
    "- no_violation (valid review)\n\n"
    "Respond with ONLY a JSON object in this exact format:\n"
    '{"violation": ["category1", "category2"], "rationale": "one sentence explanation of why you chose these categories"}\n'
    'If no violations, use: {"violation": [], "rationale": "one sentence explanation of why this is a valid review"}\n'
)

def build_chat_prompt(review_text: str) -> str:
    messages = [
        {
            "role": "system",
            "content": [{"type": "text", "text": SYSTEM_PROMPT}]
        },
        {
            "role": "user",
            "content": [{"type": "text", "text": f"Review:\n{review_text}"}]
        }
    ]
    # Convert to a single generation string using Gemma’s chat template
    return tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True  # adds assistant preamble so model continues correctly
    )

_JSON_PATTERN = re.compile(r"\{.*?\}", flags=re.DOTALL)

def _extract_first_json(s: str):
    m = _JSON_PATTERN.search(s)
    if not m:
        return {}
    try:
        return json.loads(m.group(0))
    except json.JSONDecodeError:
        return {}


In [None]:

CATEGORIES = ["ads", "irrelevant", "no_visit_rant", "no_violation"]

def _bool_map_from_list(labels):
    """Turn a list like ['ads', 'irrelevant'] into a full bool map with exclusivity rule for no_violation."""
    flags = {k: False for k in CATEGORIES}
    for lab in labels:
        if lab in flags and lab != "no_violation":
            flags[lab] = True
    # no_violation is True only if none of the violation flags are True
    flags["no_violation"] = not (flags["ads"] or flags["irrelevant"] or flags["no_visit_rant"])
    return flags

def rule_based_check(text: str):
    text_l = (text or "").lower()
    labels = []
    rationale_bits = []

    if any(re.search(p, text_l) for p in AD_PATTERNS):
        labels.append("ads")
        rationale_bits.append("Matched advertisement pattern")
    if any(re.search(p, text_l) for p in NO_VISIT_PATTERNS):
        labels.append("no_visit_rant")
        rationale_bits.append("Matched no-visit rant pattern")
    if any(re.search(p, text_l) for p in IRRELEVANT_PATTERNS):
        labels.append("irrelevant")
        rationale_bits.append("Matched irrelevant pattern")

    if not labels:
        return None

    flags = _bool_map_from_list(labels)
    return {
        "violations": flags,
        "confidence": 0.95 if labels else 0.0,
        "rationale": "; ".join(rationale_bits) or "Rule-based matched"
    }

def llm_classify(text: str):
    prompt = build_chat_prompt(text)
    out = pipe(
        prompt,
        max_new_tokens=128,
        do_sample=False,
        return_full_text=False
    )
    raw = out[0]["generated_text"].strip()
    parsed = _extract_first_json(raw)

    if isinstance(parsed, dict) and isinstance(parsed.get("violation"), list):
        flags = _bool_map_from_list(parsed["violation"])
        rationale = parsed.get("rationale", "No rationale provided")
        return {
            "violations": flags,
            "confidence": 1.0,
            "rationale": rationale
        }
    else:
        # Fallback: treat as no_violation (nothing triggered)
        flags = _bool_map_from_list([])
        return {
            "violations": flags,
            "confidence": 0.0,
            "rationale": f"Unparseable model output: {raw[:120]}..."
        }

# ======================
# Full Pipeline
# ======================
outputs = []
for _, row in df.iterrows():
    review = row.get("text", "")
    rb = rule_based_check(review)
    result = rb if rb else llm_classify(review)
    outputs.append(result)

# Expand four boolean columns
df["is_ads"] = [o["violations"]["ads"] for o in outputs]
df["is_irrelevant"] = [o["violations"]["irrelevant"] for o in outputs]
df["is_no_visit_rant"] = [o["violations"]["no_visit_rant"] for o in outputs]
df["is_no_violation"] = [o["violations"]["no_violation"] for o in outputs]
# Add rationale column
df["rationale"] = [o["rationale"] for o in outputs]

# Print rationales to screen
print("=== LLM Rationales ===")
for i, output in enumerate(outputs):
    print(f"Review {i+1}: {output['rationale']}")

df.to_csv("reviews_with_policy_flags.csv", index=False)
print(df[["text", "is_ads", "is_irrelevant", "is_no_visit_rant", "is_no_violation"]].head(10))

=== LLM Rationales ===
Review 1: The review is a short, promotional statement implying a service is consistently performed well.
Review 2: This review is a positive testimonial about a company's services and staff, lacking any negative or irrelevant content.
Review 3: The mention of 'Contact for delivery!' suggests a promotional or advertising element.
Review 4: This is a positive review praising the business's customer service and doesn't contain any irrelevant or inappropriate content.
Review 5: The review describes a brief observation without actually visiting the establishment and expressing a complaint or positive experience.
                                                text  is_ads  is_irrelevant  \
0      Always done right from wood stove to screens!    True          False   
1  A great company to work with.  Their sales and...   False          False   
2  Great place to do business with staff was grea...    True          False   
3  Awesome Customer service, quick response, 