Skip to content

Spark Streaming

Hoa Nguyen edited this page Jan 19, 2018 · 6 revisions

Spark Streaming Dev

Table of Contents

  1. Introduction

  2. Requirements

  3. Setting up the Kafka Interface

  4. Setting up Spark Streaming

4.1. Setup your spark streaming directory

4.2. Compile and submit job to Spark

### Introduction

Spark Streaming brings Spark’s language-integrated API to stream processing, letting you write streaming jobs the same way you write batch jobs. It supports Java, Scala and Python. In this example, we will use Spark Streaming and consume messages from Kafka in a distributed manner.

### Requirements * AWS Cluster (4 Nodes) * Spark * HDFS * Kafka ### Setting up the Kafka Interface

Please go through the Kafka - Advanced DEV to create a topic with multiple (4) partitions and also have 8 threads from your local computer act as producers to the Kafka topic broker.

Next we will read in each of these partitions in parallel into a Spark Streaming job and compute the average price and total volume for each source in the last 2 seconds.

### Setting up Spark Streaming #### Setup your spark streaming directory

Create a folder for you project called stream-example. Your directory structure should look like the following.

Let’s start with the build.sbt at the top level which will be used to build the Spark Streaming application. Place the following dependencies in the build.sbt file

name := "price_data"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.2.1" % "provided",
"org.apache.spark" %% "spark-sql" % "2.2.1" % "provided",
"org.apache.spark" %% "spark-streaming" % "2.2.1" % "provided",
"org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.1"
)

mergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf")          => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")      => MergeStrategy.discard
  case "log4j.properties"                                  => MergeStrategy.discard
  case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
  case "reference.conf"                                    => MergeStrategy.concat
  case _                                                   => MergeStrategy.first
}

Since we will be using Kafka as our data stream, we must bring this in as a dependency using sbt assembly. Place the following into the project/plugins.sbt file

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

Let’s now create the source file under the src/main/scala named price_data.scala. This file will compute the average price and total volume traded in the last 2 seconds for each data source by streaming data from the 4 Kafka partitions. Place the following into price_data.scala

import kafka.serializer.StringDecoder

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.sql._

object PriceDataStreaming {
  def main(args: Array[String]) {

    val brokers = "ec2-52-26-67-62.us-west-2.compute.amazonaws.com:9092"
    val topics = "price_data_part4"
    val topicsSet = topics.split(",").toSet

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("price_data")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

    // Get the lines and show results
    messages.foreachRDD { rdd =>

        val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
        import sqlContext.implicits._

        val lines = rdd.map(_._2)
        val ticksDF = lines.map( x => {
                                  val tokens = x.split(";")
                                  Tick(tokens(0), tokens(2).toDouble, tokens(3).toInt)}).toDF()
        val ticks_per_source_DF = ticksDF.groupBy("source")
                                .agg("price" -> "avg", "volume" -> "sum")
                                .orderBy("source")

        ticks_per_source_DF.show()
    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

case class Tick(source: String, price: Double, volume: Int)

/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {

  @transient  private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}
#### Compile and submit job to Spark

We can now build the project by calling the following from the top of the project directory and bring in the Kafka dependencies. This will create a jar containing the Kafka dependencies under the target/scala_2.10 folder

master-node:~/stream-example$ sbt assembly

Lastly we can build the project with the remaining dependencies with the following

master-node:~/stream-example$ sbt package

The project can then be submitted to Spark with the following

master-node:~/stream-example$ spark-submit --class  --master spark://:7077 --jars target/scala-2.11/price_data-assembly-1.0.jar target/scala-2.11/price_data-assembly-1.0.jar

e.g.
master-node:~/stream-example$ spark-submit --class PriceDataStreaming --master spark://ip-171-20-36-249:7077 --jars target/scala-2.11/price_data-assembly-1.0.jar target/scala-2.11/price_data-assembly-1.0.jar

After some initialization you should start seeing results being computed and print to screen every 2 seconds.