Skip to content

Commit

Permalink
[SPARK-8378] [STREAMING] Add the Python API for Flume
Browse files Browse the repository at this point in the history
Author: zsxwing <zsxwing@gmail.com>

Closes #6830 from zsxwing/flume-python and squashes the following commits:

78dfdac [zsxwing] Fix the compile error in the test code
f1bf3c0 [zsxwing] Address TD's comments
0449723 [zsxwing] Add sbt goal streaming-flume-assembly/assembly
e93736b [zsxwing] Fix the test case for determine_modules_to_test
9d5821e [zsxwing] Fix pyspark_core dependencies
f9ee681 [zsxwing] Merge branch 'master' into flume-python
7a55837 [zsxwing] Add streaming_flume_assembly to run-tests.py
b96b0de [zsxwing] Merge branch 'master' into flume-python
ce85e83 [zsxwing] Fix incompatible issues for Python 3
01cbb3d [zsxwing] Add import sys
152364c [zsxwing] Fix the issue that StringIO doesn't work in Python 3
14ba0ff [zsxwing] Add flume-assembly for sbt building
b8d5551 [zsxwing] Merge branch 'master' into flume-python
4762c34 [zsxwing] Fix the doc
0336579 [zsxwing] Refactor Flume unit tests and also add tests for Python API
9f33873 [zsxwing] Add the Python API for Flume
  • Loading branch information
zsxwing authored and tdas committed Jul 1, 2015
1 parent b8faa32 commit 75b9fe4
Show file tree
Hide file tree
Showing 15 changed files with 1,009 additions and 236 deletions.
7 changes: 4 additions & 3 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ def determine_modules_to_test(changed_modules):
['examples', 'graphx']
>>> x = sorted(x.name for x in determine_modules_to_test([modules.sql]))
>>> x # doctest: +NORMALIZE_WHITESPACE
['examples', 'hive-thriftserver', 'mllib', 'pyspark-core', 'pyspark-ml', \
'pyspark-mllib', 'pyspark-sql', 'pyspark-streaming', 'sparkr', 'sql']
['examples', 'hive-thriftserver', 'mllib', 'pyspark-ml', \
'pyspark-mllib', 'pyspark-sql', 'sparkr', 'sql']
"""
# If we're going to have to run all of the tests, then we can just short-circuit
# and return 'root'. No module depends on root, so if it appears then it will be
Expand Down Expand Up @@ -293,7 +293,8 @@ def build_spark_sbt(hadoop_version):
build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
sbt_goals = ["package",
"assembly/assembly",
"streaming-kafka-assembly/assembly"]
"streaming-kafka-assembly/assembly",
"streaming-flume-assembly/assembly"]
profiles_and_goals = build_profiles + sbt_goals

print("[info] Building Spark (w/Hive 0.13.1) using SBT with these arguments: ",
Expand Down
15 changes: 12 additions & 3 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def contains_file(self, filename):


streaming_flume = Module(
name="streaming_flume",
name="streaming-flume",
dependencies=[streaming],
source_file_regexes=[
"external/flume",
Expand All @@ -214,6 +214,15 @@ def contains_file(self, filename):
)


streaming_flume_assembly = Module(
name="streaming-flume-assembly",
dependencies=[streaming_flume, streaming_flume_sink],
source_file_regexes=[
"external/flume-assembly",
]
)


mllib = Module(
name="mllib",
dependencies=[streaming, sql],
Expand Down Expand Up @@ -241,7 +250,7 @@ def contains_file(self, filename):

pyspark_core = Module(
name="pyspark-core",
dependencies=[mllib, streaming, streaming_kafka],
dependencies=[],
source_file_regexes=[
"python/(?!pyspark/(ml|mllib|sql|streaming))"
],
Expand Down Expand Up @@ -281,7 +290,7 @@ def contains_file(self, filename):

pyspark_streaming = Module(
name="pyspark-streaming",
dependencies=[pyspark_core, streaming, streaming_kafka],
dependencies=[pyspark_core, streaming, streaming_kafka, streaming_flume_assembly],
source_file_regexes=[
"python/pyspark/streaming"
],
Expand Down
18 changes: 18 additions & 0 deletions docs/streaming-flume-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ configuring Flume agents.
See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java).
</div>
<div data-lang="python" markdown="1">
from pyspark.streaming.flume import FlumeUtils

flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])

By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/flume_wordcount.py).
</div>
</div>

Note that the hostname should be the same as the one used by the resource manager in the
Expand Down Expand Up @@ -135,6 +144,15 @@ configuring Flume agents.
JavaReceiverInputDStream<SparkFlumeEvent>flumeStream =
FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]);
</div>
<div data-lang="python" markdown="1">
from pyspark.streaming.flume import FlumeUtils

addresses = [([sink machine hostname 1], [sink port 1]), ([sink machine hostname 2], [sink port 2])]
flumeStream = FlumeUtils.createPollingStream(streamingContext, addresses)

By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils).
</div>
</div>

See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala).
Expand Down
2 changes: 1 addition & 1 deletion docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
{:.no_toc}

<span class="badge" style="background-color: grey">Python API</span> As of Spark {{site.SPARK_VERSION_SHORT}},
out of these sources, *only* Kafka is available in the Python API. We will add more advanced sources in the Python API in future.
out of these sources, *only* Kafka and Flume are available in the Python API. We will add more advanced sources in the Python API in future.

This category of sources require interfacing with external non-Spark libraries, some of them with
complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts
Expand Down
55 changes: 55 additions & 0 deletions examples/src/main/python/streaming/flume_wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Usage: flume_wordcount.py <hostname> <port>
To run this on your local machine, you need to setup Flume first, see
https://flume.apache.org/documentation.html
and then run the example
`$ bin/spark-submit --jars external/flume-assembly/target/scala-*/\
spark-streaming-flume-assembly-*.jar examples/src/main/python/streaming/flume_wordcount.py \
localhost 12345
"""
from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils

if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: flume_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)

sc = SparkContext(appName="PythonStreamingFlumeWordCount")
ssc = StreamingContext(sc, 1)

hostname, port = sys.argv[1:]
kvs = FlumeUtils.createStream(ssc, hostname, int(port))
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()
135 changes: 135 additions & 0 deletions external/flume-assembly/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-assembly_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project External Flume Assembly</name>
<url>http://spark.apache.org/</url>

<properties>
<sbt.project.name>streaming-flume-assembly</sbt.project.name>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
<version>${avro.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-flume-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
<resource>log4j.properties</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Loading

0 comments on commit 75b9fe4

Please sign in to comment.