In [None]:
# Databricks notebook source
# MAGIC %md # Distributed ABSA & Product Summary
# MAGIC Widgets: product_id, reviews_blob_path, raw_container, raw_results_prefix, summary_prefix

# COMMAND ----------
dbutils.widgets.text("product_id","")
dbutils.widgets.text("reviews_blob_path","")
dbutils.widgets.text("raw_container","")
dbutils.widgets.text("raw_results_prefix","")
dbutils.widgets.text("summary_prefix","")

product_id          = dbutils.widgets.get("product_id")
reviews_blob_path   = dbutils.widgets.get("reviews_blob_path")
RAW_CONTAINER       = dbutils.widgets.get("raw_container")
RAW_RESULTS_PREFIX  = dbutils.widgets.get("raw_results_prefix")
SUMMARY_PREFIX      = dbutils.widgets.get("summary_prefix")



In [None]:
# COMMAND ----------
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
import mlflow
import json, re
from gensim.summarization import summarize
import sqlite3

spark = SparkSession.builder \
    .appName("ABSA_Product_Summary") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.instances", "10") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .getOrCreate()


In [None]:

# COMMAND ----------
# 1. Load raw reviews JSONL from ADLS Gen2 via ABFSS
storage_account = "reviewsenseproject"  # replace with your account
raw_path = f"abfss://{RAW_CONTAINER}@{storage_account}.dfs.core.windows.net/{reviews_blob_path}"
df_raw = spark.read.json(raw_path).selectExpr("review_id", "text as review_text")

In [None]:

# COMMAND ----------
# 2. Load models from MLflow (Azure ML Registry via MLflow Tracking URI)
aspect_model    = mlflow.pytorch.load_model("models:/bert-aspect-extraction/Production")
sent_model      = mlflow.pytorch.load_model("models:/bert-sentiment-classification/Production")
scoring_pyfunc  = mlflow.pyfunc.load_model("models:/gpt2-scoring-justification/Production")

from transformers import BertTokenizer, GPT2Tokenizer
# Tokenizer paths assumed packaged in model artifacts
aspect_tok    = BertTokenizer.from_pretrained("/dbfs/models/bert-aspect-extraction/tokenizer")
sent_tok      = BertTokenizer.from_pretrained("/dbfs/models/bert-sentiment-classification/tokenizer")
gpt2_tok      = GPT2Tokenizer.from_pretrained("/dbfs/models/gpt2-scoring-justification/tokenizer")

# Broadcast
bc_aspect   = spark.sparkContext.broadcast((aspect_model, aspect_tok))
bc_sent     = spark.sparkContext.broadcast((sent_model,   sent_tok))
bc_score    = spark.sparkContext.broadcast((scoring_pyfunc, gpt2_tok))


In [None]:
# COMMAND ----------
# 3. UDF: Per-review ABSA
def split_sentences(text):
    return re.split(r'(?<=[.!?]) +', text)

ASPECTS = [
    "Product Quality","Content/Performance","User Experience","Value for Money",
    "Customer Service","Aesthetics/Design","Functionality/Features",
    "Ease of Use/Accessibility","Durability/Longevity","Shipping and Packaging"
]

def analyze_review(review_text):
    a_model, a_tok = bc_aspect.value
    s_model, s_tok = bc_sent.value
    score_model, gpt2_tok = bc_score.value

    # Aspect Extraction (BERT TokenClassification)
    tokens = a_tok.tokenize(review_text)
    ids    = a_tok.encode(review_text, return_tensors="pt")
    mask   = ids.ne(a_tok.pad_token_id).long()
    outs   = a_model(ids, attention_mask=mask)
    preds  = outs.logits.argmax(dim=2).squeeze().tolist()
    tags   = [a_model.config.id2label[p] for p in preds]

    extracted = []
    cur = ""
    for tok, tag in zip(tokens, tags):
        if tag.startswith("B-"):
            if cur: extracted.append(cur.strip())
            cur = tok.replace("##","")
        elif tag.startswith("I-"):
            cur += " "+tok.replace("##","")
        else:
            if cur: extracted.append(cur.strip()); cur=""
    if cur: extracted.append(cur.strip())

    results = []
    for asp in ASPECTS:
        if asp in extracted:
            sents = [s for s in split_sentences(review_text) if asp.lower() in s.lower()]
            scores, justs = [], []
            for s in sents:
                # Sentiment Classification
                inp = f"[CLS] {s_tok.sep_token} {asp} {s_tok.sep_token} {s} [SEP]"
                iids = s_tok.encode(inp, return_tensors="pt")
                msk  = iids.ne(s_tok.pad_token_id).long()
                out  = s_model(iids, attention_mask=msk)
                lbl  = out.logits.argmax(dim=1).item()
                sentiment = s_model.config.id2label[lbl]
                # Scoring & Justification via pyfunc
                payload = json.dumps({"aspect":asp,"sentiment":sentiment,"sentence":s})
                sj = score_model.predict(payload)
                scores.append(float(sj["score"]))
                justs.append(sj["justification"])
            avg = sum(scores)/len(scores)
            overall = "Positive" if avg>=20 else "Negative" if avg<=-20 else "Mixed"
            results.append((review_text, asp, sentiment, avg, justs))
        else:
            results.append((review_text, asp, "Neutral", 0.0, []))
    return results

