## DSCC202-402 Data Science at Scale Final Project
### Tracking Tweet sentiment at scale using a pretrained transformer (classifier)
<p>Consider the following illustration of the end to end system that you will be building.  Each student should do their own work.  The project will demonstrate your understanding of Spark Streaming, the medalion data architecture using Delta Lake, Spark Inference at Scale using an MLflow packaged model as well as Exploritory Data Analysis and System Tracking and Monitoring.</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/pipeline.drawio.png">

<p>
You will be pulling an updated copy of the course GitHub repositiory: <a href="https://github.com/lpalum/dscc202-402-spring2025">The Repo</a>.  

Once you have updated your fork of the repository you should see the following template project that is resident in the final_project directory.
</p>

<img src="https://data-science-at-scale.s3.amazonaws.com/images/notebooks.drawio.png">

<p>
You can then pull your project into the Databrick Workspace using the <a href="https://github.com/apps/databricks">Databricks App on Github</a> or by cloning the repo to your laptop and then uploading the final_project directory and its contents to your workspace using file imports.  Your choice.

<p>
Work your way through this notebook which will give you the steps required to submit a complete and compliant project.  The following illustration and associated data dictionary specifies the transformations and data that you are to generate for each step in the medallion pipeline.
</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/dataframes.drawio.png">

#### Bronze Data - raw ingest
- date - string in the source json
- user - string in the source json
- text - tweet string in the source json
- sentiment - the given sentiment of the text as determined by an unknown model that is provided in the source json
- source_file - the path of the source json file the this row of data was read from
- processing_time - a timestamp of when you read this row from the source json

#### Silver Data - Bronze Preprocessing
- timestamp - convert date string in the bronze data to a timestamp
- mention - every @username mentioned in the text string in the bronze data gets a row in this silver data table.
- cleaned_text - the bronze text data with the mentions (@username) removed.
- sentiment - the given sentiment that was associated with the text in the bronze table.

#### Gold Data - Silver Table Inference
- timestamp - the timestamp from the silver data table rows
- mention - the mention from the silver data table rows
- cleaned_text - the cleaned_text from the silver data table rows
- sentiment - the given sentiment from the silver data table rows
- predicted_score - score out of 100 from the Hugging Face Sentiment Transformer
- predicted_sentiment - string representation of the sentiment
- sentiment_id - 0 for negative and 1 for postive associated with the given sentiment
- predicted_sentiment_id - 0 for negative and 1 for positive assocaited with the Hugging Face Sentiment Transformer

#### Application Data - Gold Table Aggregation
- min_timestamp - the oldest timestamp on a given mention (@username)
- max_timestamp - the newest timestamp on a given mention (@username)
- mention - the user (@username) that this row pertains to.
- negative - total negative tweets directed at this mention (@username)
- neutral - total neutral tweets directed at this mention (@username)
- positive - total positive tweets directed at this mention (@username)

