In [4]:
# ML system for classifying Google location reviews quality and relevance
# Labels:
# 0: Low Quality
# 1: High Quality
# 2: Fake
# 3: Irrelevant
# 4: Advertisement

import warnings
warnings.filterwarnings("ignore")

import os
import re
import math
import pandas as pd
import numpy as np
from typing import List, Optional, Tuple

from urllib.parse import urlparse

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# Optional deep models
try:
    from sentence_transformers import CrossEncoder
    CROSSENCODER_AVAILABLE = True
except Exception:
    CROSSENCODER_AVAILABLE = False

DATA_PATH = "dataset/final_df.csv"
OUTPUT_PATH = "dataset/predictions.csv"



ModuleNotFoundError: No module named 'pandas'

In [None]:
# Load dataset and basic preprocessing

df = pd.read_csv(DATA_PATH)

# Normalize expected columns
TEXT_COL = "Review Text"
PLACE_WEBSITE_COL = "website"
BUSINESS_NAME_COL = "Business Name"
ADDRESS_COL = "Address"
CATEGORY_COL = "Category"
LANG_COL = "lang_code" if "lang_code" in df.columns else None
REVIEW_LEN_COL = "review_length" if "review_length" in df.columns else None
EMPTY_COL = "empty_review" if "empty_review" in df.columns else None
EXISTING_TOPIC_REL_COL = "topic_relevance" if "topic_relevance" in df.columns else None

# Fill NaNs for text
df[TEXT_COL] = df[TEXT_COL].fillna("").astype(str)

# Convenience: basic lengths if not present
if REVIEW_LEN_COL is None:
    REVIEW_LEN_COL = "_auto_review_length"
    df[REVIEW_LEN_COL] = df[TEXT_COL].str.split().apply(len)

if EMPTY_COL is None:
    EMPTY_COL = "_auto_empty_review"
    df[EMPTY_COL] = df[TEXT_COL].str.strip().eq("").astype(int)

# Quick preview
df.head(3)


In [None]:
# Rule-based detectors: Low Quality and Advertisement

URL_REGEX = re.compile(r"https?://\S+|www\.\S+", re.IGNORECASE)
SHORT_GENERIC_PATTERNS = [
    r"^good$", r"^nice$", r"^ok$", r"^okay$", r"^bad$", r"^meh$",
    r"^great$", r"^awesome$", r"^terrible$", r"^worst$", r"^best$",
]
SHORT_GENERIC_RE = re.compile("|".join(SHORT_GENERIC_PATTERNS), re.IGNORECASE)

AD_KEYWORDS = [
    "promo", "promotion", "discount", "sale", "deal", "sponsored", "visit our website",
    "click here", "buy now", "limited time", "offer", "coupon", "voucher"
]
AD_RE = re.compile(r"|".join([re.escape(k) for k in AD_KEYWORDS]), re.IGNORECASE)


def extract_urls(text: str) -> List[str]:
    return URL_REGEX.findall(text or "")


def is_low_quality(text: str, review_len: int, empty_flag: int, min_words: int = 8) -> bool:
    if empty_flag == 1:
        return True
    if review_len < min_words:
        # very short or generic
        if SHORT_GENERIC_RE.search(text.strip()) is not None:
            return True
        # short and mostly punctuation/emojis
        alnum_ratio = (sum(ch.isalnum() for ch in text) / max(1, len(text)))
        if alnum_ratio < 0.2:
            return True
    return False


def url_domain(url: str) -> Optional[str]:
    try:
        return urlparse(url).netloc
    except Exception:
        return None


def normalize_domain(domain: Optional[str]) -> Optional[str]:
    if domain is None:
        return None
    domain = domain.lower()
    if domain.startswith("www."):
        domain = domain[4:]
    return domain