schema = ArrayType(
    StructType([
        StructField("review_text", StringType()),
        StructField("aspect", StringType()),
        StructField("sentiment", StringType()),
        StructField("score", FloatType()),
        StructField("justifications", ArrayType(StringType()))
    ])
)

analyze_udf = F.udf(analyze_review, schema)

In [None]:
# COMMAND ----------
# 4. Apply UDF in parallel across all reviews
df_absa = df_raw.withColumn("analysis", analyze_udf(F.col("review_text")))




In [None]:

# 5. Explode per-review results & write raw results to Blob
df_flat = df_absa.select(
    "review_id",
    F.posexplode("analysis").alias("pos","item")
).select(
    "review_id",
    F.col("item.aspect").alias("aspect"),
    F.col("item.sentiment").alias("sentiment"),
    F.col("item.score").alias("score"),
    F.col("item.justifications").alias("justifications")
)

raw_out_path = f"abfss://{RAW_RESULTS_PREFIX}@{storage_account}.dfs.core.windows.net/{product_id}/"
df_flat.write.mode("overwrite").parquet(raw_out_path)

# COMMAND ----------
# 6. Aggregate to product‑level
df_grouped = df_flat.groupBy("aspect").agg(
    F.avg("score").alias("avg_score"),
    F.count("*").alias("mention_count"),
    F.sum(F.expr("CASE WHEN sentiment='Positive' THEN 1 ELSE 0 END")).alias("cnt_pos"),
    F.sum(F.expr("CASE WHEN sentiment='Mixed'    THEN 1 ELSE 0 END")).alias("cnt_mixed"),
    F.sum(F.expr("CASE WHEN sentiment='Negative' THEN 1 ELSE 0 END")).alias("cnt_neg"),
    F.collect_list("justifications").alias("all_justs")
).withColumn(
    "pct_positive", 100*F.col("cnt_pos")/F.col("mention_count")
).withColumn(
    "pct_mixed",    100*F.col("cnt_mixed")/F.col("mention_count")
).withColumn(
    "pct_negative", 100*F.col("cnt_neg")/F.col("mention_count")
)


In [None]:

# 7. Extractive summarization of justifications
def extractive_summary(just_lists):
    sents = [s for sub in just_lists for s in sub]
    doc   = "\n".join(sents)
    if len(sents)<3:
        return doc
    try:
        return summarize(doc, word_count=60)
    except:
        return doc[:300]

summ_udf = F.udf(extractive_summary, StringType())

df_summary = df_grouped.withColumn(
    "product_justification", summ_udf(F.col("all_justs"))
).withColumn(
    "overall_sentiment",
    F.when(F.col("avg_score")>=20, "Positive")
     .when(F.col("avg_score")<=-20, "Negative")
     .otherwise("Mixed")
).select(
    F.lit(product_id).alias("product_id"),
    "aspect","avg_score","mention_count",
    "pct_positive","pct_mixed","pct_negative",
    "overall_sentiment","product_justification"
)

In [None]:
# 8. Write summary JSONL to Blob
summary_out = f"abfss://{SUMMARY_PREFIX}@{storage_account}.dfs.core.windows.net/{product_id}/summary.jsonl"
df_summary.coalesce(1).write.mode("overwrite").json(summary_out)


In [None]:
# 9. Store summary in local SQLite on driver
conn = sqlite3.connect("/dbfs/tmp/absa_summary.db")
df_pd = df_summary.toPandas()
df_pd.to_sql("product_summary", conn, if_exists="replace", index=False)
conn.close()

# COMMAND ----------
print(f"Raw per‑review results at: {raw_out_path}")
print(f"Product summary JSON at: {summary_out}")
print("SQLite DB stored at /dbfs/tmp/absa_summary.db")