## DSCC202-402 Data Science at Scale Final Project
### Tracking Tweet sentiment at scale using a pretrained transformer (classifier)
<p>Consider the following illustration of the end to end system that you will be building.  Each student should do their own work.  The project will demonstrate your understanding of Spark Streaming, the medalion data architecture using Delta Lake, Spark Inference at Scale using an MLflow packaged model as well as Exploritory Data Analysis and System Tracking and Monitoring.</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/pipeline.drawio.png">

<p>
You will be pulling an updated copy of the course GitHub repositiory: <a href="https://github.com/lpalum/dscc202-402-spring2024">The Repo</a>.  If you are unclear on how to pull an updated copy using the GitHub command line, the following <a href="https://techwritingmatters.com/how-to-update-your-forked-repository-on-github">document</a> is helpful.  Be sure to add the professors and TAs as collaborators on your project. 

- lpalum@gmail.com GitHub ID: lpalum
- ajay.anand@rochester.edu GitHub ID: ajayan12
- divyamunot1999@gmail.com GitHub ID: divyamunot
- ylong6@u.Rochester.edu GitHub ID: NinaLong2077

Once you have updates your fork of the repository you should see the following template project that is resident in the final_project directory.
</p>

<img src="https://data-science-at-scale.s3.amazonaws.com/images/notebooks.drawio.png">

<p>
You can then pull your project into the Databrick Workspace using the <a href="https://www.databricks.training/step-by-step/importing-courseware-from-github/index.html">Repos</a> feature.
Each student is expected to submit the URL of their project on GitHub with their code checked in on the main/master branch.  This illustration highlights the branching scheme that you may use to work on your code in steps and then merge your submission into your master branch before submitting.
</p>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/github.drawio.png">
<p>
Work your way through this notebook which will give you the steps required to submit a complete and compliant project.  The following illustration and associated data dictionary specifies the transformations and data that you are to generate for each step in the medallion pipeline.
</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/dataframes.drawio.png">

#### Bronze Data - raw ingest
- date - string in the source json
- user - string in the source json
- text - tweet string in the source json
- sentiment - the given sentiment of the text as determined by an unknown model that is provided in the source json
- source_file - the path of the source json file the this row of data was read from
- processing_time - a timestamp of when you read this row from the source json

#### Silver Data - Bronze Preprocessing
- timestamp - convert date string in the bronze data to a timestamp
- mention - every @username mentioned in the text string in the bronze data gets a row in this silver data table.
- cleaned_text - the bronze text data with the mentions (@username) removed.
- sentiment - the given sentiment that was associated with the text in the bronze table.

#### Gold Data - Silver Table Inference
- timestamp - the timestamp from the silver data table rows
- mention - the mention from the silver data table rows
- cleaned_text - the cleaned_text from the silver data table rows
- sentiment - the given sentiment from the silver data table rows
- predicted_score - score out of 100 from the Hugging Face Sentiment Transformer
- predicted_sentiment - string representation of the sentiment
- sentiment_id - 0 for negative and 1 for postive associated with the given sentiment
- predicted_sentiment_id - 0 for negative and 1 for positive assocaited with the Hugging Face Sentiment Transformer

#### Application Data - Gold Table Aggregation
- min_timestamp - the oldest timestamp on a given mention (@username)
- max_timestamp - the newest timestamp on a given mention (@username)
- mention - the user (@username) that this row pertains to.
- negative - total negative tweets directed at this mention (@username)
- neutral - total neutral tweets directed at this mention (@username)
- positive - total positive tweets directed at this mention (@username)

