In [1]:
sc

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1717213352991_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-0>

### Lab 8: Spark Streaming For Log Processing

This is a simple exercise in log processing.  The log files come from various servers at various time points.
Each record in a log file is of the form ```serverID,severity,timestamp```, where  
    - `serverID` is a string unique to the server  
    - `severity` is a value of 2 (referred to as `SEV2` that represents no error, just a service call),  1 (referred to as `SEV1` that represents a minor error), or 0 (referred to as `SEV0` that represents a fatal/severe error)    
    - `timestamp` is an integer starting at 1 (bigger numbers mean later)  

For this lab, the four log files (on Canvas and Teams) will be "delivered" to Spark by being placed in a "live data" S3 bucket, for example `s3://hankssteven-week9/data/LogDataLive/`.

There are two servers in the log files, `s1` and `s2`, and the log records range from `t1` to `t10`.  
The files are delivered with one file per server for five time units. For example, the file `s115.csv` has records for server `s1` for times `t1` to `t5`.

You want to process these new records incrementally, and are interested in these two "reports":

1. The *volume report*: reports the number of `SEV2` events divided by the number of time units for each server. The number of time units for our purposes is `max(timestamp) - min(timestamp) + 1`. This volume report will not be cumulative, i.e., every time new log data comes in, the mapping from the server to `SEV2` events is updated  
2. The *SEV0 log*: this is a sequence of records of the form ```serverID timestamp``` recording the timestamp of a `SEV0` event reported by a server. This report grows over time, i.e., each time a new log file is processed, new records are appended to the end.

Your final reports will be produced by two streaming queries:
1. One that *modifies* the `SEV2` volume report, which is stored in memory and will be queried in this Spark notebook
2. One that *appends* to the `SEV0` log report, which will be stored as CSV records on S3


#### Set up the Following Buckets and Folders to Hold your Streaming Input and Output

1. Create a bucket *yourname-week9* in S3.  Remember the convention that *yourname* is your SeattleU username.
2. Create a folder *data* within that bucket
3. Create a folder *LogData* within the data folder
4. Upload the four log files from the Teams folder to your *LogData* folder
5. Create an empty folder *LogDataLive* (for simulating log streams)
6. Create an empty folder *StreamingOutput* (for saving your results)

The cell below will list your S3 folders.  Replace the bucket name with your own.

In [1]:
%%bash
aws s3 ls s3://mbhavanagarwala-week9/data/
echo "---"
aws s3 ls s3://mbhavanagarwala-week9/data/LogData/
echo "---"
aws s3 ls s3://mbhavanagarwala-week9/data/LogDataLive/

                           PRE LogData/
                           PRE LogDataLive/
                           PRE StreamingOutput/
2024-06-01 03:52:49          0 
---
2024-06-01 03:53:50          0 
2024-06-01 03:54:31      16000 s115.csv
2024-06-01 03:54:30        820 s1610.csv
2024-06-01 03:54:32      40000 s215.csv
2024-06-01 03:54:30       8210 s2610.csv
---
2024-06-01 03:57:09          0 


In [2]:
# Create the schema for the log files based on the above description of the data 
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

logSchema = StructType([
    StructField("serverID", StringType(), True),
    StructField("severity", IntegerType(), True),
    StructField("timestamp", IntegerType(), True)
])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Create the streaming DataFrame (readStream) on your log directory, using the schema you just created
streamingLogData = spark.readStream\
.schema(logSchema)\
.option("maxFilesPerTrigger",1)\
.csv("s3://mbhavanagarwala-week9/data/LogDataLive/")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Part 1: Get the SEV2 volume report

In [4]:
# Use the data frame you just created to create another data frame with the 
# sev2 volume report.  It should have columns 'serverID' and 'avgVolume'

from pyspark.sql.functions import col, count, min, max, expr


