Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8378][Streaming]Add the Python API for Flume #6830

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/streaming-flume-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ configuring Flume agents.
See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java).
</div>
<div data-lang="python" markdown="1">
from pyspark.streaming.flume import FlumeUtils

flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])

By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/flume_wordcount.py).
</div>
</div>

Note that the hostname should be the same as the one used by the resource manager in the
Expand Down Expand Up @@ -129,6 +138,12 @@ configuring Flume agents.
JavaReceiverInputDStream<SparkFlumeEvent>flumeStream =
FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]);
</div>
<div data-lang="python" markdown="1">
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the decoding logic the same here too? UTF-8 encoded string, or custom decoding function? If yes, we should move that snippet explaining this outside of both approaches and specify that it is applicable to both.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the doc. Because there is no a Python example for FlumeUtils.createPollingStream (it requires the user installing the Spark sink jar to Flume, so we cannot write an out-of-the-box example), I just copied the description of the encoding function to here.

from pyspark.streaming.flume import FlumeUtils

addresses = [([sink machine hostname 1], [sink port 1]), ([sink machine hostname 2], [sink port 2])]
flumeStream = FlumeUtils.createPollingStream(streamingContext, addresses)
</div>
</div>

See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala).
Expand Down
2 changes: 1 addition & 1 deletion docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
{:.no_toc}

<span class="badge" style="background-color: grey">Python API</span> As of Spark {{site.SPARK_VERSION_SHORT}},
out of these sources, *only* Kafka is available in the Python API. We will add more advanced sources in the Python API in future.
out of these sources, *only* Kafka and Flume are available in the Python API. We will add more advanced sources in the Python API in future.

This category of sources require interfacing with external non-Spark libraries, some of them with
complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts
Expand Down
55 changes: 55 additions & 0 deletions examples/src/main/python/streaming/flume_wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Usage: flume_wordcount.py <hostname> <port>

To run this on your local machine, you need to setup Flume first, see
https://flume.apache.org/documentation.html

