From ebf35a3a1686bd83c07d15baa2f0109e25a2cf4b Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 15 Mar 2016 13:20:17 -0700 Subject: [PATCH 1/3] Remove docs of streaming-akka, streaming-zeromq, streaming-mqtt, streaming-flume and streaming-twitter --- docs/streaming-custom-receivers.md | 2 +- docs/streaming-flume-integration.md | 172 ------------------ docs/streaming-programming-guide.md | 89 +++------ .../main/python/streaming/flume_wordcount.py | 56 ------ .../main/python/streaming/mqtt_wordcount.py | 59 ------ 5 files changed, 23 insertions(+), 355 deletions(-) delete mode 100644 docs/streaming-flume-integration.md delete mode 100644 examples/src/main/python/streaming/flume_wordcount.py delete mode 100644 examples/src/main/python/streaming/mqtt_wordcount.py diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index a4e17fd24eac2..71c184c28b381 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -4,7 +4,7 @@ title: Spark Streaming Custom Receivers --- Spark Streaming can receive streaming data from any arbitrary data source beyond -the ones for which it has built-in support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.). +the ones for which it has built-in support (that is, beyond Kafka, Kinesis, files, sockets, etc.). This requires the developer to implement a *receiver* that is customized for receiving data from the concerned data source. This guide walks through the process of implementing a custom receiver and using it in a Spark Streaming application. Note that custom receivers can be implemented diff --git a/docs/streaming-flume-integration.md b/docs/streaming-flume-integration.md deleted file mode 100644 index 8eeeee75dbf40..0000000000000 --- a/docs/streaming-flume-integration.md +++ /dev/null @@ -1,172 +0,0 @@ ---- -layout: global -title: Spark Streaming + Flume Integration Guide ---- - -[Apache Flume](https://flume.apache.org/) is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Here we explain how to configure Flume and Spark Streaming to receive data from Flume. There are two approaches to this. - -## Approach 1: Flume-style Push-based Approach -Flume is designed to push data between Flume agents. In this approach, Spark Streaming essentially sets up a receiver that acts an Avro agent for Flume, to which Flume can push the data. Here are the configuration steps. - -#### General Requirements -Choose a machine in your cluster such that - -- When your Flume + Spark Streaming application is launched, one of the Spark workers must run on that machine. - -- Flume can be configured to push data to a port on that machine. - -Due to the push model, the streaming application needs to be up, with the receiver scheduled and listening on the chosen port, for Flume to be able push data. - -#### Configuring Flume -Configure Flume agent to send data to an Avro sink by having the following in the configuration file. - - agent.sinks = avroSink - agent.sinks.avroSink.type = avro - agent.sinks.avroSink.channel = memoryChannel - agent.sinks.avroSink.hostname = - agent.sinks.avroSink.port = - -See the [Flume's documentation](https://flume.apache.org/documentation.html) for more information about -configuring Flume agents. - -#### Configuring Spark Streaming Application -1. **Linking:** In your SBT/Maven project definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). - - groupId = org.apache.spark - artifactId = spark-streaming-flume_{{site.SCALA_BINARY_VERSION}} - version = {{site.SPARK_VERSION_SHORT}} - -2. **Programming:** In the streaming application code, import `FlumeUtils` and create input DStream as follows. - -
-
- import org.apache.spark.streaming.flume._ - - val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]) - - See the [API docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$) - and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala). -
-
- import org.apache.spark.streaming.flume.*; - - JavaReceiverInputDStream flumeStream = - FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]); - - See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html) - and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java). -
-
- from pyspark.streaming.flume import FlumeUtils - - flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]) - - By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type. - See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils) - and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/flume_wordcount.py). -
-
- - Note that the hostname should be the same as the one used by the resource manager in the - cluster (Mesos, YARN or Spark Standalone), so that resource allocation can match the names and launch - the receiver in the right machine. - -3. **Deploying:** As with any Spark applications, `spark-submit` is used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications. - - For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). - - For Python applications which lack SBT/Maven project management, `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies can be directly added to `spark-submit` using `--packages` (see [Application Submission Guide](submitting-applications.html)). That is, - - ./bin/spark-submit --packages org.apache.spark:spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... - - Alternatively, you can also download the JAR of the Maven artifact `spark-streaming-flume-assembly` from the - [Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-flume-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`. - -## Approach 2: Pull-based Approach using a Custom Sink -Instead of Flume pushing data directly to Spark Streaming, this approach runs a custom Flume sink that allows the following. - -- Flume pushes data into the sink, and the data stays buffered. -- Spark Streaming uses a [reliable Flume receiver](streaming-programming-guide.html#receiver-reliability) - and transactions to pull data from the sink. Transactions succeed only after data is received and - replicated by Spark Streaming. - -This ensures stronger reliability and -[fault-tolerance guarantees](streaming-programming-guide.html#fault-tolerance-semantics) -than the previous approach. However, this requires configuring Flume to run a custom sink. -Here are the configuration steps. - -#### General Requirements -Choose a machine that will run the custom sink in a Flume agent. The rest of the Flume pipeline is configured to send data to that agent. Machines in the Spark cluster should have access to the chosen machine running the custom sink. - -#### Configuring Flume -Configuring Flume on the chosen machine requires the following two steps. - -1. **Sink JARs**: Add the following JARs to Flume's classpath (see [Flume's documentation](https://flume.apache.org/documentation.html) to see how) in the machine designated to run the custom sink . - - (i) *Custom sink JAR*: Download the JAR corresponding to the following artifact (or [direct link](http://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-flume-sink_{{site.SCALA_BINARY_VERSION}}/{{site.SPARK_VERSION_SHORT}}/spark-streaming-flume-sink_{{site.SCALA_BINARY_VERSION}}-{{site.SPARK_VERSION_SHORT}}.jar)). - - groupId = org.apache.spark - artifactId = spark-streaming-flume-sink_{{site.SCALA_BINARY_VERSION}} - version = {{site.SPARK_VERSION_SHORT}} - - (ii) *Scala library JAR*: Download the Scala library JAR for Scala {{site.SCALA_VERSION}}. It can be found with the following artifact detail (or, [direct link](http://search.maven.org/remotecontent?filepath=org/scala-lang/scala-library/{{site.SCALA_VERSION}}/scala-library-{{site.SCALA_VERSION}}.jar)). - - groupId = org.scala-lang - artifactId = scala-library - version = {{site.SCALA_VERSION}} - - (iii) *Commons Lang 3 JAR*: Download the Commons Lang 3 JAR. It can be found with the following artifact detail (or, [direct link](http://search.maven.org/remotecontent?filepath=org/apache/commons/commons-lang3/3.3.2/commons-lang3-3.3.2.jar)). - - groupId = org.apache.commons - artifactId = commons-lang3 - version = 3.3.2 - -2. **Configuration file**: On that machine, configure Flume agent to send data to an Avro sink by having the following in the configuration file. - - agent.sinks = spark - agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink - agent.sinks.spark.hostname = - agent.sinks.spark.port = - agent.sinks.spark.channel = memoryChannel - - Also make sure that the upstream Flume pipeline is configured to send the data to the Flume agent running this sink. - -See the [Flume's documentation](https://flume.apache.org/documentation.html) for more information about -configuring Flume agents. - -#### Configuring Spark Streaming Application -1. **Linking:** In your SBT/Maven project definition, link your streaming application against the `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide). - -2. **Programming:** In the streaming application code, import `FlumeUtils` and create input DStream as follows. - -
-
- import org.apache.spark.streaming.flume._ - - val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]) -
-
- import org.apache.spark.streaming.flume.*; - - JavaReceiverInputDStreamflumeStream = - FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]); -
-
- from pyspark.streaming.flume import FlumeUtils - - addresses = [([sink machine hostname 1], [sink port 1]), ([sink machine hostname 2], [sink port 2])] - flumeStream = FlumeUtils.createPollingStream(streamingContext, addresses) - - By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type. - See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils). -
-
- - See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala). - - Note that each input DStream can be configured to receive data from multiple sinks. - -3. **Deploying:** This is same as the first approach. - - - diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 6c36b41e78d52..2af05832d563f 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -11,7 +11,7 @@ description: Spark Streaming programming guide and tutorial for Spark SPARK_VERS # Overview Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources -like Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets, and can be processed using complex +like Kafka, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like `map`, `reduce`, `join` and `window`. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark's @@ -40,7 +40,7 @@ stream of results in batches. Spark Streaming provides a high-level abstraction called *discretized stream* or *DStream*, which represents a continuous stream of data. DStreams can be created either from input data -streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level +streams from sources such as Kafka, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of [RDDs](api/scala/index.html#org.apache.spark.rdd.RDD). @@ -408,7 +408,7 @@ Similar to Spark, Spark Streaming is available through Maven Central. To write y -For ingesting data from sources like Kafka, Flume, and Kinesis that are not present in the Spark +For ingesting data from sources like Kafka, and Kinesis that are not present in the Spark Streaming core API, you will have to add the corresponding artifact `spark-streaming-xyz_{{site.SCALA_BINARY_VERSION}}` to the dependencies. For example, @@ -417,11 +417,7 @@ some of the common ones are as follows. - - - -
SourceArtifact
Kafka spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}
Flume spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}
Kinesis
spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} [Amazon Software License]
Twitter spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}}
ZeroMQ spark-streaming-zeromq_{{site.SCALA_BINARY_VERSION}}
MQTT spark-streaming-mqtt_{{site.SCALA_BINARY_VERSION}}
@@ -595,7 +591,7 @@ Spark Streaming provides two categories of built-in streaming sources. - *Basic sources*: Sources directly available in the StreamingContext API. Examples: file systems, and socket connections. -- *Advanced sources*: Sources like Kafka, Flume, Kinesis, Twitter, etc. are available through +- *Advanced sources*: Sources like Kafka, Kinesis, etc. are available through extra utility classes. These require linking against extra dependencies as discussed in the [linking](#linking) section. @@ -615,7 +611,7 @@ as well as to run the receiver(s). - When running a Spark Streaming program locally, do not use "local" or "local[1]" as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using - a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will + a input DStream based on a receiver (e.g. sockets, Kafka, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use "local[*n*]" as the master URL, where *n* > number of receivers to run (see [Spark Properties](configuration.html#spark-properties) for information on how to set @@ -672,38 +668,12 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea {:.no_toc} Python API As of Spark {{site.SPARK_VERSION_SHORT}}, -out of these sources, Kafka, Kinesis, Flume and MQTT are available in the Python API. +out of these sources, Kafka and Kinesis are available in the Python API. This category of sources require interfacing with external non-Spark libraries, some of them with -complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts +complex dependencies (e.g., Kafka). Hence, to minimize issues related to version conflicts of dependencies, the functionality to create DStreams from these sources has been moved to separate -libraries that can be [linked](#linking) to explicitly when necessary. For example, if you want to -create a DStream using data from Twitter's stream of tweets, you have to do the following: - -1. *Linking*: Add the artifact `spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}}` to the - SBT/Maven project dependencies. -1. *Programming*: Import the `TwitterUtils` class and create a DStream with - `TwitterUtils.createStream` as shown below. -1. *Deploying*: Generate an uber JAR with all the dependencies (including the dependency - `spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}}` and its transitive dependencies) and - then deploy the application. This is further explained in the [Deploying section](#deploying-applications). - -
-
-{% highlight scala %} -import org.apache.spark.streaming.twitter._ - -TwitterUtils.createStream(ssc, None) -{% endhighlight %} -
-
-{% highlight java %} -import org.apache.spark.streaming.twitter.*; - -TwitterUtils.createStream(jssc); -{% endhighlight %} -
-
+libraries that can be [linked](#linking) to explicitly when necessary. Note that these advanced sources are not available in the Spark shell, hence applications based on these advanced sources cannot be tested in the shell. If you really want to use them in the Spark @@ -714,19 +684,8 @@ Some of these advanced sources are as follows. - **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kafka 0.8.2.1. See the [Kafka Integration Guide](streaming-kafka-integration.html) for more details. -- **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Flume 1.6.0. See the [Flume Integration Guide](streaming-flume-integration.html) for more details. - - **Kinesis:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kinesis Client Library 1.2.1. See the [Kinesis Integration Guide](streaming-kinesis-integration.html) for more details. -- **Twitter:** Spark Streaming's TwitterUtils uses Twitter4j to get the public stream of tweets using - [Twitter's Streaming API](https://dev.twitter.com/docs/streaming-apis). Authentication information - can be provided by any of the [methods](http://twitter4j.org/en/configuration.html) supported by - Twitter4J library. You can either get the public stream, or get the filtered stream based on a - keywords. See the API documentation ([Scala](api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$), - [Java](api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html)) and examples - ([TwitterPopularTags]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala) - and [TwitterAlgebirdCMS]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala)). - ### Custom Sources {:.no_toc} @@ -741,7 +700,7 @@ Guide](streaming-custom-receivers.html) for details. {:.no_toc} There can be two kinds of data sources based on their *reliability*. Sources -(like Kafka and Flume) allow the transferred data to be acknowledged. If the system receiving +(like Kafka) allow the transferred data to be acknowledged. If the system receiving data from these *reliable* sources acknowledges the received data correctly, it can be ensured that no data will be lost due to any kind of failure. This leads to two kinds of receivers: @@ -1721,7 +1680,7 @@ operations on the same data). For window-based operations like `reduceByWindow` Hence, DStreams generated by window-based operations are automatically persisted in memory, without the developer calling `persist()`. -For input streams that receive data over the network (such as, Kafka, Flume, sockets, etc.), the +For input streams that receive data over the network (such as, Kafka, sockets, etc.), the default persistence level is set to replicate the data to two nodes for fault-tolerance. Note that, unlike RDDs, the default persistence level of DStreams keeps the data serialized in @@ -1927,10 +1886,10 @@ To run a Spark Streaming applications, you need to have the following. - *Package the application JAR* - You have to compile your streaming application into a JAR. If you are using [`spark-submit`](submitting-applications.html) to start the application, then you will not need to provide Spark and Spark Streaming in the JAR. However, - if your application uses [advanced sources](#advanced-sources) (e.g. Kafka, Flume, Twitter), + if your application uses [advanced sources](#advanced-sources) (e.g. Kafka), then you will have to package the extra artifact they link to, along with their dependencies, - in the JAR that is used to deploy the application. For example, an application using `TwitterUtils` - will have to include `spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}}` and all its + in the JAR that is used to deploy the application. For example, an application using `KafkaUtils` + will have to include `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and all its transitive dependencies in the application JAR. - *Configuring sufficient memory for the executors* - Since the received data must be stored in @@ -2011,7 +1970,7 @@ for graceful shutdown options) which ensure data that has been received is compl processed before shutdown. Then the upgraded application can be started, which will start processing from the same point where the earlier application left off. Note that this can be done only with input sources that support source-side buffering -(like Kafka, and Flume) as data needs to be buffered while the previous application was down and +(like Kafka) as data needs to be buffered while the previous application was down and the upgraded application is not yet up. And restarting from earlier checkpoint information of pre-upgrade code cannot be done. The checkpoint information essentially contains serialized Scala/Java/Python objects and trying to deserialize objects with new, @@ -2066,7 +2025,7 @@ highlights some of the most important ones. ### Level of Parallelism in Data Receiving {:.no_toc} -Receiving data over the network (like Kafka, Flume, socket, etc.) requires the data to be deserialized +Receiving data over the network (like Kafka, socket, etc.) requires the data to be deserialized and stored in Spark. If the data receiving becomes a bottleneck in the system, then consider parallelizing the data receiving. Note that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data. @@ -2398,8 +2357,7 @@ additional effort may be necessary to achieve exactly-once semantics. There are Between Spark 0.9.1 and Spark 1.0, there were a few API changes made to ensure future API stability. This section elaborates the steps required to migrate your existing code to 1.0. -**Input DStreams**: All operations that create an input stream (e.g., `StreamingContext.socketStream`, -`FlumeUtils.createStream`, etc.) now returns +**Input DStreams**: All operations that create an input stream (e.g., `StreamingContext.socketStream`, etc.) now returns [InputDStream](api/scala/index.html#org.apache.spark.streaming.dstream.InputDStream) / [ReceiverInputDStream](api/scala/index.html#org.apache.spark.streaming.dstream.ReceiverInputDStream) (instead of DStream) for Scala, and [JavaInputDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaInputDStream.html) / @@ -2443,29 +2401,26 @@ Please refer to the project for more details. # Where to Go from Here * Additional guides - [Kafka Integration Guide](streaming-kafka-integration.html) - - [Flume Integration Guide](streaming-flume-integration.html) - [Kinesis Integration Guide](streaming-kinesis-integration.html) - [Custom Receiver Guide](streaming-custom-receivers.html) +* External DStream data sources: + - [DStream Flume](https://github.com/spark-packages/dstream-flume) + - [DStream MQTT](https://github.com/spark-packages/dstream-mqtt) + - [DStream Twitter](https://github.com/spark-packages/dstream-twitter) + - [DStream Akka](https://github.com/spark-packages/dstream-akka) + - [DStream ZeroMQ](https://github.com/spark-packages/dstream-zeromq) * API documentation - Scala docs * [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) and [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream) * [KafkaUtils](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$), - [FlumeUtils](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$), [KinesisUtils](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$), - [TwitterUtils](api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$), - [ZeroMQUtils](api/scala/index.html#org.apache.spark.streaming.zeromq.ZeroMQUtils$), and - [MQTTUtils](api/scala/index.html#org.apache.spark.streaming.mqtt.MQTTUtils$) - Java docs * [JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html), [JavaDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaDStream.html) and [JavaPairDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaPairDStream.html) * [KafkaUtils](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html), - [FlumeUtils](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html), [KinesisUtils](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html) - [TwitterUtils](api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html), - [ZeroMQUtils](api/java/index.html?org/apache/spark/streaming/zeromq/ZeroMQUtils.html), and - [MQTTUtils](api/java/index.html?org/apache/spark/streaming/mqtt/MQTTUtils.html) - Python docs * [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) and [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream) * [KafkaUtils](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils) diff --git a/examples/src/main/python/streaming/flume_wordcount.py b/examples/src/main/python/streaming/flume_wordcount.py deleted file mode 100644 index d75bc6daac138..0000000000000 --- a/examples/src/main/python/streaming/flume_wordcount.py +++ /dev/null @@ -1,56 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" - Counts words in UTF8 encoded, '\n' delimited text received from the network every second. - Usage: flume_wordcount.py - - To run this on your local machine, you need to setup Flume first, see - https://flume.apache.org/documentation.html - - and then run the example - `$ bin/spark-submit --jars \ - external/flume-assembly/target/scala-*/spark-streaming-flume-assembly-*.jar \ - examples/src/main/python/streaming/flume_wordcount.py \ - localhost 12345 -""" -from __future__ import print_function - -import sys - -from pyspark import SparkContext -from pyspark.streaming import StreamingContext -from pyspark.streaming.flume import FlumeUtils - -if __name__ == "__main__": - if len(sys.argv) != 3: - print("Usage: flume_wordcount.py ", file=sys.stderr) - exit(-1) - - sc = SparkContext(appName="PythonStreamingFlumeWordCount") - ssc = StreamingContext(sc, 1) - - hostname, port = sys.argv[1:] - kvs = FlumeUtils.createStream(ssc, hostname, int(port)) - lines = kvs.map(lambda x: x[1]) - counts = lines.flatMap(lambda line: line.split(" ")) \ - .map(lambda word: (word, 1)) \ - .reduceByKey(lambda a, b: a+b) - counts.pprint() - - ssc.start() - ssc.awaitTermination() diff --git a/examples/src/main/python/streaming/mqtt_wordcount.py b/examples/src/main/python/streaming/mqtt_wordcount.py deleted file mode 100644 index abf9c0e21d307..0000000000000 --- a/examples/src/main/python/streaming/mqtt_wordcount.py +++ /dev/null @@ -1,59 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" - A sample wordcount with MqttStream stream - Usage: mqtt_wordcount.py - - To run this in your local machine, you need to setup a MQTT broker and publisher first, - Mosquitto is one of the open source MQTT Brokers, see - http://mosquitto.org/ - Eclipse paho project provides number of clients and utilities for working with MQTT, see - http://www.eclipse.org/paho/#getting-started - - and then run the example - `$ bin/spark-submit --jars \ - external/mqtt-assembly/target/scala-*/spark-streaming-mqtt-assembly-*.jar \ - examples/src/main/python/streaming/mqtt_wordcount.py \ - tcp://localhost:1883 foo` -""" - -import sys - -from pyspark import SparkContext -from pyspark.streaming import StreamingContext -from pyspark.streaming.mqtt import MQTTUtils - -if __name__ == "__main__": - if len(sys.argv) != 3: - print >> sys.stderr, "Usage: mqtt_wordcount.py " - exit(-1) - - sc = SparkContext(appName="PythonStreamingMQTTWordCount") - ssc = StreamingContext(sc, 1) - - brokerUrl = sys.argv[1] - topic = sys.argv[2] - - lines = MQTTUtils.createStream(ssc, brokerUrl, topic) - counts = lines.flatMap(lambda line: line.split(" ")) \ - .map(lambda word: (word, 1)) \ - .reduceByKey(lambda a, b: a+b) - counts.pprint() - - ssc.start() - ssc.awaitTermination() From 0655af67c58aa3033811e73d4ac4e48234e10055 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 23 Mar 2016 15:36:16 -0700 Subject: [PATCH 2/3] Add Flume back --- docs/streaming-custom-receivers.md | 2 +- docs/streaming-flume-integration.md | 172 ++++++++++++++++++ docs/streaming-programming-guide.md | 32 ++-- .../main/python/streaming/flume_wordcount.py | 56 ++++++ 4 files changed, 247 insertions(+), 15 deletions(-) create mode 100644 docs/streaming-flume-integration.md create mode 100644 examples/src/main/python/streaming/flume_wordcount.py diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 71c184c28b381..a4e17fd24eac2 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -4,7 +4,7 @@ title: Spark Streaming Custom Receivers --- Spark Streaming can receive streaming data from any arbitrary data source beyond -the ones for which it has built-in support (that is, beyond Kafka, Kinesis, files, sockets, etc.). +the ones for which it has built-in support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.). This requires the developer to implement a *receiver* that is customized for receiving data from the concerned data source. This guide walks through the process of implementing a custom receiver and using it in a Spark Streaming application. Note that custom receivers can be implemented diff --git a/docs/streaming-flume-integration.md b/docs/streaming-flume-integration.md new file mode 100644 index 0000000000000..8eeeee75dbf40 --- /dev/null +++ b/docs/streaming-flume-integration.md @@ -0,0 +1,172 @@ +--- +layout: global +title: Spark Streaming + Flume Integration Guide +--- + +[Apache Flume](https://flume.apache.org/) is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Here we explain how to configure Flume and Spark Streaming to receive data from Flume. There are two approaches to this. + +## Approach 1: Flume-style Push-based Approach +Flume is designed to push data between Flume agents. In this approach, Spark Streaming essentially sets up a receiver that acts an Avro agent for Flume, to which Flume can push the data. Here are the configuration steps. + +#### General Requirements +Choose a machine in your cluster such that + +- When your Flume + Spark Streaming application is launched, one of the Spark workers must run on that machine. + +- Flume can be configured to push data to a port on that machine. + +Due to the push model, the streaming application needs to be up, with the receiver scheduled and listening on the chosen port, for Flume to be able push data. + +#### Configuring Flume +Configure Flume agent to send data to an Avro sink by having the following in the configuration file. + + agent.sinks = avroSink + agent.sinks.avroSink.type = avro + agent.sinks.avroSink.channel = memoryChannel + agent.sinks.avroSink.hostname = + agent.sinks.avroSink.port = + +See the [Flume's documentation](https://flume.apache.org/documentation.html) for more information about +configuring Flume agents. + +#### Configuring Spark Streaming Application +1. **Linking:** In your SBT/Maven project definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). + + groupId = org.apache.spark + artifactId = spark-streaming-flume_{{site.SCALA_BINARY_VERSION}} + version = {{site.SPARK_VERSION_SHORT}} + +2. **Programming:** In the streaming application code, import `FlumeUtils` and create input DStream as follows. + +
+
+ import org.apache.spark.streaming.flume._ + + val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]) + + See the [API docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$) + and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala). +
+
+ import org.apache.spark.streaming.flume.*; + + JavaReceiverInputDStream flumeStream = + FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]); + + See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html) + and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java). +
+
+ from pyspark.streaming.flume import FlumeUtils + + flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]) + + By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type. + See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils) + and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/flume_wordcount.py). +
+
+ + Note that the hostname should be the same as the one used by the resource manager in the + cluster (Mesos, YARN or Spark Standalone), so that resource allocation can match the names and launch + the receiver in the right machine. + +3. **Deploying:** As with any Spark applications, `spark-submit` is used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications. + + For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). + + For Python applications which lack SBT/Maven project management, `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies can be directly added to `spark-submit` using `--packages` (see [Application Submission Guide](submitting-applications.html)). That is, + + ./bin/spark-submit --packages org.apache.spark:spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + + Alternatively, you can also download the JAR of the Maven artifact `spark-streaming-flume-assembly` from the + [Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-flume-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`. + +## Approach 2: Pull-based Approach using a Custom Sink +Instead of Flume pushing data directly to Spark Streaming, this approach runs a custom Flume sink that allows the following. + +- Flume pushes data into the sink, and the data stays buffered. +- Spark Streaming uses a [reliable Flume receiver](streaming-programming-guide.html#receiver-reliability) + and transactions to pull data from the sink. Transactions succeed only after data is received and + replicated by Spark Streaming. + +This ensures stronger reliability and +[fault-tolerance guarantees](streaming-programming-guide.html#fault-tolerance-semantics) +than the previous approach. However, this requires configuring Flume to run a custom sink. +Here are the configuration steps. + +#### General Requirements +Choose a machine that will run the custom sink in a Flume agent. The rest of the Flume pipeline is configured to send data to that agent. Machines in the Spark cluster should have access to the chosen machine running the custom sink. + +#### Configuring Flume +Configuring Flume on the chosen machine requires the following two steps. + +1. **Sink JARs**: Add the following JARs to Flume's classpath (see [Flume's documentation](https://flume.apache.org/documentation.html) to see how) in the machine designated to run the custom sink . + + (i) *Custom sink JAR*: Download the JAR corresponding to the following artifact (or [direct link](http://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-flume-sink_{{site.SCALA_BINARY_VERSION}}/{{site.SPARK_VERSION_SHORT}}/spark-streaming-flume-sink_{{site.SCALA_BINARY_VERSION}}-{{site.SPARK_VERSION_SHORT}}.jar)). + + groupId = org.apache.spark + artifactId = spark-streaming-flume-sink_{{site.SCALA_BINARY_VERSION}} + version = {{site.SPARK_VERSION_SHORT}} + + (ii) *Scala library JAR*: Download the Scala library JAR for Scala {{site.SCALA_VERSION}}. It can be found with the following artifact detail (or, [direct link](http://search.maven.org/remotecontent?filepath=org/scala-lang/scala-library/{{site.SCALA_VERSION}}/scala-library-{{site.SCALA_VERSION}}.jar)). + + groupId = org.scala-lang + artifactId = scala-library + version = {{site.SCALA_VERSION}} + + (iii) *Commons Lang 3 JAR*: Download the Commons Lang 3 JAR. It can be found with the following artifact detail (or, [direct link](http://search.maven.org/remotecontent?filepath=org/apache/commons/commons-lang3/3.3.2/commons-lang3-3.3.2.jar)). + + groupId = org.apache.commons + artifactId = commons-lang3 + version = 3.3.2 + +2. **Configuration file**: On that machine, configure Flume agent to send data to an Avro sink by having the following in the configuration file. + + agent.sinks = spark + agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink + agent.sinks.spark.hostname = + agent.sinks.spark.port = + agent.sinks.spark.channel = memoryChannel + + Also make sure that the upstream Flume pipeline is configured to send the data to the Flume agent running this sink. + +See the [Flume's documentation](https://flume.apache.org/documentation.html) for more information about +configuring Flume agents. + +#### Configuring Spark Streaming Application +1. **Linking:** In your SBT/Maven project definition, link your streaming application against the `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide). + +2. **Programming:** In the streaming application code, import `FlumeUtils` and create input DStream as follows. + +
+
+ import org.apache.spark.streaming.flume._ + + val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]) +
+
+ import org.apache.spark.streaming.flume.*; + + JavaReceiverInputDStreamflumeStream = + FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]); +
+
+ from pyspark.streaming.flume import FlumeUtils + + addresses = [([sink machine hostname 1], [sink port 1]), ([sink machine hostname 2], [sink port 2])] + flumeStream = FlumeUtils.createPollingStream(streamingContext, addresses) + + By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type. + See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils). +
+
+ + See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala). + + Note that each input DStream can be configured to receive data from multiple sinks. + +3. **Deploying:** This is same as the first approach. + + + diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 2af05832d563f..8d21917a7da24 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -11,7 +11,7 @@ description: Spark Streaming programming guide and tutorial for Spark SPARK_VERS # Overview Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources -like Kafka, Kinesis, or TCP sockets, and can be processed using complex +like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like `map`, `reduce`, `join` and `window`. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark's @@ -40,7 +40,7 @@ stream of results in batches. Spark Streaming provides a high-level abstraction called *discretized stream* or *DStream*, which represents a continuous stream of data. DStreams can be created either from input data -streams from sources such as Kafka, and Kinesis, or by applying high-level +streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of [RDDs](api/scala/index.html#org.apache.spark.rdd.RDD). @@ -408,7 +408,7 @@ Similar to Spark, Spark Streaming is available through Maven Central. To write y -For ingesting data from sources like Kafka, and Kinesis that are not present in the Spark +For ingesting data from sources like Kafka, Flume, and Kinesis that are not present in the Spark Streaming core API, you will have to add the corresponding artifact `spark-streaming-xyz_{{site.SCALA_BINARY_VERSION}}` to the dependencies. For example, @@ -417,6 +417,7 @@ some of the common ones are as follows. +
SourceArtifact
Kafka spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}
Flume spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}
Kinesis
spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} [Amazon Software License]
@@ -591,7 +592,7 @@ Spark Streaming provides two categories of built-in streaming sources. - *Basic sources*: Sources directly available in the StreamingContext API. Examples: file systems, and socket connections. -- *Advanced sources*: Sources like Kafka, Kinesis, etc. are available through +- *Advanced sources*: Sources like Kafka, Flume, Kinesis, etc. are available through extra utility classes. These require linking against extra dependencies as discussed in the [linking](#linking) section. @@ -611,7 +612,7 @@ as well as to run the receiver(s). - When running a Spark Streaming program locally, do not use "local" or "local[1]" as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using - a input DStream based on a receiver (e.g. sockets, Kafka, etc.), then the single thread will + a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use "local[*n*]" as the master URL, where *n* > number of receivers to run (see [Spark Properties](configuration.html#spark-properties) for information on how to set @@ -668,10 +669,10 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea {:.no_toc} Python API As of Spark {{site.SPARK_VERSION_SHORT}}, -out of these sources, Kafka and Kinesis are available in the Python API. +out of these sources, Kafka, Kinesis and Flume are available in the Python API. This category of sources require interfacing with external non-Spark libraries, some of them with -complex dependencies (e.g., Kafka). Hence, to minimize issues related to version conflicts +complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts of dependencies, the functionality to create DStreams from these sources has been moved to separate libraries that can be [linked](#linking) to explicitly when necessary. @@ -684,6 +685,8 @@ Some of these advanced sources are as follows. - **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kafka 0.8.2.1. See the [Kafka Integration Guide](streaming-kafka-integration.html) for more details. +- **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Flume 1.6.0. See the [Flume Integration Guide](streaming-flume-integration.html) for more details. + - **Kinesis:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kinesis Client Library 1.2.1. See the [Kinesis Integration Guide](streaming-kinesis-integration.html) for more details. ### Custom Sources @@ -700,7 +703,7 @@ Guide](streaming-custom-receivers.html) for details. {:.no_toc} There can be two kinds of data sources based on their *reliability*. Sources -(like Kafka) allow the transferred data to be acknowledged. If the system receiving +(like Kafka and Flume) allow the transferred data to be acknowledged. If the system receiving data from these *reliable* sources acknowledges the received data correctly, it can be ensured that no data will be lost due to any kind of failure. This leads to two kinds of receivers: @@ -1680,7 +1683,7 @@ operations on the same data). For window-based operations like `reduceByWindow` Hence, DStreams generated by window-based operations are automatically persisted in memory, without the developer calling `persist()`. -For input streams that receive data over the network (such as, Kafka, sockets, etc.), the +For input streams that receive data over the network (such as, Kafka, Flume, sockets, etc.), the default persistence level is set to replicate the data to two nodes for fault-tolerance. Note that, unlike RDDs, the default persistence level of DStreams keeps the data serialized in @@ -1886,7 +1889,7 @@ To run a Spark Streaming applications, you need to have the following. - *Package the application JAR* - You have to compile your streaming application into a JAR. If you are using [`spark-submit`](submitting-applications.html) to start the application, then you will not need to provide Spark and Spark Streaming in the JAR. However, - if your application uses [advanced sources](#advanced-sources) (e.g. Kafka), + if your application uses [advanced sources](#advanced-sources) (e.g. Kafka, Flume), then you will have to package the extra artifact they link to, along with their dependencies, in the JAR that is used to deploy the application. For example, an application using `KafkaUtils` will have to include `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and all its @@ -1970,7 +1973,7 @@ for graceful shutdown options) which ensure data that has been received is compl processed before shutdown. Then the upgraded application can be started, which will start processing from the same point where the earlier application left off. Note that this can be done only with input sources that support source-side buffering -(like Kafka) as data needs to be buffered while the previous application was down and +(like Kafka, and Flume) as data needs to be buffered while the previous application was down and the upgraded application is not yet up. And restarting from earlier checkpoint information of pre-upgrade code cannot be done. The checkpoint information essentially contains serialized Scala/Java/Python objects and trying to deserialize objects with new, @@ -2025,7 +2028,7 @@ highlights some of the most important ones. ### Level of Parallelism in Data Receiving {:.no_toc} -Receiving data over the network (like Kafka, socket, etc.) requires the data to be deserialized +Receiving data over the network (like Kafka, Flume, socket, etc.) requires the data to be deserialized and stored in Spark. If the data receiving becomes a bottleneck in the system, then consider parallelizing the data receiving. Note that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data. @@ -2357,7 +2360,7 @@ additional effort may be necessary to achieve exactly-once semantics. There are Between Spark 0.9.1 and Spark 1.0, there were a few API changes made to ensure future API stability. This section elaborates the steps required to migrate your existing code to 1.0. -**Input DStreams**: All operations that create an input stream (e.g., `StreamingContext.socketStream`, etc.) now returns +**Input DStreams**: All operations that create an input stream (e.g., `StreamingContext.socketStream`, `FlumeUtils.createStream`, etc.) now returns [InputDStream](api/scala/index.html#org.apache.spark.streaming.dstream.InputDStream) / [ReceiverInputDStream](api/scala/index.html#org.apache.spark.streaming.dstream.ReceiverInputDStream) (instead of DStream) for Scala, and [JavaInputDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaInputDStream.html) / @@ -2404,7 +2407,6 @@ Please refer to the project for more details. - [Kinesis Integration Guide](streaming-kinesis-integration.html) - [Custom Receiver Guide](streaming-custom-receivers.html) * External DStream data sources: - - [DStream Flume](https://github.com/spark-packages/dstream-flume) - [DStream MQTT](https://github.com/spark-packages/dstream-mqtt) - [DStream Twitter](https://github.com/spark-packages/dstream-twitter) - [DStream Akka](https://github.com/spark-packages/dstream-akka) @@ -2414,12 +2416,14 @@ Please refer to the project for more details. * [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) and [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream) * [KafkaUtils](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$), + [FlumeUtils](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$), [KinesisUtils](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$), - Java docs * [JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html), [JavaDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaDStream.html) and [JavaPairDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaPairDStream.html) * [KafkaUtils](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html), + [FlumeUtils](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html), [KinesisUtils](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html) - Python docs * [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) and [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream) diff --git a/examples/src/main/python/streaming/flume_wordcount.py b/examples/src/main/python/streaming/flume_wordcount.py new file mode 100644 index 0000000000000..d75bc6daac138 --- /dev/null +++ b/examples/src/main/python/streaming/flume_wordcount.py @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + Usage: flume_wordcount.py + + To run this on your local machine, you need to setup Flume first, see + https://flume.apache.org/documentation.html + + and then run the example + `$ bin/spark-submit --jars \ + external/flume-assembly/target/scala-*/spark-streaming-flume-assembly-*.jar \ + examples/src/main/python/streaming/flume_wordcount.py \ + localhost 12345 +""" +from __future__ import print_function + +import sys + +from pyspark import SparkContext +from pyspark.streaming import StreamingContext +from pyspark.streaming.flume import FlumeUtils + +if __name__ == "__main__": + if len(sys.argv) != 3: + print("Usage: flume_wordcount.py ", file=sys.stderr) + exit(-1) + + sc = SparkContext(appName="PythonStreamingFlumeWordCount") + ssc = StreamingContext(sc, 1) + + hostname, port = sys.argv[1:] + kvs = FlumeUtils.createStream(ssc, hostname, int(port)) + lines = kvs.map(lambda x: x[1]) + counts = lines.flatMap(lambda line: line.split(" ")) \ + .map(lambda word: (word, 1)) \ + .reduceByKey(lambda a, b: a+b) + counts.pprint() + + ssc.start() + ssc.awaitTermination() From dcdd3cb6e7aaeeeefa73e8ea8302c29ccf8a376f Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 23 Mar 2016 15:40:26 -0700 Subject: [PATCH 3/3] Address comments --- NOTICE | 1 - licenses/LICENSE-jblas.txt | 31 ------------------------------- project/MimaBuild.scala | 7 +------ python/docs/pyspark.streaming.rst | 7 ------- 4 files changed, 1 insertion(+), 45 deletions(-) delete mode 100644 licenses/LICENSE-jblas.txt diff --git a/NOTICE b/NOTICE index 6a26155fb4952..2a6fe237dcbea 100644 --- a/NOTICE +++ b/NOTICE @@ -48,7 +48,6 @@ Eclipse Public License 1.0 The following components are provided under the Eclipse Public License 1.0. See project link for details. - (Eclipse Public License - Version 1.0) mqtt-client (org.eclipse.paho:mqtt-client:0.4.0 - http://www.eclipse.org/paho/mqtt-client) (Eclipse Public License v1.0) Eclipse JDT Core (org.eclipse.jdt:core:3.1.1 - http://www.eclipse.org/jdt/) ======================================================================== diff --git a/licenses/LICENSE-jblas.txt b/licenses/LICENSE-jblas.txt deleted file mode 100644 index 5629dafb65b39..0000000000000 --- a/licenses/LICENSE-jblas.txt +++ /dev/null @@ -1,31 +0,0 @@ -Copyright (c) 2009, Mikio L. Braun and contributors -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - - * Redistributions in binary form must reproduce the above - copyright notice, this list of conditions and the following - disclaimer in the documentation and/or other materials provided - with the distribution. - - * Neither the name of the Technische Universität Berlin nor the - names of its contributors may be used to endorse or promote - products derived from this software without specific prior - written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala index acf7b8961eb74..3dc1ceacde19a 100644 --- a/project/MimaBuild.scala +++ b/project/MimaBuild.scala @@ -88,16 +88,11 @@ object MimaBuild { def mimaSettings(sparkHome: File, projectRef: ProjectRef) = { val organization = "org.apache.spark" - // The resolvers setting for MQTT Repository is needed for mqttv3(1.0.1) - // because spark-streaming-mqtt(1.6.0) depends on it. - // Remove the setting on updating previousSparkVersion. val previousSparkVersion = "1.6.0" val fullId = "spark-" + projectRef.project + "_2.11" mimaDefaultSettings ++ Seq(previousArtifact := Some(organization % fullId % previousSparkVersion), - binaryIssueFilters ++= ignoredABIProblems(sparkHome, version.value), - sbt.Keys.resolvers += - "MQTT Repository" at "https://repo.eclipse.org/content/repositories/paho-releases") + binaryIssueFilters ++= ignoredABIProblems(sparkHome, version.value)) } } diff --git a/python/docs/pyspark.streaming.rst b/python/docs/pyspark.streaming.rst index fc52a647543e7..25ceabac0a541 100644 --- a/python/docs/pyspark.streaming.rst +++ b/python/docs/pyspark.streaming.rst @@ -29,10 +29,3 @@ pyspark.streaming.flume.module :members: :undoc-members: :show-inheritance: - -pyspark.streaming.mqtt module ------------------------------ -.. automodule:: pyspark.streaming.mqtt - :members: - :undoc-members: - :show-inheritance: