## DSCC202-402 Data Science at Scale Final Project
### Tracking Tweet sentiment at scale using a pretrained transformer (classifier)
<p>Consider the following illustration of the end to end system that you will be building.  Each student should do their own work.  The project will demonstrate your understanding of Spark Streaming, the medalion data architecture using Delta Lake, Spark Inference at Scale using an MLflow packaged model as well as Exploritory Data Analysis and System Tracking and Monitoring.</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/pipeline.drawio.png">

<p>
You will be pulling an updated copy of the course GitHub repositiory: <a href="https://github.com/lpalum/dscc202-402-spring2024">The Repo</a>.  If you are unclear on how to pull an updated copy using the GitHub command line, the following <a href="https://techwritingmatters.com/how-to-update-your-forked-repository-on-github">document</a> is helpful.  Be sure to add the professors and TAs as collaborators on your project. 

- lpalum@gmail.com GitHub ID: lpalum
- ajay.anand@rochester.edu GitHub ID: ajayan12
- divyamunot1999@gmail.com GitHub ID: divyamunot
- ylong6@u.Rochester.edu GitHub ID: NinaLong2077

Once you have updates your fork of the repository you should see the following template project that is resident in the final_project directory.
</p>

<img src="https://data-science-at-scale.s3.amazonaws.com/images/notebooks.drawio.png">

<p>
You can then pull your project into the Databrick Workspace using the <a href="https://www.databricks.training/step-by-step/importing-courseware-from-github/index.html">Repos</a> feature.
Each student is expected to submit the URL of their project on GitHub with their code checked in on the main/master branch.  This illustration highlights the branching scheme that you may use to work on your code in steps and then merge your submission into your master branch before submitting.
</p>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/github.drawio.png">
<p>
Work your way through this notebook which will give you the steps required to submit a complete and compliant project.  The following illustration and associated data dictionary specifies the transformations and data that you are to generate for each step in the medallion pipeline.
</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/dataframes.drawio.png">

#### Bronze Data - raw ingest
- date - string in the source json
- user - string in the source json
- text - tweet string in the source json
- sentiment - the given sentiment of the text as determined by an unknown model that is provided in the source json
- source_file - the path of the source json file the this row of data was read from
- processing_time - a timestamp of when you read this row from the source json

#### Silver Data - Bronze Preprocessing
- timestamp - convert date string in the bronze data to a timestamp
- mention - every @username mentioned in the text string in the bronze data gets a row in this silver data table.
- cleaned_text - the bronze text data with the mentions (@username) removed.
- sentiment - the given sentiment that was associated with the text in the bronze table.

#### Gold Data - Silver Table Inference
- timestamp - the timestamp from the silver data table rows
- mention - the mention from the silver data table rows
- cleaned_text - the cleaned_text from the silver data table rows
- sentiment - the given sentiment from the silver data table rows
- predicted_score - score out of 100 from the Hugging Face Sentiment Transformer
- predicted_sentiment - string representation of the sentiment
- sentiment_id - 0 for negative and 1 for postive associated with the given sentiment
- predicted_sentiment_id - 0 for negative and 1 for positive assocaited with the Hugging Face Sentiment Transformer

#### Application Data - Gold Table Aggregation
- min_timestamp - the oldest timestamp on a given mention (@username)
- max_timestamp - the newest timestamp on a given mention (@username)
- mention - the user (@username) that this row pertains to.
- negative - total negative tweets directed at this mention (@username)
- neutral - total neutral tweets directed at this mention (@username)
- positive - total positive tweets directed at this mention (@username)