and then run the example
`$ bin/spark-submit --jars external/flume-assembly/target/scala-*/\
spark-streaming-flume-assembly-*.jar examples/src/main/python/streaming/flume_wordcount.py \
localhost 12345
"""
from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils

if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: flume_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)

sc = SparkContext(appName="PythonStreamingFlumeWordCount")
ssc = StreamingContext(sc, 1)

hostname, port = sys.argv[1:]
kvs = FlumeUtils.createStream(ssc, hostname, int(port))
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()
134 changes: 134 additions & 0 deletions external/flume-assembly/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-assembly_2.10</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi guys,

Sorry I'm late to the party, but why is this new assembly necessary?

It creates an 80MB jar file that repackages a bunch of things already present in the Spark assembly (e.g. scala.*, org.hadoop.*, and a whole lot of other things). If python/pyspark/streaming/flume.py is meant to be used inside a Spark application, aren't those dependencies already provided by the Spark assembly? In which case all that is needed is the existing spark-streaming-flume artifact?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, none of the fume stuff is present in the spark-assembly. That is
precisely why this assembly JAR with spark-streaming-flume and flume+its
dependencies were generated.

On Mon, Jul 6, 2015 at 2:02 PM, Marcelo Vanzin notifications@github.com
wrote:

In external/flume-assembly/pom.xml
#6830 (comment):

  • ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • ~ See the License for the specific language governing permissions and
  • ~ limitations under the License.
  • -->

+

  • 4.0.0
  • org.apache.spark
  • spark-parent_2.10
  • 1.5.0-SNAPSHOT
  • ../../pom.xml
  • org.apache.spark
  • spark-streaming-flume-assembly_2.10

Hi guys,

Sorry I'm late to the party, but why is this new assembly necessary?

It creates an 80MB jar file that repackages a bunch of things already
present in the Spark assembly (e.g. scala., org.hadoop., and a whole
lot of other things). If python/pyspark/streaming/flume.py is meant to be
used inside a Spark application, aren't those dependencies already provided
by the Spark assembly? In which case all that is needed is the existing
spark-streaming-flume artifact?


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/6830/files#r33981901.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, two things:

  • spark-streaming-flume, the existing artifact, has transitive dependencies on flume. So if you add it using the ivy support in spark-submit, you'd get those.
  • Even if you want to add this assembly, it currently packages way more than just flume. It includes all of Scala and Hadoop libraries and a bunch of other things, as I mentioned above.

So any way you look at it, there is still something to be fixed here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Good point, we can probably exclude scala. Why are the Hadoop libraries
    included? Definitely not through spark as spark-streaming is marked as
    provided dependency?
  2. The whole point of making the assembly JAR is to make it easy to run
    spark streaming + flume applications, especially in python where the users
    will not be creating mvn/sbt projects to include the dependencies in an
    uber jar. The most convenient for python users who want to use flume stream
    is to add --jar .jar. Hence flume and its all
    its dependencies need to be included.

On Mon, Jul 6, 2015 at 2:19 PM, Marcelo Vanzin notifications@github.com
wrote:

In external/flume-assembly/pom.xml
#6830 (comment):

  • ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • ~ See the License for the specific language governing permissions and
  • ~ limitations under the License.
  • -->

+

  • 4.0.0
  • org.apache.spark
  • spark-parent_2.10
  • 1.5.0-SNAPSHOT
  • ../../pom.xml
  • org.apache.spark
  • spark-streaming-flume-assembly_2.10

Well, two things:

  • spark-streaming-flume, the existing artifact, has transitive
    dependencies on flume. So if you add it using the ivy support in
    spark-submit, you'd get those.
  • Even if you want to add this assembly, it currently packages way
    more than just flume. It includes all of Scala and Hadoop libraries and a
    bunch of other things, as I mentioned above.

So any way you look at it, there is still something to be fixed here.


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/6830/files#r33983627.

<packaging>jar</packaging>
<name>Spark Project External Flume Assembly</name>
<url>http://spark.apache.org/</url>

<properties>
<sbt.project.name>streaming-flume-assembly</sbt.project.name>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependencies of avro and avro-ipc is necessary. If not adding them, the assembly plugin will use avro 1.7.3 and avro-ipc 1.7.4. They are incompatible and will throw

java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.flume.source.avro.AvroFlumeEvent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an avro bug. Can you file a jira for Avro? Avro should be compatible within the same minor version - 1.7.x

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know where the other version of avro is coming up ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an avro bug. Can you file a jira for Avro? Avro should be compatible within the same minor version - 1.7.x

I think avro and avro-ipc should have the same version?

Do you know where the other version of avro is coming up ?

Actually "mvn dependency:tree" shows both avro and avro-ipc are 1.7.7. But, I don't know why the assembly plugin picks up a different version.

</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
<version>${avro.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.velocity</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to pull all these in? Doesn't the spark-streaming-flume artifact pull these in anyway?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are exclusions.. not pulling in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, guess I need more coffee

<artifactId>velocity</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
<resource>log4j.properties</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
package org.apache.spark.streaming.flume

import java.net.InetSocketAddress
import java.io.{DataOutputStream, ByteArrayOutputStream}
import java.util.{List => JList, Map => JMap}

import scala.collection.JavaConversions._

import org.apache.spark.api.java.function.PairFunction
import org.apache.spark.api.python.PythonRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream


Expand Down Expand Up @@ -236,3 +242,71 @@ object FlumeUtils {
createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
}
}

/**
* This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and
* function so that it can be easily instantiated and called from Python's FlumeUtils.
*/
private class FlumeUtilsPythonHelper {

def createStream(
jssc: JavaStreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
): JavaPairDStream[Array[Byte], Array[Byte]] = {
val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, enableDecompression)
FlumeUtilsPythonHelper.toDStreamForPython(dstream)
}

def createPollingStream(
jssc: JavaStreamingContext,
hosts: JList[String],
ports: JList[Int],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not very intuitive. Why not take a list of (host, port) tuples? Is there an issue passing tuples from Python?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java-friendliness. So that there are no issues that can arise when calling from python through py4j. This is not meant to be called by public so this is sort-of-okay.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, ok. I generally don't like APIs that have two lists that have index based matching. Perhaps we can pass in a Map?

Since is not really user-facing, I am sort-of-ok with this, but I'd prefer to have a cleaner API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, ok. I generally don't like APIs that have two lists that have index based matching. Perhaps we can pass in a Map?

It cannot be a Map because sometimes one hostname may have multiple ports.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map[String, List[Int]]? Again, I don't feel really strongly about it - this just feels a lot nicer

storageLevel: StorageLevel,
maxBatchSize: Int,
parallelism: Int
): JavaPairDStream[Array[Byte], Array[Byte]] = {
assert(hosts.length == ports.length)
val addresses = hosts.zip(ports).map {
case (host, port) => new InetSocketAddress(host, port)
}
val dstream = FlumeUtils.createPollingStream(
jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
FlumeUtilsPythonHelper.toDStreamForPython(dstream)
}

}

private object FlumeUtilsPythonHelper {

private def stringMapToByteArray(map: JMap[CharSequence, CharSequence]): Array[Byte] = {
val byteStream = new ByteArrayOutputStream()
val output = new DataOutputStream(byteStream)
try {
output.writeInt(map.size)
map.foreach { kv =>
PythonRDD.writeUTF(kv._1.toString, output)
PythonRDD.writeUTF(kv._2.toString, output)
}
byteStream.toByteArray
}
finally {
output.close()
}
}

private def toDStreamForPython(dstream: JavaReceiverInputDStream[SparkFlumeEvent]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name isnt very intuitive, could be toByteArrayPairDStream.

JavaPairDStream[Array[Byte], Array[Byte]] = {
dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], Array[Byte]] {
override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], Array[Byte]) = {
val event = sparkEvent.event
val byteBuffer = event.getBody
val body = new Array[Byte](byteBuffer.remaining())
byteBuffer.get(body)
(stringMapToByteArray(event.getHeaders), body)
}
})
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
<module>external/twitter</module>
<module>external/flume</module>
<module>external/flume-sink</module>
<module>external/flume-assembly</module>
<module>external/mqtt</module>
<module>external/zeromq</module>
<module>examples</module>
Expand Down
6 changes: 3 additions & 3 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ object BuildCommons {
sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
"kinesis-asl").map(ProjectRef(buildLocation, _))

val assemblyProjects@Seq(assembly, examples, networkYarn, streamingKafkaAssembly) =
Seq("assembly", "examples", "network-yarn", "streaming-kafka-assembly")
val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly) =
Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly")
.map(ProjectRef(buildLocation, _))

val tools = ProjectRef(buildLocation, "tools")
Expand Down Expand Up @@ -349,7 +349,7 @@ object Assembly {
.getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
},
jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) =>
if (mName.contains("streaming-kafka-assembly")) {
if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly")) {
// This must match the same name used in maven (see external/kafka-assembly/pom.xml)
s"${mName}-${v}.jar"
} else {
Expand Down
Loading