This notebook is available at https://github.com/databricks-industry-solutions/review-summarisation. For more information about this solution accelerator, check out our [website](https://www.databricks.com/solutions/accelerators/large-language-models-retail) and [blog post](https://www.databricks.com/blog/automated-analysis-product-reviews-using-large-language-models-llms).

# Presentation

We have used our model and our summarisation task is complete. As for the short and last step, the only thing that is left to be done is to turn our dataframe into an easily presentable format.

What we want to aim for is a dataframe that has a row per book per week. In each row, we want to have some metadata information such as book name, author, etc.. as well as avg. rating for the week, positive summaries and negative summaries. This can greatly help if we need to build a dashboard.

----

**Setup Used:**

- Runtime: 13.2 ML
- Cluster:
  - Machine: 16 CPU + 64 GB RAM (For Driver & Worker)
  - 8 Workers

#### Data Defaults
Specify catalog and schema.

In [0]:
# Imports
from config import CATALOG_NAME, SCHEMA_NAME, USE_UC

# If UC is enabled
if USE_UC:
    _ = spark.sql(f"USE CATALOG {CATALOG_NAME};")

# Sets the standard database to be used in this notebook
_ = spark.sql(f"USE SCHEMA {SCHEMA_NAME};")

#### Read Data
Read the summarised and condensed dataframe.

In [0]:
# 4x total core count
spark.conf.set("spark.sql.shuffle.partitions", 512)

# Read the table
reviews_df = spark.read.table("book_reviews_condensed")

### Build Meta DF
This dataframe will have the per week per book information.

In [0]:
# Imports
from pyspark.sql import functions as SF

# Build meta reviews df
meta_reviews_df = (
    reviews_df
    .withColumn(
        "weighted_star_rating", 
        SF.col("n_reviews") * SF.col("avg_star_rating")
    )
    .groupBy("asin", "title", "author", "week_start")
    .agg(
        SF.sum("n_reviews").alias("n_reviews"),
        SF.sum("n_review_tokens").alias("n_review_tokens"),
        SF.sum("weighted_star_rating").alias("weighted_star_rating"),
    )
    .withColumn(
        "avg_star_rating", 
        SF.round(SF.col("weighted_star_rating") / SF.col("n_reviews"), 2),
    )
    .drop("weighted_star_rating")
    .orderBy("asin", "title", "author", "week_start")
)

### Build Summary Reviews
This dataframe will have positive and negative reviews placed in the same row rather than having separate rows for each. We will use a pivot function for this.

In [0]:
# Imports
from pyspark.sql import functions as SF

# Build meta reviews df
summary_reviews_df = (
    reviews_df.groupBy("asin", "title", "author", "week_start")
    .pivot("star_rating_class")
    .agg(SF.first("final_review_summary"))
    .withColumnRenamed("high", "positive_reviews_summary")
    .withColumnRenamed("low", "negative_reviews_summary")
    .orderBy("asin", "title", "author", "week_start")
)



### Join Dataframes
Join the two dataframes we just created

In [0]:
summary_df = meta_reviews_df.join(
    summary_reviews_df, 
    how="inner", 
    on=["asin", "title", "author", "week_start"]
)

### Parse as HTML 
Parse the summary cells as HTML columns so we can display nicely on our dashboard.

In [0]:
# Imports
from pyspark.sql import functions as SF
import html

# Build a UDF to convert to HTML
@SF.udf("string")
def convert_to_html(text):
    html_content = ""
    try:
        # Escape any existing HTML characters
        escaped_string = html.escape(text)
        # Replace newline characters with HTML line breaks
        html_content = escaped_string.replace("\n", "<br>")
    except:
        pass
    return html_content

# Apply 
summary_df = (
    summary_df
    .withColumn("positive_reviews_summary", convert_to_html("positive_reviews_summary"))
    .withColumn("negative_reviews_summary", convert_to_html("negative_reviews_summary"))
)

### Build Display ID
We might have some occurrences where a book might have the same name with another one, therefore we want to create a unique display ID thats made from book's name, author's name, and the ID of the book.

In [0]:
# Imports
from pyspark.sql import functions as SF

# Build UDF 
@SF.udf("string")
def build_display_id(title, author, asin):
    display_id = f"{title} by {author} ({asin})"
    return display_id

# Apply
summary_df = summary_df.withColumn(
    "display_id", build_display_id(SF.col("title"), SF.col("author"), SF.col("asin"))
)

### Save Finalised Dataframe

And our final product is ready.. we can go ahead and save

In [0]:
(
    summary_df
    .write
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("book_reviews_finalised")
)

In [0]:
display(summary_df.limit(5))