Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions docs/backend/spark-architecture.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Spark architecture

This section describes how the [Backend architecture](../index.md#backend) is implemented in Spark Streaming.

## Data Flow Graph

Bullet Spark implements the backend piece from the full [Architecture](../index.md#architecture). It is implemented with Spark Streaming:

![Bullet Spark DAG](../img/spark-dag.png)

The components in the [Architecture](../index.md#architecture) have direct counterparts here. The Query Receiver reading from the PubSub layer using plugged-in PubSub consumers and the Query Unioning make up the Request Processor. The Filter Streaming and your plugin for your source of data make up the Data Processor. The Join Streaming and the Result Emitter make up the Combiner.

The red lines are the path for the queries that come in through the PubSub, the orange lines are the path for the signals and the blue lines are for the data from your data source. The shapes of the boxes denote the type of transformation/action being executed in the boxes.

### Data processing

Bullet can accept arbitrary sources of data as long as they can be ingested by Spark. They can be Kafka, Flume, Kinesis, and TCP sockets etc. In order to hook up your data to Bullet Spark, you just need to implement the [Data Producer Trait](https://github.com/bullet-db/bullet-spark/blob/master/src/main/scala/com/yahoo/bullet/spark/DataProducer.scala). In your implementation, you can either:
* Use [Spark Streaming built-in sources](https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers) to receive data. Below is a quick example for a direct Kafka source in Scala. You can also write it in Java:

```scala
import com.yahoo.bullet.spark.DataProducer
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
// import all other necessary packages

class DirectKafkaProducer extends DataProducer {
override def getBulletRecordStream(ssc: StreamingContext, config: BulletSparkConfig): DStream[BulletRecord] = {
val topics = Array("test")
val kafkaParams = Map[String, AnyRef](
"bootstrap.servers" -> "server1, server2",
"group.id" -> "mygroup",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[ByteArrayDeserializer]
// Other kafka params
)

val directKafkaStream = KafkaUtils.createDirectStream[String, Array[Byte]](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, Array[Byte]](topics, kafkaParams))

directKafkaStream.map(record => {
// Convert your record to BulletRecord
})
}
}
```

* Write a [custom receiver](https://spark.apache.org/docs/latest/streaming-custom-receivers.html) to receive data from any arbitrary data source beyond the ones for which it has built-in support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.). See [example](https://github.com/bullet-db/bullet-db.github.io/tree/src/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples).

After receiving your data, you can do any transformations like joins or type conversions in your implementation before emitting to the Filter Streaming stage.

The Filter Streaming stage checks every record from your data source against every query from Query Unioning stage to see if it matches and emits partial results to the Join Streaming stage.

### Request processing

The Query Receiver fetches Bullet queries and signals through the PubSub layer using the Subscribers provided by the plugged in PubSub layer. The queries received through the PubSub also contain information about the query such as its unique identifier, potentially other metadata and signals. The Query Unioning collects all active queries by the stateful transformation [updateStateByKey](https://spark.apache.org/docs/latest/streaming-programming-guide.html#updatestatebykey-operation) and broadcasts all the collected queries to every executor for the Filter Streaming stage.

The Query Unioning also sends all active queries and signals to the Join Streaming stage.

### Combining

The Filter Streaming combines all the partial results from the Filter Streaming by the stateful transformation [mapWithState](https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions@mapWithState[StateType,MappedType](spec:org.apache.spark.streaming.StateSpec[K,V,StateType,MappedType])(implicitevidence$2:scala.reflect.ClassTag[StateType],implicitevidence$3:scala.reflect.ClassTag[MappedType]):org.apache.spark.streaming.dstream.MapWithStateDStream[K,V,StateType,MappedType]) and produces final results.

The Result Emitter uses the particular publisher from the plugged in PubSub layer to send back results/loop signals.
93 changes: 93 additions & 0 deletions docs/backend/spark-setup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Bullet on Spark

This section explains how to set up and run Bullet on Spark.

## Configuration

Bullet is configured at run-time using settings defined in a file. Settings not overridden will default to the values in [bullet_spark_defaults.yaml](https://github.com/bullet-db/bullet-spark/blob/master/src/main/resources/bullet_spark_defaults.yaml). You can find out what these settings do in the comments listed in the defaults.

## Installation

Download the Bullet Spark standalone jar from [JCenter](http://jcenter.bintray.com/com/yahoo/bullet/bullet-spark/).

If you are using Bullet Kafka as pluggable PubSub, you can download the fat jar from [JCenter](http://jcenter.bintray.com/com/yahoo/bullet/bullet-kafka/). Otherwise, you need to plug in your own PubSub jar or use the RESTPubSub built-into bullet-core and turned on in the API.

To use Bullet Spark, you need to implement your own [Data Producer Trait](https://github.com/bullet-db/bullet-spark/blob/master/src/main/scala/com/yahoo/bullet/spark/DataProducer.scala) with a JVM based project. You have two ways to implement it as described in the [Spark Architecture](spark-architecture.md#data-processing) section. You include the Bullet artifact and Spark dependencies in your pom.xml or other equivalent build tools. The artifacts are available through JCenter. Here is an example if you use Scala and Maven:

```xml
<repositories>
<repository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>central</id>
<name>bintray</name>
<url>http://jcenter.bintray.com</url>
</repository>
</repositories>
```

```xml
<properties>
<scala.version>2.11.7</scala.version>
<scala.dep.version>2.11</scala.dep.version>
<spark.version>2.3.0</spark.version>
<bullet.spark.version>0.1.1</bullet.spark.version>
</properties>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.dep.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.dep.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.yahoo.bullet</groupId>
<artifactId>bullet-spark</artifactId>
<version>${bullet.spark.version}</version>
</dependency>
```

You can also add ```<classifier>sources</classifier>``` or ```<classifier>javadoc</classifier>``` if you want the sources or javadoc.

## Launch

After you have implemented your own data producer and built a jar, you could launch your Bullet Spark application. Here is an example command for a [YARN cluster](https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html).

```bash
./bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--class com.yahoo.bullet.spark.BulletSparkStreamingMain \
--queue <your queue> \
--executor-memory 12g \
--executor-cores 2 \
--num-executors 200 \
--driver-cores 2 \
--driver-memory 12g \
--conf spark.streaming.backpressure.enabled=true \
--conf spark.default.parallelism=20 \
... # other Spark settings
--jars /path/to/your-data-producer.jar,/path/to/your-pubsub.jar \
/path/to/downloaded-bullet-spark-standalone.jar \
--bullet-spark-conf /path/to/your-settings.yaml
```

You can pass other Spark settings by adding ```--conf key=value``` to the command. For more settings, you can refer to the [Spark Configuration](https://spark.apache.org/docs/latest/configuration.html).

For other platforms, you can find the commands from the [Spark Documentation](https://spark.apache.org/docs/latest/submitting-applications.html).
Binary file added docs/img/spark-dag.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading