Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-1938 Add support to execute Spark SQL process
Author: peeyush b <pbishnoi@hortonworks.com>

Reviewers: "Venkat Ranganathan  <venkat@hortonworks.com>"

Closes #188 from peeyushb/FALCON-1938

(cherry picked from commit c12c999)
Signed-off-by: bvellanki <bvellanki@hortonworks.com>
  • Loading branch information
peeyushb authored and bvellanki committed Jul 1, 2016
1 parent fbe84bc commit 7871dce21273aa55eba49afee21fd14892113411
Showing 7 changed files with 274 additions and 0 deletions.
@@ -927,6 +927,29 @@ Input and Output data to the Spark application will be set as argument when Spar
In the set of arguments, first argument will always correspond to input feed, second argument will always correspond to output feed and then user's provided argument will be set.


For running the Spark SQL process entity, that read and write the data stored on Hive, the datanucleus jars under the $HIVE_HOME/lib directory and hive-site.xml
under $SPARK_HOME/conf/ directory need to be available on the driver and all executors launched by the YARN cluster.
The convenient way to do this is adding them through the --jars option and --file option of the spark-opts attribute.
Example:
<verbatim>
<process name="spark-process">
...
<workflow engine="spark" path="/resources/action">
<spark-attributes>
<master>local</master>
<name>Spark SQL</name>
<class>org.examples.SparkSQLProcessTable</class>
<jar>/resources/action/lib/spark-application.jar</jar>
<spark-opts>--num-executors 1 --driver-memory 512m --jars /usr/local/hive/lib/datanucleus-rdbms.jar,/usr/local/hive/lib/datanucleus-core.jar,/usr/local/hive/lib/datanucleus-api-jdo.jar --files /usr/local/spark/conf/hive-site.xml</spark-opts>
</spark-attributes>
...
</process>
</verbatim>

Input and Output to the Spark SQL application will be set as argument when Spark workflow will be generated, if input and output feed entity is defined in the process entity.
If input feed is of table type, then input table partition, table name and database name will be set as input arguments. If output feed is of table type, then output table partition, table name and database name will be set as output arguments.
Once input and output arguments is set, then user's provided argument will be set.

---+++ Retry
Retry policy defines how the workflow failures should be handled. Three retry policies are defined: periodic, exp-backoff(exponential backoff) and final. Depending on the delay and number of attempts, the workflow is re-tried after specific intervals. If user sets the onTimeout attribute to "true", retries will happen for TIMED_OUT instances.
Syntax:
@@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<process name="spark-sql-process" xmlns="uri:falcon:process:0.1">
<!-- where -->
<clusters>
<cluster name="hcat-local">
<validity start="2013-11-15T00:05Z" end="2013-11-15T01:05Z"/>
</cluster>
</clusters>

<!-- when -->
<parallel>1</parallel>
<order>LIFO</order>
<frequency>minutes(5)</frequency>
<timezone>UTC</timezone>

<!-- what -->
<inputs>
<!-- In the workflow, the input paths will be available in a variable 'inpaths' -->
<input name="inparts" feed="hcat-in" start="now(0,-5)" end="now(0,-1)"/>
</inputs>

<outputs>
<!-- In the workflow, the output path will be available in a variable 'outpath' -->
<output name="outpart" feed="hcat-out" instance="now(0,0)"/>
</outputs>

<workflow engine="spark" path="/app/spark"/>
<spark-attributes>
<master>local</master>
<name>Spark SQL</name>
<class>org.apache.falcon.example.spark.SparkSQLProcessTable</class>
<jar>/app/spark/lib/falcon-examples.jar</jar>
<spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1</spark-opts>
</spark-attributes>

<retry policy="periodic" delay="minutes(3)" attempts="3"/>

</process>
@@ -63,6 +63,16 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.falcon.example.spark;

import org.apache.spark.SparkContext;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;

/**
* Spark SQL Example.
*/

public final class SparkSQLProcessTable {

private SparkSQLProcessTable() {
}
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("Arguments must contain details for input or output table");
System.exit(0);
}

SparkConf conf = new SparkConf().setAppName("SparkSQL example");
SparkContext sc = new SparkContext(conf);
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);

String sqlQuery = "FROM " +args[2]+"."+args[1]+ " INSERT OVERWRITE TABLE " +args[5]+"."+args[4]
+" PARTITION("+args[3]+") SELECT word, SUM(cnt) AS cnt WHERE "+args[0]+" GROUP BY word";

DataFrame df = sqlContext.sql(sqlQuery);
df.show();
}
}

@@ -46,6 +46,7 @@
*/
public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
private static final String ACTION_TEMPLATE = "/action/process/spark-action.xml";
private static final String FALCON_PREFIX = "falcon_";

