# Spark Structured Streaming - Sources and Sinks

Notebook to explore the different options of querying stream data using <b>Spark Structured Streaming API</b>.

As you may remember, we treat this data streams as if they were Spark DataFrames whenever transforming the data and aggregating the data. Now big differences are going to be noticeable whenever reading and writing the information.

Our main focus of this notebook is showing examples of this commands and in which situations you need to use them.

All the documentation for the API you can find it in the [link](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html).

**WARNING** -- This notebook is not for launching queries/scripts as it is not properly Spark is not properly set up for launching code. It is only meant for showing different options of scripts for Spark Structured Streaming API.

## Sections
* [Sources](#0)
    * [File Source](#0.1)
    * [Socket Source](#0.2)
    * [Kafka Source](#0.3)
* [Sinks](#1)
    * [File Sinks](#1.1)
    * [Kafka Sinks](#1.2)
    * [Memory Sinks](#1.3)
    * [Console Sinks](#1.4)

<a id='0'></a>
## <u>Sources</u>

In this section we will view the different ways to read files using Spark Structured Streaming.

Depending on the type of file, location and data inside of the file you will need to insert different options to make spark posible to read them correctly.

The input can be either unstructured data or structured dataframes stored in HDFs.

Common options:

* <b>maxFilesPerTrigger</b> --> You indicate how many files you are going to read for each iteration.
* <b>latestFirst</b> --> When you indicate True then all new files are going to be read first.
* <b>maxFilesAge</b> --> Define how old are going to be oldest files to read.

<a id='0.1'></a>
### File Source

You use this kind of source to read directly from files that are stored either in your file system or in HDFS. 

It can read different kind of formats:

* CSV
* JSON
* Parquet
* ORC
* text
* textfile


Examples:

- Normal reading csv from path

In [None]:
## Setting up the schema with which you are going to read the dataframe
csv_schema = StructType().add("name", "string").add("age", "integer")

## Declaring path to the csv
path_to_csv = "/././"

## Read the source from csv located in your file system
df_csv = spark \
    .readStream \
    .option("sep", ",") \ # you declare how the csv is separating the columns inside
    .option("header", "false") \ # you declare if the first line in the csv file is a header
    .schema(csv_schema) \ # you can already declare the spark schema already
    .csv(path_to_csv)

- Reading part of the text files from hdfs path.

In [None]:
## Declaring path to the HDFS file where text files are
path_to_hdfs = "hdfs://././"

## Read the source from text files located in HDFS
df_csv = spark \
    .readStream \
    .option ("maxFilesPerTrigger", 50) \ # generic spark readstream option
    .option ("latestFirst", "true") \ # generic spark readstream option
    .option ("maxFilesAge", "true") \ # generic spark readstream option
    .text(path_to_hdfs)


- Reading JSON from hdfs path.

In [None]:
## Declaring path to the HDFS file where JSON files are
path_to_hdfs = "hdfs://././"

## Read the source from JSON located in your file system
df_csv = spark \
    .readStream \
    .option ("allowComments", "true") \ # if in the json file there are comments inserted
    .option ("multiLine", "true") \ # if each line is a json document itself
    .text(path_to_hdfs)

<a id='0.2'></a>
### Socket Source

The transmition support control (TCP) is a protocol that enable bidirectional communications between a client and a server. 

The socket source is a TCP socket client (that is running in the driver) to connect to a TCP server that offers an UTF-8 encoded text-based data stream.

You need to specify the host and port to where connect to the TCP server.

This kind of connection is not used for production but for testing streaming queries/scripts. Also it doesn't replay uncommitted offset, so in case of failure this type of connection will lose data that is not stored in memory.

In [None]:
# Simple TCP server connection
df_socket = spark \
            .readStream\
            .format("socket")\
            .option("host", "localhost")\
            .option("port", 9999)

<a id='0.3'></a>
### Kafka Source

With this method we will read from a **Kafka Producer** and a specific topic.

You need to define in which ports is set up the Kafka Producer.

Also you need to specify the topics from which obtain the different events.

You will obtain a rows with the next fields:

- key
- value
- topic
- partition
- offset
- timestamp
- timestampType
- headers (optional)

In [None]:
## Kafka-Spark Integration API 

df_kafka = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "host1:port1, host2:port2") \ # specify ports from where connect to producer
        .option("subscribe", "topic1,topic2") \ # specify which topics read from
        .option("includeHeaders", "true") \ #include the headers in a column in the resulting dataframe
        .option("startingOffsets", "earliest") \ # from where to start reading | default: "earliest"-batch , "latest"-streaming
        .option("endingOffsets", "latest") \ , # to where stop reading
        .option("maxTriggerDelay", "3m") \ # maximum time of wait between triggers
        .option("minPartitions", "2") \ # minimum number partitions from where to read considering 1:1 relation between spark and topic partitions
        .load()

<a id='1'></a>
## <u>Sinks</u>

Once we read the information from the sources, the dataframes will suffer from different transformations or aggregations made using the Spark SQL API.

Now, after these operations, the resulting dataframes can be either stored in some local file system or HDFS or sent to another streaming and scalable service such as Kafka.

You have two kind of sinks:

- Reliable sinks for application processes
    - File Sink
    - Kafka Sink
- Unreliable sinks for experimentation
    - Memory sink
    - Console sink
    
Common options:

* <b>path</b> --> You indicate the directory in your filesystem where to save the information.
* <b>chekpointLocation</b> --> You indicate the directory in your filesystem where to checkpointing metadata.
* <b>compression</b> --> You indicate the way to compress data, by default is deactivated.

<a id='1.1'></a>
### File Sinks

You use this kind of sink to write directly to files that are stored either in your file system or in HDFS. 

It can read different kind of formats:

* CSV
* JSON
* Parquet
* ORC
* text


Examples:

* Writing a csv file.

In [None]:
path_of_csv = 'hdfs://./.'

query = df \
    .writeStream \
    .format("csv") \ # format of the output
    .option("checkpointLocation", "path/to/checkpoint/dir") \ # create checkpoint location just in case there is some failure
    .option("header", 'true') \ # indicates that you want to also save headers
    .option("sep", ';') \ # indicates the way to create separations in the csv file
    .option("path", path_of_csv) \ # path where writing the dataframe
    .start() # to initiate the sink you need to launch this command

query.awaitTermination() # launch the query

* Writing a table in parquet in HDFS.

In [None]:
path_to_parquet = 'hdfs://./.'

query = df \
    .writeStream \
    .format("parquet") \ # format of the output
    .option("checkpointLocation", "path/to/checkpoint/dir") \ # create checkpoint location just in case there is some failure
    .option("path", path_to_parquet) \ # path where writing the dataframe
    .outputMode("update") \ # specify that you want to update some registry
    .start() # to initiate the sink you need to launch this command

query.awaitTermination() # launch the query

<a id='1.2'></a>
### Kafka Sinks

With this method we will send the information to a **Kafka Consumer** with a specific topic.

You need to define in which ports is set up the Kafka Consumer.

Also you need to specify the topics from which obtain the different events.

You have the alternative of erasing the option of "topic" inside of the query and include a variable inside the value which header is named "topic". In that case, it will direct each row depending of the variable topic of the dataframe.

You will obtain a rows with the next fields:

- key
- value

Inside of value you will need to include the dataframe that you want to send to Kafka consumer.

In [None]:
## Kafka-Spark Integration API 

query = df \
    .writeStream \
    .queryName('KafkaQuery') \
    .format("kafka") \ # specify that is kafka write
    .option("kafka.bootstrap.servers", "host1:port1, host2:port2") \ # specify ports from where connect to producer
    .option("topic", "topic1") \ # specify which topics read from
    .outputMode("append") \ # specify the way you want to write information
    .option("checkpointLocation", "path/to/checkpoint/dir") \ # create checkpoint location just in case there is some failure
    .option("failOnDataLoss", "false") # use this for testing, never for production
    .start()
    
query.awaitTermination() # launch the query

<a id='1.3'></a>
### Memory Sinks

This way of writing information creates a temporary table in memory in order to generate the queries that are required.

In [None]:
query \
    .writeStream \
    .format("memory") \ # defining the way to save the dataframe in memory
    .queryName('memory_table') \ # this will be the name of the dataframe stored in memory
    .outputMode("complete") \ # specifies to overwrite the dataframe
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .start()

# Now we query from the in memory saved table
spark_session.sql('SELECT * FROM memory_table;').count()

<a id='1.4'></a>
### Console Sinks

Prints results of the query in the console, useful to visualize how the dataframe looks like.

In [None]:
 # Start running the query that prints the running counts to the console
query = df \
    .writeStream \
    .format("console") \ # defining to visualize the result via console
    .outputMode("complete") \ # specifies to show all the dataframe
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .option('numRows', 50) \ # show 50 rows of the dataframe in console
    .option('truncate', 'false') \ # show all the characters inside of the columns
    .start()

query.awaitTermination()