def is_advertisement(text: str, place_website: Optional[str]) -> bool:
    urls = extract_urls(text)
    if not urls and AD_RE.search(text or "") is None:
        return False
    if urls:
        place_domain = normalize_domain(url_domain(place_website)) if place_website else None
        for u in urls:
            d = normalize_domain(url_domain(u))
            if d and place_domain and d.endswith(place_domain):
                # URL points to same business domain → not ad
                return False
        # external links present
        return True
    # No links but strong ad wording
    return AD_RE.search(text or "") is not None


# Apply rule detectors
rule_low_quality = df.apply(lambda r: is_low_quality(r[TEXT_COL], int(r[REVIEW_LEN_COL]), int(r[EMPTY_COL])), axis=1)
rule_ad = df.apply(lambda r: is_advertisement(r[TEXT_COL], r.get(PLACE_WEBSITE_COL, None)), axis=1)

df["rule_low_quality"] = rule_low_quality.astype(int)
df["rule_advertisement"] = rule_ad.astype(int)

# Preview counts
df[["rule_low_quality", "rule_advertisement"]].mean()


In [None]:
# Relevance scoring: CrossEncoder (semantic relevance to location)

if CROSSENCODER_AVAILABLE:
    model_name = "cross-encoder/ms-marco-MiniLM-L-6-v2"
    cross_encoder = CrossEncoder(model_name)
else:
    cross_encoder = None


def build_relevance_queries(row: pd.Series) -> List[str]:
    fields = []
    for col in [BUSINESS_NAME_COL, ADDRESS_COL, CATEGORY_COL]:
        if col in row and pd.notnull(row[col]) and str(row[col]).strip():
            fields.append(str(row[col]))
    if not fields:
        return ["the reviewed location"]
    return ["; ".join(fields)]


def score_relevance_batch(texts: List[str], queries: List[str]) -> np.ndarray:
    if cross_encoder is None:
        # Fallback: use simple heuristic—character overlap ratio
        scores = []
        for t, q in zip(texts, queries):
            tset = set(t.lower().split())
            qset = set(q.lower().split())
            inter = len(tset & qset)
            denom = max(1, len(tset))
            scores.append(inter / denom)
        return np.array(scores, dtype=float)
    pairs = list(zip(queries, texts))
    return np.array(cross_encoder.predict(pairs), dtype=float)


# Compute relevance scores in mini-batches
BATCH = 256
relevance_scores = []
for i in range(0, len(df), BATCH):
    chunk = df.iloc[i:i+BATCH]
    texts = chunk[TEXT_COL].tolist()
    queries = [build_relevance_queries(r)[0] for _, r in chunk.iterrows()]
    relevance_scores.extend(score_relevance_batch(texts, queries))

df["semantic_relevance"] = np.array(relevance_scores)
df[["semantic_relevance"]].describe()


In [None]:
# Fake detection heuristic
# Signals (weak but helpful): mentions of not visiting, generic praise without specifics, contradictions

FAKE_PATTERNS = [
    r"didn't visit", r"did not visit", r"haven't been", r"never been", r"not been there",
    r"paid review", r"sponsored review", r"fake review", r"bot review", r"looks nice*", r"seems good",
]
FAKE_RE = re.compile("|".join(FAKE_PATTERNS), re.IGNORECASE)

GENERIC_PRAISE = re.compile(r"(great|nice|good|amazing|awesome) (place|spot|location|shop|store)\b", re.IGNORECASE)


def fake_score(text: str) -> float:
    if not text or not text.strip():
        return 0.0
    s = 0.0
    if FAKE_RE.search(text):
        s += 0.7
    if GENERIC_PRAISE.search(text):
        s += 0.2
    # too many exclamations can be suspicious
    if text.count("!") >= 3:
        s += 0.1
    return min(1.0, s)


df["fake_score"] = df[TEXT_COL].apply(fake_score)
df[["fake_score"]].describe()


In [None]:
# Final combiner: produce labels 0-4
# Priority: Advertisement (4) > Irrelevant (3) > Fake (2) > Low Quality (0) > High Quality (1)