When you are designing your approach, one of the main decisions that you will need to make is how you are going to orchestrate the streaming data processing in your pipeline.  There are several valid approaches.  First, you may choose to start the bronze_stream and let it complete (read and append all of the source data) before preceeding and starting up the silver_stream.  This approach has latency associated with it but it will allow your code to proceed in a linear fashion and process all the data by the end of your notebook execution.  Another potential approach is to start all the streams and have a "watch" method to determine when the pipeline has processed sufficient or all of the source data before stopping and displaying results.  Both of these approaches are valid and have different implications on how you will trigger your steams and how you will gate the execution of your pipeline.  Think through how you want to proceed and ask questions if you need guidance. The following references may be helpful:
- [Spark Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Databricks Autoloader - Cloudfiles](https://docs.databricks.com/en/ingestion/auto-loader/index.html)

### Be sure that your project runs end to end when *Run all* is executued on this notebook! (15 Points out of 60)

In [0]:
pip install pyspark

In [0]:
%run ./includes/includes

In [0]:
"""
Adding a widget to the notebook to control the clearing of a previous run.
or stopping the active streams using routines defined in the utilities notebook
"""
dbutils.widgets.removeAll()

dbutils.widgets.dropdown("clear_previous_run", "No", ["No","Yes"])
if (getArgument("clear_previous_run") == "Yes"):
    clear_previous_run()
    print("Cleared all previous data.")

dbutils.widgets.dropdown("stop_streams", "No", ["No","Yes"])
if (getArgument("stop_streams") == "Yes"):
    stop_all_streams()
    print("Stopped all active streams.")

from delta import *
dbutils.widgets.dropdown("optimize_tables", "No", ["No","Yes"])
if (getArgument("optimize_tables") == "Yes"):
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, BRONZE_DELTA).optimize().executeCompaction()
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, SILVER_DELTA).optimize().executeCompaction()
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, GOLD_DELTA).optimize().executeCompaction()
    print("Optimized all of the Delta Tables")

## 1.0 Import your libraries here...
- Are your shuffle partitions consistent with your cluster and your workload?
- Do you have the necessary libraries to perform the required operations in the pipeline/application?

In [0]:
from pyspark.sql.types import DoubleType, StringType, StructType, StructField
from pyspark.sql.functions import input_file_name, current_timestamp, col, split, explode, regexp_replace, to_timestamp, udf, when, count, sum, min, max
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
import mlflow
import mlflow.pyfunc
import numpy as np
import matplotlib.pyplot as plt

In [0]:
# Setup the partitions and turn on AQE
spark.conf.set("spark.sql.shuffle.partitions", "32")  # keep the size of shuffles to cores default shuffle partitions of 200 will create "thrash"
spark.conf.set("spark.sql.adaptive.enabled", "true") # Spark 3.0 AQE - coalescing post-shuffle partitions, converting sort-merge join to broadcast join, and skew join optimization

## 2.0 Use the utility functions to ...
- Read the source file directory listing
- Count the source files (how many are there?)
- print the contents of one of the files

In [0]:
# Read the source file directory listing as Dataframe
src_file_dic_listing_df = get_source_listing_df()

# Count the source files
count_files = src_file_dic_listing_df['File Name'].count()      # from 'utilities'
print(f"Number of files in source directory: {count_files}")


In [0]:
# show what that df looks like
src_file_dic_listing_df

In [0]:
# Print the content of one of the files
example_file = src_file_dic_listing_df['File Name'][13] # get the data at 13th row 
example_file_name = example_file.split("/")[-1] # split the string into 2 parts, and get the later one [1]. (exactly, file name)
example_file_content = show_s3_file_contents(example_file).decode('utf-8')

print(f"Contents of {example_file_name}: \n {example_file_content}")

The difference between '.count()' and 'len()', when using for calculating the length of dataframe?
- **".count()"** : Counts the number of non-NA/null values, excluding any missing or null values that might be present.
- **"len()"**: Counts the total number of entries, including NA/null values, regardless of whether it contains valid data or not. This is more about the size of the DataFrame column than the content.

## 3.0 Transform the Raw Data to Bronze Data using a stream
- define the schema for the raw data
- setup a read stream using cloudfiles and the source data format
- setup a write stream using cloudfiles to append to the bronze delta table
- enforce schema
- allow a new schema to be merged into the bronze delta table
- Use the defined BRONZE_CHECKPOINT and BRONZE_DELTA paths defines in the includes
- name your raw to bronze stream as bronze_stream
- transform the raw data to the bronze data using the data definition at the top of the notebook

