Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8378][Streaming]Add the Python API for Flume #6830

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ def determine_modules_to_test(changed_modules):
['examples', 'graphx']
>>> x = sorted(x.name for x in determine_modules_to_test([modules.sql]))
>>> x # doctest: +NORMALIZE_WHITESPACE
['examples', 'hive-thriftserver', 'mllib', 'pyspark-core', 'pyspark-ml', \
'pyspark-mllib', 'pyspark-sql', 'pyspark-streaming', 'sparkr', 'sql']
['examples', 'hive-thriftserver', 'mllib', 'pyspark-ml', \
'pyspark-mllib', 'pyspark-sql', 'sparkr', 'sql']
"""
# If we're going to have to run all of the tests, then we can just short-circuit
# and return 'root'. No module depends on root, so if it appears then it will be
Expand Down Expand Up @@ -292,7 +292,8 @@ def build_spark_sbt(hadoop_version):
build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
sbt_goals = ["package",
"assembly/assembly",
"streaming-kafka-assembly/assembly"]
"streaming-kafka-assembly/assembly",
"streaming-flume-assembly/assembly"]
profiles_and_goals = build_profiles + sbt_goals

print("[info] Building Spark (w/Hive 0.13.1) using SBT with these arguments: "
Expand Down
15 changes: 12 additions & 3 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def contains_file(self, filename):


streaming_flume = Module(
name="streaming_flume",
name="streaming-flume",
dependencies=[streaming],
source_file_regexes=[
"external/flume",
Expand All @@ -214,6 +214,15 @@ def contains_file(self, filename):
)


streaming_flume_assembly = Module(
name="streaming-flume-assembly",
dependencies=[streaming_flume, streaming_flume_sink],
source_file_regexes=[
"external/flume-assembly",
]
)


mllib = Module(
name="mllib",
dependencies=[streaming, sql],
Expand Down Expand Up @@ -241,7 +250,7 @@ def contains_file(self, filename):

pyspark_core = Module(
name="pyspark-core",
dependencies=[mllib, streaming, streaming_kafka],
dependencies=[],
source_file_regexes=[
"python/(?!pyspark/(ml|mllib|sql|streaming))"
],
Expand Down Expand Up @@ -281,7 +290,7 @@ def contains_file(self, filename):

pyspark_streaming = Module(
name="pyspark-streaming",
dependencies=[pyspark_core, streaming, streaming_kafka],
dependencies=[pyspark_core, streaming, streaming_kafka, streaming_flume_assembly],
source_file_regexes=[
"python/pyspark/streaming"
],
Expand Down
18 changes: 18 additions & 0 deletions docs/streaming-flume-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ configuring Flume agents.
See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java).
</div>
<div data-lang="python" markdown="1">
from pyspark.streaming.flume import FlumeUtils

flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])

By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/flume_wordcount.py).
</div>
</div>

Note that the hostname should be the same as the one used by the resource manager in the
Expand Down Expand Up @@ -135,6 +144,15 @@ configuring Flume agents.
JavaReceiverInputDStream<SparkFlumeEvent>flumeStream =
FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]);
</div>
<div data-lang="python" markdown="1">
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the decoding logic the same here too? UTF-8 encoded string, or custom decoding function? If yes, we should move that snippet explaining this outside of both approaches and specify that it is applicable to both.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the doc. Because there is no a Python example for FlumeUtils.createPollingStream (it requires the user installing the Spark sink jar to Flume, so we cannot write an out-of-the-box example), I just copied the description of the encoding function to here.

from pyspark.streaming.flume import FlumeUtils

addresses = [([sink machine hostname 1], [sink port 1]), ([sink machine hostname 2], [sink port 2])]
flumeStream = FlumeUtils.createPollingStream(streamingContext, addresses)

By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils).
</div>
</div>

See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala).
Expand Down
2 changes: 1 addition & 1 deletion docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
{:.no_toc}

<span class="badge" style="background-color: grey">Python API</span> As of Spark {{site.SPARK_VERSION_SHORT}},
out of these sources, *only* Kafka is available in the Python API. We will add more advanced sources in the Python API in future.
out of these sources, *only* Kafka and Flume are available in the Python API. We will add more advanced sources in the Python API in future.

This category of sources require interfacing with external non-Spark libraries, some of them with
complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts
Expand Down
55 changes: 55 additions & 0 deletions examples/src/main/python/streaming/flume_wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Usage: flume_wordcount.py <hostname> <port>

To run this on your local machine, you need to setup Flume first, see
https://flume.apache.org/documentation.html

and then run the example
`$ bin/spark-submit --jars external/flume-assembly/target/scala-*/\
spark-streaming-flume-assembly-*.jar examples/src/main/python/streaming/flume_wordcount.py \
localhost 12345
"""
from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils

