## DSCC202-402 Data Science at Scale Final Project
### Tracking Tweet sentiment at scale using a pretrained transformer (classifier)
<p>Consider the following illustration of the end to end system that you will be building.  Each student should do their own work.  The project will demonstrate your understanding of Spark Streaming, the medalion data architecture using Delta Lake, Spark Inference at Scale using an MLflow packaged model as well as Exploritory Data Analysis and System Tracking and Monitoring.</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/pipeline.drawio.png">

<p>
You will be pulling an updated copy of the course GitHub repositiory: <a href="https://github.com/lpalum/dscc202-402-spring2025">The Repo</a>.  

Once you have updated your fork of the repository you should see the following template project that is resident in the final_project directory.
</p>

<img src="https://data-science-at-scale.s3.amazonaws.com/images/notebooks.drawio.png">

<p>
You can then pull your project into the Databrick Workspace using the <a href="https://github.com/apps/databricks">Databricks App on Github</a> or by cloning the repo to your laptop and then uploading the final_project directory and its contents to your workspace using file imports.  Your choice.

<p>
Work your way through this notebook which will give you the steps required to submit a complete and compliant project.  The following illustration and associated data dictionary specifies the transformations and data that you are to generate for each step in the medallion pipeline.
</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/dataframes.drawio.png">

#### Bronze Data - raw ingest
- date - string in the source json
- user - string in the source json
- text - tweet string in the source json
- sentiment - the given sentiment of the text as determined by an unknown model that is provided in the source json
- source_file - the path of the source json file the this row of data was read from
- processing_time - a timestamp of when you read this row from the source json

#### Silver Data - Bronze Preprocessing
- timestamp - convert date string in the bronze data to a timestamp
- mention - every @username mentioned in the text string in the bronze data gets a row in this silver data table.
- cleaned_text - the bronze text data with the mentions (@username) removed.
- sentiment - the given sentiment that was associated with the text in the bronze table.

#### Gold Data - Silver Table Inference
- timestamp - the timestamp from the silver data table rows
- mention - the mention from the silver data table rows
- cleaned_text - the cleaned_text from the silver data table rows
- sentiment - the given sentiment from the silver data table rows
- predicted_score - score out of 100 from the Hugging Face Sentiment Transformer
- predicted_sentiment - string representation of the sentiment
- sentiment_id - 0 for negative and 1 for postive associated with the given sentiment
- predicted_sentiment_id - 0 for negative and 1 for positive assocaited with the Hugging Face Sentiment Transformer

#### Application Data - Gold Table Aggregation
- min_timestamp - the oldest timestamp on a given mention (@username)
- max_timestamp - the newest timestamp on a given mention (@username)
- mention - the user (@username) that this row pertains to.
- negative - total negative tweets directed at this mention (@username)
- neutral - total neutral tweets directed at this mention (@username)
- positive - total positive tweets directed at this mention (@username)