In [0]:
# Define the schema for the raw data
# 'StructType' is the class name as defined in the PySpark library for defining DataFrame schemas
# 'StructField' is used to specify the name, data type, and nullability of each column in the DataFrame
rawdata_schema = StructType([
    StructField("date", StringType(), nullable=True),
    StructField("user", StringType(), nullable=True),
    StructField("text", StringType(), nullable=True),
    StructField("sentiment", StringType(), nullable=True)
])

In [0]:
# Setup a read stream using cloudfiles and the source data format
# Transform the raw data to the bronze data using the data definition at the top of the notebook
rawdata_stream = (
    spark
    .readStream                                         # Initialize the read stream
    .format("cloudfiles")                               # Use the cloudfiles source format for cloud-based storage systems
    .option("cloudfiles.format", "json")                # declaring the cloudfiles format is JSON
    .schema(rawdata_schema)                             # Apply the predefined schema to ensure correct data-type
    .load(TWEET_SOURCE_PATH)                            # Where the streaming data source is located
    .withColumn('source_file', input_file_name())       # returns the name of the file currently being processed in PySpark
    .withColumn('processing_time', current_timestamp()) # used in streaming applications to record when data was processed
)

In [0]:
# Setup a write stream using cloudfiles to append to the bronze delta table
bronze_query = (
    rawdata_stream
    .writeStream                                          # Initialize the write stream
    .option("mergeSchema", "true")                        # Allow a new schema to be merged into the bronze delta table
    .outputMode("append")                                 # Set the output mode to append, to ensure data is added
    .format("delta")                                      # Use the "Delta" format for storage
    .queryName("bronze_stream")                           # Name your raw to bronze stream as bronze_stream
    .option("checkpointLocation", BRONZE_CHECKPOINT)      # Specify the checkpoint location for fault tolerance
    .start(BRONZE_DELTA)                                  # Start the bronze stream with the path to the Delta table
)

In [0]:
# To monitor and confirm that "bronze_stream" has successfully started
bronze_started = wait_stream_start(spark, "bronze_stream")

## 4.0 Bronze Data Exploratory Data Analysis
- How many tweets are captured in your Bronze Table?
- Are there any columns that contain Nan or Null values?  If so how many and what will you do in your silver transforms to address this?
- Count the number of tweets by each unique user handle and sort the data by descending count.
- How many tweets have at least one mention (@) how many tweet have no mentions (@)
- Plot a bar chart that shows the top 20 tweeters (users)


In [0]:
# Load the bronze table
# Reading data from an external storage location ('BRONZE_DELTA') with "delta" format

bronze_eda = (spark
              .read
              .format("delta")
              .load(BRONZE_DELTA)
)

In [0]:
bronze_eda.show()

In [0]:
# Enable to display full content
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

# Show the full content of the "date" column
bronze_eda.select('date').show(truncate=False)

In [0]:
# Counting how many tweets are captured in Bronze Table
tweet_count = bronze_eda.count()
print(f"{tweet_count} tweets are captured in Bronze Table. ")

In [0]:
# Check if there's any missing value in each column
bronze_missing = {c: bronze_eda.where(col(c).isNull()).count() for c in bronze_eda.columns}

for name, count in bronze_missing.items():
    print(f"{count} missing values in column '{name}'.")

In [0]:
# Count how many tweet per user posted and sort it descendly by 'count' 
user_tweet_counts = (
    bronze_eda
    .groupBy("user")                # Calculate number of tweets each user posted, thus, groupby 'user'
    .count()
    .sort("count", ascending=False)
    .limit(20)                      # Shows the top 20 tweeters (users)
)

user_tweet_counts.show()

In [0]:
# Count how many tweet with at least one mention and no mention
mention_count = bronze_eda.filter(bronze_eda.text.like('%@%')).count()
no_mention_count = bronze_eda.filter(~bronze_eda.text.like('%@%')).count()

print(f"{mention_count} tweets have at least one mention.")
print(f"{no_mention_count} tweets have no mentions.")

In [0]:
# Plot a bar chart that shows the top 20 tweeters (users)
# Collect data from Spark DataFrame and convert it to a list of tuples
data = user_tweet_counts.collect()
# Extract usernames and tweet counts from the collected data
usernames = [row['user'] for row in data]
tweet_counts = [row['count'] for row in data]


