Skip to content

Commit

Permalink
Add the Python API for Flume
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Jun 15, 2015
1 parent 4c5889e commit 9f33873
Show file tree
Hide file tree
Showing 8 changed files with 427 additions and 5 deletions.
15 changes: 15 additions & 0 deletions docs/streaming-flume-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ configuring Flume agents.
See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java).
</div>
<div data-lang="python" markdown="1">
from pyspark.streaming.flume import FlumeUtils

flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])

By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/flume_wordcount.py).
</div>
</div>

Note that the hostname should be the same as the one used by the resource manager in the
Expand Down Expand Up @@ -129,6 +138,12 @@ configuring Flume agents.
JavaReceiverInputDStream<SparkFlumeEvent>flumeStream =
FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]);
</div>
<div data-lang="python" markdown="1">
from pyspark.streaming.flume import FlumeUtils

addresses = [([sink machine hostname 1], [sink port 1]), ([sink machine hostname 2], [sink port 2])]
flumeStream = FlumeUtils.createPollingStream(streamingContext, addresses)
</div>
</div>

See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala).
Expand Down
2 changes: 1 addition & 1 deletion docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
{:.no_toc}

<span class="badge" style="background-color: grey">Python API</span> As of Spark {{site.SPARK_VERSION_SHORT}},
out of these sources, *only* Kafka is available in the Python API. We will add more advanced sources in the Python API in future.
out of these sources, *only* Kafka and Flume are available in the Python API. We will add more advanced sources in the Python API in future.

This category of sources require interfacing with external non-Spark libraries, some of them with
complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts
Expand Down
55 changes: 55 additions & 0 deletions examples/src/main/python/streaming/flume_wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Usage: flume_wordcount.py <hostname> <port>
To run this on your local machine, you need to setup Flume first, see
https://flume.apache.org/documentation.html
and then run the example
`$ bin/spark-submit --jars external/flume-assembly/target/scala-*/\
spark-streaming-flume-assembly-*.jar examples/src/main/python/streaming/flume_wordcount.py \
localhost 12345
"""
from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils

if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: flume_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)

sc = SparkContext(appName="PythonStreamingFlumeWordCount")
ssc = StreamingContext(sc, 1)

hostname, port = sys.argv[1:]
kvs = FlumeUtils.createStream(ssc, hostname, int(port))
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()
134 changes: 134 additions & 0 deletions external/flume-assembly/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-assembly_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project External Flume Assembly</name>
<url>http://spark.apache.org/</url>

<properties>
<sbt.project.name>streaming-flume-assembly</sbt.project.name>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
<version>${avro.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
<resource>log4j.properties</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
package org.apache.spark.streaming.flume

import java.net.InetSocketAddress
import java.io.{DataOutputStream, ByteArrayOutputStream}
import java.util.{List => JList, Map => JMap}

import scala.collection.JavaConversions._

import org.apache.spark.api.java.function.PairFunction
import org.apache.spark.api.python.PythonRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream


Expand Down Expand Up @@ -236,3 +242,71 @@ object FlumeUtils {
createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
}
}

/**
* This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and
* function so that it can be easily instantiated and called from Python's FlumeUtils.
*/
private class FlumeUtilsPythonHelper {

def createStream(
jssc: JavaStreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
): JavaPairDStream[Array[Byte], Array[Byte]] = {
val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, enableDecompression)
FlumeUtilsPythonHelper.toDStreamForPython(dstream)
}

def createPollingStream(
jssc: JavaStreamingContext,
hosts: JList[String],
ports: JList[Int],
storageLevel: StorageLevel,
maxBatchSize: Int,
parallelism: Int
): JavaPairDStream[Array[Byte], Array[Byte]] = {
assert(hosts.length == ports.length)
val addresses = hosts.zip(ports).map {
case (host, port) => new InetSocketAddress(host, port)
}
val dstream = FlumeUtils.createPollingStream(
jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
FlumeUtilsPythonHelper.toDStreamForPython(dstream)
}

}

private object FlumeUtilsPythonHelper {

private def stringMapToByteArray(map: JMap[CharSequence, CharSequence]): Array[Byte] = {
val byteStream = new ByteArrayOutputStream()
val output = new DataOutputStream(byteStream)
try {
output.writeInt(map.size)
map.foreach { kv =>
PythonRDD.writeUTF(kv._1.toString, output)
PythonRDD.writeUTF(kv._2.toString, output)
}
byteStream.toByteArray
}
finally {
output.close()
}
}

private def toDStreamForPython(dstream: JavaReceiverInputDStream[SparkFlumeEvent]):
JavaPairDStream[Array[Byte], Array[Byte]] = {
dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], Array[Byte]] {
override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], Array[Byte]) = {
val event = sparkEvent.event
val byteBuffer = event.getBody
val body = new Array[Byte](byteBuffer.remaining())
byteBuffer.get(body)
(stringMapToByteArray(event.getHeaders), body)
}
})
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
<module>external/twitter</module>
<module>external/flume</module>
<module>external/flume-sink</module>
<module>external/flume-assembly</module>
<module>external/mqtt</module>
<module>external/zeromq</module>
<module>examples</module>
Expand Down
6 changes: 3 additions & 3 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ object BuildCommons {
sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
"kinesis-asl").map(ProjectRef(buildLocation, _))

val assemblyProjects@Seq(assembly, examples, networkYarn, streamingKafkaAssembly) =
Seq("assembly", "examples", "network-yarn", "streaming-kafka-assembly")
val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly) =
Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly")
.map(ProjectRef(buildLocation, _))

val tools = ProjectRef(buildLocation, "tools")
Expand Down Expand Up @@ -349,7 +349,7 @@ object Assembly {
.getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
},
jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) =>
if (mName.contains("streaming-kafka-assembly")) {
if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly")) {
// This must match the same name used in maven (see external/kafka-assembly/pom.xml)
s"${mName}-${v}.jar"
} else {
Expand Down
Loading

0 comments on commit 9f33873

Please sign in to comment.