When you are designing your approach, one of the main decisions that you will need to make is how you are going to orchestrate the streaming data processing in your pipeline.  There are several valid approaches.  First, you may choose to start the bronze_stream and let it complete (read and append all of the source data) before preceeding and starting up the silver_stream.  This approach has latency associated with it but it will allow your code to proceed in a linear fashion and process all the data by the end of your notebook execution.  Another potential approach is to start all the streams and have a "watch" method to determine when the pipeline has processed sufficient or all of the source data before stopping and displaying results.  Both of these approaches are valid and have different implications on how you will trigger your steams and how you will gate the execution of your pipeline.  Think through how you want to proceed and ask questions if you need guidance. The following references may be helpful:
- [Spark Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Databricks Autoloader - Cloudfiles](https://docs.databricks.com/en/ingestion/auto-loader/index.html)

### Be sure that your project runs end to end when *Run all* is executued on this notebook! (15 Points out of 60)

In [0]:
%run ./includes/includes

In [0]:
"""
Adding a widget to the notebook to control the clearing of a previous run.
or stopping the active streams using routines defined in the utilities notebook
"""
dbutils.widgets.removeAll()

dbutils.widgets.dropdown("clear_previous_run", "No", ["No","Yes"])
if (getArgument("clear_previous_run") == "Yes"):
    clear_previous_run()
    print("Cleared all previous data.")

dbutils.widgets.dropdown("stop_streams", "No", ["No","Yes"])
if (getArgument("stop_streams") == "Yes"):
    stop_all_streams()
    print("Stopped all active streams.")

from delta import *
dbutils.widgets.dropdown("optimize_tables", "No", ["No","Yes"])
if (getArgument("optimize_tables") == "Yes"):
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, BRONZE_DELTA).optimize().executeCompaction()
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, SILVER_DELTA).optimize().executeCompaction()
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, GOLD_DELTA).optimize().executeCompaction()
    print("Optimized all of the Delta Tables")

## 1.0 Import your libraries here...
- Are your shuffle partitions consistent with your cluster and your workload?
- Do you have the necessary libraries to perform the required operations in the pipeline/application?

In [0]:
# Are your shuffle partitions consistent with your cluster and your workload?
spark.conf.set("spark.sql.shuffle.partitions", spark.sparkContext.defaultParallelism)

# Do you have the necessary libraries to perform the required operations in the pipeline/application?
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import count, isnan, col, when, current_timestamp, input_file_name, desc, explode, expr, to_timestamp, regexp_replace
import matplotlib.pyplot as plt
import mlflow
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

## 2.0 Use the utility functions to ...
- Read the source file directory listing
- Count the source files (how many are there?)
- print the contents of one of the files

In [0]:
# Read the source file directory listing
df = get_source_listing_df()
# Count the source files (how many are there?)
num_rows = df.count()
print(f"There are {num_rows[0]} source files.")

# print the contents of one of the files
first_filename = df.loc[1, 'File Name']
show_s3_file_contents(first_filename)

## 3.0 Transform the Raw Data to Bronze Data using a stream
- define the schema for the raw data
- setup a read stream using cloudfiles and the source data format
- setup a write stream using cloudfiles to append to the bronze delta table
- enforce schema
- allow a new schema to be merged into the bronze delta table
- Use the defined BRONZE_CHECKPOINT and BRONZE_DELTA paths defines in the includes
- name your raw to bronze stream as bronze_stream
- transform the raw data to the bronze data using the data definition at the top of the notebook

In [0]:
# define the schema for the raw data
schema = StructType([
    StructField("date", StringType(), True),
    StructField("user", StringType(), True),
    StructField("text", StringType(), True),
    StructField("sentiment", StringType(), True),
    StructField("source_file", StringType(), True),
    StructField("processing_time", TimestampType(), True)
])

In [0]:

# setup a read stream using cloudfiles and the source data format
df_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("cloudFiles.schemaLocation", "/mnt/delta/schema")
    # .option("cloudFiles.schemaEvolutionMode", "fail") 
    .schema(schema) # enforce schema
    .load(TWEET_SOURCE_PATH)
    # .load('/mnt/test-tweet-sample')
)

# Add processing_time and source_file columns
df_stream = df_stream.withColumn("processing_time", current_timestamp())
df_stream = df_stream.withColumn("source_file", input_file_name())

bronze_stream = (
    df_stream.writeStream
    .format("delta")
    .option("mergeSchema", "true") # allow a new schema to be merged into the bronze delta table
    .outputMode("append")
    .option("checkpointLocation", BRONZE_CHECKPOINT) # Use the defined BRONZE_CHECKPOINT
    .queryName("bronze_stream")
    .trigger(availableNow=True)
    .start(BRONZE_DELTA) # Use the defined BRONZE_DELTA
)

In [0]:
bronze_stream.awaitTermination()

In [0]:
# %sql
# OPTIMIZE delta.`dbfs:/tmp/labuser104917-3018615/bronze.delta`;

## 4.0 Bronze Data Exploratory Data Analysis
- How many tweets are captured in your Bronze Table?
- Are there any columns that contain Nan or Null values?  If so how many and what will you do in your silver transforms to address this?
- Count the number of tweets by each unique user handle and sort the data by descending count.
- How many tweets have at least one mention (@) how many tweet have no mentions (@)
- Plot a bar chart that shows the top 20 tweeters (users)