# Create the bar chart using Matplotlib
fig, ax = plt.subplots(figsize=(10, 8))  # Create a figure and a single subplot
ax.bar(usernames, tweet_counts, color='blue')  # Create a bar chart

# Labeling the axes
ax.set_xlabel('Username', fontsize=12)  # Set x-axis label
ax.set_ylabel('Tweet Count', fontsize=12)  # Set y-axis label
ax.set_title('Top 20 Tweeters', fontsize=16)  # Set title

# Rotate the x-axis labels to prevent overlap
ax.set_xticklabels(usernames, rotation=90, fontsize=10)

# Show the plot
plt.tight_layout()
plt.show()


## 5.0 Transform the Bronze Data to Silver Data using a stream
- setup a read stream on your bronze delta table
- setup a write stream to append to the silver delta table
- Use the defined SILVER_CHECKPOINT and SILVER_DELTA paths in the includes
- name your bronze to silver stream as silver_stream
- transform the bronze data to the silver data using the data definition at the top of the notebook

In [0]:
# Setup a silver read stream with bronze table
bronze_to_silver_stream = (
    spark
    .readStream                                         # Initialize the read stream
    .format("delta")                                    # Specify the format of the data source
    .load(BRONZE_DELTA)                                 # Load data from the 'BRONZE_DELTA' table
)

In [0]:
# Transform the bronze data to the silver data using the data definition at the top of the notebook
bronze_to_silver_transform = (
    bronze_to_silver_stream
    .withColumn("timestamp", to_timestamp(col("date"), "yyyy-MM-dd HH:mm:ss"))            # Convert 'date' to timestamp
    .withColumn("mention", explode(split(col('text')," ")))                               # Explode 'text' into mentions
    .where("mention like '@%' and mention <> '@'")                                        # Filter conditions
    .withColumn("cleaned_text", regexp_replace(col('text'),"(\s|^)@\w+",""))              # Remove 
    .select("timestamp", "mention", "cleaned_text", "sentiment")
)

In [0]:
# Setup a write stream to append to the silver delta table
bronze_to_silver_query = (
    bronze_to_silver_transform
    .writeStream                                               # Initialize the write stream
    .option("mergeSchema", "true")                             # Allow a new schema to be merged into the silver delta table
    .outputMode("append")                                      # Set the output mode to append, to ensure data is added
    .format('delta')                                           # Use the "Delta" format for storage
    .queryName("silver_stream")                                # Name bronze to silver stream as 'silver_stream'
    .option("checkpointLocation", SILVER_CHECKPOINT)           # Specify the checkpoint location for fault tolerance
    .start(SILVER_DELTA)                                       # Start the silver stream with the path to the Delta table
)

In [0]:
silver_started = wait_stream_start(spark, "silver_stream")

In [0]:
# Show the silver delta table
silver_eda = (spark.read.format("delta").load(SILVER_DELTA))
silver_eda.show()

## 6.0 Transform the Silver Data to Gold Data using a stream
- setup a read stream on your silver delta table
- setup a write stream to append to the gold delta table
- Use the defined GOLD_CHECKPOINT and GOLD_DELTA paths defines in the includes
- name your silver to gold stream as gold_stream
- transform the silver data to the gold data using the data definition at the top of the notebook
- Load the pretrained transformer sentiment classifier from the MODEL_NAME at the production level from the MLflow registry
- Use a spark UDF to parallelize the inference across your silver data

In [0]:
# Setup a gold read stream with silver table
silver_to_gold_stream = (
    spark
    .readStream                                         # Initialize the read stream
    .format("delta")                                    # Specify the format of the data source
    .load(SILVER_DELTA)                                 # Load data from the 'SILVER_DELTA' table
)

In [0]:
# Load the pretrained transformer sentiment classifier from the MODEL_NAME at the production level from the MLflow registry
model_version_uri = f"models:/{MODEL_NAME}/production"
# Use a spark UDF to parallelize the inference across your silver data
HF_UDF = mlflow.pyfunc.spark_udf(spark, model_uri = model_version_uri)