When you are designing your approach, one of the main decisions that you will need to make is how you are going to orchestrate the streaming data processing in your pipeline.  There are several valid approaches to triggering your steams and how you will gate the execution of your pipeline.  Think through how you want to proceed and ask questions if you need guidance. The following references may be helpful:
- [Spark Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Databricks Autoloader - Cloudfiles](https://docs.databricks.com/en/ingestion/auto-loader/index.html)
- [In class examples - Spark Structured Streaming Performance](https://dbc-f85bdc5b-07db.cloud.databricks.com/editor/notebooks/2638424645880316?o=1093580174577663)

### Be sure your project runs end to end when *Run all* is executued on this notebook! (7 points)

### This project is worth 25% of your final grade.
- DSCC-202 Students have 55 possible points on this project (see points above and the instructions below)
- DSCC-402 Students have 60 possible points on this project (one extra section to complete)

In [0]:
%run ./includes/includes

In [0]:
"""
Adding a widget to the notebook to control the clearing of a previous run.
or stopping the active streams using routines defined in the utilities notebook
"""
dbutils.widgets.removeAll()

dbutils.widgets.dropdown("clear_previous_run", "No", ["No","Yes"])
if (getArgument("clear_previous_run") == "Yes"):
    clear_previous_run()
    print("Cleared all previous data.")

dbutils.widgets.dropdown("stop_streams", "No", ["No","Yes"])
if (getArgument("stop_streams") == "Yes"):
    stop_all_streams()
    print("Stopped all active streams.")

dbutils.widgets.dropdown("optimize_tables", "No", ["No","Yes"])
if (getArgument("optimize_tables") == "Yes"):
    # Suck up those small files that we have been appending.
    # Optimize the tables
    optimize_table(BRONZE_DELTA)
    optimize_table(SILVER_DELTA)
    optimize_table(GOLD_DELTA)
    print("Optimized all of the Delta Tables")

## 1.0 Import your libraries here (2 points)
- Are your shuffle partitions consistent with your cluster and your workload?
- Do you have the necessary libraries to perform the required operations in the pipeline/application?

In [0]:
# ENTER YOUR CODE HERE
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pandas as pd
import numpy as np
import seaborn as sns
from mlflow.tracking.client import MlflowClient
import mlflow
import mlflow.pyfunc
from sklearn.metrics import recall_score, precision_score, f1_score, confusion_matrix
from pyspark.sql import functions as F
from transformers import pipeline

import time
from datetime import datetime
import matplotlib.pyplot as plt

In [0]:
# Are your shuffle partitions consistent with your cluster and your workload?
spark.conf.set("spark.sql.adaptive.enabled", "true")
print(spark.conf.get("spark.sql.shuffle.partitions"))
print(sc.defaultParallelism)

In [0]:
# because default shuffle partition is 200 and we have 4 clusters, 
# we reduce num of shuffle partitions to 8
spark.conf.set("spark.sql.shuffle.partitions", 4*sc.defaultParallelism)

## 2.0 Define and execute utility functions (3 points)
- Read the source file directory listing
- Count the source files (how many are there?)
- print the contents of one of the files

In [0]:
TWEET_SOURCE_PATH	

In [0]:
source_files = dbutils.fs.ls(TWEET_SOURCE_PATH)
file_count = len(source_files)

print(f"Number of source files: {file_count}")

In [0]:
# ENTER YOUR CODE HERE
# Read the source file directory listing
source_files = dbutils.fs.ls(TWEET_SOURCE_PATH)

# Count the source files
file_count = len(source_files)

# # Print the contents of one of the files
# sample_file_path = source_files[0].path
# sample_file_content = dbutils.fs.head(sample_file_path)

# # Display the results
print(f"Number of source files: {file_count}")
# print(f"Contents of one file ({sample_file_path}):\n{sample_file_content}")

## 3.0 Transform the Raw Data to Bronze Data using a stream  (8 points)
- define the schema for the raw data
- setup a read stream using cloudfiles and the source data format
- setup a write stream using delta lake to append to the bronze delta table
- enforce schema
- allow a new schema to be merged into the bronze delta table
- Use the defined BRONZE_CHECKPOINT and BRONZE_DELTA paths defined in the includes
- name your raw to bronze stream as bronze_stream
- transform the raw data to the bronze data using the data definition at the top of the notebook

In [0]:
# ENTER YOUR CODE HERE
raw_schema = (StructType(
                [StructField("date", StringType(),nullable=True),
                StructField("user", StringType(), nullable=True),
                StructField("text", StringType(), nullable=True),
                StructField("sentiment", StringType(),nullable=True)])
              )

def start_bronze_stream():
# Read Files with schema and source tracking
# Write Stream
    bronze_stream = (
        spark.readStream
                .format("cloudFiles")
                .option("cloudFiles.format", "json")
                .option("path", TWEET_SOURCE_PATH)
                .option("mergeSchema", True)
                .schema(raw_schema) # enforce schema with reading
                .load()
                .withColumn("source_file", input_file_name())
                .withColumn("processing_time", current_timestamp())  # Add ingestion timestamp
            .writeStream
                .format("delta")
                .outputMode("append")                                # Only add new data
                .trigger(processingTime="10 seconds")                # Process what's available now
                .option("checkpointLocation", BRONZE_CHECKPOINT)     # Enable recovery
                .queryName("bronze_stream")                           # Name the query for monitoring
                .start(BRONZE_DELTA)                                 # Output path for Delta table
    )

    return bronze_stream

In [0]:
# --- Start Bronze Stream ---
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "bronze_pool")
bronze_query = start_bronze_stream()

In [0]:
if wait_for_delta_table(BRONZE_DELTA, 10000):
    print("Bronze table is ready.")
else:
    print("Bronze table didnt set up")

In [0]:
# query.awaitTermination(30000)

In [0]:
# # Load the Delta table
# delta_table = DeltaTable.forPath(spark, BRONZE_DELTA)

# # Delete all rows from the table
# delta_table.delete(condition="true")

In [0]:
# bronze_table = spark.read.format("delta").load(BRONZE_DELTA)
# bronze_table.printSchema()

## 4.0 Transform the Bronze Data to Silver Data using a stream (5 points)
- setup a read stream on your bronze delta table
- setup a write stream to append to the silver delta table
- Use the defined SILVER_CHECKPOINT and SILVER_DELTA paths in the includes
- name your bronze to silver stream as silver_stream
- transform the bronze data to the silver data using the data definition at the top of the notebook

In [0]:
silver_table = spark.read.format("delta").load(SILVER_DELTA)
silver_table.printSchema()

In [0]:
# silver_schema = StructType([
#     StructField("timestamp", TimestampType(), True),
#     StructField("mention", StringType(), True),
#     StructField("cleaned_text", StringType(), True),
#     StructField("sentiment", StringType(), True)
# ])

# Read stream from bronze delta table
def start_silver_stream():
    silver_stream = (
        spark.readStream
                .format("delta")
                .load(BRONZE_DELTA)
                # Here, transform data as necessary
                .withColumn("mention", regexp_extract(col("text"), r"(@\w+)", 0))
                .withColumn("cleaned_text", regexp_replace(col("text"), r'@\w+', "" ))
                .withColumn("date_str", expr("substring(date, 5, length(date))"))
                .withColumn("timestamp", to_timestamp(col("date_str"),"MMM dd HH:mm:ss z yyyy"))
                # .drop("date", "user", "text", "source_file", "processing_time", "date_str")
                .select("timestamp", "mention", "cleaned_text", "sentiment")
            # Write stream query
            .writeStream
                .format("delta")
                .outputMode("append")
                .trigger(processingTime="1 minute")
                .queryName("silver_stream") 
                .option("checkpointLocation", SILVER_CHECKPOINT)
                .start(SILVER_DELTA)
    )

    return silver_stream

In [0]:
# --- Start Silver Stream ---
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "silver_pool")
silver_query = start_silver_stream()