# Calculate the SEV2 volume report
volumeReportDataFrame = streamingLogData.filter(col("severity") == 2).groupBy("serverID") \
    .agg(
        count("severity").alias("SEV2_count"),
        min("timestamp").alias("min_timestamp"),
        max("timestamp").alias("max_timestamp")
    ) \
    .withColumn("time_units", col("max_timestamp") - col("min_timestamp") + 1) \
    .withColumn("avgVolume", col("SEV2_count") / col("time_units")) \
    .select("serverID", "avgVolume")



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Create and start a query (writeStream) that generates the sev2 report;  it is an in-memory sink.
volumeReportQuery = volumeReportDataFrame.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("volumeReport") \
    .start()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Write a (very simple) spark SQL query to show the contents of your sev2 report (volumeReportQuery). It should initially be empty
volumeReportDF = spark.sql("SELECT * FROM volumeReport")
volumeReportDF.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+---------+
|serverID|avgVolume|
+--------+---------+
+--------+---------+

Copy the files s115 and s215 from LogData to LogDataLive.  Correct the bucket name below

In [7]:
%%bash
aws s3 cp s3://mbhavanagarwala-week9/data/LogData/s115.csv s3://mbhavanagarwala-week9/data/LogDataLive/s115.csv
aws s3 cp s3://mbhavanagarwala-week9/data/LogData/s215.csv s3://mbhavanagarwala-week9/data/LogDataLive/s215.csv


copy: s3://mbhavanagarwala-week9/data/LogData/s115.csv to s3://mbhavanagarwala-week9/data/LogDataLive/s115.csv
copy: s3://mbhavanagarwala-week9/data/LogData/s215.csv to s3://mbhavanagarwala-week9/data/LogDataLive/s215.csv


In [10]:
# Rerun the same query to show that the sev2 volume report has been updated.
# Wait a while.  You should see rows for both s1 and s2
spark.sql("SELECT * FROM volumeReport").show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+---------+
|serverID|avgVolume|
+--------+---------+
|      s2|    920.0|
|      s1|    379.4|
+--------+---------+

Now copy the two log files s1610 and s2610

In [11]:
%%bash
aws s3 cp s3://mbhavanagarwala-week9/data/LogData/s1610.csv s3://mbhavanagarwala-week9/data/LogDataLive/s1610.csv
aws s3 cp s3://mbhavanagarwala-week9/data/LogData/s2610.csv s3://mbhavanagarwala-week9/data/LogDataLive/s2610.csv


copy: s3://mbhavanagarwala-week9/data/LogData/s1610.csv to s3://mbhavanagarwala-week9/data/LogDataLive/s1610.csv
copy: s3://mbhavanagarwala-week9/data/LogData/s2610.csv to s3://mbhavanagarwala-week9/data/LogDataLive/s2610.csv


In [17]:
# Run the query again to verify that the report was updated. 
# Be sure to wait for a little while to make sure the query is updated.
spark.sql("SELECT * FROM volumeReport").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+---------+
|serverID|avgVolume|
+--------+---------+
|      s2|    519.9|
|      s1|    199.7|
+--------+---------+

## Part 2. Get the SEV0 log report

Delete all files from your "live" directory before working on this part.  (Note, deleting all the files will make the directory go away.  Don't worry, we'll put it back!)

In [24]:
%%bash
aws s3 rm s3://mbhavanagarwala-week9/data/LogDataLive --recursive --include "*.csv"


Verify the LogDataLive folder is gone

In [25]:
%%bash
aws s3 ls s3://mbhavanagarwala-week9/data

                           PRE data/


In [26]:
# Create a data frame on top of your original data frame that holds the raw data, 
# this data frame for the sev0 report is just <serverID> <time stamp> ordered by timestamp, 
# and by server ID within timestamp


from pyspark.sql.functions import col

# Filter SEV0 events and select relevant columns
sev0 = streamingLogData.filter(col("severity") == 0) \
    .select("serverID", "timestamp") 
    




VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
# Create a query on your sev0 data frame that writes the table (data frame) to a csv file in your data/StreamingOutput folder