# Transform the silver data to the gold data using the data definition at the top of the notebook
silver_to_gold_transform = (
    silver_to_gold_stream
    .withColumn("predicted", HF_UDF(col('cleaned_text')))                    # Apply the MLflow model to predict sentiment
    .withColumn("predicted_sentiment",
                when(col('predicted')['label'] == 'POS', "positive")
                .when(col('predicted')['label'] == 'NEG', "negative")
                .when(col('predicted')['label'] == 'NEU', "neutral"))        # Map predicted labels to sentiment categories
    .withColumn("predicted_score", (col('predicted')['score']).astype(DoubleType()))    # Extract and cast predicted scores
    .drop("predicted")
    .withColumn("sentiment_id",                      # O for negative and 1 for postive associated with the given sentiment
                when(col('sentiment') == "positive", 1)
                .when(col('sentiment') == "negative", 0))
    .withColumn("predicted_sentiment_id",            # O for negative and 1 for positive based on HF Sentiment Transformer
                when(col('predicted_sentiment') == "positive", 1)
                .when(col('predicted_sentiment') == "negative", 0))
)

In [0]:
silver_to_gold_query = (
    silver_to_gold_transform
    .writeStream
    .option("mergeSchema", "True")
    .outputMode("append")
    .format("delta")
    .queryName("gold_stream")                              # Name silver to gold stream as gold_stream
    .option("checkpointLocation", GOLD_CHECKPOINT)
    .start(GOLD_DELTA)
)

In [0]:
gold_started = wait_stream_start(spark, "gold_stream")

In [0]:
gold_data = (spark.read.format("delta").load(GOLD_DELTA))
gold_data.show()

In [0]:
# Count occurrences of 'neutral' in the 'predicted_sentiment' column
all_count = gold_data.count()
neutral_count = gold_data.filter(col("predicted_sentiment") == "neutral").count()

# Print the count
print(f"Number of value in 'predicted_sentiment' column:", all_count)
print(f"Number of 'neutral' in 'predicted_sentiment' column:", neutral_count)


I DECIDE TO DROP 'NEUTRAL' sentiment generated by HF model because I think it's hard to classify them into positive or negative based on sentiment score. Also, our dataset is huge, I think it can still generate meaningful result when I removed rows having 'neutral' in 'predicted_sentiment' column. Thus, in the next step, when i calculate each metric, I won't consider about the 'neutral'.

## 7.0 Capture the accuracy metrics from the gold table in MLflow
Store the following in an MLflow experiment run:
- Store the precision, recall, and F1-score as MLflow metrics
- Store an image of the confusion matrix as an MLflow artifact
- Store the mdoel name and the MLflow version that was used as an MLflow parameters
- Store the version of the Delta Table (input-silver) as an MLflow parameter

In [0]:
results = (
    gold_data
    .select("sentiment_id", "predicted_sentiment_id")                             # select true results and predicted results
    .toPandas()
    .query("predicted_sentiment_id.notnull()")
)


results['predicted_sentiment_id'] = results['predicted_sentiment_id'].astype(int)   # Convert predicted results to integers for scoring

precision_value = precision_score(results['sentiment_id'], results['predicted_sentiment_id'])
recall_value = recall_score(results['sentiment_id'], results['predicted_sentiment_id'])
f1_score_value = f1_score(results['sentiment_id'], results['predicted_sentiment_id'])
confusion_matrix_value = confusion_matrix(results['sentiment_id'], results['predicted_sentiment_id'])

print(f"Precision : {precision_value}")
print(f"Recall : {recall_value}")
print(f"F1-score : {f1_score_value}")

In [0]:
# Creating a heatmap using Matplotlib's imshow function
plt.figure(figsize=(8, 6))
plt.imshow(confusion_matrix_value, interpolation='nearest', cmap='Blues')
plt.colorbar()  # Add a colorbar to a plot

# Adding labels and title for clarity
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix Heatmap')

# Adding annotations inside the squares
threshold = confusion_matrix_value.max() / 2.
for i in range(confusion_matrix_value.shape[0]):
    for j in range(confusion_matrix_value.shape[1]):
        plt.text(j, i, format(confusion_matrix_value[i, j], 'd'),
                 horizontalalignment="center",
                 color="white" if confusion_matrix_value[i, j] > threshold else "black")

