Skip to content
This repository has been archived by the owner on Feb 16, 2024. It is now read-only.

Commit

Permalink
Update website for Bahir release 2.1.0
Browse files Browse the repository at this point in the history
 - add release notes for version 2.1.0
 - add doc for 2.1.0, update current, add link in navbar
 - correct page title and description for release note pages
  • Loading branch information
ckadner committed Feb 22, 2017
1 parent 92ca3c9 commit 0d53ccd
Show file tree
Hide file tree
Showing 18 changed files with 582 additions and 26 deletions.
2 changes: 2 additions & 0 deletions site/_data/navigation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ topnav:
subcategories:
- title: Bahir Spark Extensions - Current
url: /docs/spark/current/documentation
- title: Bahir Spark Extensions - 2.1.0
url: /docs/spark/2.1.0/documentation
- title: Bahir Spark Extensions - 2.0.2
url: /docs/spark/2.0.2/documentation
- title: Bahir Spark Extensions - 2.0.1
Expand Down
4 changes: 2 additions & 2 deletions site/_data/project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ github_project_name: bahir
description: Apache Bahir provides extensions to distributed analytic platforms such as Apache Spark.

download: /download
latest_release: 2.0.2
latest_release_date: 01/27/2017
latest_release: 2.1.0
latest_release_date: 02/22/2017

dev_list: dev@bahir.apache.org
dev_list_subscribe: dev-subscribe@bahir.apache.org
Expand Down
3 changes: 3 additions & 0 deletions site/_data/releases.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# limitations under the License.
#

- version: 2.1.0
date: 02/22/2017

- version: 2.0.2
date: 01/27/2017

Expand Down
46 changes: 46 additions & 0 deletions site/docs/spark/2.1.0/documentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
---
layout: page
title: Extensions for Apache Spark
description: Extensions for Apache Spark
group: nav-right
---
<!--
{% comment %}
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to you under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
{% endcomment %}
-->

{% include JB/setup %}

### Apache Bahir Extensions for Apache Spark

<br/>

#### Structured Streaming Data Sources

[MQTT data source](../spark-sql-streaming-mqtt)

<br/>

#### Discretized Streams (DStreams) Connectors

[Akka connector](../spark-streaming-akka)

[MQTT connector](../spark-streaming-mqtt)

[Twitter connector](../spark-streaming-twitter)

[ZeroMQ connector](../spark-streaming-zeromq)
147 changes: 147 additions & 0 deletions site/docs/spark/2.1.0/spark-sql-streaming-mqtt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
---
layout: page
title: Spark Structured Streaming MQTT
description: Spark Structured Streaming MQTT
group: nav-right
---
<!--
{% comment %}
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to you under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
{% endcomment %}
-->

{% include JB/setup %}

A library for reading data from MQTT Servers using Spark SQL Streaming ( or Structured streaming.).

## Linking

Using SBT:

libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.1.0"

Using Maven:

<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-sql-streaming-mqtt_2.11</artifactId>
<version>2.1.0</version>
</dependency>

This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
For example, to include it when starting the spark shell:

$ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.1.0

Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
The `--packages` argument can also be used with `bin/spark-submit`.

This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.

## Examples

A SQL Stream can be created with data streams received through MQTT Server using,

sqlContext.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", "mytopic")
.load("tcp://localhost:1883")

## Enable recovering from failures.

Setting values for option `localStorage` and `clientId` helps in recovering in case of a restart, by restoring the state where it left off before the shutdown.

sqlContext.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", "mytopic")
.option("localStorage", "/path/to/localdir")
.option("clientId", "some-client-id")
.load("tcp://localhost:1883")

## Configuration options.

This source uses [Eclipse Paho Java Client](https://eclipse.org/paho/clients/java/). Client API documentation is located [here](http://www.eclipse.org/paho/files/javadoc/index.html).

* `brokerUrl` A url MqttClient connects to. Set this or `path` as the url of the Mqtt Server. e.g. tcp://localhost:1883.
* `persistence` By default it is used for storing incoming messages on disk. If `memory` is provided as value for this option, then recovery on restart is not supported.
* `topic` Topic MqttClient subscribes to.
* `clientId` clientId, this client is assoicated with. Provide the same value to recover a stopped client.
* `QoS` The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe.
* `username` Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors.
* `password` Sets the password to use for the connection.
* `cleanSession` Setting it true starts a clean session, removes all checkpointed messages by a previous run of this source. This is set to false by default.
* `connectionTimeout` Sets the connection timeout, a value of 0 is interpretted as wait until client connects. See `MqttConnectOptions.setConnectionTimeout` for more information.
* `keepAlive` Same as `MqttConnectOptions.setKeepAliveInterval`.
* `mqttVersion` Same as `MqttConnectOptions.setMqttVersion`.

### Scala API

An example, for scala API to count words from incoming message stream.

// Create DataFrame representing the stream of input lines from connection to mqtt server
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", topic)
.load(brokerUrl).as[(String, Timestamp)]

// Split the lines into words
val words = lines.map(_._1).flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()

query.awaitTermination()

Please see `MQTTStreamWordCount.scala` for full example.

### Java API

An example, for Java API to count words from incoming message stream.

// Create DataFrame representing the stream of input lines from connection to mqtt server.
Dataset<String> lines = spark
.readStream()
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", topic)
.load(brokerUrl).select("value").as(Encoders.STRING());

// Split the lines into words
Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING());

// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();

// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.start();

query.awaitTermination();

Please see `JavaMQTTStreamWordCount.java` for full example.

89 changes: 89 additions & 0 deletions site/docs/spark/2.1.0/spark-streaming-akka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
---
layout: page
title: Spark Streaming Akka
description: Spark Streaming Akka
group: nav-right
---
<!--
{% comment %}
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to you under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
{% endcomment %}
-->

{% include JB/setup %}

A library for reading data from Akka Actors using Spark Streaming.

## Linking

Using SBT:

libraryDependencies += "org.apache.bahir" %% "spark-streaming-akka" % "2.1.0"

Using Maven:

<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-akka_2.11</artifactId>
<version>2.1.0</version>
</dependency>

This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
For example, to include it when starting the spark shell:

$ bin/spark-shell --packages org.apache.bahir:spark-streaming-akka_2.11:2.1.0

Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
The `--packages` argument can also be used with `bin/spark-submit`.

This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.

## Examples

DStreams can be created with data streams received through Akka actors by using `AkkaUtils.createStream(ssc, actorProps, actor-name)`.

### Scala API

You need to extend `ActorReceiver` so as to store received data into Spark using `store(...)` methods. The supervisor strategy of
this actor can be configured to handle failures, etc.

class CustomActor extends ActorReceiver {
def receive = {
case data: String => store(data)
}
}

// A new input stream can be created with this custom actor as
val ssc: StreamingContext = ...
val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver")

### Java API

You need to extend `JavaActorReceiver` so as to store received data into Spark using `store(...)` methods. The supervisor strategy of
this actor can be configured to handle failures, etc.

class CustomActor extends JavaActorReceiver {
@Override
public void onReceive(Object msg) throws Exception {
store((String) msg);
}
}

// A new input stream can be created with this custom actor as
JavaStreamingContext jssc = ...;
JavaDStream<String> lines = AkkaUtils.<String>createStream(jssc, Props.create(CustomActor.class), "CustomReceiver");

See end-to-end examples at [Akka Examples](https://github.com/apache/bahir/tree/master/streaming-akka/examples)
Loading

0 comments on commit 0d53ccd

Please sign in to comment.