# Thresholds
SEMANTIC_REL_THR = 0.25 if not CROSSENCODER_AVAILABLE else 0.35
FAKE_THR = 0.6


def assign_label(row: pd.Series) -> int:
    # 4. Advertisement
    if int(row.get("rule_advertisement", 0)) == 1:
        return 4
    # 3. Irrelevant (low semantic relevance)
    if float(row.get("semantic_relevance", 0.0)) < SEMANTIC_REL_THR:
        return 3
    # 2. Fake (heuristic)
    if float(row.get("fake_score", 0.0)) >= FAKE_THR:
        return 2
    # 0. Low quality (rule)
    if int(row.get("rule_low_quality", 0)) == 1:
        return 0
    # 1. High quality otherwise
    return 1


df["pred_label"] = df.apply(assign_label, axis=1)

# Attach label names for readability
ID2LABEL = {0: "Low Quality", 1: "High Quality", 2: "Fake", 3: "Irrelevant", 4: "Advertisement"}
df["pred_label_name"] = df["pred_label"].map(ID2LABEL)

df[["pred_label", "pred_label_name"]].value_counts(normalize=True)


In [None]:
# Save predictions

output_cols = [BUSINESS_NAME_COL, TEXT_COL, PLACE_WEBSITE_COL, ADDRESS_COL, CATEGORY_COL,
               "rule_low_quality", "rule_advertisement", "fake_score", "semantic_relevance",
               "pred_label", "pred_label_name"]

for c in list(output_cols):
    if c not in df.columns:
        output_cols.remove(c)

pred_df = df[output_cols].copy()
pred_df.to_csv(OUTPUT_PATH, index=False)

OUTPUT_PATH


NameError: name 'BUSINESS_NAME_COL' is not defined

In [None]:
# Robustness fixes: safe URL/text handling and improved ad detection

def safe_text(x) -> str:
    return x if isinstance(x, str) else ""


def is_nonempty_str(x) -> bool:
    return isinstance(x, str) and x.strip() != ""


def url_domain(url: Optional[str]) -> Optional[str]:
    if not is_nonempty_str(url):
        return None
    try:
        return urlparse(url).netloc
    except Exception:
        return None


def normalize_domain(domain: Optional[str]) -> Optional[str]:
    if not is_nonempty_str(domain):
        return None
    d = domain.lower()
    if d.startswith("www."):
        d = d[4:]
    return d


def extract_urls(text: str) -> List[str]:
    return URL_REGEX.findall(safe_text(text))


def is_advertisement(text: str, place_website: Optional[str]) -> bool:
    urls = extract_urls(text)
    has_ad_words = AD_RE.search(safe_text(text)) is not None

    if urls:
        place_domain = normalize_domain(url_domain(place_website))
        link_domains = [normalize_domain(url_domain(u)) for u in urls]
        # External if any link domain does not end with the place domain
        external_found = False
        for d in link_domains:
            if d is None:
                continue
            if place_domain is None or not d.endswith(place_domain):
                external_found = True
                break
        if external_found:
            return True
        # Only in-domain links → not an ad by rule
        return False

    # No links: treat strong promotional wording as ad
    return has_ad_words


# Recompute with robust ad detection and keep low_quality as before
df["rule_advertisement"] = df.apply(lambda r: is_advertisement(r[TEXT_COL], r.get(PLACE_WEBSITE_COL, None)), axis=1).astype(int)


In [None]:
# Recompute predictions with updated rules

df["pred_label"] = df.apply(assign_label, axis=1)
df["pred_label_name"] = df["pred_label"].map(ID2LABEL)

pred_df = df[[c for c in [BUSINESS_NAME_COL, TEXT_COL, PLACE_WEBSITE_COL, ADDRESS_COL, CATEGORY_COL,
               "rule_low_quality", "rule_advertisement", "fake_score", "semantic_relevance",
               "pred_label", "pred_label_name"] if c in df.columns]].copy()

pred_df.to_csv(OUTPUT_PATH, index=False)
OUTPUT_PATH
