Skip to content
Permalink
Browse files
Safely call methods for PipelineResult
  • Loading branch information
shlxue committed May 13, 2022
1 parent e555e2e commit cc7fa65c6cb6a8c86d95f2479502109a429b3e21
Showing 1 changed file with 26 additions and 6 deletions.
@@ -17,6 +17,7 @@

package org.apache.hop.beam.engines;

import org.apache.beam.runners.core.metrics.DefaultMetricResults;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.flink.FlinkRunner;
@@ -27,6 +28,7 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.util.ThrowingSupplier;
import org.apache.hop.beam.metadata.RunnerType;
import org.apache.hop.beam.pipeline.HopPipelineMetaToBeamPipelineConverter;
import org.apache.hop.core.IRowSet;
@@ -52,6 +54,9 @@
public abstract class BeamPipelineEngine extends Variables
implements IPipelineEngine<PipelineMeta> {

static MetricResults EMPTY_METRIC_RESULTS =
new DefaultMetricResults(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
/**
* Constant specifying a filename containing XML to inject into a ZIP file created during resource
* export.
@@ -375,7 +380,7 @@ protected synchronized void populateEngineMetrics() throws HopException {
if (beamPipelineResults != null) {
Set<String> transformNames = new HashSet<>(Arrays.asList(pipelineMeta.getTransformNames()));
Map<String, EngineComponent> componentsMap = new HashMap<>();
MetricResults metrics = beamPipelineResults.metrics();
MetricResults metrics = safelyCall(() -> beamPipelineResults.metrics(), EMPTY_METRIC_RESULTS);
MetricQueryResults allResults = metrics.queryMetrics(MetricsFilter.builder().build());

for (MetricResult<Long> result : allResults.getCounters()) {
@@ -418,7 +423,7 @@ protected synchronized void populateEngineMetrics() throws HopException {

// Set the transform status to reflect the pipeline status.
//
switch (beamPipelineResults.getState()) {
switch (safelyCall(() -> beamPipelineResults.getState(), PipelineResult.State.UNKNOWN)) {
case DONE:
engineComponent.setRunning(false);
engineComponent.setStatus(ComponentExecutionStatus.STATUS_FINISHED);
@@ -476,14 +481,14 @@ protected long calculateDuration(Date startTime, Date stopTime) {
}

protected synchronized void evaluatePipelineStatus() throws HopException {
if (beamPipelineResults == null || beamPipelineResults.getState() == null) {
if (beamPipelineResults == null || safelyCall(() -> beamPipelineResults.getState()) == null) {
statusDescription = "";
return;
}

// This seems to be the most reliable way of checking the state...
//
PipelineResult.State pipelineState = beamPipelineResults.waitUntilFinish(Duration.millis(1));
PipelineResult.State pipelineState = safelyCall(() -> beamPipelineResults.waitUntilFinish(Duration.millis(1)));
if (pipelineState != null) {
boolean cancelPipeline = false;
boolean cancelRefreshTimer = false;
@@ -530,7 +535,7 @@ protected synchronized void evaluatePipelineStatus() throws HopException {

if (cancelPipeline) {
try {
beamPipelineResults.cancel();
safelyCall(() -> beamPipelineResults.cancel());
logChannel.logBasic("Pipeline execution cancelled");
} catch (Exception e) {
logChannel.logError("Cancellation of pipeline failed", e);
@@ -598,7 +603,7 @@ public void waitUntilFinished() {
public void stopAll() {
try {
if (beamPipelineResults != null) {
beamPipelineResults.cancel();
safelyCall(() -> beamPipelineResults.cancel());
evaluatePipelineStatus();
}
} catch (Exception e) {
@@ -1532,4 +1537,19 @@ public void setBeamEngineRunConfiguration(
IBeamPipelineEngineRunConfiguration beamEngineRunConfiguration) {
this.beamEngineRunConfiguration = beamEngineRunConfiguration;
}

private <R> R safelyCall(ThrowingSupplier<R> supplier) {
return safelyCall(supplier, null);
}

private <R> R safelyCall(ThrowingSupplier<R> supplier, R defaultValue) {
try {
return supplier.get();
} catch (UnsupportedOperationException e) {
logChannel.logBasic(e.getMessage());
} catch (Exception e) {
throw new RuntimeException(e);
}
return defaultValue;
}
}

0 comments on commit cc7fa65

Please sign in to comment.