<h3> Listing data files </h3>

In [0]:
%fs ls /databricks-datasets/structured-streaming/events/

path,name,size
dbfs:/databricks-datasets/structured-streaming/events/file-0.json,file-0.json,72530
dbfs:/databricks-datasets/structured-streaming/events/file-1.json,file-1.json,72961
dbfs:/databricks-datasets/structured-streaming/events/file-10.json,file-10.json,73025
dbfs:/databricks-datasets/structured-streaming/events/file-11.json,file-11.json,72999
dbfs:/databricks-datasets/structured-streaming/events/file-12.json,file-12.json,72987
dbfs:/databricks-datasets/structured-streaming/events/file-13.json,file-13.json,73006
dbfs:/databricks-datasets/structured-streaming/events/file-14.json,file-14.json,73003
dbfs:/databricks-datasets/structured-streaming/events/file-15.json,file-15.json,73007
dbfs:/databricks-datasets/structured-streaming/events/file-16.json,file-16.json,72978
dbfs:/databricks-datasets/structured-streaming/events/file-17.json,file-17.json,73008


<h3> Defining Schema</h3>

In [0]:
from pyspark.sql.types import *

from pyspark.sql.functions import *

#Reference to the path where input data is stored
inputPath = "/databricks-datasets/structured-streaming/events/"

jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

<h3> Initializing the stream </h3>

In [0]:
# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

In [0]:
# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      streamingInputDF.action, 
      window(streamingInputDF.time, "1 hour"))
    .count()
)

# To check if the DF is actually a streaming DF
streamingCountsDF.isStreaming

<h3>Starting the Streaming Job</h3>

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table 
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [0]:
# Blocks until all available data in the source has been processed and committed to the sink. This method is intended for testing.
query.processAllAvailable() 

# Prints the (logical and physical) plans to the console for debugging purpose.
query.explain()

In [0]:
# Returns the StreamingQueryException if the query was terminated by an exception, or None.
query.exception()

<h3> Properties of a query </h3>

In [0]:
# Returns the unique id of this query that persists across restarts from checkpoint data. 
print("id: " , query.id)

# Return boolean: whether this streaming query is currently active or not.
print("isActive: " , query.isActive)

# Returns the user-specified name of the query, or null if not specified.
print("name: " , query.name)

# Returns the unique id of this query that does not persist across restarts. 
print("runId: " , query.runId)

# Returns the current status of the query.
print("status: ", query.status)

In [0]:
# Returns the most recent StreamingQueryProgress update of this streaming query or None if there were no progress updates.
print("lastProgress:", query.lastProgress)

In [0]:
# Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
print("recentProgress:", query.recentProgress)

<h3> Streaming Query Manager </h3>

In [0]:
# Returns a list of active queries associated with this SQLContext
print("active: ", spark.streams.active)

# Returns an active query from this SQLContext or throws exception if an active query with this name doesn’t exist.
print("get active query:" , spark.streams.get(query.id))

# Forget about past terminated queries so that awaitAnyTermination() can be used again to wait for new terminations.
print("resetTerminated:" , spark.streams.resetTerminated())

<h3>Data Stream Writer</h3>

In [0]:
query.stop()

In [0]:
# foreach(f)
# Sets the output of the streaming query to be processed using the provided writer f. This is often used to write the output of a streaming query to arbitrary storage systems. 

#Using a function
def print_row(row):
    print(row)

writerFunc = streamingCountsDF.writeStream.foreach(print_row)

writerFunc

In [0]:
# Print every row using a object with process() method
class RowPrinter:
    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True
    def process(self, row):
        print(row)
    def close(self, error):
        print("Closed with error: %s" % str(error))

writerObj = streamingCountsDF.writeStream.foreach(RowPrinter())

writerObj

In [0]:
# foreachBatch(func)
# Sets the output of the streaming query to be processed using the provided function. This is supported only the in the micro-batch execution modes (that is, when the trigger is not continuous).

def func(batch_df, batch_id):
    batch_df.collect()

writerBatch = streamingCountsDF.writeStream.foreachBatch(func)

writerBatch

In [0]:
# trigger
# Set the trigger for the stream query. If this is not set it will run the query as fast as possible, which is equivalent to setting the trigger to processingTime='0 seconds'.

# trigger the query for execution every 5 seconds
writerP5 = streamingCountsDF.writeStream.trigger(processingTime='5 seconds')

# trigger the query for just once batch of data
writerOnce = streamingCountsDF.writeStream.trigger(once=True)

# trigger the query for execution every 5 seconds
writerC5 = streamingCountsDF.writeStream.trigger(continuous='5 seconds')