Skip to content

Commit

Permalink
BEAM-830 Support launch on YARN cluster.
Browse files Browse the repository at this point in the history
  • Loading branch information
tweise committed Dec 6, 2016
1 parent 493c04f commit 96ffef1
Show file tree
Hide file tree
Showing 5 changed files with 571 additions and 35 deletions.
41 changes: 41 additions & 0 deletions runners/apex/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,51 @@
</ignoredUsedUndeclaredDependencies>
</configuration>
</execution>
<execution>
<id>dependency-tree</id>
<phase>generate-test-resources</phase>
<goals>
<goal>tree</goal>
</goals>
<configuration>
<outputFile>${project.build.directory}/classes/dependency-tree</outputFile>
</configuration>
</execution>
</executions>
</plugin>

</plugins>

<pluginManagement>
<plugins>
<!-- Eclipse has a problem with dependency:tree when it is not in package phase -->
<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<versionRange>[2.10,)</versionRange>
<goals>
<goal>tree</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@
*/
package org.apache.beam.runners.apex;

import static com.google.common.base.Preconditions.checkArgument;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.google.common.base.Throwables;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.apex.api.EmbeddedAppLauncher;
import org.apache.apex.api.Launcher;
import org.apache.apex.api.Launcher.AppHandle;
import org.apache.beam.runners.apex.translation.ApexPipelineTranslator;
import org.apache.beam.runners.core.AssignWindows;
import org.apache.beam.sdk.Pipeline;
Expand Down Expand Up @@ -122,33 +124,46 @@ public <OutputT extends POutput, InputT extends PInput> OutputT apply(
public ApexRunnerResult run(final Pipeline pipeline) {

final ApexPipelineTranslator translator = new ApexPipelineTranslator(options);
final AtomicReference<DAG> apexDAG = new AtomicReference<>();

StreamingApplication apexApp = new StreamingApplication() {
@Override
public void populateDAG(DAG dag, Configuration conf) {
apexDAG.set(dag);
dag.setAttribute(DAGContext.APPLICATION_NAME, options.getApplicationName());
translator.translate(pipeline, dag);
}
};

checkArgument(options.isEmbeddedExecution(),
"only embedded execution is supported at this time");
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
try {
lma.prepareDAG(apexApp, conf);
LocalMode.Controller lc = lma.getController();
Launcher.LaunchMode mode = (options.isEmbeddedExecution()) ? Launcher.LaunchMode.EMBEDDED
: Launcher.LaunchMode.YARN;
Launcher<AppHandle> launcher = Launcher.getLauncher(mode);
Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
if (options.isEmbeddedExecution()) {
launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true);
if (options.isEmbeddedExecutionDebugMode()) {
// turns off timeout checking for operator progress
lc.setHeartbeatMonitoringEnabled(false);
launchAttributes.put(EmbeddedAppLauncher.HEARTBEAT_MONITORING, false);
}
Configuration conf = new Configuration(false);
try {
ApexRunner.ASSERTION_ERROR.set(null);
AppHandle apexAppResult = launcher.launchApp(apexApp, conf, launchAttributes);
return new ApexRunnerResult(apexDAG.get(), apexAppResult);
} catch (Exception e) {
Throwables.propagateIfPossible(e);
throw new RuntimeException(e);
}
} else {
try {
ApexYarnLauncher yarnLauncher = new ApexYarnLauncher();
AppHandle apexAppResult = yarnLauncher.launchApp(apexApp);
return new ApexRunnerResult(apexDAG.get(), apexAppResult);
} catch (IOException e) {
throw new RuntimeException("Failed to launch the application on YARN.", e);
}
ApexRunner.ASSERTION_ERROR.set(null);
lc.runAsync();
return new ApexRunnerResult(lma.getDAG(), lc);
} catch (Exception e) {
Throwables.propagateIfPossible(e);
throw new RuntimeException(e);
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.beam.runners.apex;

import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;

import java.io.IOException;
import java.lang.reflect.Field;

import org.apache.apex.api.Launcher.AppHandle;
import org.apache.apex.api.Launcher.ShutdownMode;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
Expand All @@ -36,12 +36,12 @@
*/
public class ApexRunnerResult implements PipelineResult {
private final DAG apexDAG;
private final LocalMode.Controller ctrl;
private final AppHandle apexApp;
private State state = State.UNKNOWN;

public ApexRunnerResult(DAG dag, LocalMode.Controller ctrl) {
public ApexRunnerResult(DAG dag, AppHandle apexApp) {
this.apexDAG = dag;
this.ctrl = ctrl;
this.apexApp = apexApp;
}

@Override
Expand All @@ -57,19 +57,19 @@ public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)

@Override
public State cancel() throws IOException {
ctrl.shutdown();
apexApp.shutdown(ShutdownMode.KILL);
state = State.CANCELLED;
return state;
}

@Override
public State waitUntilFinish(Duration duration) {
return ApexRunnerResult.waitUntilFinished(ctrl, duration);
return ApexRunnerResult.waitUntilFinished(apexApp, duration);
}

@Override
public State waitUntilFinish() {
return ApexRunnerResult.waitUntilFinished(ctrl, null);
return ApexRunnerResult.waitUntilFinished(apexApp, null);
}

@Override
Expand All @@ -85,24 +85,18 @@ public DAG getApexDAG() {
return apexDAG;
}

public static State waitUntilFinished(LocalMode.Controller ctrl, Duration duration) {
// we need to rely on internal field for now
// Apex should make it available through API in upcoming release.
public static State waitUntilFinished(AppHandle apexApp, Duration duration) {
long timeout = (duration == null || duration.getMillis() < 1) ? Long.MAX_VALUE
: System.currentTimeMillis() + duration.getMillis();
Field appDoneField;
try {
appDoneField = ctrl.getClass().getDeclaredField("appDone");
appDoneField.setAccessible(true);
while (!appDoneField.getBoolean(ctrl) && System.currentTimeMillis() < timeout) {
while (!apexApp.isFinished() && System.currentTimeMillis() < timeout) {
if (ApexRunner.ASSERTION_ERROR.get() != null) {
throw ApexRunner.ASSERTION_ERROR.get();
}
Thread.sleep(500);
}
return appDoneField.getBoolean(ctrl) ? State.DONE : null;
} catch (NoSuchFieldException | SecurityException | IllegalArgumentException
| IllegalAccessException | InterruptedException e) {
return apexApp.isFinished() ? State.DONE : null;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Expand Down

0 comments on commit 96ffef1

Please sign in to comment.