In [0]:
# How many tweets are captured in your Bronze Table?
bronze_df = spark.read.format("delta").load(BRONZE_DELTA)

# Count the number of tweets
tweet_count = bronze_df.count()
print(f"Total number of tweets: {tweet_count}")

# Are there any columns that contain Nan or Null values? If so how many and what will you do in your silver transforms to address this?
print('+----+----+----+\n')
print('Total Nulls for each column')
bronze_df.select([count(when(col(c).isNull(), c)).alias(c) for c in bronze_df.columns]).show()

print('+----+----+----+\n')

print('Total NaNs for each column')
bronze_df.select([
    count(when(isnan(col(c)), c)).alias(c) 
    for c in bronze_df.columns if c != 'processing_time'
]).show()
print('+----+----+----+')

# As seen below there are non Nulls or Nan values in the bronze table columns. If there were we would have to take into account which column it was. The user column is relevenat in the next tasks to aggregate number of tweets. The sentiment column is relevant in the future tasks to visualize the tweeter mentions with most negative or positive tweets etc.


In [0]:
# display(bronze_df)

In [0]:
# Count the number of tweets by each unique user handle and sort the data by descending count.
user_tweet_counts = bronze_df.groupBy("user").count().orderBy(desc("count"))
display(user_tweet_counts)

In [0]:
# How many tweets have at least one mention (@) how many tweet have no mentions (@)

df_with_mentions = bronze_df.withColumn("has_mention", when(col("text").contains("@"), 1).otherwise(0))

# Count tweets with at least one mention
tweets_with_mentions = df_with_mentions.filter(col("has_mention") == 1).count()

# Count tweets with no mentions
tweets_without_mentions = df_with_mentions.filter(col("has_mention") == 0).count()

print(f"Tweets with mentions: {tweets_with_mentions}")
print(f"Tweets without mentions: {tweets_without_mentions}")

In [0]:
# Plot a bar chart that shows the top 20 tweeters (users)

# Convert Spark DataFrame to Pandas DataFrame for plotting
top_tweeters_pd = user_tweet_counts.limit(20).toPandas()

plt.figure(figsize=(12, 8))
plt.bar(top_tweeters_pd['user'], top_tweeters_pd['count'], color='blue')
plt.xlabel('Username')
plt.ylabel('# of Tweets')
plt.title('Top 20 Tweeters')
plt.xticks(rotation=45)
plt.show()

## 5.0 Transform the Bronze Data to Silver Data using a stream
- setup a read stream on your bronze delta table
- setup a write stream to append to the silver delta table
- Use the defined SILVER_CHECKPOINT and SILVER_DELTA paths in the includes
- name your bronze to silver stream as silver_stream
- transform the bronze data to the silver data using the data definition at the top of the notebook

In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY") # new version of spark doesn't correctly identify the day with 'E'

bronze_read_stream = (
    spark.readStream
    .format('delta')
    .load(BRONZE_DELTA)
)

# Transformations
silver_df = bronze_read_stream.withColumn("timestamp", to_timestamp("date", "E MMM dd HH:mm:ss z yyyy"))

# Extract all mentions into a new column as an array
silver_df = silver_df.withColumn("mentions", expr("regexp_extract_all(text, '(?<=^|(?<=[^a-zA-Z0-9-_\\\\.]))@([A-Za-z]+[A-Za-z0-9_]+)', 0)"))
# Explode the mentions array into multiple rows, one for each mention
silver_df = silver_df.withColumn("mention", explode(col("mentions"))).drop("mentions")

silver_df = silver_df.withColumn("cleaned_text", regexp_replace("text", "(@\\w+)", ""))

# drop redundant columns
silver_df = silver_df.drop("date", "user", "text", "source_file", "processing_time")

silver_stream = silver_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", SILVER_CHECKPOINT) \
    .trigger(availableNow=True) \
    .queryName('silver_stream') \
    .start(SILVER_DELTA)


In [0]:
silver_stream.awaitTermination()

## 6.0 Transform the Silver Data to Gold Data using a stream
- setup a read stream on your silver delta table
- setup a write stream to append to the gold delta table
- Use the defined GOLD_CHECKPOINT and GOLD_DELTA paths defines in the includes
- name your silver to gold stream as gold_stream
- transform the silver data to the gold data using the data definition at the top of the notebook
- Load the pretrained transformer sentiment classifier from the MODEL_NAME at the production level from the MLflow registry
- Use a spark UDF to parallelize the inference across your silver data

