# ETL: Metrics Calculation and Issue idenitification

This notebook uses SQL and Pyspark to identify issues based on extracted reivew aspects from [1_Review_Apspect_Extraction_Agent](/Workspace/Users/cindy.wu@databricks.com/voc_industry_demo/1_Review_Apspect_Extraction_Agent).

Data Flow: 
raw reviews -> **review aspect extractions -> location aspect daily -> flag all issues**-> issue diagnosis and recommendations

In [0]:
%sql
SELECT
 *
FROM lakehouse_inn_catalog.voc.review_extractions
LIMIT 200;

In [0]:
RAW_ASPECTS   = "lakehouse_inn_catalog.voc.review_aspect_details"   # review_id, aspect, sentiment, evidence, opinion_terms
RAW_REVIEWS   = "lakehouse_inn_catalog.voc.raw_reviews"              # review_id, location, review_date, review_text
ASPECTS_DAILY = "lakehouse_inn_catalog.voc.loc_aspect_daily"         # location, aspect, date, neg_share_7d, volume_7d, delta_pp
ISSUES_DAILY      = "lakehouse_inn_catalog.voc.issues_daily"
RAW_EXTRACTS = "lakehouse_inn_catalog.voc.review_extractions"

In [0]:
current_date_param = "2025-10-15"

### Explode extraction by aspects

`review_aspect_details` Table Schema

Each row contains one aspect
| **Column** | **Type** | **Description** |
|-------------|-----------|-----------------|
| `review_id` | `STRING` | Unique review identifier. |
| `aspect` | `STRING` | Aspect extracted from the review (e.g., `cleanliness`, `staff_friendliness`). |
| `sentiment` | `STRING` | Sentiment toward the aspect (`very_negative`, `negative`, `neutral`, `positive`, `very_positive`). |
| `evidence` | `ARRAY<STRING>` | Text snippets supporting the sentiment classification. |
| `opinion_terms` | `ARRAY<STRING>` | Opinion words or short phrases describing the aspect (e.g., "dirty", "friendly"). |
| `location` | `STRING` | City or region where the hotel is located. |
| `latitude` | `DECIMAL(6,4)` | Latitude coordinate of the hotel. |
| `longitude` | `DECIMAL(7,4)` | Longitude coordinate of the hotel. |
| `channel` | `STRING` | Source channel where the review was posted (e.g., Google, TripAdvisor). |
| `star_rating` | `LONG` | Numeric star rating provided by the customer. |
| `review_date` | `DATE` | Date when the review was posted. |
| `review_text` | `STRING` | Full text of the customer review. |

In [0]:

from pyspark.sql.functions import from_json, col, explode
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

# Read the table
# df = spark.table(RAW_EXTRACTS)
df = spark.table(RAW_EXTRACTS)

# Define schema for the response column
aspect_term_schema = StructType([
    StructField("aspect", StringType()),
    StructField("evidence", ArrayType(StringType())),
    StructField("opinion_terms", ArrayType(StringType())),
    StructField("sentiment", StringType())
])

response_schema = StructType([
    StructField("aspect_terms", ArrayType(aspect_term_schema))
])


# Parse the response column
parsed_df = df.withColumn(
    "js",
    from_json(col("response").cast("string"), response_schema)
)

# Explode aspect_terms
exploded_df = parsed_df.select(
    "review_id",
    "review_text",
    explode(col("js.aspect_terms")).alias("a")
).select(
    "review_id",
    "review_text",
    col("a.aspect").alias("aspect"),
    col("a.sentiment").alias("sentiment"),
    col("a.evidence").alias("evidence"),
    col("a.opinion_terms").alias("opinion_terms")
)


In [0]:
display(exploded_df)

In [0]:
raw_reviews_df = spark.table(RAW_REVIEWS)

exploded_df.count(), raw_reviews_df.count()
display(raw_reviews_df)

In [0]:
raw_reviews_df.toPandas()[["review_id",'review_text']].drop_duplicates()
d = exploded_df.join(raw_reviews_df[['location',
 'latitude',
 'longitude',
 'review_id',
 'channel',
 'star_rating',
 'review_date',
 'review_text']], on=["review_id", "review_text"], how="left").toPandas()
d

In [0]:
# Load the raw reviews table
raw_reviews_df = spark.table(RAW_REVIEWS)

# Merge the exploded aspects with raw reviews on the review identifier
merged_df = exploded_df.join(raw_reviews_df[['location',
 'latitude',
 'longitude',
 'review_id',
 'channel',
 'star_rating',
 'review_date',
 'review_text']], on=["review_id", "review_text"], how="left")