sev0SaveQuery = sev0.writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("path", "s3://mbhavanagarwala-week9/data/StreamingOutput/") \
    .option("checkpointLocation", "s3://mbhavanagarwala-week9/data/checkpoints/") \
    .start()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
Copy two files s115 and s215 into the live folder

In [28]:
%%bash
aws s3 cp s3://mbhavanagarwala-week9/data/LogData/s115.csv s3://mbhavanagarwala-week9/data/LogDataLive/s115.csv
aws s3 cp s3://mbhavanagarwala-week9/data/LogData/s215.csv s3://mbhavanagarwala-week9/data/LogDataLive/s215.csv


copy: s3://mbhavanagarwala-week9/data/LogData/s115.csv to s3://mbhavanagarwala-week9/data/LogDataLive/s115.csv
copy: s3://mbhavanagarwala-week9/data/LogData/s215.csv to s3://mbhavanagarwala-week9/data/LogDataLive/s215.csv


Check the contents of the output folder.  Make a note of the non-empty part files generated.  That/those are your SEV 0 report after processing times 1 through 5

In [29]:
%%bash
aws s3 ls s3://mbhavanagarwala-week9/data/StreamingOutput/

2024-06-01 03:57:25          0 
2024-06-01 06:29:42          0 _spark_metadata_$folder$


In [None]:
# Copy the s1610 and s2610 files into the live folder

In [30]:
%%bash
aws s3 cp s3://mbhavanagarwala-week9/data/LogData/s1610.csv s3://mbhavanagarwala-week9/data/LogDataLive/s1610.csv
aws s3 cp s3://mbhavanagarwala-week9/data/LogData/s2610.csv s3://mbhavanagarwala-week9/data/LogDataLive/s2610.csv


copy: s3://mbhavanagarwala-week9/data/LogData/s1610.csv to s3://mbhavanagarwala-week9/data/LogDataLive/s1610.csv
copy: s3://mbhavanagarwala-week9/data/LogData/s2610.csv to s3://mbhavanagarwala-week9/data/LogDataLive/s2610.csv


Check the contents of the output folder again.  Make a note of the NEW non-empty part files generated

In [31]:
%%bash
aws s3 ls s3://mbhavanagarwala-week9/data/StreamingOutput/

                           PRE _spark_metadata/
2024-06-01 03:57:25          0 
2024-06-01 06:29:42          0 _spark_metadata_$folder$
2024-06-01 06:37:12          0 part-00000-7baff181-1bfe-4aae-abe7-6f4c6c1e17e4-c000.csv
2024-06-01 06:37:09         15 part-00000-9c376fb5-f84a-4e58-87f4-3d451757ce64-c000.csv


In [32]:
# Now you're done with the lab
# clean up / stop all running streaming jobs
# Stop all active streaming queries
for stream in spark.streams.active:
    print(f"Stopping stream: {stream.name}")
    stream.stop()

# Verify that all streams are stopped
print("All streaming queries stopped.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Stopping stream: None
Stopping stream: volumeReport
All streaming queries stopped.

### Put Your Sev 0 Report Here

Your Sev 0 report(s) exist as CSV part files on S3.  The first version of the report is the contents of the part files added after adding the log files for times 1 through 5, and the second version of the report is the contents of all the part files (i.e. after adding both sets of log files).  In the next two markdown cells you will copy in the contents of the report.  You will get the lines by downloading the CSV files from S3 and copying their contents into the respective markdown cells, notice your file contents will be wrapped in a ```pre``` tag (preformatted output).

**Important Note**: Make sure your notebook can be executed from beginning to end without error. You should check that before you hand it in. Simply putting results into a non-working notebook will not be considered as a valid submission.

##### Contents of the sev 0 report after processing times 1 to 5
<pre>
s1,5
s1,5
s1,5
</pre>

##### Contents of the sev 0 report after processing times 1 to 5 and 6 through 10
<pre>
s2,9
</pre>