# Lab 04 - Streaming Pattern - Processing events from Kafka using Spark and MLlib

AdventureWorks has asked for the ability to extend their product recommendations feature, integrating the trained Alternating Least Squares (ALS) recommendation model to make predictions against streaming weblog data from Kafka.

In this lab, you will upload and run a Java .jar application to add sample weblog data into a Kafka topic, and use the same application to view the data added. You will then create a simple Kafka producer using Spark to add a few more records to the topic. Next, you will use Spark Structured Streaming to query the data, and run the streamed data against the ALS recommendation model, getting product recommendations for a given user.

## Requirements

* An Azure Virtual Network
* A Spark 2.1 on HDInsight 3.6 cluster, inside the virtual network
* A Kafka on HDInsight 3.6 cluster, inside the virtual network

## Environment setup
The first thing you need to do is prepare the environment for the tasks ahead.

### Load required packages
To use Spark Structured Streaming with Kafka, you must load the appropriate packages. The version must match the version of both Kafka and Spark that you are using, so for our setup we need to load packages that works with Kafka on HDInsight 3.6, and Spark 2.1 on HDInsight 3.6.

In [None]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-streaming_2.11:2.1.0,org.apache.spark:spark-streaming-kafka-0-8_2.10:2.1.0",
        "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
    }
}

### Install `jq` on the header node of the Spark cluster
For several steps of this lab you will be working with JSON data from the command line. `jq` is a lightweight and flexible command-line JSON processor, which you will use to parse the JSON returned by the `curl` command below. To install `jq` do the following:

1. Open an bash shell prompt, and connect to the head node of your Spark cluster using the following command, replacing SPARKCLUSTERNAME with the name of your Spark cluster. 
```bash
ssh sshuser@SPARKCLUSTERNAME-ssh.azurehdinsight.net
```
2. You will be presented with a prompt that the authenticity of the host can't be established. Type `yes` to continue.
3. Enter your admin password when prompted.
4. Once your connection is established, enter the following at the ssh command prompt.
```bash
sudo apt -y install jq
```

Leave the bash shell open, as you will be using it again below.

## Create a Kafka topic
Now that the environment is ready, the next thing you need to do when working with Kafka is create a topic. A topic is a category or feed name to which records are published. This will be where your streaming data resides within the Kafka cluster.

### Get Zookeeper hosts
Topics are registered in ZooKeeper, which means you must provide the **Zookeeper host** information for your Kafka cluster. To find the Zookeeper host information for your Kafka HDInsight cluster, you can use the Ambari REST API. The following cell retrieves this information using the the `curl` and `jq` utilities using a `%%bash` shell magic command.

> While there may be more than two Zookeeper hosts for your cluster, you do not need to provide a full list of all hosts to clients. One or two is enough. In this case, we return two.

The following cell generates a comma-delimited list containing two hosts, similar to the following example:
```
zk0-kafka.rwlvi5egublulm0bp55vont2af.xx.internal.cloudapp.net:2181,zk1-kafka.rwlvi5egublulm0bp55vont2af.xx.internal.cloudapp.net:2181
```

Before running the cell below:
1. Replace the value of `KAFKACLUSTERNAME` with the name of your Kafka cluster.
2. Replace the value of `PASSWORD` with the admin password of your Kafka cluster (default value assigned was "Abc!1234567890").

In [None]:
%%bash
KAFKACLUSTERNAME="<repace with your kafka cluster name>"
PASSWORD="Abc!1234567890"
curl -su admin:$PASSWORD -G "https://$KAFKACLUSTERNAME.azurehdinsight.net/api/v1/clusters/$KAFKACLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2

### Create the topic
Now that we have our Zookeeper host list, you can use a bash shell to create a topic on Kafka called "weblogs."

Copy the output from the previous step, and use it to replace the value of the `ZOOKEEPER_HOSTS` variable in the next cell.