# Show the merged result
display(merged_df)

In [0]:
merged_df.count()

In [0]:
merged_df.write.mode("overwrite").saveAsTable(RAW_ASPECTS)

## Daily Aggregated Issues by location_aspect

- `lakehouse_inn_catalog.voc.loc_aspect_daily`
| Column                  | Description                                                  |
|--------------------------|--------------------------------------------------------------|
| `location`               | City or hotel region                                         |
| `aspect`                 | Review aspect (e.g., cleanliness, wifi_connectivity)         |
| `date`                   | Review date                                                  |
| `total_mentions`         | Total mentions that day                                      |
| `neg_mentions`           | Count of negative + very_negative mentions                   |
| `pos_mentions`           | Count of positive + very_positive mentions                   |
| `neu_mentions`           | Count of neutral mentions                                    |
| `neg_share`              | Daily negative share (`neg_mentions / total_mentions`)       |
| `baseline_neg_share_21d` | Average negative share of past 21 days                       |
| `neg_share_7d`           | 7-day average including current day                          |
| `volume_7d`              | Total mentions over last 7 days                              |
| `delta_pp`               | Change in negative share (percentage points) vs 21-day baseline |

In [0]:
from pyspark.sql import functions as F, Window as W

daily = (
    merged_df.withColumn("date", F.col("review_date").cast("date"))
    .groupBy("location", "aspect", "date")
    .agg(
        F.count("*").alias("total_mentions"),
        F.sum(F.when(F.col("sentiment").isin("negative", "very_negative"), 1).otherwise(0)).alias("neg_mentions"),
        F.sum(F.when(F.col("sentiment").isin("positive", "very_positive"), 1).otherwise(0)).alias("pos_mentions"),
        F.sum(F.when(F.col("sentiment") == "neutral", 1).otherwise(0)).alias("neu_mentions")
    )
    .withColumn(
        "neg_share",
        F.when(F.col("total_mentions") > 0,
               F.col("neg_mentions").cast("double") / F.col("total_mentions").cast("double"))
         .otherwise(F.lit(None).cast("double"))
    )
)

# 4️⃣ Rolling window definitions
w_prev21 = W.partitionBy("location", "aspect").orderBy("date").rowsBetween(-21, -1)  # baseline (exclude current)
w_last7  = W.partitionBy("location", "aspect").orderBy("date").rowsBetween(-6, 0)   # short-term (include current)

# 5️⃣ Compute rolling metrics
final = (
    daily
    .withColumn("sum_neg_prev21", F.sum("neg_mentions").over(w_prev21))
    .withColumn("sum_tot_prev21", F.sum("total_mentions").over(w_prev21))
    .withColumn(
        "baseline_neg_share_21d",
        F.when(F.col("sum_tot_prev21") > 0,
               F.col("sum_neg_prev21").cast("double") / F.col("sum_tot_prev21").cast("double"))
         .otherwise(F.lit(None).cast("double"))
    )
    .withColumn("sum_neg_last7", F.sum("neg_mentions").over(w_last7))
    .withColumn("sum_tot_last7", F.sum("total_mentions").over(w_last7))
    .withColumn(
        "neg_share_7d",
        F.when(F.col("sum_tot_last7") > 0,
               F.col("sum_neg_last7").cast("double") / F.col("sum_tot_last7").cast("double"))
         .otherwise(F.lit(None).cast("double"))
    )
    .withColumn("volume_7d", F.col("sum_tot_last7").cast("double"))
    .withColumn("date_dt", F.col("date"))
    .withColumn(
        "delta_pp",
        F.when(
            F.col("baseline_neg_share_21d").isNotNull() & F.col("neg_share_7d").isNotNull(),
            (F.col("neg_share_7d") - F.col("baseline_neg_share_21d")) * F.lit(100.0)
        ).otherwise(F.lit(None).cast("double"))
    )
    .select(
        "location",
        "aspect",
        "date",
        "total_mentions",
        "neg_mentions",
        "pos_mentions",
        "neu_mentions",
        "neg_share",
        "date_dt",
        "baseline_neg_share_21d",
        "neg_share_7d",
        "volume_7d",
        "delta_pp",
    )
)

In [0]:
final.write.mode("overwrite").saveAsTable(ASPECTS_DAILY)
display(spark.table(ASPECTS_DAILY).orderBy("location", "aspect", "date").limit(50))

## Flag Issues: issues_daily
(do we need all reviews or negative reviews in here)
### issues_daily Table Schema

