Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-1893 : Add documentation and examples for spark workflow engine
Please review the pull request which contains documentation with examples how to use the Spark application with Falcon.

Author: peeyush b <pbishnoi@hortonworks.com>

Reviewers: "Pavan Kumar Kolamuri <pavan.kolamuri@gmail.com>, Venkat Ranganathan <vranganathan@hortonworks.com>"

Closes #150 from peeyushb/FALCON-1893

(cherry picked from commit ff2f9e2)
Signed-off-by: pavankumar526 <pavan.kolamuri@gmail.com>
  • Loading branch information
peeyushb authored and pavankumar526 committed May 30, 2016
1 parent 33a37a6 commit da3e37b25d8d49fca2d54c4fe61425824101ef0e
Show file tree
Hide file tree
Showing 11 changed files with 420 additions and 4 deletions.
@@ -178,6 +178,7 @@ Submit and schedule the process:
<verbatim>
$bin/falcon entity -submitAndSchedule -type process -file examples/entity/filesystem/oozie-mr-process.xml
$bin/falcon entity -submitAndSchedule -type process -file examples/entity/filesystem/pig-process.xml
$bin/falcon entity -submitAndSchedule -type process -file examples/entity/spark/spark-process.xml
</verbatim>
Generate input data:
<verbatim>
@@ -189,7 +190,7 @@ $bin/falcon instance -status -type process -name oozie-mr-process -start 2013-11
</verbatim>

HCat based example entities are in examples/entity/hcat.

Spark based example entities are in examples/entity/spark.

---+++Stopping Falcon Server
<verbatim>
@@ -828,13 +828,13 @@ be in lib folder inside the workflow path.
The properties defined in the cluster and cluster properties(nameNode and jobTracker) will also
be available for the workflow.

There are 3 engines supported today.
There are 4 engines supported today.

---++++ Oozie