> For example, ZOOKEEPER_HOSTS="zk1-kafka.0qmwcwuospvenlxdqylbdkt1jc.xx.internal.cloudapp.net:2181,zk3-kafka.0qmwcwuospvenlxdqylbdkt1jc.xx.internal.cloudapp.net:2181"

In [None]:
%%bash
TOPIC="weblogs"
ZOOKEEPER_HOSTS="<replace with output from previous command>"
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic $TOPIC --zookeeper $ZOOKEEPER_HOSTS

## Upload and run a Kafka producer/consumer application
The next step is to upload an application that can be used to write data to your Kafka topic. For this, you will use a Java application which produces sample weblog data, and writes it to the Kafka cluster.

### Copy the `jar` file to the Spark cluster head node
As part of the package you downloaded for this lab, you will find a file named kafka-producer-consumer.jar. This file needs to be uploaded to the head node of your Spark cluster. 

From a **new** bash shell prompt, you will upload the compiled `jar` file to the local storage of your Spark HDInsight cluster head node using an `scp` command. As done earlier, replace SPARKCLUSTERNAME with the name you provided earlier for your Spark cluster. When prompted, enter the password for the SSH user. Replace the "/path/to/Kafka-Producer-Consumer/kafka-producer-consumer.jar" with the path to this file in the Lab04 folder.

```bash
scp ./kafka-producer-consumer.jar sshuser@SPARKCLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
```

