# Download Datasets

In [0]:
%sh 
curl -O 'https://raw.githubusercontent.com/masfworld/datahack_docker/master/zeppelin/data/el_quijote.txt'
curl -O 'https://raw.githubusercontent.com/masfworld/datahack_docker/master/zeppelin/data/frankenstein.txt'
curl -O 'https://raw.githubusercontent.com/masfworld/datahack_docker/master/zeppelin/data/characters.csv'
curl -O 'https://raw.githubusercontent.com/masfworld/datahack_docker/master/zeppelin/data/species.csv'

In [0]:
dbutils.fs.rm("/dataset/streaming", True)
dbutils.fs.mkdirs("/dataset/streaming")
dbutils.fs.cp('file:/databricks/driver/el_quijote.txt','dbfs:/dataset/streaming/el_quijote.txt')
dbutils.fs.cp('file:/databricks/driver/frankenstein.txt','dbfs:/dataset/frankenstein.txt')
dbutils.fs.cp('file:/databricks/driver/characters.csv','dbfs:/dataset/characters.csv')
dbutils.fs.cp('file:/databricks/driver/species.csv','dbfs:/dataset/species.csv')

# Structured Streaming
Structured Streaming is a high-level API for stream processing that became production-ready in Spark 2.2. It is scalable and fault-tolerant, and it is built on the Spark SQL engine. Structured Streaming allows you to take the same operations that you perform in batch mode using Spark’s structured APIs, and run them in a streaming fashion. This can reduce latency and allow for incremental processing. For more information, visit [Databricks - Structured Streaming](https://www.databricks.com/glossary/what-is-structured-streaming).



## Example 1 - Read a streaming folder

Read a streaming folder

In [0]:
from pyspark.sql.functions import *

# Read streaming data from the specified directory
lines = spark \
  .readStream \
  .format("text") \
  .load("/dataset/streaming/")

# Split the lines into words and create a DataFrame with a column named "word"
words = lines.select(
    explode(split(col("value"), " ")).alias("word"),    
)

# Group the words and count their occurrences, then sort by count in descending order
groupedWords = words \
  .groupBy("word") \
  .count() \
  .sort(col("count").desc())

# Define a streaming query that writes the complete output to an in-memory table
query = groupedWords \
  .writeStream \
  .outputMode("complete") \
  .format("memory") \
  .queryName("testquijote")

# Start the first streaming query
query.start()

In [0]:
# Group the words and count their occurrences
# No sorting is applied here because sorting is not supported on streaming DataFrames/Datasets unless it is on an aggregated DataFrame/Dataset in Complete output mode
groupedWords2 = words \
  .groupBy("word") \
  .count()

# Define another streaming query that writes only updated output to an in-memory table
query2 = groupedWords2 \
  .writeStream \
  .outputMode("update") \
  .format("memory") \
  .queryName("testquijote2")

# Start the second streaming query
query2.start()

In [0]:
# Execute an SQL query on the in-memory table "testquijote" and show the first 10 results
spark.sql("select * from testquijote limit 10").show()

**Now let's copy a new file into the streaming directory to see that the in-memory table captures the changes.** 

In [0]:
# Copy the file "frankenstein.txt" from "/dataset/" to the "/dataset/streaming/" directory
dbutils.fs.cp('/dataset/frankenstein.txt','/dataset/streaming/')

# List the files in the "/dataset/streaming/" directory to verify the copy
dbutils.fs.ls('/dataset/streaming')

In [0]:
# Execute an SQL query on the in-memory table "testquijote" and show the first 10 results
spark.sql("select * from testquijote limit 10").show()

For example, we see that the word "the" appears, which did not appear before.

## Exercise 1 - Filter words with less than 4 characters

Using the example 1 code, filter out all words with less than 4 characters




In [0]:
# Execute an SQL query on the in-memory table "testquijote" to select words with length greater than 4 and show the first 10 results
spark.sql("select * from testquijote where length(word) > 4 limit 10").show()

In [0]:
# Filter words with length greater than 4, group them, count their occurrences, and sort by count in descending order
groupedWords_2 = words \
  .filter(length("word") > 4) \
  .groupBy("word") \
  .count() \
  .sort(col("count").desc())

# Define a streaming query that writes the filtered and grouped words to an in-memory table
query_2 = groupedWords_2 \
  .writeStream \
  .outputMode("complete") \
  .format("memory") \
  .queryName("testquijote_2")

# Start the streaming query
query_2.start()

In [0]:
# Execute an SQL query on the in-memory table "testquijote_2" and show the first 10 results
spark.sql("select * from testquijote_2 limit 10").show()

## Example 2 - Read a CSV file and apply a schema

Reading a CSV file, applying a schema

In [0]:
from pyspark.sql.types import *

# Define the schema for the CSV files
schema = StructType([
    StructField("name", StringType(), True),
    StructField("height", StringType(), True),
    StructField("hair_color", StringType(), True),
    StructField("skin_color", StringType(), True),
    StructField("eye_color", StringType(), True),
    StructField("birth_year", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("homeworld", StringType(), True),
    StructField("species", StringType(), True)
])

In [0]:
# Read streaming data from the specified directory with the given schema
lines = spark.readStream \
  .format("csv") \
  .schema(schema) \
  .load("/dataset/charac*.csv") \
  .withColumn("current_timestamp", current_timestamp())

In [0]:
# Define a streaming query that writes the data to an in-memory table
query = lines.writeStream \
  .outputMode("update") \
  .format("memory") \
  .queryName("charac") \

# Start running the query that prints the running counts to the console
query.start()

In [0]:
# Execute an SQL query on the in-memory table "charac" and show the first 10 results
spark.sql("select * from charac limit 10").show()

# Windowing
**Windowing** is a powerful feature in Apache Spark that allows for set-based computations (such as aggregations) or other operations over subsets of events within a specified time period. This is particularly useful for processing time-series data, streaming data, and real-time analytics, where operations need to be performed over a continuous stream of data points.

## Example 3 - 5 seconds fix window

Read in streaming files `el_quijote.txt` and `frankestein.txt`. Applying a 5 seconds fix window

In [0]:
# Remove the "/dataset/books" directory and all its contents if it exists
dbutils.fs.rm('/dataset/books', True)

# Create a new directory at "/dataset/books"
dbutils.fs.mkdirs("/dataset/books")

In [0]:
# Copy the file "el_quijote.txt" from "/dataset/streaming/" to "/dataset/books/"
dbutils.fs.cp('/dataset/streaming/el_quijote.txt', '/dataset/books/')

# List the files in the "/dataset/books/" directory to verify the copy
dbutils.fs.ls('/dataset/books/')

In [0]:
# Read streaming data from the "/dataset/books/" directory
words = spark \
  .readStream \
  .format("text") \
  .load("/dataset/books/") \
  .select(explode(split(col("value"), " ")).alias("word"), col("current_timestamp")) \
  .withColumn("current_timestamp", current_timestamp()) # Add a column with the current timestamp

`explode(split(col("value"), " ")).alias("word")`: Split lines into words and add a "word" column

In [0]:
# Group the words by a 5-second window and count their occurrences, then sort by count in descending order
windowedCounts = words \
  .groupBy(
      window(col("current_timestamp"), "5 seconds"),
      col("word")
  ) \
  .count() \
  .sort(col("count").desc())

# Define a streaming query that writes the windowed counts to an in-memory table
query = windowedCounts \
  .writeStream \
  .outputMode("complete") \
  .format("memory") \
  .queryName("test_windowing_books")

# Start the streaming query
query.start()

In [0]:
# Execute an SQL query on the in-memory table "test_windowing_books" and show the first 10 results, displaying full content
spark.sql("select * from test_windowing_books limit 10").show(20, False)

In [0]:
# Execute an SQL query to group the results by window and count the number of occurrences in each window, displaying full content
spark.sql("select window, count(*) from test_windowing_books group by window").show(20, False)

In [0]:
# Copy the file "el_quijote.txt" from "/dataset/streaming/" to "/dataset/books/" again
dbutils.fs.cp('/dataset/streaming/el_quijote.txt', '/dataset/books/el_quijote2.txt')

# List the files in the "/dataset/books/" directory to verify the copy
dbutils.fs.ls('/dataset/books/')

In [0]:
# Execute an SQL query to group the results by window and count the number of occurrences in each window again, displaying full content
spark.sql("select window, count(*) from test_windowing_books group by window").show(20, False)

We can see that after copying a new file to the path, we now have two windows.

## Exercise 2 - 2 seconds fix window

Get the number of different species classifications from `species.csv`
- Split the file in multiple parts, create a new folder, inserting each part in the folder one by one
- Group the result in 2 seconds fix window



In [0]:
# Remove the "/dataset/species_splitted" directory and all its contents if it exists
dbutils.fs.rm('/dataset/species_splitted', True)

In [0]:
# Read the CSV file "/dataset/species.csv" with inferred schema and header
data_species = spark \
  .read \
  .load("/dataset/species.csv", format="csv", inferSchema="true", header="true")

# Repartition the DataFrame into 3 partitions and write it back as CSV files to "/dataset/species_splitted"
data_species \
  .repartition(3) \
  .write.csv("/dataset/species_splitted")

In [0]:
# List the files in the "/dataset/species_splitted" directory to verify the operation
dbutils.fs.ls('/dataset/species_splitted')

In [0]:
# Remove the "/dataset/species" directory and all its contents if it exists
dbutils.fs.rm('/dataset/species', True)
# Create a new directory at "/dataset/species"
dbutils.fs.mkdirs('/dataset/species')

In [0]:
from pyspark.sql.types import *

# Define the schema for the species data
schema_species = StructType([
    StructField("name", StringType(), True),
    StructField("classification", StringType(), True),
    StructField("designation", StringType(), True),
    StructField("skin_colors", StringType(), True),
    StructField("hair_colors", StringType(), True),
    StructField("eye_colors", StringType(), True),
    StructField("average_lifespan", StringType(), True),
    StructField("language", StringType(), True),
    StructField("homeworld", StringType(), True)
])

In [0]:
# Enable schema inference for streaming queries
spark.sql("set spark.sql.streaming.schemaInference=true")

In [0]:
# Read streaming data from the "/dataset/species/" directory with the given schema
rows = spark.readStream \
  .format("csv") \
  .schema(schema_species) \
  .load("/dataset/species/") \
  .withColumn("current_timestamp", current_timestamp())

# Group the rows by a 2-second window and classification, then count the occurrences
windowedCounts_species = rows.groupBy(
      window(col("current_timestamp"), "2 seconds"),
      col("classification")
  ).count()

In [0]:
# Define a streaming query that writes the windowed counts to an in-memory table
query_species = windowedCounts_species.writeStream \
  .outputMode("update") \
  .format("memory") \
  .queryName("windowing_species") 

# Start the streaming query
query_species.start()

In [0]:
# Execute an SQL query on the in-memory table "windowing_species" and show the first 30 results, displaying full content
spark.sql("select * from windowing_species").show(30, False)

In [0]:
# Copy a specific partition file from "/dataset/species_splitted/" to "/dataset/species/"
dbutils.fs.cp('/dataset/species_splitted/part-00002-tid-3182637813745946561-4c082602-0398-426b-98b6-8bd6a6250458-3205-1-c000.csv', '/dataset/species/')

In [0]:
# Execute an SQL query on the in-memory table "windowing_species" and show the first 30 results, displaying full content
spark.sql("select * from windowing_species").show(30, False)

In [0]:
# Copy additional partition files from "/dataset/species_splitted/" to "/dataset/species/"
dbutils.fs.cp('/dataset/species_splitted/part-00000-tid-3182637813745946561-4c082602-0398-426b-98b6-8bd6a6250458-3203-1-c000.csv', '/dataset/species/')
dbutils.fs.cp('/dataset/species_splitted/part-00001-tid-3182637813745946561-4c082602-0398-426b-98b6-8bd6a6250458-3204-1-c000.csv', '/dataset/species/')

In [0]:
# Execute an SQL query on the in-memory table "windowing_species" and show the first 30 results, displaying full content
spark.sql("select * from windowing_species").show(30, False)

# Stream - Stream Join

## Exercise 3 - Inner Join

Update the following code to build an inner join between `df_left` and `df_right` Dataframes.
- Set the output mode to `append`
- Set 2 hours watermark for both streams

In [0]:
# Create a streaming DataFrame generating data at a rate of 3 rows per second
df_left = (
    spark
    .readStream
    .format("rate")
    .option("rowsPerSecond", 3)
    .load()
)

In [0]:
# Create another streaming DataFrame generating data at a rate of 3 rows per second
df_right = (
    spark
    .readStream
    .format("rate")
    .option("rowsPerSecond", 3)
    .load()
)

In [0]:
from pyspark.sql.functions import *

# Modify the df_left DataFrame by adding random columns "left_key" and "left_value" with values between 1 and 10
# Also, add a watermark to the "timestamp" column with a 2-hour delay
df_left_modified = (
    df_left
    .withColumn("left_key", ceil(rand() * 10))
    .withColumn("left_value", ceil(rand() * 10))
    .withWatermark("timestamp", "2 hours")
)

In [0]:
# Modify the df_right DataFrame by adding random columns "right_key" and "right_value" with values between 1 and 10
df_right_modified = (
    df_right
    .withColumn("right_key", ceil(rand() * 10))
    .withColumn("right_value", ceil(rand() * 10))
)

In [0]:
# Join the modified DataFrames on the condition where "left_key" equals "right_key"
df_joined = (
    df_left_modified
    .join(df_right_modified, df_left_modified.left_key == df_right_modified.right_key)
)

In [0]:
# Display the joined DataFrame in a streaming query
(
df_joined
 .display()
)

We execute this SQL query to see the joined values:

In [0]:
# Create the Temporary View
df_joined.createOrReplaceTempView("test_joined")

In [0]:
%sql
SELECT * 
FROM test_joined

In [0]:
# Add watermarking to both DataFrames on the "timestamp" column with a 2-hour delay
df_left_watermarked = df_left_modified.withWatermark("timestamp", "2 hours")
df_right_watermarked = df_right_modified.withWatermark("timestamp", "2 hours")

# Rename the "timestamp" column in df_left_modified to "timestamp_left"
# Perform a left join with df_right_modified on the condition where "left_key" equals "right_key"
# and the "timestamp" in df_right_modified is within 1 hour of "timestamp_left"
df_left_watermarked.withColumnRenamed("timestamp","timestamp_left")\
    .join(
        df_right_watermarked, expr("""
            left_key = right_key AND
            timestamp <= timestamp_left + interval 1 hour
        """), 
        how='left')\
    .display()

I added watermarking to both dataframes `df_left_watermarked` and `df_right_watermarked` because otherwise I got an error in the join:

`AnalysisException: Stream-stream LeftOuter join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition;`