| **Column** | **Type** | **Description** |
|-------------|-----------|-----------------|
| `issue_id` | `STRING` | Unique issue identifier (e.g., `RUN02-0001`). |
| `opened_at` | `DATE` | Date when the issue was detected (based on daily metrics). |
| `location` | `STRING` | Hotel city or region where the issue occurred. |
| `aspect` | `STRING` | Review aspect associated with the issue (e.g., `cleanliness`, `wifi_connectivity`). |
| `severity` | `STRING` | Issue severity level (`Critical`, `Warning`) determined from thresholds on sentiment trends. |
| `status` | `STRING` | Issue status (`Open`, `Closed`); default is `Open`. |
| `open_reason` | `STRING` | Explanation of why the issue was opened (e.g., “Neg share 78% (+16.2pp vs 21d)”). |
| `delta_pp_open` | `DOUBLE` | Change in negative share (percentage points) vs 21-day baseline. |
| `nms_open` | `DOUBLE` | 7-day average negative share (normalized metric). |
| `volume_open` | `BIGINT` | 7-day total number of mentions contributing to this issue. |
| `relevant_reviews` | `ARRAY<STRUCT>` | Up to 5 most relevant reviews linked to the issue (negative-first, recent-first). Each struct contains: |
| → `review_id` | `STRING` | Unique review identifier. |
| → `sentiment` | `STRING` | Sentiment for this aspect (`very_negative` → `very_positive`). |
| → `evidence` | `ARRAY<STRING>` | Extracted snippets supporting the sentiment classification. |
| → `opinion_terms` | `ARRAY<STRING>` | Opinion keywords or phrases about the aspect (e.g., “dirty”, “outdated”). |
| → `review_date` | `DATE` | Date of the review. |
| → `review_text` | `STRING` | Full text of the customer review. |

---

####  Threshold Logic
- **Critical:** `neg_share_7d ≥ 0.85` or `delta_pp ≥ 30.0`
- **Warning:** `neg_share_7d ≥ 0.70` or `delta_pp ≥ 20.0`
- **Minimum volume:** only triggered if `volume_7d ≥ 10`

####  Threshold Logic (10/22) 
`delta_pp`Change in negative share (percentage points) vs 21-day baseline. 
- **Critical:** `neg_share_7d ≥ 0.85` or `delta_pp ≥ 15.0`
- **Warning:** `neg_share_7d ≥ 0.70` or `delta_pp ≥ 10.0`
- **Minimum volume:** only triggered if `volume_7d ≥ 10`

#### Granularity
- One row per `(location, aspect, date)` combination that meets or exceeds alert thresholds.  
- Reviews are limited to the past **7 days** for the same `(location, aspect)`.

In [0]:
from pyspark.sql import functions as F, Window as W

# ------------------------------------------------------
# Assumes ASPECTS_DAILY is already defined in your env.
# ------------------------------------------------------

# 1) Load daily metrics and classify severity
df = spark.table(ASPECTS_DAILY)

THRESHOLDS = {
    "critical_neg_share": 0.85,  # >75% negative sentiment
    "critical_delta_pp": 15.0,   # >15pp spike in negativity
    "warning_neg_share": 0.70,   # >60% negative sentiment
    "warning_delta_pp": 10.0,     # >8pp increase
    "min_volume": 10             # only consider if volume >= 10 mentions
}

df_issues = (
    df.filter(F.col("volume_7d") >= THRESHOLDS["min_volume"])
      .withColumn(
          "severity",
          F.when(
              (F.col("neg_share_7d") >= THRESHOLDS["critical_neg_share"]) |
              (F.col("delta_pp") >= THRESHOLDS["critical_delta_pp"]),
              F.lit("Critical")
          )
          .when(
              (F.col("neg_share_7d") >= THRESHOLDS["warning_neg_share"]) |
              (F.col("delta_pp") >= THRESHOLDS["warning_delta_pp"]),
              F.lit("Warning")
          )
          .otherwise(F.lit(None))
      )
      .filter(F.col("severity").isNotNull())
)


