Skip to content

Commit

Permalink
Merge pull request #7464: [BEAM-6382] Cherry pick pr #7443 into 2.10.…
Browse files Browse the repository at this point in the history
…0 release branch
  • Loading branch information
xinyuiscool committed Jan 10, 2019
2 parents 19bb897 + 57af122 commit 629eaf9
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,25 @@
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.samza.config.ConfigFactory;
import org.apache.samza.config.factories.PropertiesConfigFactory;

/** Options which can be used to configure a Samza PipelineRunner. */
public interface SamzaPipelineOptions extends PipelineOptions {

@Description(
"The config for Samza using a properties file. It is *optional*. "
"The config file for Samza. It is *optional*. By default Samza supports properties config."
+ "Without a config file, Samza uses a default config for local execution.")
String getConfigFilePath();

void setConfigFilePath(String filePath);

@Description("The factory to read config file from config file path.")
@Default.Class(PropertiesConfigFactory.class)
Class<? extends ConfigFactory> getConfigFactory();

void setConfigFactory(Class<? extends ConfigFactory> configFactory);

@Description(
"The config override to set programmatically. It will be applied on "
+ "top of config file if it exits, otherwise used directly as the config.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@

import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;

import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -52,40 +51,34 @@ public State getState() {
}

@Override
public State cancel() throws IOException {
public State cancel() {
runner.kill(app);

//TODO: runner.waitForFinish() after SAMZA-1653 done
return getState();
return waitUntilFinish();
}

@Override
public State waitUntilFinish(Duration duration) {
//TODO: SAMZA-1653
throw new UnsupportedOperationException(
"waitUntilFinish(duration) is not supported by the SamzaRunner");
public State waitUntilFinish(@Nullable Duration duration) {
try {
if (duration == null) {
runner.waitForFinish();
} else {
runner.waitForFinish(java.time.Duration.ofMillis(duration.getMillis()));
}
} catch (Exception e) {
throw new Pipeline.PipelineExecutionException(e);
}

final StateInfo stateInfo = getStateInfo();
if (stateInfo.state == State.FAILED) {
throw stateInfo.error;
}

return stateInfo.state;
}

@Override
public State waitUntilFinish() {
if (runner instanceof LocalApplicationRunner) {
try {
((LocalApplicationRunner) runner).waitForFinish();
} catch (Exception e) {
throw new Pipeline.PipelineExecutionException(e);
}

final StateInfo stateInfo = getStateInfo();
if (stateInfo.state == State.FAILED) {
throw stateInfo.error;
}

return stateInfo.state;
} else {
// TODO: SAMZA-1653 support waitForFinish in remote runner too
throw new UnsupportedOperationException(
"waitUntilFinish is not supported by the SamzaRunner when running remotely");
}
return waitUntilFinish(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,36 @@
*/
package org.apache.beam.runners.samza.container;

import java.time.Duration;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.runtime.LocalContainerRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Runs the beam Yarn container, using the static global job model. */
public class BeamContainerRunner extends LocalContainerRunner {
private static final Logger LOG = LoggerFactory.getLogger(ContainerCfgFactory.class);

public BeamContainerRunner(Config config) {
super(ContainerCfgFactory.jobModel, System.getenv(ShellCommandConfig.ENV_CONTAINER_ID()));
}

@Override
public ApplicationStatus status(StreamApplication app) {
return ApplicationStatus.Running;
}

@Override
public void waitForFinish() {
LOG.info("Container has stopped");
}

@Override
public boolean waitForFinish(Duration timeout) {
LOG.info("Container has stopped");
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.beam.runners.samza.container;

import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigFactory;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.job.model.JobModel;
Expand Down Expand Up @@ -49,6 +52,8 @@ public Config getConfig(URI configUri) {
}
}

return jobModel.getConfig();
final Map<String, String> config = new HashMap<>(jobModel.getConfig());
config.put("app.runner.class", BeamContainerRunner.class.getName());
return new MapConfig(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigFactory;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
Expand Down Expand Up @@ -73,30 +74,32 @@ public Config build() {
config.put(
"beamPipelineOptions",
Base64Serializer.serializeUnchecked(new SerializablePipelineOptions(options)));
// TODO: remove after SAMZA-1531 is resolved
config.put(
ApplicationConfig.APP_RUN_ID,
String.valueOf(System.currentTimeMillis())
+ "-"
// use the most significant bits in UUID (8 digits) to avoid collision
+ UUID.randomUUID().toString().substring(0, 8));

return new MapConfig(config);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static Map<String, String> createUserConfig(SamzaPipelineOptions options) {
private static Map<String, String> createUserConfig(SamzaPipelineOptions options)
throws Exception {
final String configFilePath = options.getConfigFilePath();
final Map<String, String> config = new HashMap<>();

// If user provides a config file, use it as base configs.
if (StringUtils.isNoneEmpty(configFilePath)) {
final File configFile = new File(configFilePath);
checkArgument(configFile.exists(), "Config file %s does not exist", configFilePath);
final PropertiesConfigFactory configFactory = new PropertiesConfigFactory();
final URI configUri = configFile.toURI();
final ConfigFactory configFactory =
options.getConfigFactory().getDeclaredConstructor().newInstance();

// Config file must exist for default properties config
// TODO: add check to all non-empty files once we don't need to
// pass the command-line args through the containers
if (configFactory instanceof PropertiesConfigFactory) {
checkArgument(configFile.exists(), "Config file %s does not exist", configFilePath);
}

config.putAll(configFactory.getConfig(configUri));
}

Expand Down Expand Up @@ -127,6 +130,13 @@ public static Map<String, String> localRunConfig() {
.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName())
.put(TaskConfig.COMMIT_MS(), "-1")
.put("processor.id", "1")
.put(
// TODO: remove after SAMZA-1531 is resolved
ApplicationConfig.APP_RUN_ID,
String.valueOf(System.currentTimeMillis())
+ "-"
// use the most significant bits in UUID (8 digits) to avoid collision
+ UUID.randomUUID().toString().substring(0, 8))
.build();
}

Expand Down

0 comments on commit 629eaf9

Please sign in to comment.