From 7bfe003a703ffdc48c63533aa2e966c52d0731bf Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Fri, 25 May 2018 15:47:30 -0700 Subject: [PATCH 1/4] add spark random receiver --- examples/spark/.gitignore | 2 + examples/spark/README.md | 4 + examples/spark/pom.xml | 77 +++++++++++ .../yahoo/bullet/spark/RandomProducer.scala | 21 +++ .../spark/receiver/RandomReceiver.scala | 126 ++++++++++++++++++ 5 files changed, 230 insertions(+) create mode 100644 examples/spark/.gitignore create mode 100644 examples/spark/README.md create mode 100644 examples/spark/pom.xml create mode 100644 examples/spark/src/main/scala/com/yahoo/bullet/spark/RandomProducer.scala create mode 100644 examples/spark/src/main/scala/com/yahoo/bullet/spark/receiver/RandomReceiver.scala diff --git a/examples/spark/.gitignore b/examples/spark/.gitignore new file mode 100644 index 00000000..850262ce --- /dev/null +++ b/examples/spark/.gitignore @@ -0,0 +1,2 @@ +/target/** + diff --git a/examples/spark/README.md b/examples/spark/README.md new file mode 100644 index 00000000..61120612 --- /dev/null +++ b/examples/spark/README.md @@ -0,0 +1,4 @@ +# Bullet-Spark Example + +This example generates fake Bullet Records for use in an example. + diff --git a/examples/spark/pom.xml b/examples/spark/pom.xml new file mode 100644 index 00000000..593c17e3 --- /dev/null +++ b/examples/spark/pom.xml @@ -0,0 +1,77 @@ + + 4.0.0 + com.yahoo.bullet + bullet-spark-example + 0.0.1-SNAPSHOT + jar + + 2.11.7 + 2.11 + 2.3.0 + 0.1.0 + + + + + + false + + jcenter + bintray + http://jcenter.bintray.com + + + + + + + org.scala-lang + scala-library + ${scala.version} + provided + + + org.apache.spark + spark-streaming_${scala.dep.version} + ${spark.version} + provided + + + org.apache.spark + spark-core_${scala.dep.version} + ${spark.version} + provided + + + com.yahoo.bullet + bullet-spark + ${bullet.spark.version} + + + + src/main/scala + + + org.scala-tools + maven-scala-plugin + + + + compile + testCompile + + + + + ${scala.version} + + -target:jvm-1.8 + + + + + + + diff --git a/examples/spark/src/main/scala/com/yahoo/bullet/spark/RandomProducer.scala b/examples/spark/src/main/scala/com/yahoo/bullet/spark/RandomProducer.scala new file mode 100644 index 00000000..1129f4b8 --- /dev/null +++ b/examples/spark/src/main/scala/com/yahoo/bullet/spark/RandomProducer.scala @@ -0,0 +1,21 @@ +/* + * Copyright 2018, Oath Inc. + * Licensed under the terms of the Apache License, Version 2.0. + * See the LICENSE file associated with the project for terms. + */ +package com.yahoo.bullet.spark + +import com.yahoo.bullet.record.BulletRecord +import com.yahoo.bullet.spark.receiver.RandomReceiver +import com.yahoo.bullet.spark.utils.BulletSparkConfig +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.DStream + +class RandomProducer extends DataProducer { + override def getBulletRecordStream(ssc: StreamingContext, config: BulletSparkConfig): DStream[BulletRecord] = { + // Bullet record input stream. + val bulletReceiver = new RandomReceiver(config) + ssc.receiverStream(bulletReceiver).asInstanceOf[DStream[BulletRecord]] + } +} + diff --git a/examples/spark/src/main/scala/com/yahoo/bullet/spark/receiver/RandomReceiver.scala b/examples/spark/src/main/scala/com/yahoo/bullet/spark/receiver/RandomReceiver.scala new file mode 100644 index 00000000..a91a9913 --- /dev/null +++ b/examples/spark/src/main/scala/com/yahoo/bullet/spark/receiver/RandomReceiver.scala @@ -0,0 +1,126 @@ +/* + * Copyright 2018, Oath Inc. + * Licensed under the terms of the Apache License, Version 2.0. + * See the LICENSE file associated with the project for terms. + */ +package com.yahoo.bullet.spark.receiver + +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.util.Random + +import com.yahoo.bullet.record.BulletRecord +import com.yahoo.bullet.spark.utils.{BulletSparkConfig, BulletSparkLogger} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver + + +object RandomReceiver { + // Fields in BulletRecord + private val STRING = "uuid" + private val LONG = "tuple_number" + private val DOUBLE = "probability" + private val BOOLEAN_MAP = "tags" + private val STATS_MAP = "stats" + private val LIST = "classifiers" + private val DURATION = "duration" + private val TYPE = "type" + private val RANDOM_MAP_KEY_A = "field_A" + private val RANDOM_MAP_KEY_B = "field_B" + private val PERIOD_COUNT = "period_count" + private val RECORD_NUMBER = "record_number" + private val NANO_TIME = "nano_time" + private val TIMESTAMP = "timestamp" + // Some static values in BulletRecord for the fields + private val STRING_POOL = Array("foo", "bar", "baz", "qux", "quux", "norf") + private val INTEGER_POOL = Array(2057, 13, 10051, 2, 1059, 187) +} + +/** + * Constructor that takes a configuration to use. + * + * @param config The BulletSparkConfig to load settings from. + */ +class RandomReceiver(val config: BulletSparkConfig) + extends Receiver[BulletRecord](StorageLevel.MEMORY_AND_DISK_SER) with BulletSparkLogger { + // Number of tuples to emit + private val maxPerPeriod = 100L + // Period in milliseconds. Default 1000 ms + private val period = 1000 + private var periodCount = 0L + private var generatedThisPeriod = 0L + private var nextIntervalStart = 0L + + override def onStart(): Unit = { + new Thread() { + override def run(): Unit = { + receive() + } + }.start() + logger.info("Random receiver started.") + } + + override def onStop(): Unit = { + logger.info("Random receiver stopped.") + } + + private def receive(): Unit = { + nextIntervalStart = System.currentTimeMillis() + while (!isStopped) { + val timeNow = System.currentTimeMillis() + // Only emit if we are still in the interval and haven't gone over our per period max + if (timeNow <= nextIntervalStart && generatedThisPeriod < maxPerPeriod) { + store(generateRecord()) + generatedThisPeriod += 1 + } + if (timeNow > nextIntervalStart) { + logger.info("Generated {} tuples out of {}", generatedThisPeriod, maxPerPeriod) + nextIntervalStart = timeNow + period + generatedThisPeriod = 0 + periodCount += 1 + } + // It is courteous to sleep for a short time if you're not emitting anything... + try + Thread.sleep(1) + catch { + case e: InterruptedException => logger.error("Error: ", e) + } + } + } + + private def generateRecord(): BulletRecord = { + val record = new BulletRecord() + val uuid = UUID.randomUUID().toString + record.setString(RandomReceiver.STRING, uuid) + record.setLong(RandomReceiver.LONG, generatedThisPeriod) + record.setDouble(RandomReceiver.DOUBLE, Random.nextDouble()) + record.setString(RandomReceiver.TYPE, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))) + record.setLong(RandomReceiver.DURATION, System.nanoTime() % RandomReceiver.INTEGER_POOL(Random.nextInt(RandomReceiver.INTEGER_POOL.length))) + val booleanMap = Map[java.lang.String, java.lang.Boolean]( + uuid.substring(0, 8) -> Random.nextBoolean(), + uuid.substring(9, 13) -> Random.nextBoolean(), + uuid.substring(14, 18) -> Random.nextBoolean(), + uuid.substring(19, 23) -> Random.nextBoolean() + ) + record.setBooleanMap(RandomReceiver.BOOLEAN_MAP, booleanMap.asJava) + val statsMap = Map[java.lang.String, java.lang.Long]( + RandomReceiver.PERIOD_COUNT -> periodCount, + RandomReceiver.RECORD_NUMBER -> (periodCount * maxPerPeriod + generatedThisPeriod), + RandomReceiver.NANO_TIME -> System.nanoTime(), + RandomReceiver.TIMESTAMP -> System.nanoTime() + ) + record.setLongMap(RandomReceiver.STATS_MAP, statsMap.asJava) + val randomMapA = Map[java.lang.String, java.lang.String]( + RandomReceiver.RANDOM_MAP_KEY_A -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)), + RandomReceiver.RANDOM_MAP_KEY_B -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)) + ) + val randomMapB = Map[java.lang.String, java.lang.String]( + RandomReceiver.RANDOM_MAP_KEY_A -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)), + RandomReceiver.RANDOM_MAP_KEY_B -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)) + ) + record.setListOfStringMap(RandomReceiver.LIST, List(randomMapA.asJava, randomMapB.asJava).asJava) + record + } +} + From ccf36315cb2e3b16103ff560aa8ccd2fa9d0fe06 Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Fri, 25 May 2018 15:58:36 -0700 Subject: [PATCH 2/4] remove sleeping and change dir structure --- .../bullet/spark/{ => examples}/RandomProducer.scala | 5 +++-- .../spark/{ => examples}/receiver/RandomReceiver.scala | 8 +------- 2 files changed, 4 insertions(+), 9 deletions(-) rename examples/spark/src/main/scala/com/yahoo/bullet/spark/{ => examples}/RandomProducer.scala (82%) rename examples/spark/src/main/scala/com/yahoo/bullet/spark/{ => examples}/receiver/RandomReceiver.scala (94%) diff --git a/examples/spark/src/main/scala/com/yahoo/bullet/spark/RandomProducer.scala b/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/RandomProducer.scala similarity index 82% rename from examples/spark/src/main/scala/com/yahoo/bullet/spark/RandomProducer.scala rename to examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/RandomProducer.scala index 1129f4b8..531ecfee 100644 --- a/examples/spark/src/main/scala/com/yahoo/bullet/spark/RandomProducer.scala +++ b/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/RandomProducer.scala @@ -3,10 +3,11 @@ * Licensed under the terms of the Apache License, Version 2.0. * See the LICENSE file associated with the project for terms. */ -package com.yahoo.bullet.spark +package com.yahoo.bullet.spark.examples import com.yahoo.bullet.record.BulletRecord -import com.yahoo.bullet.spark.receiver.RandomReceiver +import com.yahoo.bullet.spark.DataProducer +import com.yahoo.bullet.spark.examples.receiver.RandomReceiver import com.yahoo.bullet.spark.utils.BulletSparkConfig import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.DStream diff --git a/examples/spark/src/main/scala/com/yahoo/bullet/spark/receiver/RandomReceiver.scala b/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala similarity index 94% rename from examples/spark/src/main/scala/com/yahoo/bullet/spark/receiver/RandomReceiver.scala rename to examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala index a91a9913..732c565c 100644 --- a/examples/spark/src/main/scala/com/yahoo/bullet/spark/receiver/RandomReceiver.scala +++ b/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala @@ -3,7 +3,7 @@ * Licensed under the terms of the Apache License, Version 2.0. * See the LICENSE file associated with the project for terms. */ -package com.yahoo.bullet.spark.receiver +package com.yahoo.bullet.spark.examples.receiver import java.util.UUID @@ -80,12 +80,6 @@ class RandomReceiver(val config: BulletSparkConfig) generatedThisPeriod = 0 periodCount += 1 } - // It is courteous to sleep for a short time if you're not emitting anything... - try - Thread.sleep(1) - catch { - case e: InterruptedException => logger.error("Error: ", e) - } } } From 2c3dbd6f24fceff8c5a14a173039dfcc581e8e75 Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Fri, 25 May 2018 16:07:50 -0700 Subject: [PATCH 3/4] add sleep to yield CPU --- .../bullet/spark/examples/receiver/RandomReceiver.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala b/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala index 732c565c..a98f7f8a 100644 --- a/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala +++ b/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala @@ -80,6 +80,12 @@ class RandomReceiver(val config: BulletSparkConfig) generatedThisPeriod = 0 periodCount += 1 } + // It is courteous to sleep for a short time. + try + Thread.sleep(1) + catch { + case e: InterruptedException => logger.error("Error: ", e) + } } } From 4d50e6617dae08dfd0d6aeb051c0ac2aed3422e5 Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Fri, 25 May 2018 16:08:59 -0700 Subject: [PATCH 4/4] minor change --- .../yahoo/bullet/spark/examples/receiver/RandomReceiver.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala b/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala index a98f7f8a..e696cbbe 100644 --- a/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala +++ b/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala @@ -81,9 +81,9 @@ class RandomReceiver(val config: BulletSparkConfig) periodCount += 1 } // It is courteous to sleep for a short time. - try + try { Thread.sleep(1) - catch { + } catch { case e: InterruptedException => logger.error("Error: ", e) } }