Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
FALCON-1938 Add support to execute Spark SQL process
Browse files Browse the repository at this point in the history
Author: peeyush b <pbishnoi@hortonworks.com>

Reviewers: "Venkat Ranganathan  <venkat@hortonworks.com>"

Closes #188 from peeyushb/FALCON-1938
  • Loading branch information
peeyushb authored and bvellanki committed Jul 1, 2016
1 parent bb23ccf commit c12c999
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 0 deletions.
23 changes: 23 additions & 0 deletions docs/src/site/twiki/EntitySpecification.twiki
Expand Up @@ -927,6 +927,29 @@ Input and Output data to the Spark application will be set as argument when Spar
In the set of arguments, first argument will always correspond to input feed, second argument will always correspond to output feed and then user's provided argument will be set.


For running the Spark SQL process entity, that read and write the data stored on Hive, the datanucleus jars under the $HIVE_HOME/lib directory and hive-site.xml
under $SPARK_HOME/conf/ directory need to be available on the driver and all executors launched by the YARN cluster.
The convenient way to do this is adding them through the --jars option and --file option of the spark-opts attribute.
Example:
<verbatim>
<process name="spark-process">
...
<workflow engine="spark" path="/resources/action">
<spark-attributes>
<master>local</master>
<name>Spark SQL</name>
<class>org.examples.SparkSQLProcessTable</class>
<jar>/resources/action/lib/spark-application.jar</jar>
<spark-opts>--num-executors 1 --driver-memory 512m --jars /usr/local/hive/lib/datanucleus-rdbms.jar,/usr/local/hive/lib/datanucleus-core.jar,/usr/local/hive/lib/datanucleus-api-jdo.jar --files /usr/local/spark/conf/hive-site.xml</spark-opts>
</spark-attributes>
...
</process>
</verbatim>

Input and Output to the Spark SQL application will be set as argument when Spark workflow will be generated, if input and output feed entity is defined in the process entity.
If input feed is of table type, then input table partition, table name and database name will be set as input arguments. If output feed is of table type, then output table partition, table name and database name will be set as output arguments.
Once input and output arguments is set, then user's provided argument will be set.

---+++ Retry
Retry policy defines how the workflow failures should be handled. Three retry policies are defined: periodic, exp-backoff(exponential backoff) and final. Depending on the delay and number of attempts, the workflow is re-tried after specific intervals. If user sets the onTimeout attribute to "true", retries will happen for TIMED_OUT instances.
Syntax:
Expand Down
55 changes: 55 additions & 0 deletions examples/entity/spark/spark-sql-process.xml
@@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<process name="spark-sql-process" xmlns="uri:falcon:process:0.1">
<!-- where -->
<clusters>
<cluster name="hcat-local">
<validity start="2013-11-15T00:05Z" end="2013-11-15T01:05Z"/>
</cluster>
</clusters>

<!-- when -->
<parallel>1</parallel>
<order>LIFO</order>
<frequency>minutes(5)</frequency>
<timezone>UTC</timezone>

<!-- what -->
<inputs>
<!-- In the workflow, the input paths will be available in a variable 'inpaths' -->
<input name="inparts" feed="hcat-in" start="now(0,-5)" end="now(0,-1)"/>
</inputs>

<outputs>
<!-- In the workflow, the output path will be available in a variable 'outpath' -->
<output name="outpart" feed="hcat-out" instance="now(0,0)"/>
</outputs>

<workflow engine="spark" path="/app/spark"/>
<spark-attributes>
<master>local</master>
<name>Spark SQL</name>
<class>org.apache.falcon.example.spark.SparkSQLProcessTable</class>
<jar>/app/spark/lib/falcon-examples.jar</jar>
<spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1</spark-opts>
</spark-attributes>

<retry policy="periodic" delay="minutes(3)" attempts="3"/>

</process>
10 changes: 10 additions & 0 deletions examples/pom.xml
Expand Up @@ -63,6 +63,16 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.falcon.example.spark;

import org.apache.spark.SparkContext;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;

/**
* Spark SQL Example.
*/

public final class SparkSQLProcessTable {

private SparkSQLProcessTable() {
}
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("Arguments must contain details for input or output table");
System.exit(0);
}

SparkConf conf = new SparkConf().setAppName("SparkSQL example");
SparkContext sc = new SparkContext(conf);
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);

String sqlQuery = "FROM " +args[2]+"."+args[1]+ " INSERT OVERWRITE TABLE " +args[5]+"."+args[4]
+" PARTITION("+args[3]+") SELECT word, SUM(cnt) AS cnt WHERE "+args[0]+" GROUP BY word";

DataFrame df = sqlContext.sql(sqlQuery);
df.show();
}
}

Expand Up @@ -46,6 +46,7 @@
*/
public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
private static final String ACTION_TEMPLATE = "/action/process/spark-action.xml";
private static final String FALCON_PREFIX = "falcon_";