### Return to the shell you opened previously, which has the SSH connection to your Spark cluster head node.
Back in the bash shell where you performed the `jq` installation, verify the `kafka-producer-consumer.jar` file was uploaded by listing the files.
```bash
ll
```
You should see output similar to the following image:
![Head Node Files](https://raw.githubusercontent.com/ZoinerTejada/hdi-labs/master/Labs/Lab04/images/cluster-file-list.png)

### Get you Kafka brokers
Before attempting to run the Kafka producer, you need to retrieve your **Kafka brokers**. These brokers provide the connection information needed for the kafka-producer-consumer command-line app to write and read records to and from your Kafka cluster. To find the Kafka broker information for your Kafka HDInsight cluster, you can use the Ambari REST API. The following cell retrieve this information using the the `curl` and `jq` utilities using a `%%bash` shell magic command.

> While there may be more than two broker hosts for your cluster, you do not need to provide a full list of all hosts to clients. One or two is enough. In this case, we return two.

The following cell generates a comma-delimited list containing two hosts, similar to the following example:
```
wn0-kafka.liftazhqudlunpo4tkvapo234g.dx.internal.cloudapp.net:9092,wn1-kafka.liftazhqudlunpo4tkvapo234g.dx.internal.cloudapp.net:9092
```

Before running the cell below:
1. Replace the value of `KAFKACLUSTERNAME` with the name of your Kafka cluster.
2. Replace the value of `PASSWORD` with the admin password of your Kafka cluster (default value assigned was "Abc!1234567890").

In [None]:
%%bash
KAFKACLUSTERNAME="<repace with your kafka cluster name>"
PASSWORD="Abc!1234567890"
curl -su admin:$PASSWORD -G "https://$KAFKACLUSTERNAME.azurehdinsight.net/api/v1/clusters/$KAFKACLUSTERNAME/services/KAFKA/components/KAFKA_BROKER" | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2

### Execute the Kafka Producer
Return to your bash shell with the SSH connection to your Spark cluster head node.

You will now execute a producer command in the application to write records to Kafka. The following command will write 100,000 abbreviated weblog records to your topic in Kafka. The records are in JSON format, and look like the following:

```json
{"ProductId" : 33, "UserId" : 37}
{"ProductId" : 95, "UserId" : 2208}
{"ProductId" : 83, "UserId" : 9316}
{"ProductId" : 1, "UserId" : 7418}
{"ProductId" : 92, "UserId" : 10569}
```

Replace the value of `KAFKABROKERS` with the output of the previous cell, then copy and paste the code into your bash shell.

```bash
TOPIC=weblogs
KAFKABROKERS="<replace with your kafka brokers list>"
java -jar kafka-producer-consumer.jar producer $KAFKABROKERS $TOPIC
```

You should see the number of records written count up to 100,000 in the dialog.

![Producer output](https://raw.githubusercontent.com/ZoinerTejada/hdi-labs/master/Labs/Lab04/images/kafka-producer-app-output.png)

> You may see a failure message, as is highlighted in the image above. This can be safely ignored.

### Execute the Kafka Consumer
Now that you have successfully used the Kafka producer to write data to your topic, let's use the consumer component of the `jar` application to look at the data that you uploaded.

Copy and paste the following code into your bash shell connected via SSH to your Spark cluster head node. Execution of this command will output the data written by the previous command.

> Note the $KAFKABROKERS and $TOPIC variables don't need to be assigned, as they were added with the previous producer commands.

```bash
java -jar kafka-producer-consumer.jar consumer $KAFKABROKERS $TOPIC
```

You will notice that the command does not return to a command prompt. You can press `CTRL+C` to get the prompt back, but for now leave it as is.

![Consumer output](https://raw.githubusercontent.com/ZoinerTejada/hdi-labs/master/Labs/Lab04/images/kafka-consumer-app-output.png)

## Create a simple Spark producer
In this next section, you will use Spark as a Kafka Producer, and add a few more records to the topic. With the open Consumer prompt, you will be able to observe the new records being added.

### Get Weblogs data
First, retrieve your weblogs data from the `weblogs` Hive table, and store it in a Spark DataFrame.

In [None]:
// Use Spark SQL to retrieve all the records from the "weblogs" Hive table.
val weblogs = spark.sql("SELECT * FROM weblogs")
weblogs.printSchema()

### Create sample dataset
From the weblogs data, create a small sample of data you can send to Kafka.

> The weblogs data contains approximately 90 million records, as well as mulitple fields that are not need for this exercise, so the sampel dataset will make things more manageable.

In [None]:
// Use the select and limit methods of a Spark Dataframe to create a new Dataframe
// containing only 100 records, and only those fields we want to write to Kafka.
val sample = weblogs.selectExpr("ProductId", "UserId").limit(100)
sample.show(5)

### Create a new Spark Kafka Producer
To write records to Kafka from your Spark Dataframe, you first need to create a Kafka Producer. This will be very similar to the code found inside the Java kafka-producer-consumer.jar application executed above.

#### Configure the properties for our Kafka Producer
First, set the properties of the Kafka producer.

Before executing the cell below:
1. Replace the value of `kafkaBrokers` with your comma-delimited list of Kafka brokers from above.
2. Replace the value of `kafkaTopic` with the name you provided in for your Kafka topic above. If you left the default value of `weblogs` you don't need to change this.

In [None]:
// Import required libraries
import org.apache.spark.sql._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.Properties;
import scala.util.parsing.json.JSONObject

val kafkaBrokers = "<replace with your kafka broker list>"
val kafkaTopic = "weblogs"

val props = new Properties()
props.put("bootstrap.servers", kafkaBrokers)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

println("Finished configuring Kafka Producer")

### Start the producer stream
In the next cell, you will start streaming weblog data into Kafka. For accommodate this, you will generate a KafkaProducer, and use its send method to pass each row of weblog data into Kafka.

Once you start running the cell, return to your bash shell with the open consumer session, and you should see the new records streaming into the topic in Kafka.

In [None]:
// Create an accumulator to track the number of weblog entries emitted to Kafka
val numRecords = sc.accumulator(0L,"Weblog entries sent to Kafka")

def convertToJson(row: Row): String = {
    val m = row.getValuesMap(row.schema.fieldNames)
    JSONObject(m).toString()
}

// Loop through the records in the sample DataFrame, converting each row to JSON and passing it to a Kafka producer.
for (rec <- sample.collect()) {
    val producer = new KafkaProducer[String,String](props)
    val jsonData = convertToJson(rec)
    val message = new ProducerRecord[String,String](kafkaTopic, "spark_demo", jsonData)
    producer.send(message)
    producer.close()
    numRecords +=1
}

println("Finished writting " + numRecords + " records to Kafka topic '" + kafkaTopic + "'")

In your bash shell, you should have seen the 100 new records streaming into the topic via the consumer application. The total records in the topic should now read 100100.

![Spark Producer output](https://raw.githubusercontent.com/ZoinerTejada/hdi-labs/master/Labs/Lab04/images/spark-producer-output.png)

You can now press `CTRL+C` in the bash shell to exit the consumer app, and return to the command prompt.

## Create query for the Kafka stream
It is now time to look at reading the data from Kafka into a Spark Structured Streaming Dataframe.

You can access the data stored in the Kafka topic, and read it into a Spark streaming DataFrame, by subscribing to your `weblogs` topic. The `kafka` DataFrame, below, represents an unbounded table containing the streaming data, and is your query for the streaming data.

In [None]:
// Construct a streaming DataFrame that reads from weblogs
val kafka = { spark.readStream.format("kafka")
             .option("kafka.bootstrap.servers", kafkaBrokers)
             .option("subscribe", kafkaTopic)
             .option("startingOffsets", "earliest")
             .load() }

kafka.printSchema()

Looking at the schema outuput for the `kakfa` streaming DataFrame, you can see it includes the fields `key`, `value`, `topic`, `partition`, `offset`, `timestamp` and `timestampType` fields. You can pick and choose the columns needed for processing. The `value` field contains the actual data, and `timestamp` is message arrival timestamp.

For this lab, we are only interested in the `value` field. Notice how it is currently displayed as binary data. To make use of this field, you need to convert it to a string.

The final step is to actually start receiving the data. To do this you can set it up to print the `value` field to the console, and then start the stream using `start()`.

In [None]:
// Convert the value column to a string
val kafka_value = kafka.select(col("value").cast("string"))

kafka_value.selectExpr("value").writeStream.format("console").start.awaitTermination(10000)

After this code is executed, the streaming computation will have started in the background. The query object is a handle to that active streaming query, and we have decided to run the query for 10 seconds before termination, by using `awaitTermination(10000)`.

As you can see, all of the data in the value column is in a JSON format. You will need to convert it to the appropriate data types, and create columns in a DataFrame for the values.

### Create a schema for reading the JSON data.
To accomplish this, you will first need to create a schema that can be applied to the JSON data.

In [None]:
// Import libraries used for declaring schemas and working with JSON data
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// Define the structure of our weblogs JSON document that is read from Kafka.
val schema = { (new StructType)
                  .add("ProductId", IntegerType)
                  .add("UserId", LongType) }
             
schema.printTreeString

Now that you have an appropriate schema, you can create a new streaming DataFrame which contains your extracted JSON data by using the Spark SQL `from_json` method. This will create an aliased column named "weblog" to store the object. Then, you can select the data into a console stream for viewing.

In [None]:
val weblog_data = kafka_value.select(from_json(col("value").cast("string"), schema).alias("weblog"))
val weblogs_stream = weblog_data.select("weblog.ProductId", "weblog.UserId")

// Output to the console, so you can view the data.
weblogs_stream.writeStream.format("console").start.awaitTermination(10000)

Now, let's take a quick look at the isStreaming method. This is a way to verify whether or not a DataFrame is streaming. For our weblog_stream DataFrame, you should see true returned.

In [None]:
weblogs_stream.isStreaming

For comparision, run the same command against the weblogs DataFrame you created above to load the weblogs data from CSV files in Azure storage.

In [None]:
weblogs.isStreaming

### Create an in-memory query for accessing the streaming data
In this scenario, you will store the input data as an in-memory table. From here, you can query the dataset using SQL. The name of table is specified from the queryName option.

> NOTE: In-memory output should only be used on small datasets, as the entire output is collected and stored in memory. In a production environment, you would want to write the query to a file or other sink using code like the following:
```scala
writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
```

In [None]:
val query = { weblogs_stream
        .writeStream
        .format("memory")
        .queryName("streamingLogs")
        .start() }

With the in-memory query, you can now access the data via Spark SQL, using the queryName, `streamingLogs`, as the table name.

In [None]:
val user_product_mapping = spark.sql("select distinct ProductId, UserId from streamingLogs")
user_product_mapping.show(5)

## Operationalize the ML model
Run the streaming DataFrames through the ALS model, so product recommendations based on the streaming datasets can be generated.

### Retrieve the model
First, you need to ensure the model exists in the proper storage location. The output from the command below should resemble something like:
```
Found 3 items
drwxr-xr-x   - livy supergroup          0 2017-10-23 18:16 /models/cfmodel/itemFactors
drwxr-xr-x   - livy supergroup          0 2017-10-23 18:15 /models/cfmodel/metadata
drwxr-xr-x   - livy supergroup          0 2017-10-23 18:16 /models/cfmodel/userFactors
```

In [None]:
%%sh
hdfs dfs -ls /models/cfmodel

After verifying the model exists in storage, load the model.

In [None]:
// Import mllib recommendation data types
import org.apache.spark.ml.recommendation.ALSModel

// Load the model using ASLModel's load method.
val model = ALSModel.load("/models/cfmodel")

### Apply the model to the streamed data from Kafka
Now use the tranform method on the model to create a new DataFrame that includes all of the columns from our streamingLogs query, and adds a new prediciton column that indicates the "confidence" of the prediction.

In [None]:
val predictions = model.transform(user_product_mapping)
predictions.show(5)

In the prediction column, you may have NaN (not a number) values which simply mean no prediction. Let's clean up the prediction DataFrame by omitting rows with NaN values for the prediction, cache the results and take a peek.

In [None]:
val recommended_products = predictions.where("not isnan(prediction)").orderBy("UserId", "prediction")
recommended_products.cache()
recommended_products.show(5)

### Create a DataFrame for Products data
Before running the model, let's load and parse the product data from Azure Storage, so it can be joined to the recommended_products data. 

In [None]:
val products_schema = { (new StructType)
                       .add("ProductId", IntegerType)
                       .add("ProductName", StringType)
                       .add("Price", FloatType)
                       .add("CategoryId", StringType)
                       .add("Ignore1", StringType)
                       .add("Ignore2", StringType)
                       .add("Ignore3", StringType)
                       .add("Category", StringType)
                       .add("Department", StringType) }

val products_DF = { spark.read.format("com.databricks.spark.csv")
                   .option("header", false)
                   .schema(products_schema)
                   .load("/retaildata/rawdata/ProductFile/part{*}") }

val products = products_DF.select("ProductId", "ProductName", "Price", "CategoryId", "Category", "Department")
products.show(5)

### Get product recommendations
Execute the query below, joining the Products and Recommendation data. 

> If you don't get any results, try entering a different UserId in the where clause. You can select one from the results of the recommended_products.show() operation above.

In [None]:
recommended_products.join(products, "ProductId").where("UserId = 807").orderBy(col("prediction").desc).show(10)

## Clean up

In [None]:
// Stop the query
query.stop()

## Conclusion
In the lab, you have learned how to use Spark Structured Streaming and Kafka to incorporate streaming data into a trained machine learning model.

Specifically you:
* Configured a Spark cluster to use Kafka
* Created a Kafka topic
* Used a Java application to add records to a Kafka topic, and to consume the records added
* Created a simple Kafka Producer using Spark
* Operationalized a trained ALS model to get product recommendations using streamed data