df_final = (
    df_issues
    .withColumn(
        "issue_id",
        F.concat(
            F.lit("RUN-"),
            F.date_format(F.lit(current_date_param), "yyyyMMdd"),
            F.lit("-"),
            F.lpad((F.monotonically_increasing_id() % 1_000_000).cast("string"), 6, "0")
        )
    )
    .withColumn("opened_at", F.col("date"))
    .withColumn(
        "status",
        F.when(
            F.col("date") < F.add_months(F.lit(current_date_param), -1),
            F.lit("Closed")
        ).otherwise(F.lit("Open"))
    )
    .withColumn(
        "open_reason",
        F.concat(
            F.lit("Neg share "),
            F.format_number(F.col("neg_share_7d") * 100, 1),
            F.lit("% (+"),
            F.format_number(F.col("delta_pp"), 1),
            F.lit("pp vs 21d)")
        )
    )
    .select(
        "issue_id",
        "opened_at",
        "location",
        "aspect",
        "severity",
        "status",
        "open_reason",
        F.round("delta_pp", 1).alias("delta_pp_open"),
        F.round("neg_share_7d", 3).alias("nms_open"),
        F.round("volume_7d", 0).cast("bigint").alias("volume_open"),
    )
)
# 2) Reviews: use review_aspect_details (already has all fields)
RAW_ASPECTS = "lakehouse_inn_catalog.voc.review_aspect_details"

review_aspects = spark.table(RAW_ASPECTS).select(
    "review_id",
    "aspect",
    "sentiment",
    "evidence",
    "opinion_terms",
    "location",
    "review_date",
    "review_text",
    "channel",
    "star_rating"
)

# 3) Join issues with reviews (matching location, aspect, date range)
joined = (
    df_final.alias("i")
    .join(
        review_aspects.alias("x"),
        on=[F.col("i.location") == F.col("x.location"),
            F.col("i.aspect")   == F.col("x.aspect")],
        how="left"
    )
    .where(
        (F.col("x.review_date") >= F.date_sub(F.col("i.opened_at"), 6)) &
        (F.col("x.review_date") <= F.col("i.opened_at"))
    )
    .select(
        F.col("i.issue_id").alias("issue_id"),
        F.col("i.opened_at").alias("opened_at"),
        F.col("i.location").alias("location"),
        F.col("i.aspect").alias("aspect"),
        F.col("i.severity").alias("severity"),
        F.col("i.status").alias("status"),
        F.col("i.open_reason").alias("open_reason"),
        F.col("i.delta_pp_open").alias("delta_pp_open"),
        F.col("i.nms_open").alias("nms_open"),
        F.col("i.volume_open").alias("volume_open"),
        F.col("x.review_id").alias("review_id"),
        F.col("x.sentiment").alias("review_sentiment"),
        F.col("x.evidence").alias("evidence"),
        F.col("x.opinion_terms").alias("opinion_terms"),
        F.col("x.review_date").alias("review_date"),
        F.col("x.review_text").alias("review_text"),
        F.col("x.channel").alias("channel"),
        F.col("x.star_rating").alias("star_rating"),
    )
    # # Deduplicate: one review per issue
    # .dropDuplicates(["issue_id", "review_id"])
)

# ======================================================
# Prioritize very_negative > negative > neutral > positive > very_positive, then most recent
# ======================================================
priority_map = F.create_map(
    F.lit("very_negative"), F.lit(5),
    F.lit("negative"),      F.lit(4),
    F.lit("neutral"),       F.lit(3),
    F.lit("positive"),      F.lit(2),
    F.lit("very_positive"), F.lit(1)
)

ranked = joined.withColumn("priority", priority_map[F.col("review_sentiment")])

w = (
    W.partitionBy("issue_id")
     .orderBy(
         F.col("priority").desc_nulls_last(),
         F.col("review_date").desc_nulls_last(),
         F.col("review_id")
     )
)

# ======================================================
# Collect top 5 reviews per issue
# ======================================================
# top_reviews = (
#     ranked
#     .withColumn("rn", F.row_number().over(w))
#     .filter(F.col("rn") <= 100)
#     .groupBy(
#         "issue_id","opened_at","location","aspect",
#         "severity","status","open_reason",
#         "delta_pp_open","nms_open","volume_open"
#     )
#     .agg(
#         F.collect_list(
#             F.struct(
#                 F.col("review_id"),
#                 F.col("review_sentiment").alias("sentiment"),
#                 F.col("evidence"),
#                 F.col("opinion_terms"),
#                 F.col("review_date"),
#                 F.col("review_text"),
#                 F.col("channel"),
#                 F.col("star_rating")
#             )
#         ).alias("relevant_reviews")
#     )
# )
top_reviews = (
    joined
    .groupBy(
        "issue_id","opened_at","location","aspect",
        "severity","status","open_reason",
        "delta_pp_open","nms_open","volume_open"
    )
    .agg(
        F.collect_list(
            F.struct(
                F.col("review_id"),
                F.col("review_sentiment").alias("sentiment"),
                F.col("evidence"),
                F.col("opinion_terms"),
                F.col("review_date"),
                F.col("review_text"),
                F.col("channel"),
                F.col("star_rating")
            )
        ).alias("relevant_reviews")
    )
)
# ======================================================
# Left join back to issues table (keep empty arrays)
# ======================================================
final_issues = (
    df_final.alias("i")
    .join(
        top_reviews.alias("t"),
        on=["issue_id","opened_at","location","aspect","severity","status",
            "open_reason","delta_pp_open","nms_open","volume_open"],
        how="left"
    )
    .withColumn(
        "relevant_reviews",
        F.coalesce(
            F.col("relevant_reviews"),
            F.array().cast(
                "array<struct<review_id:string,sentiment:string,evidence:array<string>,opinion_terms:array<string>,review_date:date,review_text:string,channel:string,star_rating:bigint>>"
            )
        )
    )
)
display(final_issues)