if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: flume_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)

sc = SparkContext(appName="PythonStreamingFlumeWordCount")
ssc = StreamingContext(sc, 1)

hostname, port = sys.argv[1:]
kvs = FlumeUtils.createStream(ssc, hostname, int(port))
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()
135 changes: 135 additions & 0 deletions external/flume-assembly/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-assembly_2.10</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi guys,

Sorry I'm late to the party, but why is this new assembly necessary?

It creates an 80MB jar file that repackages a bunch of things already present in the Spark assembly (e.g. scala.*, org.hadoop.*, and a whole lot of other things). If python/pyspark/streaming/flume.py is meant to be used inside a Spark application, aren't those dependencies already provided by the Spark assembly? In which case all that is needed is the existing spark-streaming-flume artifact?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, none of the fume stuff is present in the spark-assembly. That is
precisely why this assembly JAR with spark-streaming-flume and flume+its
dependencies were generated.

On Mon, Jul 6, 2015 at 2:02 PM, Marcelo Vanzin notifications@github.com
wrote:

In external/flume-assembly/pom.xml
#6830 (comment):

  • ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • ~ See the License for the specific language governing permissions and
  • ~ limitations under the License.
  • -->

+

  • 4.0.0
  • org.apache.spark
  • spark-parent_2.10
  • 1.5.0-SNAPSHOT
  • ../../pom.xml
  • org.apache.spark
  • spark-streaming-flume-assembly_2.10

Hi guys,

Sorry I'm late to the party, but why is this new assembly necessary?

It creates an 80MB jar file that repackages a bunch of things already
present in the Spark assembly (e.g. scala., org.hadoop., and a whole
lot of other things). If python/pyspark/streaming/flume.py is meant to be
used inside a Spark application, aren't those dependencies already provided
by the Spark assembly? In which case all that is needed is the existing
spark-streaming-flume artifact?


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/6830/files#r33981901.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, two things:

  • spark-streaming-flume, the existing artifact, has transitive dependencies on flume. So if you add it using the ivy support in spark-submit, you'd get those.
  • Even if you want to add this assembly, it currently packages way more than just flume. It includes all of Scala and Hadoop libraries and a bunch of other things, as I mentioned above.

So any way you look at it, there is still something to be fixed here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Good point, we can probably exclude scala. Why are the Hadoop libraries
    included? Definitely not through spark as spark-streaming is marked as
    provided dependency?
  2. The whole point of making the assembly JAR is to make it easy to run
    spark streaming + flume applications, especially in python where the users
    will not be creating mvn/sbt projects to include the dependencies in an
    uber jar. The most convenient for python users who want to use flume stream
    is to add --jar .jar. Hence flume and its all
    its dependencies need to be included.

On Mon, Jul 6, 2015 at 2:19 PM, Marcelo Vanzin notifications@github.com
wrote:

In external/flume-assembly/pom.xml
#6830 (comment):

  • ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • ~ See the License for the specific language governing permissions and
  • ~ limitations under the License.
  • -->

+

  • 4.0.0
  • org.apache.spark
  • spark-parent_2.10
  • 1.5.0-SNAPSHOT
  • ../../pom.xml
  • org.apache.spark
  • spark-streaming-flume-assembly_2.10

Well, two things:

  • spark-streaming-flume, the existing artifact, has transitive
    dependencies on flume. So if you add it using the ivy support in
    spark-submit, you'd get those.
  • Even if you want to add this assembly, it currently packages way
    more than just flume. It includes all of Scala and Hadoop libraries and a
    bunch of other things, as I mentioned above.

So any way you look at it, there is still something to be fixed here.


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/6830/files#r33983627.

<packaging>jar</packaging>
<name>Spark Project External Flume Assembly</name>
<url>http://spark.apache.org/</url>

<properties>
<sbt.project.name>streaming-flume-assembly</sbt.project.name>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependencies of avro and avro-ipc is necessary. If not adding them, the assembly plugin will use avro 1.7.3 and avro-ipc 1.7.4. They are incompatible and will throw

java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.flume.source.avro.AvroFlumeEvent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an avro bug. Can you file a jira for Avro? Avro should be compatible within the same minor version - 1.7.x

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know where the other version of avro is coming up ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an avro bug. Can you file a jira for Avro? Avro should be compatible within the same minor version - 1.7.x

I think avro and avro-ipc should have the same version?

Do you know where the other version of avro is coming up ?

Actually "mvn dependency:tree" shows both avro and avro-ipc are 1.7.7. But, I don't know why the assembly plugin picks up a different version.

</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
<version>${avro.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.velocity</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to pull all these in? Doesn't the spark-streaming-flume artifact pull these in anyway?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are exclusions.. not pulling in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, guess I need more coffee

<artifactId>velocity</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-flume-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
<resource>log4j.properties</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Loading