## DSCC202-402 Data Science at Scale Final Project
### Tracking Tweet sentiment at scale using a pretrained transformer (classifier)
<p>Consider the following illustration of the end to end system that you will be building.  Each student should do their own work.  The project will demonstrate your understanding of Spark Streaming, the medalion data architecture using Delta Lake, Spark Inference at Scale using an MLflow packaged model as well as Exploritory Data Analysis and System Tracking and Monitoring.</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/pipeline.drawio.png">

<p>
You will be pulling an updated copy of the course GitHub repositiory: <a href="https://github.com/lpalum/dscc202-402-spring2025">The Repo</a>.  

Once you have updated your fork of the repository you should see the following template project that is resident in the final_project directory.
</p>

<img src="https://data-science-at-scale.s3.amazonaws.com/images/notebooks.drawio.png">

<p>
You can then pull your project into the Databrick Workspace using the <a href="https://github.com/apps/databricks">Databricks App on Github</a> or by cloning the repo to your laptop and then uploading the final_project directory and its contents to your workspace using file imports.  Your choice.

<p>
Work your way through this notebook which will give you the steps required to submit a complete and compliant project.  The following illustration and associated data dictionary specifies the transformations and data that you are to generate for each step in the medallion pipeline.
</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/dataframes.drawio.png">

#### Bronze Data - raw ingest
- date - string in the source json
- user - string in the source json
- text - tweet string in the source json
- sentiment - the given sentiment of the text as determined by an unknown model that is provided in the source json
- source_file - the path of the source json file the this row of data was read from
- processing_time - a timestamp of when you read this row from the source json

#### Silver Data - Bronze Preprocessing
- timestamp - convert date string in the bronze data to a timestamp
- mention - every @username mentioned in the text string in the bronze data gets a row in this silver data table.
- cleaned_text - the bronze text data with the mentions (@username) removed.
- sentiment - the given sentiment that was associated with the text in the bronze table.

#### Gold Data - Silver Table Inference
- timestamp - the timestamp from the silver data table rows
- mention - the mention from the silver data table rows
- cleaned_text - the cleaned_text from the silver data table rows
- sentiment - the given sentiment from the silver data table rows
- predicted_score - score out of 100 from the Hugging Face Sentiment Transformer
- predicted_sentiment - string representation of the sentiment
- sentiment_id - 0 for negative and 1 for postive associated with the given sentiment
- predicted_sentiment_id - 0 for negative and 1 for positive assocaited with the Hugging Face Sentiment Transformer

#### Application Data - Gold Table Aggregation
- min_timestamp - the oldest timestamp on a given mention (@username)
- max_timestamp - the newest timestamp on a given mention (@username)
- mention - the user (@username) that this row pertains to.
- negative - total negative tweets directed at this mention (@username)
- neutral - total neutral tweets directed at this mention (@username)
- positive - total positive tweets directed at this mention (@username)