When you are designing your approach, one of the main decisions that you will need to make is how you are going to orchestrate the streaming data processing in your pipeline.  There are several valid approaches to triggering your steams and how you will gate the execution of your pipeline.  Think through how you want to proceed and ask questions if you need guidance. The following references may be helpful:
- [Spark Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Databricks Autoloader - Cloudfiles](https://docs.databricks.com/en/ingestion/auto-loader/index.html)
- [In class examples - Spark Structured Streaming Performance](https://dbc-f85bdc5b-07db.cloud.databricks.com/editor/notebooks/2638424645880316?o=1093580174577663)

### Be sure your project runs end to end when *Run all* is executued on this notebook! (7 points)

### This project is worth 25% of your final grade.
- DSCC-202 Students have 55 possible points on this project (see points above and the instructions below)
- DSCC-402 Students have 60 possible points on this project (one extra section to complete)

In [0]:
%run ./includes/includes

In [0]:
"""
Adding a widget to the notebook to control the clearing of a previous run.
or stopping the active streams using routines defined in the utilities notebook
"""
dbutils.widgets.removeAll()

dbutils.widgets.dropdown("clear_previous_run", "No", ["No","Yes"])
if (getArgument("clear_previous_run") == "Yes"):
    clear_previous_run()
    print("Cleared all previous data.")

dbutils.widgets.dropdown("stop_streams", "No", ["No","Yes"])
if (getArgument("stop_streams") == "Yes"):
    stop_all_streams()
    print("Stopped all active streams.")

dbutils.widgets.dropdown("optimize_tables", "No", ["No","Yes"])
if (getArgument("optimize_tables") == "Yes"):
    # Suck up those small files that we have been appending.
    # Optimize the tables
    optimize_table(BRONZE_DELTA)
    optimize_table(SILVER_DELTA)
    optimize_table(GOLD_DELTA)
    print("Optimized all of the Delta Tables")

## 1.0 Import your libraries here (2 points)
- Are your shuffle partitions consistent with your cluster and your workload?
- Do you have the necessary libraries to perform the required operations in the pipeline/application?

In [0]:
import os
import re
import time
from datetime import datetime

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from delta.tables import DeltaTable
import mlflow
import mlflow.pyfunc

from pyspark.sql.functions import (
    input_file_name,
    current_timestamp,
    regexp_replace,
    to_timestamp,
    expr,
    explode,
    col,
    when,
    rand,
    lit,
    sum as spark_sum,
    size,
    count,
    desc
)

from pyspark.sql.types import StructType, StructField, StringType
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix


# Next I will set the number of partitions, since I have 2 nodes with 8 cores total, so 16 should be a good balance keeping workers not too busy but occupied.
spark.conf.set("spark.sql.shuffle.partitions", 16)

## 2.0 Define and execute utility functions (3 points)
- Read the source file directory listing
- Count the source files (how many are there?)
- print the contents of one of the files

In [0]:
# I read the source files
file_directory = dbutils.fs.ls(TWEET_SOURCE_PATH)

# I count the files using a simple len function
total_num = len(file_directory)
print(f"There are {total_num} files")

# I display the contents of any single file (in this case the first one)
first_file_path = file_directory[0].path
print("\nContents of the first file:")
spark.read.text(first_file_path).display()

In [0]:
# just checking to make sure the data is right
sample = spark.read.text(first_file_path) 
sample.show(n=1, truncate=False)

## 3.0 Transform the Raw Data to Bronze Data using a stream  (8 points)
- define the schema for the raw data
- setup a read stream using cloudfiles and the source data format
- setup a write stream using delta lake to append to the bronze delta table
- enforce schema
- allow a new schema to be merged into the bronze delta table
- Use the defined BRONZE_CHECKPOINT and BRONZE_DELTA paths defined in the includes
- name your raw to bronze stream as bronze_stream
- transform the raw data to the bronze data using the data definition at the top of the notebook

In [0]:
# So firstly I define the schema I need to use for the bronze data (all are string)
Schema_raw  = StructType([
    StructField("date",      StringType(), True),    
    StructField("user",      StringType(), True),   
    StructField("text",      StringType(), True),   
    StructField("sentiment", StringType(), True)  
    # I will be adding the two additional metrics later as they are, derivative metrics of the stream   
])

# Next I set up a read‐stream with cloudFiles, and also enforcing the schema
raw_stream = (
    spark.readStream
         .format("cloudFiles")                
         .option("cloudFiles.format", "json")  
         .schema(Schema_raw)            # enforcing the schema        
         .load(TWEET_SOURCE_PATH)             # using the source directory path here
)

# transforming the raw data to the bronze data (also adding the two new columns)
bronze_df = raw_stream.select(
    "date", 
    "user", 
    "text", 
    "sentiment").withColumn("source_file", input_file_name()).withColumn("processing_time", current_timestamp())

bronze_clean = bronze_df.na.drop() # cleaning the data here, becuase another question later asks me on how I should be doing it ((ANS) by dropping the nulls)

# lastly I setup a write stream (named it bronze stream)
bronze_stream = (

    bronze_clean.writeStream
             .format("delta")  # just using the delta format
             .option("checkpointLocation", BRONZE_CHECKPOINT)  # using checkpoint
             .option("mergeSchema", "true")          #  as the markdown above said, Im also allowing schema merging
             .queryName("bronze")                      #  I gave this name so that I can easily query later for monitoring
             .outputMode("append")                   # specifying to be in append mode  
             .start(BRONZE_DELTA)                    # using the defined Bronze Delta path

)

## 4.0 Transform the Bronze Data to Silver Data using a stream (5 points)
- setup a read stream on your bronze delta table
- setup a write stream to append to the silver delta table
- Use the defined SILVER_CHECKPOINT and SILVER_DELTA paths in the includes
- name your bronze to silver stream as silver_stream
- transform the bronze data to the silver data using the data definition at the top of the notebook

In [0]:
# Enabling legacy time parser to avoid Spark ≥3.0 datetime pattern errors, it was giving me errors otherwise for string to timestamp conversion, and the output was NAN for date in silver data, if i did not do this.
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [0]:
time.sleep(30) # manually delaying the starting fo the next stream to make sure the bronze table is ready for the next stream


# setting up a read stream from the Bronze Delta table
bronze_df = (

    spark.readStream
         .format("delta")        # setting source and just loading from the pre given variable where i did a write stream before
         .load(BRONZE_DELTA)    

)


# Transforming the Bronze into Silver 
silver_df = (

    # So I need to do 4 main things:
    # a) Converitng strings to timstamps
    # b) Extracting mentions
    # c) Cleaning the mentions
    # d) just take sentiment as it is

    bronze_df

      # # removing a offset created by letters between year and time
      .withColumn(
          "no_token_date",
          regexp_replace("date", r" [A-Z]{3} ", " ")  
          # so after looking at intemrdeiate output this takes imput of somehting like "Mon May 12 22:19:49 PDT 2025" and makes it "Mon May 12 22:19:49 2025"
      )
      # Then i just convert it to timestamp using the write format
      .withColumn(
          "timestamp",
          to_timestamp("no_token_date", "EEE MMM dd HH:mm:ss yyyy")
      )



      # I use expr function to get an array of mentions (because there can be more than 1), using regex pattern to identify anything with @
      .withColumn(
          "mentions_array",
          expr("regexp_extract_all(text, '@[A-Za-z0-9_]+', 0)")
      )
      # then I just explode the mentions array to make multiple rows for each mention
      .withColumn("mention", explode("mentions_array"))



      # using a regex pattern that replaces anyhting with "@" infront of it with a blank space
      .withColumn(
          "cleaned_text",
          regexp_replace("text", "@[A-Za-z0-9_]+", "")
      )


      # lastly I just select the columns I want cleaned/transformed and the sentiments column
      .select(
          "timestamp",    # parsed event time
          "mention",      # single @username per row
          "cleaned_text", # text with mentions removed
          "sentiment"     # sentiment label from Bronze
      )
)



#  setting up the write stream with appropriate includes also namin it silver stream
silver_stream = (

    silver_df.writeStream
             .format("delta")                               
             .option("checkpointLocation", SILVER_CHECKPOINT)  # Silver checkpoint path
             .option("mergeSchema", "true")                 # again allowing for future merging
             .outputMode("append")                          # seting output mode
             .queryName("silver")                           # naming for monitoring later on
             .start(SILVER_DELTA)                           # path to Silver Delta

)

## 5.0 Transform the Silver Data to Gold Data using a stream (7 points)
- setup a read stream on your silver delta table
- setup a write stream to append to the gold delta table
- Use the defined GOLD_CHECKPOINT and GOLD_DELTA paths defines in the includes
- name your silver to gold stream as gold_stream
- transform the silver data to the gold data using the data definition at the top of the notebook
- Load the pretrained transformer sentiment classifier from the MODEL_NAME at the production level from the MLflow registry
- Use a spark UDF to parallelize the inference across your silver data

In [0]:
time.sleep(30) # again delaying so that gold stream can get a silver table

# calling the model in production mode, and using a spark udf to use it to parallelize inference later on
production_lvl_model = f"models:/{MODEL_NAME}/production"
sentiment_udf = mlflow.pyfunc.spark_udf(spark, production_lvl_model)


# will be chaining the read, transformation and write into one 
gold_stream = (

    spark.readStream # standard read stream just getting the silver data from that table.
         .format("delta")                         
         .load(SILVER_DELTA)


        # next I use the model to infer and get the next columns for the gold table  

      .withColumn("predictions", sentiment_udf(col("cleaned_text")))  # running the inference with cleaned text as the input

      .withColumn("predicted_score", col("predictions").getField("score") * 100) # from the predictions I get the score and scale it
      .withColumn("predicted_sentiment", col("predictions").getField("label")) # I also get the predcition label


        # so I recognize the fact that the model outputs 3 possible outputs, so I map them to just two outputs as mentioned in the data definition.


      .withColumn("predicted_sentiment_id",                           
          when(col("predicted_sentiment") == "POS", lit(1)) # if its an output "POS" i map to 1
         .when(col("predicted_sentiment") == "NEG", lit(0)) # if its an output "NEG" i map to 0
         .when(col("predicted_sentiment") == "NEU",
               
               # if its neutral I just flip a coin to go either way
               when(rand() < 0.5, lit(1)).otherwise(lit(0)))
      )

        # same for the original seentiment_id
      .withColumn("sentiment_id",                                      
          when(col("sentiment") == "positive", lit(1)) # if its an output "POS" i map to 1
         .when(col("sentiment") == "negative", lit(0)) # if its an output "NEG" i map to 0
         .when(col("sentiment") == "neutral",
               
                # if its neutral I just flip a coin to go either way
               when(rand() < 0.5, lit(1)).otherwise(lit(0)))
      )


        # now that i have everything I want I just select the columns I want
      .select(                                                     
          "timestamp",
          "mention",
          "cleaned_text",
          "sentiment",
          "predicted_score",
          "predicted_sentiment",
          "sentiment_id",
          "predicted_sentiment_id"
      )

        # lastly I setup a write stream
      .writeStream                                                  
         .format("delta")
         .option("checkpointLocation", GOLD_CHECKPOINT) 
         .option("mergeSchema", "true")
         .outputMode("append")
         .queryName("gold")
         .start(GOLD_DELTA)


)


## 6.0 Monitor your Streams (5 points)
- Setup a loop that runs at least every 10 seconds
- Print a timestamp of the monitoring query along with the list of streams, rows processed on each, and the processing time on each
- Run the loop until all of the data is processed (0 rows read on each active stream)
- Plot a line graph that shows the data processed by each stream over time
- Plot a line graph that shows the average processing time on each stream over time

In [0]:
time.sleep(30) # just to be safe

records = []
seen_nonzero = False



while True:

    time_right_now = datetime.now()
    timestamp = time_right_now.strftime("%Y-%m-%d %H:%M:%S")
    active = spark.streams.active




    print(f"\n[{timestamp}] Status of Streams:")
   

    check_1 = True

    for i in active:

        prog = i.lastProgress or {}

        # getting number of rows currently being inputted and processed
        num_rows = prog.get("numInputRows", 0)

        #  Batch Duration extraction from stream progress (MS)
        durations = prog.get("durationMs", {})
        processing_time = durations.get("triggerExecution", prog.get("batchDurationMs", 0))

        # prinitng it
        print(f" – {i.name}: rows inputted = {num_rows}, proccesing time (ms) = {processing_time}")

        # appening it for later grpahing
        records.append({
            "timestamp": time_right_now,
            "stream": i.name,
            "num_rows": num_rows,
            "processing time (ms)": processing_time
        })

        # chekcing if we still have rows left
        if num_rows > 0:
            check_2 = True
            check_1 = False

    if check_2 and check_1:
        print(f"[{timestamp}] All data has been processed by streams")
        break

    # if rows remain we do it again after 10 secs
    time.sleep(10)




# converitnng the records columsn into a df for prcessing
df = pd.DataFrame(records)
df["timestamp"] = pd.to_datetime(df["timestamp"])



# rows processed by time
plt.figure(figsize=(15, 8))

for name, grp in df.groupby("stream"):
    plt.plot(grp["timestamp"], grp["num_rows"], marker="o", label=name)
# -------------------------------------------------------------------------
plt.title("Data Processed Over Time")
plt.xlabel("Timestamps")
plt.ylabel("Rows being input")
plt.legend(loc="best")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()





# processing time graph
plt.figure(figsize=(15, 8))

for name, grp in df.groupby("stream"):
    plt.plot(grp["timestamp"], grp["processing time (ms)"], marker="o", label=name)
# -------------------------------------------------------------------------
plt.title("Processing Time Over Time")
plt.xlabel("Timestamp")
plt.ylabel("processing Time (ms)")
plt.legend(loc="best")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

## 7.0 Bronze Data Exploratory Data Analysis (5 points)
- How many tweets are captured in your Bronze Table?
- Are there any columns that contain Nan or Null values?  If so how many and what will you do in your silver transforms to address this?
- Count the number of tweets by each unique user handle and sort the data by descending count.
- How many tweets have at least one mention (@) how many tweet have no mentions (@)
- Plot a bar chart that shows the top 20 tweeters (users)


In [0]:
bronze_table = spark.read.format("delta").load(BRONZE_DELTA)

# How many tweets are captured in the bronze table?
total_tweets = bronze_table.count()
print(f"Total tweets in Bronze table are {total_tweets}")


# Null columns?
null_counts = bronze_table.agg(
    *[
        spark_sum(
            (col(c).isNull() | (col(c) == "")).cast("int")
        ).alias(c)
        for c in bronze_table.columns
    ]
)

print("Null counts per column:")
null_counts.show()

print("I think a pretty straight forward way is to just drop the rows with NANs, keeping in mind we have a lot of data, and also becuase we are using a pre trained modelso data isnt a huge issues for us, I have done exactly this in the bronze to silver transformation.")



#  Tweets by unique user handle in descending count
unique_user = bronze_table.groupBy("user").count().orderBy(col("count").desc())
print("Top users by tweet count (descending order):")
unique_user.show()






# Tweets with and without mentions

# again with the regex pattern to get an array of mentions
mentions_df = bronze_table.withColumn("mentions_array", expr("regexp_extract_all(text, '@[A-Za-z0-9_]+', 0)")).withColumn("num_mentions", size(col("mentions_array")))

# define with and without based of what the arrray looks like, either empty or not
tweets_with = mentions_df.filter(col("num_mentions") > 0).count()
tweets_without = mentions_df.filter(col("num_mentions") == 0).count()

# print the counts
print(f"Tweets with at least one mention: {tweets_with}")
print(f"Tweets with no mentions: {tweets_without}")


# Plot a bar chart that shows the top 20 users (assuming this is just a grpah of what i was aggregating above)
top20 = unique_user.limit(20).toPandas()

plt.figure(figsize=(15, 8))
plt.bar(top20['user'], top20['count'])
plt.title("Top 20 Tweeters by Tweet Count")
plt.xticks(rotation=90)
plt.xlabel("User's handle")
plt.ylabel("Number of Tweets")
plt.tight_layout()
plt.show()


## 8.0 Capture the accuracy metrics from the gold table in MLflow  (4 points)
Store the following in an MLflow experiment run:
- Store the precision, recall, and F1-score as MLflow metrics
- Store an image of the confusion matrix as an MLflow artifact
- Store the model name and the MLflow version that was used as an MLflow parameters
- Store the version of the Delta Table (input-silver) as an MLflow parameter

In [0]:
# first I load the actual gold table so I can calculate the metrics
gold_df = spark.read.format("delta").load(GOLD_DELTA)


# getting both tthe gold label and what the udf predicted
cost_inputs = gold_df.select("sentiment_id", "predicted_sentiment_id").collect()
y_true = [r["sentiment_id"]            for r in cost_inputs]
y_pred = [r["predicted_sentiment_id"]  for r in cost_inputs]



# now that i have the two I compute the actual metrics
precision = precision_score(y_true, y_pred, average="binary")
recall    = recall_score(y_true, y_pred, average="binary")
f1        = f1_score(y_true, y_pred, average="binary")

# also need to log these params
mlflow_version = mlflow.__version__
model_name     = MODEL_NAME   
silver_table   = DeltaTable.forPath(spark, SILVER_DELTA)
silver_version = silver_table.history(1).select("version").first()[0]




# start the actual MLflow run
with mlflow.start_run() as run:

    # loging the 3 metrics
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)
    mlflow.log_metric("f1_score", f1)

    # loging the params
    mlflow.log_param("model_name", model_name)
    mlflow.log_param("mlflow_version", mlflow_version)
    mlflow.log_param("silver_delta_version", silver_version)


    # making the confusion matrix to log
    cm = confusion_matrix(y_true, y_pred)

    # making the axis
    fig, ax = plt.subplots()
    im = ax.imshow(cm, cmap="Blues")
    fig.colorbar(im, ax=ax)

    # annotating each cell accoridng to the actual confusion matrix
    for (i, j), v in np.ndenumerate(cm):
        ax.text(j, i, v, ha="center", va="center")

    ax.set_xlabel("Predicted")
    ax.set_ylabel("Actual")

    # save to file and showing in notebook
    out = "/tmp/confusion_matrix.png"
    fig.savefig(out, bbox_inches="tight")
    plt.show()

    # logging
    mlflow.log_artifact(out, artifact_path="confusion_matrix")
    plt.close(fig)

    print("MLflow run params and artifacts logged")


## 9.0 Application Data Processing and Visualization (6 points)
- How many mentions are there in the gold data total?
- Count the number of neutral, positive and negative tweets for each mention in new columns
- Capture the total for each mention in a new column
- Sort the mention count totals in descending order
- Plot a bar chart of the top 20 mentions with positive sentiment (the people who are in favor)
- Plot a bar chart of the top 20 mentions with negative sentiment (the people who are the vilians)

*note: A mention is a specific twitter user that has been "mentioned" in a tweet with an @user reference.

In [0]:
# Reading the Gold Delta table
gold_df = spark.read.format("delta").load(GOLD_DELTA)


# How many mentions in total?

# so i exploded the column in the silver so each mention is in one column
total_mentions = gold_df.count()
print(f"Total mentions in Gold: {total_mentions}")



# Counting neutral, positive, and negative tweets per mention and also geting total counts for each
mention_stats = (

    # i look into the gold table
    gold_df.groupBy("mention") # and group every mention 

      .agg( # then on these emntions i get there counts for how mnay correpsoniding +ve and -ve sentiments they have correpsonding to them.
          
          count("*").alias("total_count"),
          count(when(col("sentiment") == "positive", True)).alias("positive_count"),
          count(when(col("sentiment") == "negative", True)).alias("negative_count")
      )

)



# Sort by total_count descending
mention_stats = mention_stats.orderBy(desc("total_count"))
mention_stats.show(10, truncate=False)



# Plotting top 20 mentions by positive sentiment
top20_pos = mention_stats.select("mention", "positive_count").limit(20).toPandas()

plt.figure(figsize=(15,8))
plt.bar(top20_pos["mention"], top20_pos["positive_count"])
plt.xticks(rotation=90, ha="right")
plt.title("Top 20 Mentions by Positive Tweet Count")
plt.xlabel("Mentions")
plt.ylabel("+ve Tweet Count")
plt.tight_layout()
plt.show()



# Plotting top 20 mentions by negative sentiment
top20_neg = mention_stats.select("mention", "negative_count").limit(20).toPandas()

plt.figure(figsize=(15,8))
plt.bar(top20_neg["mention"], top20_neg["negative_count"])
plt.xticks(rotation=90, ha="right")
plt.title("Top 20 Mentions by Negative Tweet Count")
plt.xlabel("Mentions")
plt.ylabel("-ve Tweet Count")
plt.tight_layout()
plt.show()

## 10.0 Clean up and completion of your pipeline (3 points)
- using the utilities what streams are running? If any.
- Stop all active streams
- print out the elapsed time of your notebook. Note: In the includes there is a variable START_TIME that captures the starting time of the notebook.

In [0]:
# Listing any active streams
active_streams = spark.streams.active

if active_streams:
    print("Active streams:")
    for i in active_streams:
        print(f" -> {i.name} ")

else:
    print("No stream is active.")



# stopping streams
stop_all_streams()
print("All streams have stopped")
 

# elapsed time
seconds_spent = time.time() - START_TIME
minutes = int(seconds_spent // 60)
seconds = seconds_spent % 60
print(f"Elapsed time: {minutes} mins {seconds} secs")

## 11.0 How Optimized is your Spark Application (Grad Students Only) (5 points)
Graduate students (registered for the DSCC-402 section of the course) are required to do this section.  This is a written analysis using the Spark UI (link to screen shots) that support your analysis of your pipelines execution and what is driving its performance.
Recall that Spark Optimization has 5 significant dimensions of considertation:
- Spill: write to executor disk due to lack of memory
- Skew: imbalance in partition size
- Shuffle: network io moving data between executors (wide transforms)
- Storage: inefficiency due to disk storage format (small files, location)
- Serialization: distribution of code segments across the cluster

Comment on each of the dimentions of performance and how your impelementation is or is not being affected.  Use specific information in the Spark UI to support your description.  

Note: you can take sreenshots of the Spark UI from your project runs in databricks and then link to those pictures by storing them as a publicly accessible file on your cloud drive (google, one drive, etc.)

References:
- [Spark UI Reference Reference](https://spark.apache.org/docs/latest/web-ui.html#web-ui)
- [Spark UI Simulator](https://www.databricks.training/spark-ui-simulator/index.html)

### ENTER YOUR MARKDOWN HERE