In [0]:
if wait_for_delta_table(SILVER_DELTA, 10000):
    print("Silver table is ready.")
else:
    print("Silver table didnt set up")

In [0]:
# silver_stream.awaitTermination(30000)

## 5.0 Transform the Silver Data to Gold Data using a stream (7 points)
- setup a read stream on your silver delta table
- setup a write stream to append to the gold delta table
- Use the defined GOLD_CHECKPOINT and GOLD_DELTA paths defines in the includes
- name your silver to gold stream as gold_stream
- transform the silver data to the gold data using the data definition at the top of the notebook
- Load the pretrained transformer sentiment classifier from the MODEL_NAME at the production level from the MLflow registry
- Use a spark UDF to parallelize the inference across your silver data

In [0]:
# MODEL_NAME

In [0]:
# # MODEL
# model = mlflow.pyfunc.load_model(f"models:/{MODEL_NAME}/production")
# model

In [0]:
# HF_model = pipeline(model=HF_MODEL_NAME)
# HF_model

In [0]:
# def classify_sentiment(text):
#     try:
#         # Predict sentiment using the model (assuming it returns a dictionary with 'label' and 'score')
#         result = HF_model(text)
#         sentiment = result[0]['label']
#         score = result[0]['score']
#         return sentiment, score
#     except Exception as e:
#         return None, None 