When you are designing your approach, one of the main decisions that you will need to make is how you are going to orchestrate the streaming data processing in your pipeline.  There are several valid approaches to triggering your steams and how you will gate the execution of your pipeline.  Think through how you want to proceed and ask questions if you need guidance. The following references may be helpful:
- [Spark Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Databricks Autoloader - Cloudfiles](https://docs.databricks.com/en/ingestion/auto-loader/index.html)
- [In class examples - Spark Structured Streaming Performance](https://dbc-f85bdc5b-07db.cloud.databricks.com/editor/notebooks/2638424645880316?o=1093580174577663)

### Be sure your project runs end to end when *Run all* is executued on this notebook! (7 points)

### This project is worth 25% of your final grade.
- DSCC-202 Students have 55 possible points on this project (see points above and the instructions below)
- DSCC-402 Students have 60 possible points on this project (one extra section to complete)

In [0]:
%run ./includes/includes

In [0]:
"""
Adding a widget to the notebook to control the clearing of a previous run.
or stopping the active streams using routines defined in the utilities notebook
"""
dbutils.widgets.removeAll()

dbutils.widgets.dropdown("clear_previous_run", "No", ["No","Yes"])
if (getArgument("clear_previous_run") == "Yes"):
    clear_previous_run()
    print("Cleared all previous data.")

dbutils.widgets.dropdown("stop_streams", "No", ["No","Yes"])
if (getArgument("stop_streams") == "Yes"):
    stop_all_streams()
    print("Stopped all active streams.")

dbutils.widgets.dropdown("optimize_tables", "No", ["No","Yes"])
if (getArgument("optimize_tables") == "Yes"):
    # Suck up those small files that we have been appending.
    # Optimize the tables
    optimize_table(BRONZE_DELTA)
    optimize_table(SILVER_DELTA)
    optimize_table(GOLD_DELTA)
    print("Optimized all of the Delta Tables")

## 1.0 Import your libraries here (2 points)
- Are your shuffle partitions consistent with your cluster and your workload?
- Do you have the necessary libraries to perform the required operations in the pipeline/application?

In [0]:
# System utilities
import os 
import re as regex
import time as timer

# Data manipulation
import pandas as dr  
import numpy as num  

# Visualization tools
import seaborn as sns  
import matplotlib.pyplot as plt  
import plotly.express as px_lib  
import plotly.graph_objects as go_lib  
import plotly.figure_factory as ff_lib  

# Spark core & SQL
from pyspark.sql import SparkSession as SPBuilder
from pyspark.sql import functions as pysf
from pyspark.sql.functions import col as column, lit, when, regexp_replace
from pyspark.sql.types import (
    IntegerType as IntType,
    StringType as StrType,
    StructType as Schema,
    StructField as Field
)

# Spark streaming
from pyspark.streaming import StreamingContext as StreamCtx

# Delta Lake
from delta.tables import DeltaTable as DeltaTbl

# ML and tracking
import mlflow as mlf
from pyspark.ml.evaluation import MulticlassClassificationEvaluator as MCCEval

# Scikit‑learn metrics
from sklearn.metrics import (precision_score, recall_score, f1_score, confusion_matrix)

In [0]:
# Spark tuning
session = spark
session.conf.set("spark.sql.shuffle.partitions", "8")
session.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
session.conf.set("spark.sql.adaptive.enabled", "true")

print("Spark configuration applied.")

## 2.0 Define and execute utility functions (3 points)
- Read the source file directory listing
- Count the source files (how many are there?)
- print the contents of one of the files

In [0]:
# List and count raw tweet files via Databricks dbutils
tweets_dir = TWEET_SOURCE_PATH
files_info = dbutils.fs.ls(tweets_dir)
print(f"Found {len(files_info)} files in {tweets_dir}")

In [0]:
# Contents of the first file
first_path = files_info[0].path
print(f"\n--- Contents of {first_path} ---")
session.read.text(first_path).show(truncate=False, n=50)

## 3.0 Transform the Raw Data to Bronze Data using a stream  (8 points)
- define the schema for the raw data
- setup a read stream using cloudfiles and the source data format
- setup a write stream using delta lake to append to the bronze delta table
- enforce schema
- allow a new schema to be merged into the bronze delta table
- Use the defined BRONZE_CHECKPOINT and BRONZE_DELTA paths defined in the includes
- name your raw to bronze stream as bronze_stream
- transform the raw data to the bronze data using the data definition at the top of the notebook

In [0]:
# Raw to bronze schema
raw_to_bronze_schema = Schema([
    Field("date", StrType(), True),
    Field("user", StrType(), True),
    Field("text", StrType(), True),
    Field("sentiment", StrType(), True),
    Field("source_file", StrType(), True),
])

In [0]:
# Ingest raw JSON files as a streaming DataFrame
bronze_input_df = (
    session.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("maxFilesPerTrigger", 10)
        .schema(raw_to_bronze_schema)           
        .load(TWEET_SOURCE_PATH)                
        .withColumn("source_file", pysf.input_file_name())
        .withColumn("ingest_ts", pysf.current_timestamp())
)

In [0]:
# Write to Bronze Delta
bronze_stream = (
    bronze_input_df.writeStream
        .outputMode("append")
        .format("delta")
        .option("mergeSchema", "true")
        .option("checkpointLocation", BRONZE_CHECKPOINT)
        .queryName("bronze_stream")
        .trigger(once=True)
        .start(BRONZE_DELTA)
)

# wait for the Bronze stream to finish
bronze_stream.awaitTermination()

## 4.0 Transform the Bronze Data to Silver Data using a stream (5 points)
- setup a read stream on your bronze delta table
- setup a write stream to append to the silver delta table
- Use the defined SILVER_CHECKPOINT and SILVER_DELTA paths in the includes
- name your bronze to silver stream as silver_stream
- transform the bronze data to the silver data using the data definition at the top of the notebook

In [0]:
# Sample df
bronze_df_sample = session.read.format("delta").load(BRONZE_DELTA)
display(bronze_df_sample.limit(5))

In [0]:
# Read Bronze Delta as a stream
silver_input_df = (
    session.readStream
           .format("delta")
           .option("maxFilesPerTrigger", 10)
           .load(BRONZE_DELTA)
)

In [0]:
# Apply Silver level transforms
silver_transformed_df = (
    silver_input_df
      .withColumn("timestamp", pysf.to_timestamp(pysf.col("date"), "EEE MMM dd HH:mm:ss zzz yyyy"))
      .withColumn("mention", pysf.explode(
           pysf.split(
               pysf.regexp_replace(pysf.col("text"), "[^@\w]", " "),
               " "
           )
      ))
      .filter(
          pysf.col("mention").startswith("@") & pysf.col("mention").rlike("^@\\w+")
      )
      .withColumn("cleaned_text", pysf.regexp_replace(pysf.col("text"), "@\\w+", ""))
      .select("timestamp", "mention", "cleaned_text", "sentiment")
)

In [0]:
# Write Silver Delta with schema evolution
silver_stream = (
    silver_transformed_df.writeStream
        .outputMode("append")
        .format("delta")
        .option("checkpointLocation", SILVER_CHECKPOINT)
        .queryName("silver_stream")
        .trigger(once=True)
        .start(SILVER_DELTA)
)

silver_stream.awaitTermination()
print("Silver ingestion complete.")

## 5.0 Transform the Silver Data to Gold Data using a stream (7 points)
- setup a read stream on your silver delta table
- setup a write stream to append to the gold delta table
- Use the defined GOLD_CHECKPOINT and GOLD_DELTA paths defines in the includes
- name your silver to gold stream as gold_stream
- transform the silver data to the gold data using the data definition at the top of the notebook
- Load the pretrained transformer sentiment classifier from the MODEL_NAME at the production level from the MLflow registry
- Use a spark UDF to parallelize the inference across your silver data

In [0]:
# Read Silver Delta as a streaming source
gold_input_df = (
    session.readStream
           .format("delta")
           .option("maxFilesPerTrigger", 10)
           .load(SILVER_DELTA)
           .repartition(8)                
)

In [0]:
# Load the model
sentiment_analysis = mlf.pyfunc.spark_udf(spark, model_uri = f"models:/{MODEL_NAME}/production")

In [0]:
# Apply Gold level transforms using the UDF
gold_transformed_df = (
    gold_input_df
      .withColumn("prediction_results", sentiment_analysis(pysf.col("cleaned_text")))
      .withColumn("predicted_score", pysf.col("prediction_results.score") * 100)
      .withColumn("predicted_sentiment", pysf.col("prediction_results.label"))
      .withColumn("sentiment_id", pysf.when(pysf.col("sentiment")=="positive", 1).otherwise(0))
      .withColumn("predicted_sentiment_id", pysf.when(pysf.col("predicted_sentiment")=="POS", 1).otherwise(0))
      .drop("prediction_results")
)

In [0]:
# Start the Gold stream 
gold_stream = (
    gold_transformed_df.writeStream
        .format("delta")
        .option("checkpointLocation", GOLD_CHECKPOINT)
        .outputMode("append")
        .queryName("gold_stream")
        .trigger(once=True)
        .start(GOLD_DELTA)
)
print("Gold stream started.")
gold_stream.awaitTermination()

In [0]:
# Read Gold as a batch DataFrame
gold_df_sample = session.read.format("delta").load(GOLD_DELTA)
display(gold_df_sample.limit(5))

## 6.0 Monitor your Streams (5 points)
- Setup a loop that runs at least every 10 seconds
- Print a timestamp of the monitoring query along with the list of streams, rows processed on each, and the processing time on each
- Run the loop until all of the data is processed (0 rows read on each active stream)
- Plot a line graph that shows the data processed by each stream over time
- Plot a line graph that shows the average processing time on each stream over time

In [0]:
from datetime import datetime
import time
import pandas as pd
import matplotlib.pyplot as plt

# Gold stream 
gold_stream = (
    gold_transformed_df.writeStream
        .format("delta")
        .option("checkpointLocation", GOLD_CHECKPOINT)
        .outputMode("append")
        .queryName("gold_stream")
        .trigger(once=True)
        .start(GOLD_DELTA)
)
print("Gold stream started.")

# Monitor Streams 
metrics = []

while True:
    ts = datetime.now()
    print(f"\n=== Monitor @ {ts.strftime('%Y-%m-%d %H:%M:%S')} ===")
    
    # loop through all active streams
    total_rows = 0
    for q in session.streams.active:
        prog = q.lastProgress or {}
        rows_in = prog.get("numInputRows", 0)
        batch_time = prog.get("durationMs", {}).get("addBatch", None)
        total_rows += rows_in
        
        print(f"Stream `{q.name}` | Rows In: {rows_in} | Batch Time: {batch_time} ms")
    
    # record snapshot
    metrics.append({
        "monitor_time": ts,
        "rows_processed": total_rows,
        "processing_time": batch_time
    })
    
    # stop when stream has finished and processed 0 rows
    if not any(q.isActive for q in session.streams.active) and total_rows == 0:
        print("All data processed and no active streams—exiting monitor loop.")
        break
    
    time.sleep(10)

# ensure stream is stopped
if gold_stream.isActive:
    gold_stream.stop()
print("Gold stream stopped.")

# Build DataFrame for plotting 
history_df = pd.DataFrame(metrics)

# Rows processed over time per stream 
plt.figure()
plt.plot(history_df['monitor_time'], history_df['rows_processed'], marker='o')
plt.xlabel('Time')
plt.ylabel('Rows Processed')
plt.title('Rows Processed Over Time')
plt.tight_layout()
plt.show()

# Processing time over time per stream 
plt.figure()
plt.plot(history_df['monitor_time'], history_df['processing_time'], marker='o')
plt.xlabel('Time')
plt.ylabel('Processing Time (ms)')
plt.title('Processing Time Over Time')
plt.tight_layout()
plt.show()

## 7.0 Bronze Data Exploratory Data Analysis (5 points)
- How many tweets are captured in your Bronze Table?
- Are there any columns that contain Nan or Null values?  If so how many and what will you do in your silver transforms to address this?
- Count the number of tweets by each unique user handle and sort the data by descending count.
- How many tweets have at least one mention (@) how many tweet have no mentions (@)
- Plot a bar chart that shows the top 20 tweeters (users)


In [0]:
# Loading bronze table
bronze_table = spark.read.format("delta").load(BRONZE_DELTA)
display(bronze_table.limit(5))

# Total number of tweets
tweet_count = bronze_table.count()
print("The number of tweets captured in the Bronze Table: ", tweet_count)

In [0]:
# Null values in each column
null_value_counts = bronze_table.select([
    pysf.count(pysf.when(pysf.col(c).isNull(), c)).alias(c)
    for c in bronze_table.columns
])

null_value_counts.show()

In [0]:
# Number of tweets count by each unique user handle and sorted in descending order
user_tweet_counts_unique_user = bronze_table.groupBy("user").count().orderBy("count", ascending=False)
print("Total number of tweets by each unique user handle:")
user_tweet_counts_unique_user.show()

In [0]:
# Number of tweets with @mention and no @ mention
tweets_with_mention_symbol = bronze_table.filter(pysf.col("text").contains("@")).count()
tweets_without_mention_symbol = tweet_count - tweets_with_mention_symbol
print(f"Tweets with mentions symbol: {tweets_with_mention_symbol}")
print(f"Tweets without mentions symbol: {tweets_without_mention_symbol}")

In [0]:
# Top 20 tweeters (users)
top_twitter_users = user_tweet_counts_unique_user.limit(20).toPandas()
fig = px_lib.bar(
    top_twitter_users,
    x="user",
    y="count",
    title="20 Top Users"
)
fig.show()

## 8.0 Capture the accuracy metrics from the gold table in MLflow  (4 points)
Store the following in an MLflow experiment run:
- Store the precision, recall, and F1-score as MLflow metrics
- Store an image of the confusion matrix as an MLflow artifact
- Store the model name and the MLflow version that was used as an MLflow parameters
- Store the version of the Delta Table (input-silver) as an MLflow parameter

In [0]:
# Load Gold results
gold_df = session.read.format("delta").load(GOLD_DELTA)

In [0]:
# Collect true & predicted labels
results_pd = gold_df.select("sentiment_id", "predicted_sentiment_id").toPandas()
y_true = results_pd["sentiment_id"]
y_pred = results_pd["predicted_sentiment_id"]

In [0]:
# Compute metrics
prec = precision_score(y_true, y_pred)
rec = recall_score(y_true, y_pred)
f1  = f1_score(y_true, y_pred)
cm  = confusion_matrix(y_true, y_pred)

In [0]:
# Retrieve Silver table version
silver_history = DeltaTable.forPath(session, SILVER_DELTA).history(1)
silver_version = silver_history.select("version").collect()[0][0]

In [0]:
# Log to MLflow
with mlf.start_run() as run:
    # a) Metrics
    mlf.log_metric("precision", prec)
    mlf.log_metric("recall", rec)
    mlf.log_metric("f1_score", f1)

    # b) Confusion matrix artifact
    plt.figure(figsize=(4, 4))
    sns.heatmap(cm, annot=True, fmt="d", cbar=False)
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.tight_layout()
    cm_path = "confusion_matrix.png"
    plt.savefig(cm_path)
    mlf.log_artifact(cm_path)

    # c) Parameters
    mlf.log_param("model_name", MODEL_NAME)
    mlf.log_param("mlflow_version", mlf.__version__)
    mlf.log_param("silver_table_version", str(silver_version))

print("Metrics, confusion matrix, and parameters logged to MLflow.")

## 9.0 Application Data Processing and Visualization (6 points)
- How many mentions are there in the gold data total?
- Count the number of neutral, positive and negative tweets for each mention in new columns
- Capture the total for each mention in a new column
- Sort the mention count totals in descending order
- Plot a bar chart of the top 20 mentions with positive sentiment (the people who are in favor)
- Plot a bar chart of the top 20 mentions with negative sentiment (the people who are the vilians)

*note: A mention is a specific twitter user that has been "mentioned" in a tweet with an @user reference.

In [0]:
# Loading Gold Delta Table
gold_df = session.read.format("delta").load(GOLD_DELTA)

# How many mentions are there in the gold data total?
total_mentions = gold_df.count()
print(f"Total mentions in gold data: {total_mentions}")

In [0]:
# Count the number of neutral, positive, and negative tweets for each mention
mention_counts_df = (
    gold_df
      .groupBy("mention")
      .pivot("predicted_sentiment", ["POS", "NEU", "NEG"])
      .count()
      .na.fill(0)
      .withColumnRenamed("POS", "positive")
      .withColumnRenamed("NEU", "neutral")
      .withColumnRenamed("NEG", "negative")
      .withColumn("total", 
                  pysf.col("positive") + pysf.col("neutral") + pysf.col("negative"))
      .orderBy(pysf.col("total").desc())
)

In [0]:
# Convert to Pandas for plotting
mention_counts_pd = mention_counts_df.toPandas()
mention_counts_pd.head(10)

In [0]:
# Plot top 20 mentions by positive sentiment
top_positive = mention_counts_pd.nlargest(20, "positive")
fig_pos = px_lib.bar(
    top_positive,
    x="mention",
    y="positive",
    title="Top 20 Mentions by Positive Sentiment (In Favor)"
)
fig_pos.show()

In [0]:
# Plot top 20 mentions by negative sentiment
top_negative = mention_counts_pd.nlargest(20, "negative")
fig_neg = px_lib.bar(
    top_negative,
    x="mention",
    y="negative",
    title="Top 20 Mentions by Negative Sentiment (Villains)"
)
fig_neg.show()

## 10.0 Clean up and completion of your pipeline (3 points)
- using the utilities what streams are running? If any.
- Stop all active streams
- print out the elapsed time of your notebook. Note: In the includes there is a variable START_TIME that captures the starting time of the notebook.

In [0]:
# Stop any remaining streams
active = session.streams.active
if active:
    print("Stopping active streams:")
    for q in active:
        print(f" • {q.name}")
    stopped = stop_all_streams()
    print(f"stop_all_streams() stopped anything? {stopped}")
else:
    print("No active streams to stop.")

# Print elapsed time
try:
    end_time = time.time()
    elapsed = end_time - START_TIME
    mins, secs = divmod(elapsed, 60)
    print(f"Total elapsed time: {int(mins)}m {secs:.1f}s ({elapsed:.1f} seconds)")
except NameError:
    print("START_TIME not defined. Please add `START_TIME = time.time()` at the top of your notebook.")

## 11.0 How Optimized is your Spark Application (Grad Students Only) (5 points)
Graduate students (registered for the DSCC-402 section of the course) are required to do this section.  This is a written analysis using the Spark UI (link to screen shots) that support your analysis of your pipelines execution and what is driving its performance.
Recall that Spark Optimization has 5 significant dimensions of considertation:
- Spill: write to executor disk due to lack of memory
- Skew: imbalance in partition size
- Shuffle: network io moving data between executors (wide transforms)
- Storage: inefficiency due to disk storage format (small files, location)
- Serialization: distribution of code segments across the cluster

Comment on each of the dimentions of performance and how your impelementation is or is not being affected.  Use specific information in the Spark UI to support your description.  

Note: you can take sreenshots of the Spark UI from your project runs in databricks and then link to those pictures by storing them as a publicly accessible file on your cloud drive (google, one drive, etc.)

References:
- [Spark UI Reference Reference](https://spark.apache.org/docs/latest/web-ui.html#web-ui)
- [Spark UI Simulator](https://www.databricks.training/spark-ui-simulator/index.html)

**Screenshots from Spark UI**

- [Stages](https://drive.google.com/file/d/1yl82Rtc8QPn6B1pdrjFKsMhNE4Ngyzew/view?usp=drive_link)

- [Stages-2](https://drive.google.com/file/d/1k4W_FMdF6J73JQxFTubBsS26FOnQE3_y/view?usp=drive_link)

- [Executors](https://drive.google.com/file/d/1JJs38Ns4OLOzA4CWZZohryjIsWkAbYF5/view?usp=sharing)

- [Classpath Enteries](https://drive.google.com/file/d/1IrpC73nZa3sq2chMnb_ShI1eTc8vzW5N/view?usp=sharing)

**Spill: write to executor disk due to lack of memory**

When we examine the “Spill” metrics in the Spark UI, specifically the “Disk Bytes Spilled” and “Memory Bytes Spilled” columns under each stage’s task summary, we see zero values across the board. Likewise, on the Executors page, the “Disk Used” and “Shuffle Spill” bytes remain at 0 B. This tells us that none of our tasks ever ran out of memory and had to write intermediate data out to disk. In other words, our partition sizes and UDF computations fit comfortably within each executor’s memory allocation. Because no spilling occurred, we don’t need to adjust executor memory settings or break up our workloads further for memory reasons.

**Skew: imbalance in partition size**

Data skew arises when a small number of partitions carry a disproportionately large amount of work, causing some tasks to lag far behind others. In our stages, take Stage 111, for example, we ran 79 tasks with a total input of 21.4 MiB. The UI’s blue bars are remarkably uniform, indicating that each task processed roughly the same volume and completed in similar time. There are no outliers where one task took significantly longer or read far more data. This uniformity demonstrates that our data is evenly distributed across partitions, so skew-induced hotspots are not impacting performance.


**Shuffle: network io moving data between executors (wide transforms)**

Wide transformations like repartition, groupBy, or pivoting naturally incur network traffic as data moves between executors. Our Spark UI highlights that some stages read around 4–5 MiB of shuffle data and wrote under 2 MiB back. Across all executors, the total shuffle read was about 15.8 MiB and shuffle write about 13.4 MiB. These volumes are modest relative to the cluster’s capacity, so shuffling is not a major bottleneck today. However, as our dataset grows, we should monitor those shuffle metrics, excessive shuffle can saturate the network and slow down our pipeline. Techniques like broadcast joins or minimizing unnecessary repartitions can help keep shuffle costs low.


**Storage: inefficiency due to disk storage format (small files, location)**

While our pipeline doesn’t rely on in-memory caching, the Storage tab in the UI and directory listings reveal that each micro-batch produces a handful of relatively small Delta files, often under 1 MiB each. Handling many small files can degrade performance, since the driver and executors must manage more metadata and file-open operations. To remedy this, we can compact our Bronze, Silver, and Gold Delta tables, using OPTIMIZE and ZORDER commands in Databricks, to consolidate small Parquet files into larger ones. This will improve read throughput and reduce overhead in subsequent streaming and batch reads.

**Serialization: distribution of code segments across the cluster**

Our pipeline leverages a Python UDF for sentiment scoring, which could introduce serialization overhead as data crosses the JVM↔Python boundary. However, the Spark UI shows negligible “Task Time (GC Time)” (only about six seconds of garbage collection over 38 minutes of total task time) and uniform task durations. The Environment tab’s Classpath Entries confirm that the Python UDF artifacts were shipped once per executor, not per row. These observations suggest serialization costs are already minimal. If we want to squeeze out even more efficiency, we could switch to a vectorized Pandas UDF (result_type="pandas"), which would process entire micro-batches in one shot and further reduce Python invocation overhead.

# END