### Spark streaming has microbatching, which means data comes as batches and executers run on the batches of data. If the executor has idle timeout less than the time it takes to process the batch, then the executors would be constantly added and removed. If the executors idle timeout is greater than the batch duration, the executor never gets removed. So we recommend that you disable dynamic allocation by setting spark.dynamicAllocation.enabled to false when running streaming applications.

In [1]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
        "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11",
        "spark.dynamicAllocation.enabled": false
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1595680205259_0008,pyspark,idle,Link,Link,


### Create the Kafka topic. Edit the command below by replacing YOUR_ZOOKEEPER_HOSTS with the Zookeeper host information extracted in the first step. Enter the edited command in your Jupyter Notebook to create the tripdata topic.

In [2]:
%%bash
export KafkaZookeepers="zk0-kafkah.uwguulvqr1kevfnztmsfrc5rqc.bx.internal.cloudapp.net:2181,zk1-kafkah.uwguulvqr1kevfnztmsfrc5rqc.bx.internal.cloudapp.net:2181,zk6-kafkah.uwguulvqr1kevfnztmsfrc5rqc.bx.internal.cloudapp.net:2181"

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic tripdata --zookeeper $KafkaZookeepers

Created topic "tripdata".


### Retrieve data on taxi trips. Enter the command in the next cell to load data on taxi trips in New York City. The data is loaded into a dataframe and then the dataframe is displayed as the cell output.

In [3]:
import spark.implicits._

// Load the data from the New York City Taxi data REST API for 2016 Green Taxi Trip Data
val url="https://data.cityofnewyork.us/resource/pqfs-mqru.json"
val result = scala.io.Source.fromURL(url).mkString

// Create a dataframe from the JSON data
val taxiDF = spark.read.json(Seq(result).toDS)

// Display the dataframe containing trip data
taxiDF.show()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5,application_1595680205259_0009,spark,idle,Link,Link,✔


SparkSession available as 'spark'.
+------------------+-------------------+-----+-----------+---------------------+---------------------+--------------------+-------+---------------+------------+------------------+-------------------+----------+------------------+----------+------------+------------+-------------+---------+--------+
|  dropoff_latitude|  dropoff_longitude|extra|fare_amount|improvement_surcharge|lpep_dropoff_datetime|lpep_pickup_datetime|mta_tax|passenger_count|payment_type|   pickup_latitude|   pickup_longitude|ratecodeid|store_and_fwd_flag|tip_amount|tolls_amount|total_amount|trip_distance|trip_type|vendorid|
+------------------+-------------------+-----+-----------+---------------------+---------------------+--------------------+-------+---------------+------------+------------------+-------------------+----------+------------------+----------+------------+------------+-------------+---------+--------+
|40.698043823242188|-73.924278259277344|  0.5|          8|       

### Set the Kafka broker hosts information. Replace YOUR_KAFKA_BROKER_HOSTS with the broker hosts information you extracted in step 1. Enter the edited command in the next Jupyter Notebook cell.

In [4]:
// The Kafka broker hosts and topic used to write to Kafka
val kafkaBrokers="wn0-kafkah.uwguulvqr1kevfnztmsfrc5rqc.bx.internal.cloudapp.net:9092,wn1-kafkah.uwguulvqr1kevfnztmsfrc5rqc.bx.internal.cloudapp.net:9092,wn2-kafkah.uwguulvqr1kevfnztmsfrc5rqc.bx.internal.cloudapp.net:9092"
val kafkaTopic="tripdata"

println("Finished setting Kafka broker and topic configuration.")

Finished setting Kafka broker and topic configuration.

### Send the data to Kafka. In the following command, the vendorid field is used as the key value for the Kafka message. The key is used by Kafka when partitioning data. All of the fields are stored in the Kafka message as a JSON string value. Enter the following command in Jupyter to save the data to Kafka using a batch query.

In [5]:
// Select the vendorid as the key and save the JSON string as the value.
val query = taxiDF.selectExpr("CAST(vendorid AS STRING) as key", "to_JSON(struct(*)) AS value").write.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("topic", kafkaTopic).save()

println("Data sent to Kafka")

Data sent to Kafka

### Declare a schema. The following command demonstrates how to use a schema when reading JSON data from kafka. Enter the command in your next Jupyter cell.