public SparkProcessWorkflowBuilder(Process entity) {
super(entity);
@@ -155,6 +156,10 @@ private void addInputFeedsAsArgument(List<String> argList, Cluster cluster) thro
final String inputName = input.getName();
if (storage.getType() == Storage.TYPE.FILESYSTEM) {
argList.add(0, "${" + inputName + "}");
} else if (storage.getType() == Storage.TYPE.TABLE) {
argList.add(0, "${" + FALCON_PREFIX+inputName+"_database" + "}");
argList.add(0, "${" + FALCON_PREFIX+inputName+"_table" + "}");
argList.add(0, "${" + FALCON_PREFIX+inputName+"_partition_filter_hive" + "}");
}
numInputFeed--;
}
@@ -174,6 +179,10 @@ private void addOutputFeedsAsArgument(List<String> argList, Cluster cluster) thr
final String outputName = output.getName();
if (storage.getType() == Storage.TYPE.FILESYSTEM) {
argList.add(0, "${" + outputName + "}");
} else if (storage.getType() == Storage.TYPE.TABLE) {
argList.add(0, "${" + FALCON_PREFIX+outputName+"_database" + "}");
argList.add(0, "${" + FALCON_PREFIX+outputName+"_table" + "}");
argList.add(0, "${" + FALCON_PREFIX+outputName+"_partitions_hive" + "}");
}
numOutputFeed--;
}
@@ -325,6 +325,79 @@ public void testHiveProcessMapper(String secureOption) throws Exception {
ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
}

@Test
public void testSparkSQLProcess() throws Exception {
URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.FEED, inFeed);

resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.FEED, outFeed);

resource = this.getClass().getResource("/config/process/spark-sql-process.xml");
Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.PROCESS, process);

prepare(process);
OozieEntityBuilder builder = OozieEntityBuilder.get(process);
Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
builder.build(cluster, bundlePath);
assertTrue(fs.exists(bundlePath));

BUNDLEAPP bundle = getBundle(fs, bundlePath);
assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
assertEquals(1, bundle.getCoordinator().size());
assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
bundle.getCoordinator().get(0).getName());
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");

COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
HashMap<String, String> props = getCoordProperties(coord);
HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);

verifyEntityProperties(process, cluster,
WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
verifyBrokerProperties(cluster, wfProps);

// verify table and hive props
Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
expected.putAll(ClusterHelper.getHiveProperties(cluster));
for (Map.Entry<String, String> entry : props.entrySet()) {
if (expected.containsKey(entry.getKey())) {
Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
}
}

String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
testParentWorkflow(process, parentWorkflow);

ACTION sparkNode = getAction(parentWorkflow, "user-action");

JAXBElement<org.apache.falcon.oozie.spark.ACTION> actionJaxbElement =
OozieUtils.unMarshalSparkAction(sparkNode);
org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();

assertEquals(sparkAction.getMaster(), "local");
assertEquals(sparkAction.getJar(), "jail://testCluster:00/resources/action/lib/falcon-examples.jar");

Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
List<String> argsList = sparkAction.getArg();

Input input = process.getInputs().getInputs().get(0);
Output output = process.getOutputs().getOutputs().get(0);

assertEquals(argsList.get(0), "${falcon_"+input.getName()+"_partition_filter_hive}");
assertEquals(argsList.get(1), "${falcon_"+input.getName()+"_table}");
assertEquals(argsList.get(2), "${falcon_"+input.getName()+"_database}");
assertEquals(argsList.get(3), "${falcon_"+output.getName()+"_partitions_hive}");
assertEquals(argsList.get(4), "${falcon_"+output.getName()+"_table}");
assertEquals(argsList.get(5), "${falcon_"+output.getName()+"_database}");

ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
}

@Test
public void testSparkProcess() throws Exception {

@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<process name="spark-sql-process" xmlns="uri:falcon:process:0.1">
<!-- where -->
<clusters>
<cluster name="corp">
<validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
</cluster>
</clusters>

<!-- when -->
<parallel>1</parallel>
<order>LIFO</order>
<frequency>hours(1)</frequency>
<timezone>UTC</timezone>

<!-- what -->
<inputs>
<input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
</inputs>

<outputs>
<output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
</outputs>

<workflow engine="spark" path="/resources/action"/>
<spark-attributes>
<master>local</master>
<name>Spark SQL</name>
<class>org.apache.falcon.example.spark.SparkSQLProcessTable</class>
<jar>/resources/action/lib/falcon-examples.jar</jar>
<spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1</spark-opts>
</spark-attributes>

<retry policy="periodic" delay="minutes(3)" attempts="3"/>

</process>

0 comments on commit 7871dce

Please sign in to comment.