In [0]:
silver_read_stream = (
    spark.readStream
    .format('delta')
    .load(SILVER_DELTA)
)

# load model
model_uri = f"models:/{MODEL_NAME}/Production"
model = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

# apply transformations and prediction
gold_df = silver_read_stream.withColumn("temp_predicted_sentiment", model(col("cleaned_text")))
gold_df = gold_df.withColumn("predicted_score", (col("temp_predicted_sentiment.score") * 100).cast("int"))
gold_df = gold_df.withColumn("predicted_sentiment", col("temp_predicted_sentiment.label"))
gold_df = gold_df.withColumn("sentiment_id", when(col("sentiment") == "positive", 1).otherwise(0))
# If we have neutral predicted label we assign it as negative(0).
gold_df = gold_df.withColumn("predicted_sentiment_id", when(col("predicted_sentiment") == "POS", 1).otherwise(0))

# Drop redundant columns
gold_df = gold_df.drop("temp_predicted_sentiment")

gold_stream = (
    gold_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", GOLD_CHECKPOINT)
    .trigger(availableNow=True)
    .queryName("siver_stream")
    .start(GOLD_DELTA)
)


In [0]:

gold_stream.awaitTermination()

In [0]:
# test_df = spark.read.format("delta").load(GOLD_DELTA)
# display(test_df)
# dbutils.fs.rm(GOLD_CHECKPOINT, True)

## 7.0 Capture the accuracy metrics from the gold table in MLflow
Store the following in an MLflow experiment run:
- Store the precision, recall, and F1-score as MLflow metrics
- Store an image of the confusion matrix as an MLflow artifact
- Store the mdoel name and the MLflow version that was used as an MLflow parameters
- Store the version of the Delta Table (input-silver) as an MLflow parameter

In [0]:
final_gold_df = spark.read.format('delta').load(GOLD_DELTA) # TODO ask if we should store the gold or silver table version

# Store the precision, recall, and F1-score as MLflow metrics
mlflow.start_run()
model_name = MODEL_NAME
mlflow_version = mlflow.__version__

# Compute TP, FP, FN, TN
TP = final_gold_df.select("predicted_sentiment_id", "sentiment_id").where((col("predicted_sentiment_id") == 1) & (col("sentiment_id") == 1)).count()
FP = final_gold_df.select("predicted_sentiment_id", "sentiment_id").where((col("predicted_sentiment_id") == 1) & (col("sentiment_id") == 0)).count()
FN = final_gold_df.select("predicted_sentiment_id", "sentiment_id").where((col("predicted_sentiment_id") == 0) & (col("sentiment_id") == 1)).count()
TN = final_gold_df.select("predicted_sentiment_id", "sentiment_id").where((col("predicted_sentiment_id") == 0) & (col("sentiment_id") == 0)).count()

# Calculate precision, recall, and F1 score
precision = TP / (TP + FP) if TP + FP != 0 else 0
recall = TP / (TP + FN) if TP + FN != 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if precision + recall != 0 else 0

# Store the precision, recall, and F1-score as MLflow metrics 
mlflow.log_metric("precision", precision)
mlflow.log_metric("recall", recall)
mlflow.log_metric("f1_score", f1_score)

In [0]:
# mlflow.end_run()

In [0]:
# Store an image of the confusion matrix as an MLflow artifact

############ IMPORTANT ##############
# Given that before we have considered the predicted neutral labels as negative, we will have a lot more TN and FN in the confusion matrix.

y_true = final_gold_df.select("sentiment_id").collect()
y_pred = final_gold_df.select("predicted_sentiment_id").collect()

# Generate the confusion matrix
cm = confusion_matrix(y_true, y_pred)

# Plot and save the confusion matrix
fig, ax = plt.subplots(figsize=(5,5))
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap=plt.cm.Blues, ax=ax)
plt.savefig("confusion_matrix.png")
plt.close()

# Log the confusion matrix image as an artifact
mlflow.log_artifact("confusion_matrix.png")

# Store the mdoel name and the MLflow version that was used as an MLflow parameters
mlflow.log_param("model_name", model_name)
mlflow.log_param("mlflow_version", mlflow_version)

# get table version
deltaTable = DeltaTable.forPath(spark, GOLD_DELTA)

history_df = deltaTable.history() \
    .select("version") \
    .orderBy("version", ascending=False)

delta_table_version = history_df.collect()[0][0]
 # Store the version of the Delta Table (input-silver) as an MLflow parameter