In [6]:
// Import bits useed for declaring schemas and working with JSON data
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// Define a schema for the data
val schema = (new StructType).add("dropoff_latitude", StringType).add("dropoff_longitude", StringType).add("extra", StringType).add("fare_amount", StringType).add("improvement_surcharge", StringType).add("lpep_dropoff_datetime", StringType).add("lpep_pickup_datetime", StringType).add("mta_tax", StringType).add("passenger_count", StringType).add("payment_type", StringType).add("pickup_latitude", StringType).add("pickup_longitude", StringType).add("ratecodeid", StringType).add("store_and_fwd_flag", StringType).add("tip_amount", StringType).add("tolls_amount", StringType).add("total_amount", StringType).add("trip_distance", StringType).add("trip_type", StringType).add("vendorid", StringType)
// Reproduced here for readability
//val schema = (new StructType)
//   .add("dropoff_latitude", StringType)
//   .add("dropoff_longitude", StringType)
//   .add("extra", StringType)
//   .add("fare_amount", StringType)
//   .add("improvement_surcharge", StringType)
//   .add("lpep_dropoff_datetime", StringType)
//   .add("lpep_pickup_datetime", StringType)
//   .add("mta_tax", StringType)
//   .add("passenger_count", StringType)
//   .add("payment_type", StringType)
//   .add("pickup_latitude", StringType)
//   .add("pickup_longitude", StringType)
//   .add("ratecodeid", StringType)
//   .add("store_and_fwd_flag", StringType)
//   .add("tip_amount", StringType)
//   .add("tolls_amount", StringType)
//   .add("total_amount", StringType)
//   .add("trip_distance", StringType)
//   .add("trip_type", StringType)
//   .add("vendorid", StringType)

println("Schema declared")

Schema declared

### Select data and start the stream. The following command demonstrates how to retrieve data from Kafka using a batch query. And then write the results out to HDFS on the Spark cluster. In this example, the select retrieves the message (value field) from Kafka and applies the schema to it. The data is then written to HDFS (WASB or ADL) in parquet format. Enter the command in your next Jupyter cell.

In [11]:
// Read a batch from Kafka
val kafkaDF = spark.read.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()

// Select data and write to file
val query = kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip").write.format("parquet").option("path","/nyctaxiraw/batchtripdata").option("checkpointLocation", "/batchcheckpoint").save()

println("Wrote data to file")

Wrote data to file

### You can verify that the files were created by entering the command in your next Jupyter cell. It lists the files in the /example/batchtripdata directory.

In [12]:
%%bash
hdfs dfs -ls /nyctaxiraw/batchtripdata

Found 9 items
-rw-r--r--   1 livy supergroup          0 2020-07-25 18:35 /nyctaxiraw/batchtripdata/_SUCCESS
-rw-r--r--   1 livy supergroup       2160 2020-07-25 18:35 /nyctaxiraw/batchtripdata/part-00000-e15b37f9-7e6c-4db7-a4c3-1b3902cb720f-c000.snappy.parquet
-rw-r--r--   1 livy supergroup       2160 2020-07-25 18:35 /nyctaxiraw/batchtripdata/part-00001-e15b37f9-7e6c-4db7-a4c3-1b3902cb720f-c000.snappy.parquet
-rw-r--r--   1 livy supergroup       2160 2020-07-25 18:35 /nyctaxiraw/batchtripdata/part-00002-e15b37f9-7e6c-4db7-a4c3-1b3902cb720f-c000.snappy.parquet
-rw-r--r--   1 livy supergroup       2160 2020-07-25 18:35 /nyctaxiraw/batchtripdata/part-00003-e15b37f9-7e6c-4db7-a4c3-1b3902cb720f-c000.snappy.parquet
-rw-r--r--   1 livy supergroup       2160 2020-07-25 18:35 /nyctaxiraw/batchtripdata/part-00004-e15b37f9-7e6c-4db7-a4c3-1b3902cb720f-c000.snappy.parquet
-rw-r--r--   1 livy supergroup       2160 2020-07-25 18:35 /nyctaxiraw/batchtripdata/part-00005-e15b37f9-7e6c-4db7-a4c3-1b3902c

### While the previous example used a batch query, the following command demonstrates how to do the same thing using a streaming query. Enter the command in your next Jupyter cell.

In [13]:
// Stream from Kafka
val kafkaStreamDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()

// Select data from the stream and write to file
kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip").writeStream.format("parquet").option("path","/nyctaxiraw/streamingtripdata").option("checkpointLocation", "/streamcheckpoint").start.awaitTermination(30000)
println("Wrote data to file")

java.lang.IllegalStateException: Cannot start query with id 0c1f5828-afbe-4204-9ccc-df128b9afc54 as another query with same id is already active. Perhaps you are attempting to restart a query from checkpoint that is already active.
  at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:300)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
  ... 58 elided


### Run the following cell to verify that the files were written by the streaming query.

In [14]:
%%bash
hdfs dfs -ls /nyctaxiraw/streamingtripdata

Found 1 items
drwxr-xr-x   - livy supergroup          0 2020-07-25 18:35 /nyctaxiraw/streamingtripdata/_spark_metadata