In [0]:
# classify_sentiment_udf = \
# udf(classify_sentiment, returnType=
#             StructType([StructField("predicted_sentiment", StringType(), True),
#                         StructField("predicted_score", FloatType(), True)]))

In [0]:
model_uri = f"models:/{MODEL_NAME}/Production"
classify_sentiment_udf = mlflow.pyfunc.spark_udf(spark, model_uri)

In [0]:
spark.read.format("delta").load(GOLD_DELTA).printSchema()

In [0]:
# spark.read.format("delta").load(SILVER_DELTA).printSchema()

In [0]:
# display(spark.read.format("delta").load(SILVER_DELTA).limit(1))

In [0]:
# # ENTER YOUR CODE HERE
# # Write the transformed data to the Gold Delta table
# # # def start_gold_stream():
# spark.sparkContext.setLocalProperty("spark.scheduler.pool", "gold_pool")
# gold_stream = (
#         spark.readStream.format("delta")
#         .option("maxOffsetsPerTrigger", 550)
#         .load(SILVER_DELTA)
#                 .withColumn("predicted_sentiment_and_score", classify_sentiment_udf(col("cleaned_text")))
#                 .withColumn("predicted_sentiment", col("predicted_sentiment_and_score.label"))
#                 .withColumn("predicted_score", col("predicted_sentiment_and_score.score").cast("float"))
#                 .withColumn("sentiment_id", when(col("sentiment") == "positive", 1).otherwise(0))
#                 .withColumn("predicted_sentiment_id", when(col("predicted_sentiment") == "positive", 1).otherwise(0))
#         .select("timestamp", "mention", "cleaned_text", "sentiment", "predicted_score", 
#                         "predicted_sentiment", "sentiment_id", "predicted_sentiment_id")
#         .writeStream
#                 .format("delta")
#                 .outputMode("append")
#                 .option("mergeSchema", "true")
#                 .trigger(processingTime="20 seconds")
#                 .option("checkpointLocation", GOLD_CHECKPOINT)
#                 .option("path", GOLD_DELTA)
#                 .queryName("gold_stream") 
#                 .start()
# )

# # return gold_stream
# wait_for_delta_table(GOLD_DELTA)

In [0]:
# Load model as a Spark UDF
sentiment_model_udf = mlflow.pyfunc.spark_udf(
    spark,
    model_uri = f"models:/{MODEL_NAME}/production"
    )

In [0]:

# ENTER YOUR CODE HERE
# Set up a read stream on silver delta table
silver_read_stream_df = (spark.readStream
                         .format("delta")
                         .load(SILVER_DELTA))

gold_df = (silver_read_stream_df.withColumn("prediction", sentiment_model_udf(col("cleaned_text")))
           .withColumn("predicted_score", col("prediction.score").cast("double"))
           .withColumn("predicted_sentiment", col("prediction.label"))
           .withColumn("sentiment_id", 
                       when(col("sentiment") == "positive", 1)
                       .otherwise(0))
           .withColumn("predicted_sentiment_id", 
                       when(col("predicted_sentiment") == "POS", 1)
                       .when(col("predicted_sentiment") == "NEG", 0)
                       .otherwise(-1))
           .select("timestamp", "mention", "cleaned_text", "sentiment", "prediction", "predicted_sentiment", "sentiment_id", "predicted_sentiment_id")
           )  


spark.sparkContext.setLocalProperty("spark.scheduler.pool", "gold_pool")