mlflow.log_param("delta_table_version", delta_table_version)
mlflow.end_run()

## 8.0 Application Data Processing and Visualization
- How many mentions are there in the gold data total?
- Count the number of neutral, positive and negative tweets for each mention in new columns
- Capture the total for each mention in a new column
- Sort the mention count totals in descending order
- Plot a bar chart of the top 20 mentions with positive sentiment (the people who are in favor)
- Plot a bar chart of the top 20 mentions with negative sentiment (the people who are the vilians)

You may want to use the "Loop Application" widget to control whether you repeateded display the latest plots while the data comes in from your streams before moving on to the next section and cleaning up your run.

*note: A mention is a specific twitter user that has been "mentioned" in a tweet with an @user reference.

In [0]:
total_mentions = final_gold_df.filter(col("mention").isNotNull()).count()
print(f"mentions in the gold data total: {total_mentions} \n")

aggregated_df = final_gold_df.groupBy("mention").agg(
    count(when(col("sentiment") == "neutral", True)).alias("neutral_count"),
    count(when(col("sentiment") == "positive", True)).alias("positive_count"),
    count(when(col("sentiment") == "negative", True)).alias("negative_count"),
    count("mention").alias("total_mentions")
).orderBy("total_mentions", ascending=False)


# Sorting the DataFrame by positive and negative counts
top_positive_mentions = aggregated_df.orderBy("positive_count", ascending=False).limit(20)
top_negative_mentions = aggregated_df.orderBy("negative_count", ascending=False).limit(20)


def plot_mentions(df, title, sentiment):
    data = df.select("mention", sentiment).collect()
    mentions = [x["mention"] for x in data]
    counts = [x[sentiment] for x in data]
    
    plt.figure(figsize=(10, 5))
    plt.bar(mentions, counts, color='blue')
    plt.xlabel('Mentions')
    plt.ylabel('Counts')
    plt.title(title)
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

# Plot for top 20 positive mentions
plot_mentions(top_positive_mentions, 'Top 20 Mentions with Positive Sentiment', 'positive_count')

# Plot for top 20 negative mentions
plot_mentions(top_negative_mentions, 'Top 20 Mentions with Negative Sentiment', 'negative_count')

## 9.0 Clean up and completion of your pipeline
- using the utilities what streams are running? If any.
- Stop all active streams
- print out the elapsed time of your notebook.

In [0]:
# using the utilities what streams are running? If any.
show_active_streams()
# Stop all active streams
stop_all_streams()

In [0]:
# Get the notebooks ending time note START_TIME was established in the include file when the notebook started.
END_TIME = time.time()

