# Explore Data

## 0. Preprocess Raw Data (Optional)

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame

# Read df from csv file
df_raw = spark.read.csv("Files/saved_datafiles/movie-reviews.csv", sep = "\t", header = True)

# Save as delta table
df_raw.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("df_movies_raw")

StatementMeta(, 4e3b9f42-4db2-4b7d-bd6b-a2fcd00c4194, 7, Finished, Available, Finished)

In [8]:
# Check corrupt records

df_null = df_raw.filter(df_raw["Movie"].isNull() | df_raw["Publish"].isNull() | df_raw["Review"].isNull() | df_raw["Date"].isNull() | df_raw["Score"].isNull())

print(df_null.count(), " corrupt rows exist.\n")
display(df_null)

StatementMeta(, 4e3b9f42-4db2-4b7d-bd6b-a2fcd00c4194, 10, Finished, Available, Finished)

23  corrupt rows exist.



SynapseWidget(Synapse.DataFrame, 89978b5f-f629-459d-b350-856d8fbae71c)

### Save Clean DataFrame as Table

In [12]:
from pyspark.sql.functions import regexp_replace, col

df_clean = df_raw.replace("", None).replace("NULL", None).replace("N/A", None)
df_clean = df_clean.dropna(subset=["Movie", "Publish", "Review", "Date", "Score"])
df_clean = df_clean.withColumn("Score", col("Score").cast("int"))   # Prevent floating point errors
df_clean = df_clean.withColumn("Review", regexp_replace(col("Review"), '"', ''))   # Enhance review readability

df_clean.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("df_movies_clean")

# Any warning messages shown here may be due to Spark's lazy execution...
# Corrupt records are removed successfully

StatementMeta(, 599ba93e-9972-412d-af94-8b1a005eceb8, 14, Finished, Available, Finished)

### Query from Table

Why do we save this data as a table?

We can request SQL queries to Spark tables without loading data!

In [13]:
# You can now query directly from lakehouse table, without loading data to df

display(spark.sql("SELECT Movie, Review, Score FROM df_movies_clean WHERE Movie LIKE '%HARRY POTTER%' ORDER BY FLOAT(Score) DESC"))

StatementMeta(, 599ba93e-9972-412d-af94-8b1a005eceb8, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, cab0aaa2-2eb8-4b59-92fe-35b95cc14aed)

## 1. Prepare Data

### Load Data

In [14]:
# Load clean raw data of 400k movie reviews

try:
    # If you started from section0 and already have a clean table
    df_clean = spark.sql("SELECT * FROM Lakehouse_MovieData.df_movies_clean")

except Exception as e:
    # If you are starting from this cell
    df_clean = spark.read.csv("Files/saved_datafiles/movie-reviews-clean.csv", sep = "\t", header = True)
    df_clean.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("df_movies_clean")
    df_clean = spark.sql("SELECT * FROM Lakehouse_MovieData.df_movies_clean")

# Check results
print(df_clean.count(), "rows loaded.\n")
display(df_clean.filter(col("Movie").contains("HARRY POTTER")))

StatementMeta(, 599ba93e-9972-412d-af94-8b1a005eceb8, 16, Finished, Available, Finished)

417046 rows loaded.



SynapseWidget(Synapse.DataFrame, ca32981c-b1d8-45bf-b5c8-16b8d92ef13b)

### Scale Down

Default for `num_review_min` is 300. Movies with more than 300 reviews will be kept.

Default for `num_review_keep` is 10. Among 300+ reviews, only 10 reviews for each movie will be kept.

For example, `num_review_min`=300 && `num_review_keep`=10 will leave 250 rows.

You can modify the value as you want, but
* (MUST) `num_review_keep` <= `num_review_min`
* (MUST) `num_review_keep` >= 5

In [15]:
num_review_min = 300    # Decreasing this value will result in more movies left.
num_review_keep = 10    # Increasing this value will result in more reviews left.

StatementMeta(, 599ba93e-9972-412d-af94-8b1a005eceb8, 17, Finished, Available, Finished)

In [16]:
# Keep popular movies only
review_counts = df_clean.groupBy("Movie").count()
popular_movies = review_counts.filter(col("count") >= num_review_min).select("Movie")
df_filtered = df_clean.join(popular_movies, on="Movie", how="inner")

# Keep longest reviews only
df_filtered_pd = df_filtered.toPandas()
df_filtered_pd_result = (
    df_filtered_pd.groupby("Movie", group_keys=False)
    .apply(lambda x: x.sort_values("Review", key=lambda r: r.str.len(), ascending=False).head(num_review_keep))
    .reset_index(drop=True)
)