In [0]:
final_issues.count()

mark some issues as closed (only keepiung the latest aspect-location)

In [0]:

# Window to get the latest opened_at per (aspect, location)
w = W.partitionBy("aspect", "location").orderBy(F.col("opened_at").desc())

# Add row number to identify the latest issue per group
issues_ranked = final_issues.withColumn("rn", F.row_number().over(w))

# Mark only the latest as Open, others as Closed
issues_final = (
    issues_ranked.withColumn(
        "status",
        F.when(F.col("rn") == 1, F.lit("Open")).otherwise(F.lit("Closed"))
    )
    .drop("rn")
)
issues_final.filter(F.col("status") == "Open").count()
# display(issues_final.filter(F.col("status") == "Open"))

In [0]:
from pyspark.sql import functions as F

issues_with_text = issues_final.withColumn(
    "relevant_reviews_text",
    F.expr("""
        concat(
            'Aspect: ', aspect, '\n\n',
            'Relevant Review Extracts: ', '\n',
            concat_ws(
                '\n\n',
                transform(
                    relevant_reviews,
                    x -> concat(
                        'review_id:', x.review_id, 
                        '; sentiment:', x.sentiment, 
                        '; evidence:', array_join(x.evidence, '|'), 
                        '; opinion_terms:', array_join(x.opinion_terms, '|'), 
                        '; review_date:', cast(x.review_date as string), 
                        '; channel:', x.channel, 
                        '; star_rating:', cast(x.star_rating as string)
                    )
                )
            )
        )
    """)
)

display(issues_with_text)

In [0]:
issues_with_text.count()

In [0]:
issues_with_text.write.mode("overwrite").saveAsTable(ISSUES_DAILY)

In [0]:
from pyspark.sql import functions as F

# Example: filter the final issues dataframe for Boston, MA
boston_issues = issues_with_text.filter(F.col("location") == "Austin, TX")

display(boston_issues)

In [0]:
from pyspark.sql import functions as F

duplicate_count = (
    spark.table(ISSUES_DAILY).groupBy(
        "aspect",
        "location",
        "opened_at"
    )
    .count()
    .filter(
        F.col("count") > 1
    )
    .agg(
        F.sum("count").alias("duplicate_rows")
    )
    .collect()[0]["duplicate_rows"]
)

duplicate_count = duplicate_count if duplicate_count is not None else 0
duplicate_count

In [0]:
ISSUES_DAILY = "lakehouse_inn_catalog.voc.issues_daily"
row_count = spark.table(ISSUES_DAILY).count()
row_count

## Next Step: Diagnosis and Recommendations for each issue
-> Agent Bricks

## Scratch

In [0]:
from pyspark.sql import functions as F

# Map sentiment to numerical severity weight
sentiment_weights = {
    "very_positive": -2,
    "positive": -1,
    "neutral": 0,
    "negative": 1,
    "very_negative": 2
}

sentiment_expr = F.create_map([F.lit(x) for kv in sentiment_weights.items() for x in kv])

# Assign numeric weight per aspect
df_weighted = merged_df.withColumn("severity_weight", sentiment_expr[F.col("sentiment")])

# Focus only on negative aspects
issues_df = df_weighted.filter(F.col("severity_weight") > 0)

# Compute issue frequency and severity per aspect
issue_summary = (
    issues_df.groupBy("location", "aspect")
    .agg(
        F.countDistinct("review_id").alias("issue_count"),
        F.avg("severity_weight").alias("avg_severity"),
    )
    .withColumn("severity_score", F.col("issue_count") * F.col("avg_severity"))
    .orderBy(F.desc("severity_score"))
)

display(issue_summary)

In [0]:

# Write to table
exploded_df.write.mode("overwrite").saveAsTable("lakehouse_inn_catalog.voc.issue_summary")