elapsed_time = END_TIME - START_TIME
minutes = int(elapsed_time // 60)
seconds = elapsed_time % 60
print(f"Elapsed Time: {minutes} minutes and {seconds:.2f} seconds")


## 10.0 How Optimized is your Spark Application (Grad Students Only)
Graduate students (registered for the DSCC-402 section of the course) are required to do this section.  This is a written analysis using the Spark UI (link to screen shots) that support your analysis of your pipelines execution and what is driving its performance.
Recall that Spark Optimization has 5 significant dimensions of considertation:
- Spill: write to executor disk due to lack of memory
- Skew: imbalance in partition size
- Shuffle: network io moving data between executors (wide transforms)
- Storage: inefficiency due to disk storage format (small files, location)
- Serialization: distribution of code segments across the cluster

Comment on each of the dimentions of performance and how your impelementation is or is not being affected.  Use specific information in the Spark UI to support your description.  

Note: you can take sreenshots of the Spark UI from your project runs in databricks and then link to those pictures by storing them as a publicly accessible file on your cloud drive (google, one drive, etc.)

References:
- [Spark UI Reference Reference](https://spark.apache.org/docs/latest/web-ui.html#web-ui)
- [Spark UI Simulator](https://www.databricks.training/spark-ui-simulator/index.html)

### ENTER YOUR MARKDOWN HERE
1- Spill

<p>In order to have spill in an Executor node we need to be making a transformation on a data partition that is too large on that executor and beyond its memory.
To check this we look in the job execution at "Input Size/Records," "Output Size/Records," and "Shuffle Read/Write Size" if there are values that are too large for an executor.</p>
<p>
If indeed there is a "Memory Spilled" or "Disk Spilled." it will show up as a column in the tasks description in spark ui.
Below I have an image of part of task 4 where we count the number of tweets per twitter handle.
</p>
<p><img src="https://drive.google.com/thumbnail?id=107IyI5GLfd3ECdjpTuNSBwQtnauTzWCT&sz=w1000">
<br>
There is no spill here or in any other task of the pipeline and therefore the columns do not show.
</p>
<p>
If there was a spill it would look like this example below taken from medium:<br>
https://medium.com/road-to-data-engineering/spark-performance-optimization-series-2-spill-685126e9d21f
<br>
</p>
<img src="https://drive.google.com/thumbnail?id=1S2-tSk8N9ZUF4NdPo1XZY077Vl_BEMgl&sz=w1000">
<br>
+-----------------+

2- Skew
<p>In order for skew to present iteself we need to have partitions that are imbalanced. We would see this when having many tasks, one or some of them could take much longer than the others because of the larger partition. <br>For example in task 4 when we group the number of tweets by unique twitter handle we have a much larger partition for the <b>handle tweetpet</b>.
Below after using spark ui we can see that a few tasks of the total 27 take longer than the others.<br>
Another way to notice skew is through spill on disk, if we have an unusually large partition based on a column that we are grouping by then we can have data spill on disk for that particular executor that has the task. 
This is resolved through salting or other more advanced methods which are implemented automatically by spark and databricks.</p>
<img src="https://drive.google.com/thumbnail?id=1p6uYw8IBxQITy4fcQ87YQjgP3AykHoCQ&sz=w2000">
<br>
<img src="https://drive.google.com/thumbnail?id=1kV7bjCvDLNnzhFDkD2NDrmEfej1uLJWE&sz=w2000">
<br>
<img src="https://drive.google.com/thumbnail?id=1hnzvFP5hl96rHnuE8-qNqHvtrtNm6I4C&sz=w2000">



3-Shuffle

<p>This is the DAG for the part of task 4 where we count the number of tweets for each handle. This is a wide transformation and therefore has shuffles. We have already set the number of shuffle partitions the same as the number of cores we have on our cluster, so we should be avoiding any problems.</p>
<img src="https://drive.google.com/thumbnail?id=1C8PuDo4yRM01jF-7X_D4eCpPdVtPGoOP&sz=w1000">
<p>From the next image we can see that the shuffle write was only 3.1 megabytes as the dataframe has been optimized by spark and databricks AQE to only contain the neccessary columns.</p>
<img src="https://drive.google.com/thumbnail?id=1A4ZOGXwyDShkNUFrv-uPdbX-5s-96LWm&sz=w2000">


4- Storage
<br>
If we have too many small files this can cause a delay in reading them from storage. In order to avoid this in databricks we can use the method OPTIMIZE delta.`dbfs:/tmp/labuser104917-3018615/bronze.delta`; Databricks also has warnings when we have very small files. When optimizing the files with this method my pipeline run time increased signficantly which led me to comment the line.
This could have been because of the initial overhead and team this command requires as it commpacts smaller files into fewer larger ones. Also, if our Delta table was previously cached by Spark, running OPTIMIZE would invalidate this cache because the underlying data files have changed. Subsequent queries need to reload data into memory, which could slow down their first execution.
<br>
Other methods from spark that can be used to fix storage issues by compaction smaller files into larger files are:<br>
1- repartition()
<br>
2- coalesce()
<br>
<img src="https://drive.google.com/thumbnail?id=1tc6IdcziogCHRNEM7ReZl0hlFd3KPR8e&sz=w2000">
<br>
<img src="https://drive.google.com/thumbnail?id=1bfuFlrEuXmxFC5FU96Ni55Av3WVq7m6n&sz=w2000">


5- Serialization

<p>
This is comprised from: 
<br> 1- The task deserialization time which the executor needs to deserialize the task assigned by the driver.
<br> 2- The task serialization time needed to send the result back to the driver.
<br>
Since we only have one driver and one worker, both of this times should be small because they need to be sent to only one executor node.
As we can see in the picture below many tasks may not have a result serialization time(it can be 0) beacuse we writting to the delta path and not sending any results back to the driver.
</p>
<img src="https://drive.google.com/thumbnail?id=1U22WaU4RJOm6FvJiIG5vV-xMAytCXChs&sz=w1000">
<br>
Here we can see also some summary statistics including serialization and deserialization time for the tasks in a stage.<br>
<img src="https://drive.google.com/thumbnail?id=1c2vDPlOpHFrA1vuRoF8bjRhS5QBEBxqa&sz=w1000">

