Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-1853 : Add spark process workflow builder
Following is the pull request that will perform the integration to execute the Spark application through Falcon. Please review.

XSD changes for integration are already committed through pull request #89 .

Author: peeyush b <pbishnoi@hortonworks.com>

Reviewers: "Pavan Kumar Kolamuri <pavan.kolamuri@gmail.com> , Venkat Ranganathan <vranganathan@hortonworks.com>"

Closes #118 from peeyushb/FALCON-1853
  • Loading branch information
peeyushb authored and pavankumar526 committed May 19, 2016
1 parent 6d73fad commit 6e034074f65a6498af532fd0af143c21c410374a
Showing 18 changed files with 563 additions and 1 deletion.
@@ -207,6 +207,32 @@
<artifactId>validation-api</artifactId>
<version>${javax-validation.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<exclusions>
<exclusion>
<artifactId>jackson-databind</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-module-scala_2.10</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<version>0.9.3</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
@@ -113,6 +113,11 @@ public static String getMessageBrokerUrl(Cluster cluster) {
return messageInterface == null ? NO_USER_BROKER_URL : messageInterface.getEndpoint();
}

public static String getSparkMasterEndPoint(Cluster cluster) {
final Interface sparkInterface = getInterface(cluster, Interfacetype.SPARK);
return sparkInterface == null ? null : sparkInterface.getEndpoint();
}

public static String getMessageBrokerImplClass(Cluster cluster) {
if (cluster.getProperties() != null) {
for (Property prop : cluster.getProperties().getProperties()) {
@@ -45,6 +45,8 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -94,6 +96,7 @@ public void validate(Cluster cluster) throws ValidationException {
validateRegistryInterface(cluster);
validateLocations(cluster);
validateProperties(cluster);
validateSparkMasterInterface(cluster);
}

private void validateScheme(Cluster cluster, Interfacetype interfacetype)
@@ -232,6 +235,19 @@ protected void validateRegistryInterface(Cluster cluster) throws ValidationExcep
}
}

protected void validateSparkMasterInterface(Cluster cluster) throws ValidationException {
final String sparkMasterUrl = ClusterHelper.getSparkMasterEndPoint(cluster);
if (StringUtils.isNotEmpty(sparkMasterUrl)) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster(sparkMasterUrl).setAppName("Falcon Spark");

JavaSparkContext sc = new JavaSparkContext(sparkConf);
if (sc.startTime() == null) {
throw new ValidationException("Unable to reach Spark master URL:" + sparkMasterUrl);
}
}
}

/**
* Validate ACL if authorization is enabled.
*
@@ -32,12 +32,15 @@
import org.apache.falcon.entity.v0.process.Property;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.ACL;
import org.apache.falcon.entity.v0.process.EngineType;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Inputs;
import org.apache.falcon.entity.v0.process.LateInput;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Outputs;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.SparkAttributes;
import org.apache.falcon.entity.v0.process.Workflow;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.util.DateUtil;
@@ -128,6 +131,7 @@ public void validate(Process process) throws FalconException {
validateLateInputs(process);
validateProcessSLA(process);
validateHadoopQueue(process);
validateProcessEntity(process);
}


@@ -374,4 +378,32 @@ private void validateHadoopQueue(Process process) throws FalconException {
}
}

protected void validateProcessEntity(Process process) throws FalconException {
validateSparkProcessEntity(process, process.getSparkAttributes());
}

private void validateSparkProcessEntity(Process process, SparkAttributes sparkAttributes) throws
FalconException {
Workflow workflow = process.getWorkflow();
if (workflow.getEngine() == EngineType.SPARK) {
if (sparkAttributes == null) {
throw new ValidationException(
"For Spark Workflow engine Spark Attributes in Process Entity can't be null");
} else {
String clusterName = process.getClusters().getClusters().get(0).getName();
org.apache.falcon.entity.v0.cluster.Cluster cluster =
ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
String clusterEntitySparkMaster = ClusterHelper.getSparkMasterEndPoint(cluster);
String processEntitySparkMaster = sparkAttributes.getMaster();
String sparkMaster = (processEntitySparkMaster == null)
? clusterEntitySparkMaster
: processEntitySparkMaster;
if (StringUtils.isEmpty(sparkMaster)
|| StringUtils.isEmpty(sparkAttributes.getJar())) {
throw new ValidationException("Spark master and jar/python file can't be null");
}
}
}
}

}
@@ -59,6 +59,7 @@ public class AbstractTestBase {
protected static final String FEED4_XML = "/config/feed/feed-0.4.xml";
protected static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml";
protected static final String DATASOURCE_XML = "/config/datasource/datasource-0.1.xml";
protected static final String SPARK_PROCESS_XML = "/config/process/spark-process-0.1.xml";
protected EmbeddedCluster dfsCluster;
protected Configuration conf = new Configuration();
private ConfigurationStore store;
@@ -171,6 +171,7 @@ public void testValidateClusterProperties() throws Exception {
Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster);
Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster);
Mockito.doNothing().when(clusterEntityParser).validateLocations(cluster);
Mockito.doNothing().when(clusterEntityParser).validateSparkMasterInterface(cluster);

// Good set of properties, should work
clusterEntityParser.validateProperties(cluster);
@@ -21,7 +21,9 @@
import org.apache.falcon.FalconException;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.AbstractTestBase;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
@@ -641,4 +643,21 @@ public void testProcessEndTime() throws FalconException {
process.getClusters().getClusters().get(0).getValidity().setEnd(null);
parser.validate(process);
}

@Test
public void testSparkProcessEntity() throws FalconException {
Process process = parser.parseAndValidate((ProcessEntityParserTest.class)
.getResourceAsStream(SPARK_PROCESS_XML));
Assert.assertEquals(process.getWorkflow().getEngine().value(), "spark");
Assert.assertNotNull(process.getWorkflow().getPath());
Cluster processCluster = process.getClusters().getClusters().get(0);
org.apache.falcon.entity.v0.cluster.Cluster cluster =
ConfigurationStore.get().get(EntityType.CLUSTER, processCluster.getName());
String clusterEntitySparkMaster = ClusterHelper.getSparkMasterEndPoint(cluster);
String processEntitySparkMaster = process.getSparkAttributes().getMaster();
String sparkMaster = (processEntitySparkMaster == null) ? clusterEntitySparkMaster : processEntitySparkMaster;

Assert.assertEquals(sparkMaster, "local");
parser.validate(process);
}
}
@@ -31,6 +31,7 @@
version="5.1.6"/>
<interface type="registry" endpoint="http://localhost:48080/templeton/v1"
version="0.11.0"/>
<interface type="spark" endpoint="http://localhost:7070" version="1.6.1"/>
</interfaces>
<locations>
<location name="staging" path="/projects/falcon/staging"/>
@@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<process name="spark-process" xmlns="uri:falcon:process:0.1">
<!-- where -->
<clusters>
<cluster name="testCluster">
<validity start="2011-11-02T00:00Z" end="2091-12-30T00:00Z"/>
</cluster>
</clusters>

<!-- when -->
<parallel>1</parallel>
<order>LIFO</order>
<frequency>hours(1)</frequency>

<!-- what -->
<inputs>
<!-- In the workflow, the input paths will be available in a variable 'inpaths' -->
<input name="clicks" feed="clicksFeed" start="yesterday(0,0)" end="yesterday(20,0)"/>
</inputs>

<outputs>
<!-- In the workflow, the output path will be available in a variable 'outpath' -->
<output name="clicksOutput" feed="imp-click-join2" instance="today(0,0)"/>
</outputs>

<!-- how -->
<properties>
<property name="oozie.launcher.mapreduce.map.memory.mb" value="2072"/>
<property name="oozie.launcher.mapreduce.map.java.opts" value="-Xmx2500m"/>
</properties>

<workflow engine="spark" path="/falcon/test"/>
<spark-attributes>
<master>local</master>
<name>Spark WordCount Application</name>
<class>org.apache.falcon.example.spark.SparkWordCount</class>
<jar>/falcon/test/workflow/lib/falcon-examples.jar</jar>
<spark-opts>--executor-core 1 --executor-memory 512m --driver-memory 512m</spark-opts>
</spark-attributes>

<retry policy="periodic" delay="hours(10)" attempts="3"/>

</process>
@@ -109,6 +109,11 @@
<version>${joda.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

</dependencies>

<build>
@@ -217,6 +222,25 @@
</schemas>
</configuration>
</execution>
<execution>
<id>spark-gen</id>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<forceRegenerate>true</forceRegenerate>
<generatePackage>org.apache.falcon.oozie.spark</generatePackage>
<schemas>
<schema>
<dependencyResource>
<groupId>org.apache.oozie</groupId>
<artifactId>oozie-client</artifactId>
<resource>spark-action-0.1.xsd</resource>
</dependencyResource>
</schema>
</schemas>
</configuration>
</execution>
<execution>
<id>bundle-gen</id>
<goals>
@@ -40,6 +40,7 @@
import org.apache.falcon.oozie.process.NativeOozieProcessWorkflowBuilder;
import org.apache.falcon.oozie.process.OozieProcessWorkflowBuilder;
import org.apache.falcon.oozie.process.PigProcessWorkflowBuilder;
import org.apache.falcon.oozie.process.SparkProcessWorkflowBuilder;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.CONFIGURATION;
import org.apache.falcon.oozie.workflow.CREDENTIAL;
@@ -190,6 +191,9 @@ public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster clust
case HIVE:
return new HiveProcessWorkflowBuilder(process);

case SPARK:
return new SparkProcessWorkflowBuilder(process);

default:
break;
}
@@ -213,7 +213,9 @@ protected void propagateEntityProperties(CONFIGURATION conf, List<String> paramL
configProperty.setValue((String) entry.getValue());
configuration.add(configProperty);

paramList.add(entry.getKey() + "=" + entry.getValue());
if (paramList != null) {
paramList.add(entry.getKey() + "=" + entry.getValue());
}
}
}

0 comments on commit 6e03407

Please sign in to comment.