# Set up a write stream to append to the gold delta table
gold_stream = (gold_df.writeStream
               .format("delta")
               .option("mergeSchema", "true")
               .outputMode("append")
               .option("checkpointLocation", GOLD_CHECKPOINT)
               .trigger(processingTime="30 seconds")
               .queryName("gold_stream")
               .start(GOLD_DELTA)
)

wait_for_delta_table(GOLD_DELTA)

In [0]:
# # --- Start Gold Stream ---
# spark.sparkContext.setLocalProperty("spark.scheduler.pool", "gold_pool")
# gold_query = start_gold_stream()

In [0]:
if wait_for_delta_table(GOLD_DELTA):
    print("Gold table is ready.")
else:
    print("Gold table didnt set up")

In [0]:
# gold_stream.status

In [0]:
# # Read a small sample (one row) from SILVER_DELTA
# sample_df = spark.read.format("delta").load(SILVER_DELTA).limit(1)
# sample_df.show()  # Inspect the row


In [0]:
# # Apply the transformations manually to the sample row
# transformed_df = sample_df \
#     .withColumn("predicted_sentiment_and_score", classify_sentiment_udf(F.col("cleaned_text"))) \
#     .withColumn("predicted_sentiment", F.col("predicted_sentiment_and_score.predicted_sentiment")) \
#     .withColumn("predicted_score", F.col("predicted_sentiment_and_score.predicted_score")) \
#     .withColumn("sentiment_id", F.when(F.col("sentiment") == "positive", 1).otherwise(0)) \
#     .withColumn("predicted_sentiment_id", F.when(F.col("predicted_sentiment") == "positive", 1).otherwise(0)) \
#     .select("timestamp", "mention", "cleaned_text", "sentiment", "predicted_score", 
#             "predicted_sentiment", "sentiment_id", "predicted_sentiment_id")

# # Show the result to validate the transformation
# transformed_df.show(truncate=False)


In [0]:
# display(transformed_df)

## 6.0 Monitor your Streams (5 points)
- Setup a loop that runs at least every 10 seconds
- Print a timestamp of the monitoring query along with the list of streams, rows processed on each, and the processing time on each
- Run the loop until all of the data is processed (0 rows read on each active stream)
- Plot a line graph that shows the data processed by each stream over time
- Plot a line graph that shows the average processing time on each stream over time

In [0]:
# # Inspect schema
# df = spark.read.format("delta").load(SILVER_DELTA)
# df = df.withColumn("predicted_sentiment_and_score", classify_sentiment_udf(F.col("cleaned_text")))
# df.printSchema()


In [0]:
# dbutils.fs.rm(BRONZE_CHECKPOINT, recurse=True) 
# dbutils.fs.rm(SILVER_CHECKPOINT, recurse=True) 
# dbutils.fs.rm(GOLD_CHECKPOINT, recurse=True) 

In [0]:
# RUN EVERYTHING
# # --- Start Bronze Stream ---
# spark.sparkContext.setLocalProperty("spark.scheduler.pool", "bronze_pool")
# bronze_query = start_bronze_stream()

# # --- Start Silver Stream ---
# spark.sparkContext.setLocalProperty("spark.scheduler.pool", "silver_pool")
# silver_query = start_silver_stream()

# # --- Start Gold Stream ---
# spark.sparkContext.setLocalProperty("spark.scheduler.pool", "gold_pool")
# gold_query = start_gold_stream()

# Start monitoring in a separate loop
print(f"Monitoring streams with USE_POOLS = TRUE")

"""
Monitor the streams until all of the data is processed
"""
row_count = 1
i = 0
while row_count != 0:
    time.sleep(10)
    
    # Get streaming stats
    stats = get_streaming_stats()
    
    # Display current status
    if not stats.empty:
        # Group by query name and get the latest stats
        latest_stats = stats.sort_values("elapsed_time").groupby("query").last().reset_index()
        print(f"\nStatus at {datetime.now().strftime('%H:%M:%S')} (Elapsed: {i*10}s):")
        for _, row in latest_stats.iterrows():
            print(f"  {row['query']}: {row['input_rows']} rows, {row['processing_time']}ms processing time")
        row_count = latest_stats["input_rows"].sum()
        i += 1

