Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/spark/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target/**

4 changes: 4 additions & 0 deletions examples/spark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Bullet-Spark Example

This example generates fake Bullet Records for use in an example.

77 changes: 77 additions & 0 deletions examples/spark/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.yahoo.bullet</groupId>
<artifactId>bullet-spark-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<scala.version>2.11.7</scala.version>
<scala.dep.version>2.11</scala.dep.version>
<spark.version>2.3.0</spark.version>
<bullet.spark.version>0.1.0</bullet.spark.version>
</properties>

<repositories>
<repository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>jcenter</id>
<name>bintray</name>
<url>http://jcenter.bintray.com</url>
</repository>
</repositories>

<dependencies>
<!-- Provided -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.dep.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.dep.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.yahoo.bullet</groupId>
<artifactId>bullet-spark</artifactId>
<version>${bullet.spark.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.8</arg>
</args>
</configuration>
</plugin>
</plugins>
</build>
</project>

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2018, Oath Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.spark.examples

import com.yahoo.bullet.record.BulletRecord
import com.yahoo.bullet.spark.DataProducer
import com.yahoo.bullet.spark.examples.receiver.RandomReceiver
import com.yahoo.bullet.spark.utils.BulletSparkConfig
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

class RandomProducer extends DataProducer {
override def getBulletRecordStream(ssc: StreamingContext, config: BulletSparkConfig): DStream[BulletRecord] = {
// Bullet record input stream.
val bulletReceiver = new RandomReceiver(config)
ssc.receiverStream(bulletReceiver).asInstanceOf[DStream[BulletRecord]]
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2018, Oath Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.spark.examples.receiver

import java.util.UUID

import scala.collection.JavaConverters._
import scala.util.Random

import com.yahoo.bullet.record.BulletRecord
import com.yahoo.bullet.spark.utils.{BulletSparkConfig, BulletSparkLogger}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver


object RandomReceiver {
// Fields in BulletRecord
private val STRING = "uuid"
private val LONG = "tuple_number"
private val DOUBLE = "probability"
private val BOOLEAN_MAP = "tags"
private val STATS_MAP = "stats"
private val LIST = "classifiers"
private val DURATION = "duration"
private val TYPE = "type"
private val RANDOM_MAP_KEY_A = "field_A"
private val RANDOM_MAP_KEY_B = "field_B"
private val PERIOD_COUNT = "period_count"
private val RECORD_NUMBER = "record_number"
private val NANO_TIME = "nano_time"
private val TIMESTAMP = "timestamp"
// Some static values in BulletRecord for the fields
private val STRING_POOL = Array("foo", "bar", "baz", "qux", "quux", "norf")
private val INTEGER_POOL = Array(2057, 13, 10051, 2, 1059, 187)
}

/**
* Constructor that takes a configuration to use.
*
* @param config The BulletSparkConfig to load settings from.
*/
class RandomReceiver(val config: BulletSparkConfig)
extends Receiver[BulletRecord](StorageLevel.MEMORY_AND_DISK_SER) with BulletSparkLogger {
// Number of tuples to emit
private val maxPerPeriod = 100L
// Period in milliseconds. Default 1000 ms
private val period = 1000
private var periodCount = 0L
private var generatedThisPeriod = 0L
private var nextIntervalStart = 0L

override def onStart(): Unit = {
new Thread() {
override def run(): Unit = {
receive()
}
}.start()
logger.info("Random receiver started.")
}

override def onStop(): Unit = {
logger.info("Random receiver stopped.")
}

private def receive(): Unit = {
nextIntervalStart = System.currentTimeMillis()
while (!isStopped) {
val timeNow = System.currentTimeMillis()
// Only emit if we are still in the interval and haven't gone over our per period max
if (timeNow <= nextIntervalStart && generatedThisPeriod < maxPerPeriod) {
store(generateRecord())
generatedThisPeriod += 1
}
if (timeNow > nextIntervalStart) {
logger.info("Generated {} tuples out of {}", generatedThisPeriod, maxPerPeriod)
nextIntervalStart = timeNow + period
generatedThisPeriod = 0
periodCount += 1
}
// It is courteous to sleep for a short time.
try {
Thread.sleep(1)
} catch {
case e: InterruptedException => logger.error("Error: ", e)
}
}
}

private def generateRecord(): BulletRecord = {
val record = new BulletRecord()
val uuid = UUID.randomUUID().toString
record.setString(RandomReceiver.STRING, uuid)
record.setLong(RandomReceiver.LONG, generatedThisPeriod)
record.setDouble(RandomReceiver.DOUBLE, Random.nextDouble())
record.setString(RandomReceiver.TYPE, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)))
record.setLong(RandomReceiver.DURATION, System.nanoTime() % RandomReceiver.INTEGER_POOL(Random.nextInt(RandomReceiver.INTEGER_POOL.length)))
val booleanMap = Map[java.lang.String, java.lang.Boolean](
uuid.substring(0, 8) -> Random.nextBoolean(),
uuid.substring(9, 13) -> Random.nextBoolean(),
uuid.substring(14, 18) -> Random.nextBoolean(),
uuid.substring(19, 23) -> Random.nextBoolean()
)
record.setBooleanMap(RandomReceiver.BOOLEAN_MAP, booleanMap.asJava)
val statsMap = Map[java.lang.String, java.lang.Long](
RandomReceiver.PERIOD_COUNT -> periodCount,
RandomReceiver.RECORD_NUMBER -> (periodCount * maxPerPeriod + generatedThisPeriod),
RandomReceiver.NANO_TIME -> System.nanoTime(),
RandomReceiver.TIMESTAMP -> System.nanoTime()
)
record.setLongMap(RandomReceiver.STATS_MAP, statsMap.asJava)
val randomMapA = Map[java.lang.String, java.lang.String](
RandomReceiver.RANDOM_MAP_KEY_A -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)),
RandomReceiver.RANDOM_MAP_KEY_B -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))
)
val randomMapB = Map[java.lang.String, java.lang.String](
RandomReceiver.RANDOM_MAP_KEY_A -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)),
RandomReceiver.RANDOM_MAP_KEY_B -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))
)
record.setListOfStringMap(RandomReceiver.LIST, List(randomMapA.asJava, randomMapB.asJava).asJava)
record
}
}