# Final Demo – Build Streaming Pipelines

As we understand all the moving parts, such as Kafka, Spark Structured Streaming, HBase etc using Scala as programming language – let’s go ahead and build end to end streaming data pipeline.

* Pre-Requisites
* Problem Statement
* Design the Solution
* Setup Project
* Data Ingestion – Kafka Connect
* Data Processing – Spark Structured Streaming
* Integrate with HBase

### Pre-Requisites
Let us see the Pre-Requisites to execute this demo in our PC.
* 64 Bit Computer with 64 Bit Operating System
* At least 4 GB RAM and Dual Core
* If Windows, you need to have Ubuntu setup using Windows Subsystem
* Setup all the required software to build streaming pipelines. Follow the video to understand the process of validation.
    * gen_logs – an eCommerce application simulator which adds log messages to a log file
    * Apache Spark – Ensure Spark is Setup
    * Apache Kafka – Make sure Zookeeper and Kafka are running without any issues.
    * Apache HBase – Make sure HBase is running with out any issues.
* If you are using Ubuntu on Windows (setup with subsystem for Linux), you might run into issues such as corruption of Zookeeper and Kafka. You just have to clean up as demonstrated earlier and restart.
* Make sure to have IntelliJ as IDE to develop the applications.
* We will develop an application using IntelliJ and validate using Terminal.
* Before we get into the execution we need to understand fundamentals about Spark, HBase, and Kafka and should be proficient in programming using Scala.

### Problem Statement
Let us go through the problem statement for the end to end data pipelines.
* We have gen_logs which are generating log data in streaming fashion. It is a simulator for eCommerce Web application traffic.
* Messages are logged using the standard log message format.
* We want to get department wise traffic for every minute so that we can understand the performance of the department in real time.
* Data should be stored in a database so that reports can be generated.

### Design the Solution
Let us come up with the design to get department wise count into HBase table at regular intervals.
* Make sure data is generated to log files
* Data Ingestion – Ingest data to Kafka Topic using Kafka Connect. We have chosen Kafka Connect over Flume or logstash because we want to ingest data into Kafka Topic.
* We can also use Flume or logstash for this purpose.
* Develop logic to consume data and process using Spark Structured Streaming. We can use technologies like Flink or Storm for the same purpose.
* Processed Data should be persisted to HBase table. We have chosen HBase as it is already available in our cluster along with Spark.
* HBase Data Model
    * Row Key: Date:Department
    * Column Key: Date with time up to a minute
    * Column Value: Count for a given minute
* This will facilitate us to use HBase filters to access data faster for reports such as minute wise traffic to day-wise traffic. We need to process data while building a report based on the requirements.

### Setup Project
We have 2 components for this project – Data Ingestion and Data Processing.
* For Data Ingestion we are going to use Kafka Connect, which is available out of the box.
* For Data Processing we will be using Spark Structured Streaming.
    * Data will be read from Kafka Topic
    * Data will be processed using Spark Structured Streaming APIs
    * Data will be written to HBase
* We will create a new project – streaming-pipelines
* Add assembly plugin, so that we can build a fat jar
    * Create **assembly.sbt** file in the **project** directory.
    * Add this line of code – <mark>addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")</mark>
* Add type safe config dependency so that we can externalize properties
* Add necessary dependencies for Kafka and Spark Structured Streaming
* Add necessary dependencies for HBase
* Define all dependencies as part of build.sbt with merge strategy for assembly

* We are using com.typesafe config for externalizing the properties. It can read the files from src/main/resources with names such as application.properties and build Config object.
* Let us define all the properties that are required for our application to run in dev and prod.

**kafka-structured-streaming-application.properties**

### Data Ingestion – Kafka Connect
Let us understand how we can get data from Web Server logs into Kafka Topic using Kafka Connect.
* We can either use standalone or distributed mode for Kafka connect worker
* Examples are available under ```$KAFKA_CONF_DIR```
* Kafka support a bunch of sources and sinks as part of connect
* To get data from log files we need to use the file as source
* Example for file source is available under $KAFKA_CONF_DIR/connect-file-source.properties
* We can start the worker by passing 2 arguments – one worker mode and another source type.
* We can validate by consuming data using kafka-console-consumer.sh
* Below code snippet is created for our labs, however, you can make cosmetic changes as demonstrated and use this for local setup as well. Instead of using DNS Aliases related to lab we need to use localhost to run these locally on our PC.