# Stop the streams when all rows are processed
if row_count == 0:
    print("No more data to process. Stopping streams.")
    bronze_query.stop()
    silver_query.stop()
    gold_query.stop()

# Collect final streaming metrics
df = get_streaming_stats()

# Plot processing time and input rows over elapsed time by query
if not df.empty:
    # Create a figure with two subplots
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10), sharex=True)
    
    # Plot processing time
    for query in df["query"].unique():
        subset = df[df["query"] == query]
        ax1.plot(subset["elapsed_time"], subset["processing_time"], marker='o', linestyle='-', label=query)
    
    ax1.set_ylabel("Processing Time (ms)")
    ax1.set_title(f"Spark Streaming Processing Time - USE_POOLS = {USE_POOLS}")
    ax1.legend()
    ax1.grid(True)
    
    # Plot input rows
    for query in df["query"].unique():
        subset = df[df["query"] == query]
        ax2.plot(subset["elapsed_time"], subset["input_rows"], marker='o', linestyle='-', label=query)
    
    ax2.set_xlabel("Elapsed Time (seconds)")
    ax2.set_ylabel("Input Rows")
    ax2.set_title(f"Spark Streaming Input Rows - USE_POOLS = {USE_POOLS}")
    ax2.legend()
    ax2.grid(True)
    
    plt.tight_layout()
    plt.show()
    
    # Generate statistics summary
    print("\nStreaming Performance Summary:")
    summary = df.groupby("query").agg({
        "processing_time": ["mean", "max", "min", "std"],
        "input_rows": ["sum", "mean", "max"]
    }).reset_index()
    display(summary)

In [0]:
SILVER_DELTA

In [0]:
# Select one row from SILVER_DELTA
silver_df = spark.read.format("delta").load(SILVER_DELTA).limit(1)

# Apply transformation
transformed_df = silver_df.withColumn("predicted_sentiment_and_score", classify_sentiment_udf(F.col("cleaned_text"))) \
                        .withColumn("predicted_sentiment", col("predicted_sentiment_and_score").label) \
                        .withColumn("predicted_score", col("predicted_sentiment_and_score").score.cast("float")) \
                        .withColumn("sentiment_id", when(col("sentiment") == "positive", 1).otherwise(0)) \
                        .withColumn("predicted_sentiment_id", when(col("predicted_sentiment") == "positive", 1).otherwise(0)) \
                        .select("timestamp", "mention", "cleaned_text", "sentiment", "predicted_score", 
                                "predicted_sentiment", "sentiment_id", "predicted_sentiment_id")

In [0]:
gold_df = spark.read.format("delta").load(GOLD_DELTA).limit(1)
print("Schema of gold_df:")
gold_df.printSchema()

In [0]:
# # Insert the transformed row into GOLD_DELTA
# transformed_df.write.format("delta").mode("append").save(GOLD_DELTA)

In [0]:
# # Count the number of rows in GOLD_DELTA
# gold_row_count = spark.read.format("delta").load(GOLD_DELTA).count()
# print(f"Number of rows in GOLD_DELTA: {gold_row_count}")

In [0]:
# # Select one row from SILVER_DELTA
# silver_df = spark.read.format("delta").load(SILVER_DELTA).limit(1)

# # Apply transformation
# transformed_df = silver_df.select("cleaned_text", classify_sentiment_udf(F.col("cleaned_text")).alias("sentiment"))

# # Print schemas to compare
# print("Schema of transformed_df:")
# transformed_df.printSchema()

# gold_df = spark.read.format("delta").load(GOLD_DELTA).limit(1)
# print("Schema of gold_df:")
# gold_df.printSchema()

# # Insert the transformed row into GOLD_DELTA
# transformed_df.write.format("delta").mode("append").save(GOLD_DELTA)