As part of oozie workflow engine support, users can embed a oozie workflow.
Refer to oozie [[http://oozie.apache.org/docs/4.0.1/DG_Overview.html][workflow overview]] and
[[http://oozie.apache.org/docs/4.0.1/WorkflowFunctionalSpec.html][workflow specification]] for details.
Refer to oozie [[http://oozie.apache.org/docs/4.2.0/DG_Overview.html][workflow overview]] and
[[http://oozie.apache.org/docs/4.2.0/WorkflowFunctionalSpec.html][workflow specification]] for details.

Syntax:
<verbatim>
@@ -897,6 +897,31 @@ This defines the workflow engine to be hive and the hive script is defined at
Feeds with Hive table storage will send one more parameter apart from the general ones:
<verbatim>$input_filter</verbatim>

---++++ Spark
Falcon also adds the Spark engine as part of Spark Integration which enables users to run the Java/Python Spark application as a process.
When "spark" workflow engine is mentioned spark related parameters must be provided through <spark-attributes>
Examples:
<verbatim>
<process name="spark-process">
...
<workflow engine="spark" path="/resources/action">
<spark-attributes>
<master>local</master>
<name>Spark WordCount</name>
<class>org.examples.WordCount</class>
<jar>/resources/action/lib/spark-application.jar</jar>
<spark-opts>--num-executors 1 --driver-memory 512m</spark-opts>
</spark-attributes>
...
</process>
</verbatim>

This defines the workflow engine to be spark and Java/Python Spark application must be defined with "jar" option that need to be executed.
There is flexibility to override the Spark master through process entity either to "yarn-client" or "yarn-cluster", if spark interface is already defined in cluster entity.
Input and Output data to the Spark application will be set as argument when Spark workflow will be generated, if input and output feed entity is defined in the process entity.
In the set of arguments, first argument will always correspond to input feed, second argument will always correspond to output feed and then user's provided argument will be set.


---+++ Retry
Retry policy defines how the workflow failures should be handled. Three retry policies are defined: periodic, exp-backoff(exponential backoff) and final. Depending on the delay and number of attempts, the workflow is re-tried after specific intervals. If user sets the onTimeout attribute to "true", retries will happen for TIMED_OUT instances.
Syntax:
@@ -0,0 +1,52 @@
#/**
# * Licensed to the Apache Software Foundation (ASF) under one
# * or more contributor license agreements. See the NOTICE file
# * distributed with this work for additional information
# * regarding copyright ownership. The ASF licenses this file
# * to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# * with the License. You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# */

from __future__ import print_function

import sys
from operator import add

from pyspark import SparkContext

if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: wordcount <file>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="Python WordCount")

# Read input and output path
inputPath = sys.argv[1]
print ('Path of input file ->' + inputPath)
outputPath = sys.argv[2]
print ('Path of output file ->' + outputPath)

distFile = sc.textFile(inputPath)

def flatMap(line):
return line.split(",")

def map(word):
return (word,1)

def reduce(a,b):
return a+b


counts = distFile.flatMap(flatMap).map(map).reduceByKey(reduce)

counts.saveAsTextFile(outputPath)
@@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<process name="pyspark-process" xmlns="uri:falcon:process:0.1">
<clusters>
<cluster name="local">
<validity start="2013-11-15T00:05Z" end="2013-11-15T01:05Z"/>
</cluster>
</clusters>

<parallel>1</parallel>
<order>LIFO</order>
<frequency>minutes(5)</frequency>
<timezone>UTC</timezone>

<inputs>
<!-- In the workflow, the input paths will be available in a variable 'inpaths' -->
<input name="inpaths" feed="in" start="now(0,-5)" end="now(0,-1)"/>
</inputs>

<outputs>
<!-- In the workflow, the output path will be available in a variable 'outpath' -->
<output name="outpath" feed="out" instance="now(0,0)"/>
</outputs>

<workflow engine="spark" path="/app/spark"/>
<spark-attributes>
<master>local</master>
<name>Python Spark Wordcount</name>
<jar>/app/spark/wordcount.py</jar>
<spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1</spark-opts>
</spark-attributes>

<retry policy="periodic" delay="minutes(3)" attempts="3"/>

</process>
@@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<process name="spark-pi" xmlns="uri:falcon:process:0.1">
<clusters>
<cluster name="local">
<validity start="2013-11-15T00:05Z" end="2013-11-15T01:05Z"/>
</cluster>
</clusters>

<parallel>1</parallel>
<order>LIFO</order>
<frequency>minutes(5)</frequency>
<timezone>UTC</timezone>

<workflow engine="spark" path="/app/spark/"/>
<spark-attributes>
<master>local</master>
<name>Spark PI</name>
<class>org.apache.falcon.example.spark.SparkPI</class>
<jar>/app/spark/lib/falcon-examples.jar</jar>
<spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1</spark-opts>
<arg>2</arg>
</spark-attributes>

<retry policy="periodic" delay="minutes(3)" attempts="3"/>

</process>
@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<process name="spark-process" xmlns="uri:falcon:process:0.1">
<clusters>
<cluster name="local">
<validity start="2013-11-15T00:05Z" end="2013-11-15T01:05Z"/>
</cluster>
</clusters>

<parallel>1</parallel>
<order>LIFO</order>
<frequency>minutes(5)</frequency>
<timezone>UTC</timezone>

<inputs>
<!-- In the workflow, the input paths will be available in a variable 'inpaths' -->
<input name="inpaths" feed="in" start="now(0,-5)" end="now(0,-1)"/>
</inputs>

<outputs>
<!-- In the workflow, the output path will be available in a variable 'outpath' -->
<output name="outpath" feed="out" instance="now(0,0)"/>
</outputs>

<workflow engine="spark" path="/app/spark"/>
<spark-attributes>
<master>local</master>
<name>Java Spark Wordcount</name>
<class>org.apache.falcon.example.spark.SparkWordCount</class>
<jar>/app/spark/lib/falcon-examples.jar</jar>
<spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1</spark-opts>
</spark-attributes>

<retry policy="periodic" delay="minutes(3)" attempts="3"/>

</process>
@@ -28,12 +28,41 @@
<artifactId>falcon-examples</artifactId>
<description>Apache Falcon Examples</description>
<name>Apache Falcon Examples</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.apache.mesos</groupId>
<artifactId>mesos</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
@@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.falcon.example.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;

import java.util.ArrayList;
import java.util.List;

/**
* Computes an approximation to pi.
* Usage: JavaSparkPi [slices]
*/
public final class SparkPI {

private SparkPI() {
}

public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
int n = 1 * slices;
System.out.println("n:"+n+"\tslices:"+slices);
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++) {
l.add(i);
}

JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);

int count = dataSet.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y < 1) ? 1 : 0;
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
});

System.out.println("Pi is roughly " + 4.0 * count / n);

jsc.stop();
}
}

0 comments on commit da3e37b

Please sign in to comment.