# Ticking
plt.xticks(np.arange(confusion_matrix_value.shape[1]), ['Label1', 'Label2']) 
plt.yticks(np.arange(confusion_matrix_value.shape[0]), ['Label1', 'Label2'])

# Displaying the plot
plt.show()

In [0]:
# Start MLflow run
with mlflow.start_run():
    # Prepare data and compute metrics
    results = (
        gold_data
        .select("sentiment_id", "predicted_sentiment_id")
        .toPandas()
        .query("predicted_sentiment_id.notnull()")
    )

    results['predicted_sentiment_id'] = results['predicted_sentiment_id'].astype(int)

    precision_value = precision_score(results['sentiment_id'], results['predicted_sentiment_id'])
    recall_value = recall_score(results['sentiment_id'], results['predicted_sentiment_id'])
    f1_score_value = f1_score(results['sentiment_id'], results['predicted_sentiment_id'])
    confusion_matrix_value = confusion_matrix(results['sentiment_id'], results['predicted_sentiment_id'])

    # Log metrics
    mlflow.log_metric("precision", precision_value)
    mlflow.log_metric("recall", recall_value)
    mlflow.log_metric("f1_score", f1_score_value)

    # Creating a heatmap using Matplotlib's imshow function
    plt.figure(figsize=(8, 6))
    plt.imshow(confusion_matrix_value, interpolation='nearest', cmap='Blues')
    plt.colorbar()  # Add a colorbar to a plot

    # Adding results and title for clarity
    plt.xlabel('Predicted results')
    plt.ylabel('True results')
    plt.title('Confusion Matrix Heatmap')

    # Adding annotations inside the squares
    threshold = confusion_matrix_value.max() / 2.
    for i in range(confusion_matrix_value.shape[0]):
        for j in range(confusion_matrix_value.shape[1]):
            plt.text(j, i, format(confusion_matrix_value[i, j], 'd'),
                    horizontalalignment="center",
                    color="white" if confusion_matrix_value[i, j] > threshold else "black")

    # Ticking
    plt.xticks(np.arange(confusion_matrix_value.shape[1]), ['Label1', 'Label2']) 
    plt.yticks(np.arange(confusion_matrix_value.shape[0]), ['Label1', 'Label2'])

    # Displaying the plot
    plt.show()

    # Save confusion matrix plot as an artifact
    temp_file = "confusion_matrix.png"
    plt.savefig(temp_file)
    mlflow.log_artifact(temp_file)

    # Log parameters
    mlflow.log_param("model_name", MODEL_NAME)
    mlflow.log_param("mlflow_version", mlflow.__version__)



## 8.0 Application Data Processing and Visualization
- How many mentions are there in the gold data total?
- Count the number of neutral, positive and negative tweets for each mention in new columns
- Capture the total for each mention in a new column
- Sort the mention count totals in descending order
- Plot a bar chart of the top 20 mentions with positive sentiment (the people who are in favor)
- Plot a bar chart of the top 20 mentions with negative sentiment (the people who are the vilians)

You may want to use the "Loop Application" widget to control whether you repeateded display the latest plots while the data comes in from your streams before moving on to the next section and cleaning up your run.

*note: A mention is a specific twitter user that has been "mentioned" in a tweet with an @user reference.

In [0]:
# How many mentions are there in the gold data total?
unique_mention_count = gold_data.select('mention').distinct().count()

print(f"There are a total of {unique_mention_count} unique mention users in the gold table.")

In [0]:
# Count the number of neutral, positive and negative tweets for each mention in new columns
# Capture the total for each mention in a new column
# Sort the mention count totals in descending order

app_data = (gold_data
          .groupBy('mention')                                          # all on a given mention
          .agg(min('timestamp').alias('min_timestamp'),                # the oldest timestamp
               max('timestamp').alias('max_timestamp'),                # the newest timestamp
               sum((gold_data.sentiment == 'negative').cast('int')).alias('negative'),
               sum((gold_data.sentiment == 'neutral').cast('int')).alias('neutral'),
               sum((gold_data.sentiment == 'positive').cast('int')).alias('positive'))
          .withColumn('total_count', col('negative') + col('neutral') + col('positive'))
          .select('min_timestamp', 'max_timestamp', 'mention', 'negative', 'neutral', 'positive', 'total_count')
          .sort('total_count', ascending=False)
)