In [0]:
# df = spark.read.format("delta").load(SILVER_DELTA)
# df.select("cleaned_text", classify_sentiment_udf(F.col("cleaned_text"))).show()

In [0]:
# # --- Start Gold Stream ---
# spark.sparkContext.setLocalProperty("spark.scheduler.pool", "gold_pool")
# gold_query = start_gold_stream()

In [0]:
# silver_query.status

In [0]:
# for s in spark.streams.active:
#     print(f"Name: {s.name}, ID: {s.id}, Status: {s.status['message']}")


## 7.0 Bronze Data Exploratory Data Analysis (5 points)
- How many tweets are captured in your Bronze Table?
- Are there any columns that contain Nan or Null values?  If so how many and what will you do in your silver transforms to address this?
- Count the number of tweets by each unique user handle and sort the data by descending count.
- How many tweets have at least one mention (@) how many tweet have no mentions (@)
- Plot a bar chart that shows the top 20 tweeters (users)


In [0]:
df_bronze_delta = spark.read.format("delta").load(BRONZE_DELTA)

In [0]:
# ENTER YOUR CODE HERE
# Count the number of tweets in the Bronze Table
tweet_count = df_bronze_delta.count()

# Check for columns with NaN or Null values
null_counts = df_bronze_delta.select([
    F.count(
        F.when(
            (F.col(c).cast('double').isNotNull() & F.isnan(F.col(c).cast('double'))) | F.col(c).isNull(), 
            c
        )
    ).alias(c) 
    for c in df_bronze_delta.columns if df_bronze_delta.schema[c].dataType in [DoubleType(), FloatType()]
])

# Count the number of tweets by each unique user handle and sort by descending count
user_tweet_counts = df_bronze_delta.groupBy("user").count().orderBy(F.desc("count"))

# Count tweets with at least one mention (@) and tweets with no mentions (@)
tweets_with_mentions = df_bronze_delta.filter(F.col("text").contains("@")).count()
tweets_without_mentions = df_bronze_delta.filter(~F.col("text").contains("@")).count()

# Plot a bar chart showing the top 20 tweeters (users)
top_20_tweeters = user_tweet_counts.limit(20)
display(top_20_tweeters)

# Display results
display(null_counts)
display(user_tweet_counts)
print(f"Total tweets: {tweet_count}")
print(f"Tweets with mentions: {tweets_with_mentions}")
print(f"Tweets without mentions: {tweets_without_mentions}")

In [0]:
# Convert top_20_tweeters to Pandas DataFrame for plotting
top_20_tweeters_pd = top_20_tweeters.toPandas()

# Plotting the bar graph
plt.figure(figsize=(10, 6))
plt.bar(top_20_tweeters_pd['user'], top_20_tweeters_pd['count'], color='skyblue')
plt.xlabel('User')
plt.ylabel('Tweet Count')
plt.title('Top 20 Tweeters')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

## 8.0 Capture the accuracy metrics from the gold table in MLflow  (4 points)
Store the following in an MLflow experiment run:
- Store the precision, recall, and F1-score as MLflow metrics
- Store an image of the confusion matrix as an MLflow artifact
- Store the model name and the MLflow version that was used as an MLflow parameters
- Store the version of the Delta Table (input-silver) as an MLflow parameter

In [0]:
# ENTER YOUR CODE HERE
gold_df = spark.read.format("delta").load(GOLD_DELTA)
pred = gold_df.select("sentiment_id", "predicted_sentiment_id").dropna().toPandas()
y_true = pred["sentiment_id"]
y_pred = pred["predicted_sentiment_id"]

# Compute metrics
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)
cm = confusion_matrix(y_true, y_pred)