public SparkProcessWorkflowBuilder(Process entity) {
super(entity);
Expand Down Expand Up @@ -155,6 +156,10 @@ private void addInputFeedsAsArgument(List<String> argList, Cluster cluster) thro
final String inputName = input.getName();
if (storage.getType() == Storage.TYPE.FILESYSTEM) {
argList.add(0, "${" + inputName + "}");
} else if (storage.getType() == Storage.TYPE.TABLE) {
argList.add(0, "${" + FALCON_PREFIX+inputName+"_database" + "}");
argList.add(0, "${" + FALCON_PREFIX+inputName+"_table" + "}");
argList.add(0, "${" + FALCON_PREFIX+inputName+"_partition_filter_hive" + "}");
}
numInputFeed--;
}
Expand All @@ -174,6 +179,10 @@ private void addOutputFeedsAsArgument(List<String> argList, Cluster cluster) thr
final String outputName = output.getName();
if (storage.getType() == Storage.TYPE.FILESYSTEM) {
argList.add(0, "${" + outputName + "}");
} else if (storage.getType() == Storage.TYPE.TABLE) {
argList.add(0, "${" + FALCON_PREFIX+outputName+"_database" + "}");
argList.add(0, "${" + FALCON_PREFIX+outputName+"_table" + "}");
argList.add(0, "${" + FALCON_PREFIX+outputName+"_partitions_hive" + "}");
}
numOutputFeed--;
}
Expand Down
Expand Up @@ -325,6 +325,79 @@ public void testHiveProcessMapper(String secureOption) throws Exception {
ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
}

@Test
public void testSparkSQLProcess() throws Exception {
URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.FEED, inFeed);

resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.FEED, outFeed);

resource = this.getClass().getResource("/config/process/spark-sql-process.xml");
Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.PROCESS, process);

prepare(process);
OozieEntityBuilder builder = OozieEntityBuilder.get(process);
Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
builder.build(cluster, bundlePath);
assertTrue(fs.exists(bundlePath));

BUNDLEAPP bundle = getBundle(fs, bundlePath);
assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
assertEquals(1, bundle.getCoordinator().size());
assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
bundle.getCoordinator().get(0).getName());
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");

COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
HashMap<String, String> props = getCoordProperties(coord);
HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);

verifyEntityProperties(process, cluster,
WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
verifyBrokerProperties(cluster, wfProps);

// verify table and hive props
Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
expected.putAll(ClusterHelper.getHiveProperties(cluster));
for (Map.Entry<String, String> entry : props.entrySet()) {
if (expected.containsKey(entry.getKey())) {
Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
}
}

String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
testParentWorkflow(process, parentWorkflow);

ACTION sparkNode = getAction(parentWorkflow, "user-action");

JAXBElement<org.apache.falcon.oozie.spark.ACTION> actionJaxbElement =
OozieUtils.unMarshalSparkAction(sparkNode);
org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();

assertEquals(sparkAction.getMaster(), "local");
assertEquals(sparkAction.getJar(), "jail://testCluster:00/resources/action/lib/falcon-examples.jar");

Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
List<String> argsList = sparkAction.getArg();

Input input = process.getInputs().getInputs().get(0);
Output output = process.getOutputs().getOutputs().get(0);

assertEquals(argsList.get(0), "${falcon_"+input.getName()+"_partition_filter_hive}");
assertEquals(argsList.get(1), "${falcon_"+input.getName()+"_table}");
assertEquals(argsList.get(2), "${falcon_"+input.getName()+"_database}");
assertEquals(argsList.get(3), "${falcon_"+output.getName()+"_partitions_hive}");
assertEquals(argsList.get(4), "${falcon_"+output.getName()+"_table}");
assertEquals(argsList.get(5), "${falcon_"+output.getName()+"_database}");

ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
}

@Test
public void testSparkProcess() throws Exception {

Expand Down
53 changes: 53 additions & 0 deletions oozie/src/test/resources/config/process/spark-sql-process.xml
@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<process name="spark-sql-process" xmlns="uri:falcon:process:0.1">
<!-- where -->
<clusters>
<cluster name="corp">
<validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
</cluster>
</clusters>

<!-- when -->
<parallel>1</parallel>
<order>LIFO</order>
<frequency>hours(1)</frequency>
<timezone>UTC</timezone>

<!-- what -->
<inputs>
<input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
</inputs>

<outputs>
<output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
</outputs>

<workflow engine="spark" path="/resources/action"/>
<spark-attributes>
<master>local</master>
<name>Spark SQL</name>
<class>org.apache.falcon.example.spark.SparkSQLProcessTable</class>
<jar>/resources/action/lib/falcon-examples.jar</jar>
<spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1</spark-opts>
</spark-attributes>

<retry policy="periodic" delay="minutes(3)" attempts="3"/>

</process>

0 comments on commit c12c999

Please sign in to comment.