In [0]:
# Plot a bar chart of the top 20 mentions with positive sentiment
top_20_positive_mentions = (
    app_data
    .select('mention', 'positive')
    .sort('positive', ascending=False)
    .limit(20)
)
display(top_20_positive_mentions)

In [0]:
# Plot a bar chart of the top 20 mentions with positive sentiment
# Extract data for plotting
positive_mentions = top_20_positive_mentions.select('mention').rdd.flatMap(lambda x: x).collect()
positive_counts = top_20_positive_mentions.select('positive').rdd.flatMap(lambda x: x).collect()

# Create bar chart
plt.figure(figsize = (10, 6))
plt.barh(positive_mentions, positive_counts, color='skyblue')
plt.xlabel('Positive Sentiment Count')
plt.ylabel('Positive Mentions')
plt.title('Top 20 Mentions with Positive Sentiment')
plt.gca().invert_yaxis()  # Invert y-axis to have the highest count at the top
plt.tight_layout()

# Show plot
plt.show()

In [0]:
# top_20_positive_mentions
top_20_negative_mentions = (
    app_data
    .select('mention', 'negative')
    .sort('negative', ascending=False)
    .limit(20)
)
display(top_20_negative_mentions)

In [0]:
# Plot a bar chart of the top 20 mentions with negative sentiment
# Extract data for plotting
negative_mentions = top_20_negative_mentions.select('mention').rdd.flatMap(lambda x: x).collect()
negative_counts = top_20_negative_mentions.select('negative').rdd.flatMap(lambda x: x).collect()

# Create bar chart
plt.figure(figsize = (10, 6))
plt.barh(negative_mentions, negative_counts, color='skyblue')
plt.xlabel('Negative Sentiment Count')
plt.ylabel('Negative Mentions')
plt.title('Top 20 Mentions with Negative Sentiment')
plt.gca().invert_yaxis()  # Invert y-axis to have the highest count at the top
plt.tight_layout()

# Show plot
plt.show()

## 9.0 Clean up and completion of your pipeline
- using the utilities what streams are running? If any.
- Stop all active streams
- print out the elapsed time of your notebook.

In [0]:
# Print active streams
for stream in spark.streams.active:
    print(f"Stream {stream.name} was active")
    stop_named_stream(spark, stream.name)

In [0]:
# Stop all active streams
stop_all_streams()

In [0]:
# Get the notebooks ending time note START_TIME was established in the include file when the notebook started.
END_TIME = time.time()
TOTAL_TIME = (END_TIME - START_TIME)

In [0]:
# Print out the elapsed time of your notebook.
print(f'It started at {START_TIME} to run all the notebook')
print(f'It ended at {END_TIME} to finish all the notebook')
print(f'It took {TOTAL_TIME} seconds to run through all the notebook')

## 10.0 How Optimized is your Spark Application (Grad Students Only)
Graduate students (registered for the DSCC-402 section of the course) are required to do this section.  This is a written analysis using the Spark UI (link to screen shots) that support your analysis of your pipelines execution and what is driving its performance.
Recall that Spark Optimization has 5 significant dimensions of considertation:
- Spill: write to executor disk due to lack of memory
- Skew: imbalance in partition size
- Shuffle: network io moving data between executors (wide transforms)
- Storage: inefficiency due to disk storage format (small files, location)
- Serialization: distribution of code segments across the cluster

Comment on each of the dimentions of performance and how your impelementation is or is not being affected.  Use specific information in the Spark UI to support your description.  

Note: you can take sreenshots of the Spark UI from your project runs in databricks and then link to those pictures by storing them as a publicly accessible file on your cloud drive (google, one drive, etc.)

References:
- [Spark UI Reference Reference](https://spark.apache.org/docs/latest/web-ui.html#web-ui)
- [Spark UI Simulator](https://www.databricks.training/spark-ui-simulator/index.html)

### ENTER YOUR MARKDOWN HERE