with mlflow.start_run():
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)
    mlflow.log_metric("f1_score", f1)

    plt.figure(figsize=(5, 4))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["Negative", "Positive"], yticklabels=["Negative", "Positive"])
    plt.xlabel("Pred")
    plt.ylabel("True")
    plt.title("Confusion Matrix")

    plt.tight_layout()
    plt.savefig("/tmp/confusion_matrix.png")
    mlflow.log_artifact("/tmp/confusion_matrix.png")

    mlflow.log_param("model_name", MODEL_NAME)
    mlflow.log_param("model_version", "production")

    silver_version = spark.sql(f"DESCRIBE HISTORY delta.`{SILVER_DELTA}`").select("version").first()[0]
    mlflow.log_param("silver_table_version", silver_version)


## 9.0 Application Data Processing and Visualization (6 points)
- How many mentions are there in the gold data total?
- Count the number of neutral, positive and negative tweets for each mention in new columns
- Capture the total for each mention in a new column
- Sort the mention count totals in descending order
- Plot a bar chart of the top 20 mentions with positive sentiment (the people who are in favor)
- Plot a bar chart of the top 20 mentions with negative sentiment (the people who are the vilians)

*note: A mention is a specific twitter user that has been "mentioned" in a tweet with an @user reference.

In [0]:
# ENTER YOUR CODE HERE
# Extract mentions
mention_count = gold_df.count()
print(f"Number of mentions in the gold data is: {mention_count}")

# count(*) group by mention, sentiment
mentions_sentiment_count = (
    gold_df.groupBy("mention")
        .agg(
            count(when(col("sentiment") == "neutral", True)).alias("neutral_count"),
            count(when(col("sentiment") == "positive", True)).alias("positive_count"),
            count(when(col("sentiment") == "negative", True)).alias("negative_count")
            )
        .withColumn("total_count", 
                    col("neutral_count") + col("positive_count") + col("negative_count"))
        .orderBy(col("total_count").desc())
)

# to pandas
mentions_sentiment_count_pd = mentions_sentiment_count.toPandas()

# top 20 positive comments
t20_pos_mention = mentions_sentiment_count_pd.nlargest(20, 'positive_count')

t20_pos_mention.plot.bar(x="mention", y="positive_count", legend=False, figsize=(15, 9), title="Top 20 Mentions with Positive Sentiment")
plt.ylabel("Number of Positive Tweets")
plt.xlabel("Mention")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# top 20 negative comments
t20_neg_mention = mentions_sentiment_count_pd.nlargest(20, 'negative_count')
t20_neg_mention.plot.bar(x="mention", y="negative_count", legend=False, figsize=(15, 9), title="Top 20 Mentions with Negative Sentiment")
plt.ylabel("Number of Negative Tweets")
plt.xlabel("Mention")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

## 10.0 Clean up and completion of your pipeline (3 points)
- using the utilities what streams are running? If any.
- Stop all active streams
- print out the elapsed time of your notebook. Note: In the includes there is a variable START_TIME that captures the starting time of the notebook.

In [0]:
# ENTER YOUR CODE HERE

## 11.0 How Optimized is your Spark Application (Grad Students Only) (5 points)
Graduate students (registered for the DSCC-402 section of the course) are required to do this section.  This is a written analysis using the Spark UI (link to screen shots) that support your analysis of your pipelines execution and what is driving its performance.
Recall that Spark Optimization has 5 significant dimensions of considertation:
- Spill: write to executor disk due to lack of memory
- Skew: imbalance in partition size
- Shuffle: network io moving data between executors (wide transforms)
- Storage: inefficiency due to disk storage format (small files, location)
- Serialization: distribution of code segments across the cluster

Comment on each of the dimentions of performance and how your impelementation is or is not being affected.  Use specific information in the Spark UI to support your description.  

Note: you can take sreenshots of the Spark UI from your project runs in databricks and then link to those pictures by storing them as a publicly accessible file on your cloud drive (google, one drive, etc.)

References:
- [Spark UI Reference Reference](https://spark.apache.org/docs/latest/web-ui.html#web-ui)
- [Spark UI Simulator](https://www.databricks.training/spark-ui-simulator/index.html)

### ENTER YOUR MARKDOWN HERE