# Save as Spark dataframe and table
df_movies_popular = spark.createDataFrame(df_filtered_pd_result)
df_movies_popular.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("df_movies_popular")

# Check results
print(df_movies_popular.count(), "records present.\n")
display(df_movies_popular)

StatementMeta(, 599ba93e-9972-412d-af94-8b1a005eceb8, 18, Finished, Available, Finished)

250 records present.



SynapseWidget(Synapse.DataFrame, 265d3a05-61ae-4017-96ae-a4a31836fa23)

## 2. Use Fabric AI Skills

### Sentiment Analysis

In [17]:
# FEATURE 1. Use Built-In AI Services with SDK: Sentiment Analysis

import synapse.ml.core
from synapse.ml.services import AnalyzeText
from pyspark.sql.functions import col

sentimentClient = (
    AnalyzeText()
    .setKind("SentimentAnalysis")
    .setTextCol("Review")
    .setOutputCol("SentimentAnalysis")
    .setErrorCol("Error")
)

result_sentAnalysis = sentimentClient.transform(df_movies_popular)

# Extract fields
result_sentAnalysis = result_sentAnalysis.withColumn("Sentiment", col("SentimentAnalysis").getItem("documents").getItem("sentences")[0].getItem("sentiment")) \
               .withColumn("PositiveScore", col("SentimentAnalysis").getItem("documents").getItem("sentences")[0].getItem("confidenceScores").getItem("positive").cast("float")) \
               .withColumn("NegativeScore", col("SentimentAnalysis").getItem("documents").getItem("sentences")[0].getItem("confidenceScores").getItem("negative").cast("float")) \
               .withColumn("NeutralScore", col("SentimentAnalysis").getItem("documents").getItem("sentences")[0].getItem("confidenceScores").getItem("neutral").cast("float"))
               
df_analyzed_sentiment = result_sentAnalysis.select("Movie", "Review", "Score", "Sentiment", "PositiveScore", "NegativeScore", "NeutralScore")
df_analyzed_sentiment.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("df_analyzed_sentiment")

display(df_analyzed_sentiment)

StatementMeta(, 599ba93e-9972-412d-af94-8b1a005eceb8, 19, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4020c194-37a2-412e-80b3-2234706bd7c5)

### Keyword Extraction

In [19]:
# FEATURE 2. Use Built-In AI Services with SDK: Key Phrase Extraction

import synapse.ml.core
from synapse.ml.services import AnalyzeText
from pyspark.sql.functions import col, concat_ws

keyPhraseClient = (
    AnalyzeText()
    .setKind("KeyPhraseExtraction")
    .setTextCol("Review")
    .setOutputCol("KeyPhraseExtraction")
    .setErrorCol("Error")
)

result_keyPhrase = keyPhraseClient.transform(df_analyzed_sentiment)

# Extract fields
result_keyPhrase = result_keyPhrase.withColumn("KeyPhrases", col("KeyPhraseExtraction").getItem("documents").getItem("keyPhrases"))

df_analyzed_keyphrase = result_keyPhrase.select("Movie", "Review", "Score", "Sentiment", concat_ws(", ", col("KeyPhrases")).alias("KeyPhrases"))
df_analyzed_keyphrase.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("df_analyzed_keyphrase")

display(df_analyzed_keyphrase)

StatementMeta(, 599ba93e-9972-412d-af94-8b1a005eceb8, 21, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 38470999-0f51-4e2c-8928-d52bb2debf77)

### Power BI Embedding

No need to run multiple lines of code for data exploration!

Personalize your report by choosing data, applying filters or personalizing your visuals.

In [20]:
# FEATURE 3. Use Power BI with SDK: Embed a Power BI report

from powerbiclient import QuickVisualize, get_dataset_config

PBI_visualize = QuickVisualize(get_dataset_config(df_analyzed_keyphrase.select("Movie", "Score", "Sentiment", "KeyPhrases")))
PBI_visualize

StatementMeta(, 599ba93e-9972-412d-af94-8b1a005eceb8, 22, Finished, Available, Finished)

QuickVisualize()

StatementMeta(, 599ba93e-9972-412d-af94-8b1a005eceb8, 23, Finished, Available, Finished)

StatementMeta(, 599ba93e-9972-412d-af94-8b1a005eceb8, 24, Finished, Available, Finished)

StatementMeta(, 599ba93e-9972-412d-af94-8b1a005eceb8, 25, Finished, Available, Finished)

StatementMeta(, 599ba93e-9972-412d-af94-8b1a005eceb8, 26, Finished, Available, Finished)