### Data Processing – Spark Structured Streaming
Let us understand how we can apply Spark Structure Streaming APIs to get Streaming Department Count every minute.
* There are 4 different discrete components as part of Streaming Application.
    * Creating Context (SparkSession object is a wrapper for SparkContext)
    * Reading Data
    * Processing Data
    * Writing Data
    * You need to separate these 3 and focus on relevant areas.

***Create SparkContext***

We will see how to read externalized properties and create SparkSession object.
* Make sure externalized properties are read using typesafe config APIs.
* Create SparkSession object (spark)

***Read Data***
Let us explore relevant APIs to read data in streaming fashion.
* Read data from Kafka Topic using Spark Structured Streaming APIs – **spark.readStream**.
* Let us read the data and print the lines without any data processing.


***Process Data – Baseline***

Let us explore relevant Data Frame APIs to process data. We can also use Spark SQL approach and write queries to process the data.
* As we have to kill spark context to come out of streaming context, we will first apply data processing logic using batch approach and then we will add as part of the streaming pipeline.
* We will use the date that is logged as part of log messages in the log file to get department wise count for every minute. However, the date is not logged in the standard timestamp format and hence we need to come up with logic to convert the date in string type to timestamp type.
* Also, we will filter for department records and then extract department_name along with the timestamp.
* We will also apply Data Frame APIs to get count by department every minute.

***Process Data – Streaming***

* Now let us refactor the code to get the department wise traffic in streaming fashion.We need to use Window Operations.
* We can use timestamp as part of window operation and get department wise count at regular intervals in streaming fashion.

***Write Data***

Let us see some of the key aspects of writing the data in streaming fashion after applying the processing logic.
* Print the count on the screen every 20 seconds.

### Integrate with HBase
Let us understand how to load the data into HBase table instead of printing on to the console. We will start with code from the previous topic and then take care of the integration of output with HBase.

***Using IDE***
Let us take the code from the previous topic and create a program using IDE.
* Create program – **GetStreamingDepartmentTrafficHBase**
* Make sure all the properties defined in application.properties are correct.
* Add the code to the program.
* At this juncture, we are ready to refactor the code to save the output to HBase Table.

***Implement ForeachWriter***

Let us implement ForeachWriter to integrate with HBase. These steps can be followed to integrate with any database as long as plugins are available.

* We need to use foreach to write data into the Database using Spark Structured streaming.
* **foreach** is available as part of df.writeStream
* We need to pass a custom writer of type **ForeachWriter[Row]**
* We need to implement 3 APIs – **open, process** and **close**
* For each Data Frame partition
    * **open** – Database connection will be established
    * **process** – Data will be processed one at a time
    * **close** – Database connection will be closed

***Validate Locally***

As development is done, now we can build the jar file and validate using spark-submit.

* Make necessary changes to the code related to connecting to the HBase cluster in distributed mode.
* Go to the project directory and run <mark>sbt assembly</mark> to build a fat jar.
* Ensure that Kafka connect is running and data is being ingested to Kafka Topic.
* Ensure HBase is up and running
* Create HBase table **department_count** with column family **cf – <mark>create 'department_count', 'cf'</mark>
* Run spark-submit command to run the streaming pipeline where the output is stored in the HBase table.
* Go to **hbase shell** and validate to ensure that data is flowing in without any issues.

**Run on Cluster**

Let us see the steps for validating streaming pipeline job on the cluster.
* Ship the jar file to the cluster (using scp command)
* Ensure that Kafka Connect is running and ingesting data from web server logs to Kafka Topic

* Create HBase table department_count with column family cf – <mark>create 'department_count', 'cf'</mark>
* Run spark-submit command to run the streaming pipeline where the output is stored in the HBase table.
* Go to **hbase shell** and validate to ensure that data is flowing in without any issues.