From cd14247a8d55d4ee3330720a340790330eea7626 Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 20 Apr 2017 11:08:10 -0700 Subject: [PATCH] Removing aggregators from Dataflow runner --- .../DataflowAggregatorTransforms.java | 79 ------------- .../DataflowMetricUpdateExtractor.java | 109 ------------------ .../runners/dataflow/DataflowMetrics.java | 6 +- .../runners/dataflow/DataflowPipelineJob.java | 41 ++----- .../beam/runners/dataflow/DataflowRunner.java | 12 +- .../dataflow/DataflowPipelineJobTest.java | 42 +++---- 6 files changed, 32 insertions(+), 257 deletions(-) delete mode 100755 runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java delete mode 100755 runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java deleted file mode 100755 index 0198ccac70b0..000000000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow; - -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; - -/** - * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used. - */ -class DataflowAggregatorTransforms { - private final Map, Collection>> aggregatorTransforms; - private final Multimap, AppliedPTransform> transformAppliedTransforms; - private final BiMap, String> appliedStepNames; - - public DataflowAggregatorTransforms( - Map, Collection>> aggregatorTransforms, - Map, String> transformStepNames) { - this.aggregatorTransforms = aggregatorTransforms; - appliedStepNames = HashBiMap.create(transformStepNames); - - transformAppliedTransforms = HashMultimap.create(); - for (AppliedPTransform appliedTransform : transformStepNames.keySet()) { - transformAppliedTransforms.put(appliedTransform.getTransform(), appliedTransform); - } - } - - /** - * Returns true if the provided {@link Aggregator} is used in the constructing {@link Pipeline}. - */ - public boolean contains(Aggregator aggregator) { - return aggregatorTransforms.containsKey(aggregator); - } - - /** - * Gets the step names in which the {@link Aggregator} is used. - */ - public Collection getAggregatorStepNames(Aggregator aggregator) { - Collection names = new HashSet<>(); - Collection> transforms = aggregatorTransforms.get(aggregator); - for (PTransform transform : transforms) { - for (AppliedPTransform applied : transformAppliedTransforms.get(transform)) { - names.add(appliedStepNames.get(applied)); - } - } - return names; - } - - /** - * Gets the {@link PTransform} that was assigned the provided step name. - */ - public AppliedPTransform getAppliedTransformForStepName(String stepName) { - return appliedStepNames.inverse().get(stepName); - } -} diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java deleted file mode 100755 index f725c46f25f8..000000000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow; - -import com.google.api.services.dataflow.model.MetricStructuredName; -import com.google.api.services.dataflow.model.MetricUpdate; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.PTransform; - -/** - * Methods for extracting the values of an {@link Aggregator} from a collection of {@link - * MetricUpdate MetricUpdates}. - */ -final class DataflowMetricUpdateExtractor { - private static final String STEP_NAME_CONTEXT_KEY = "step"; - private static final String IS_TENTATIVE_KEY = "tentative"; - - private DataflowMetricUpdateExtractor() { - // Do not instantiate. - } - - /** - * Extract the values of the provided {@link Aggregator} at each {@link PTransform} it was used in - * according to the provided {@link DataflowAggregatorTransforms} from the given list of {@link - * MetricUpdate MetricUpdates}. - */ - public static Map fromMetricUpdates(Aggregator aggregator, - DataflowAggregatorTransforms aggregatorTransforms, List metricUpdates) { - Map results = new HashMap<>(); - if (metricUpdates == null) { - return results; - } - - String aggregatorName = aggregator.getName(); - Collection aggregatorSteps = aggregatorTransforms.getAggregatorStepNames(aggregator); - - for (MetricUpdate metricUpdate : metricUpdates) { - MetricStructuredName metricStructuredName = metricUpdate.getName(); - Map context = metricStructuredName.getContext(); - if (metricStructuredName.getName().equals(aggregatorName) && context != null - && aggregatorSteps.contains(context.get(STEP_NAME_CONTEXT_KEY))) { - AppliedPTransform transform = - aggregatorTransforms.getAppliedTransformForStepName( - context.get(STEP_NAME_CONTEXT_KEY)); - String fullName = transform.getFullName(); - // Prefer the tentative (fresher) value if it exists. - if (Boolean.parseBoolean(context.get(IS_TENTATIVE_KEY)) || !results.containsKey(fullName)) { - results.put(fullName, toValue(aggregator, metricUpdate)); - } - } - } - - return results; - - } - - private static OutputT toValue( - Aggregator aggregator, MetricUpdate metricUpdate) { - CombineFn combineFn = aggregator.getCombineFn(); - Class outputType = combineFn.getOutputType().getRawType(); - - if (outputType.equals(Long.class)) { - @SuppressWarnings("unchecked") - OutputT asLong = (OutputT) Long.valueOf(toNumber(metricUpdate).longValue()); - return asLong; - } - if (outputType.equals(Integer.class)) { - @SuppressWarnings("unchecked") - OutputT asInt = (OutputT) Integer.valueOf(toNumber(metricUpdate).intValue()); - return asInt; - } - if (outputType.equals(Double.class)) { - @SuppressWarnings("unchecked") - OutputT asDouble = (OutputT) Double.valueOf(toNumber(metricUpdate).doubleValue()); - return asDouble; - } - throw new UnsupportedOperationException( - "Unsupported Output Type " + outputType + " in aggregator " + aggregator); - } - - private static Number toNumber(MetricUpdate update) { - if (update.getScalar() instanceof Number) { - return (Number) update.getScalar(); - } - throw new IllegalArgumentException( - "Metric Update " + update + " does not have a numeric scalar"); - } -} diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index d4d29dd7a9af..7633a5621620 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -79,9 +79,9 @@ public DataflowMetrics(DataflowPipelineJob dataflowPipelineJob, DataflowClient d private MetricKey metricHashKey( com.google.api.services.dataflow.model.MetricUpdate metricUpdate) { String fullStepName = metricUpdate.getName().getContext().get("step"); - fullStepName = (dataflowPipelineJob.aggregatorTransforms != null - ? dataflowPipelineJob.aggregatorTransforms - .getAppliedTransformForStepName(fullStepName).getFullName() : fullStepName); + fullStepName = (dataflowPipelineJob.transformStepNames != null + ? dataflowPipelineJob.transformStepNames + .inverse().get(fullStepName).getFullName() : fullStepName); return MetricKey.create( fullStepName, MetricName.named( diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 0399adaa3ae5..d464206e1979 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow; +import static com.google.common.base.MoreObjects.firstNonNull; import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime; import com.google.api.client.googleapis.json.GoogleJsonResponseException; @@ -26,9 +27,11 @@ import com.google.api.client.util.Sleeper; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.JobMessage; -import com.google.api.services.dataflow.model.JobMetrics; import com.google.api.services.dataflow.model.MetricUpdate; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.net.SocketTimeoutException; import java.util.List; @@ -42,7 +45,7 @@ import org.apache.beam.runners.dataflow.util.MonitoringUtil; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.util.FluentBackoff; import org.joda.time.Duration; import org.slf4j.Logger; @@ -89,7 +92,7 @@ public class DataflowPipelineJob implements PipelineResult { @Nullable private DataflowPipelineJob replacedByJob = null; - protected DataflowAggregatorTransforms aggregatorTransforms; + protected BiMap, String> transformStepNames; /** * The Metric Updates retrieved after the job was in a terminal state. @@ -126,16 +129,17 @@ public class DataflowPipelineJob implements PipelineResult { * * @param jobId the job id * @param dataflowOptions used to configure the client for the Dataflow Service - * @param aggregatorTransforms a mapping from aggregators to PTransforms + * @param transformStepNames a mapping from AppliedPTransforms to Step Names */ public DataflowPipelineJob( String jobId, DataflowPipelineOptions dataflowOptions, - DataflowAggregatorTransforms aggregatorTransforms) { + Map, String> transformStepNames) { this.jobId = jobId; this.dataflowOptions = dataflowOptions; this.dataflowClient = (dataflowOptions == null ? null : DataflowClient.create(dataflowOptions)); - this.aggregatorTransforms = aggregatorTransforms; + this.transformStepNames = HashBiMap.create( + firstNonNull(transformStepNames, ImmutableMap., String>of())); this.dataflowMetrics = new DataflowMetrics(this, this.dataflowClient); } @@ -456,7 +460,7 @@ private Job getJobWithRetries(BackOff backoff, Sleeper sleeper) throws IOExcepti if (currentState.isTerminal()) { terminalState = currentState; replacedByJob = new DataflowPipelineJob( - job.getReplacedByJobId(), dataflowOptions, aggregatorTransforms); + job.getReplacedByJobId(), dataflowOptions, transformStepNames); } return job; } catch (IOException exn) { @@ -488,27 +492,4 @@ private boolean nextBackOff(Sleeper sleeper, BackOff backoff) { public MetricResults metrics() { return dataflowMetrics; } - - private Map fromMetricUpdates(Aggregator aggregator) - throws IOException { - if (aggregatorTransforms.contains(aggregator)) { - List metricUpdates; - if (terminalMetricUpdates != null) { - metricUpdates = terminalMetricUpdates; - } else { - boolean terminal = getState().isTerminal(); - JobMetrics jobMetrics = dataflowClient.getJobMetrics(jobId); - metricUpdates = jobMetrics.getMetrics(); - if (terminal && jobMetrics.getMetrics() != null) { - terminalMetricUpdates = metricUpdates; - } - } - - return DataflowMetricUpdateExtractor.fromMetricUpdates( - aggregator, aggregatorTransforms, metricUpdates); - } else { - throw new IllegalArgumentException( - "Aggregator " + aggregator + " is not used in this pipeline"); - } - } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index b37d34af0fb0..63c2191da259 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -49,7 +49,6 @@ import java.nio.channels.Channels; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -97,7 +96,6 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.runners.TransformHierarchy.Node; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.GroupedValues; @@ -620,18 +618,10 @@ public DataflowPipelineJob run(Pipeline pipeline) { throw new RuntimeException("Failed to create a workflow job", e); } - // Obtain all of the extractors from the PTransforms used in the pipeline so the - // DataflowPipelineJob has access to them. - Map, Collection>> aggregatorSteps = - pipeline.getAggregatorSteps(); - - DataflowAggregatorTransforms aggregatorTransforms = - new DataflowAggregatorTransforms(aggregatorSteps, jobSpecification.getStepNames()); - // Use a raw client for post-launch monitoring, as status calls may fail // regularly and need not be retried automatically. DataflowPipelineJob dataflowPipelineJob = - new DataflowPipelineJob(jobResult.getId(), options, aggregatorTransforms); + new DataflowPipelineJob(jobResult.getId(), options, jobSpecification.getStepNames()); // If the service returned client request id, the SDK needs to compare it // with the original id generated in the request, if they are not the same diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 59315a7e11a8..9dd2ab131952 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -35,6 +35,7 @@ import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Messages; import com.google.api.services.dataflow.model.Job; +import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.net.SocketTimeoutException; import java.util.Collections; @@ -154,11 +155,10 @@ public void testWaitToFinishMessagesFail() throws Exception { when(mockMessages.list(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(listRequest); when(listRequest.setPageToken(eq((String) null))).thenReturn(listRequest); when(listRequest.execute()).thenThrow(SocketTimeoutException.class); - DataflowAggregatorTransforms dataflowAggregatorTransforms = - mock(DataflowAggregatorTransforms.class); DataflowPipelineJob job = - new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms); + new DataflowPipelineJob(JOB_ID, options, + ImmutableMap., String>of()); State state = job.waitUntilFinish( Duration.standardMinutes(5), jobHandler, fastClock, fastClock); @@ -177,11 +177,10 @@ public State mockWaitToFinishInState(State state) throws Exception { when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest); when(statusRequest.execute()).thenReturn(statusResponse); - DataflowAggregatorTransforms dataflowAggregatorTransforms = - mock(DataflowAggregatorTransforms.class); DataflowPipelineJob job = - new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms); + new DataflowPipelineJob(JOB_ID, options, + ImmutableMap., String>of()); return job.waitUntilFinish(Duration.standardMinutes(1), null, fastClock, fastClock); } @@ -245,11 +244,10 @@ public void testWaitToFinishFail() throws Exception { when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest); when(statusRequest.execute()).thenThrow(IOException.class); - DataflowAggregatorTransforms dataflowAggregatorTransforms = - mock(DataflowAggregatorTransforms.class); DataflowPipelineJob job = - new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms); + new DataflowPipelineJob(JOB_ID, options, + ImmutableMap., String>of()); long startTime = fastClock.nanoTime(); State state = job.waitUntilFinish(Duration.standardMinutes(5), null, fastClock, fastClock); @@ -266,11 +264,10 @@ public void testWaitToFinishTimeFail() throws Exception { when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest); when(statusRequest.execute()).thenThrow(IOException.class); - DataflowAggregatorTransforms dataflowAggregatorTransforms = - mock(DataflowAggregatorTransforms.class); DataflowPipelineJob job = - new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms); + new DataflowPipelineJob(JOB_ID, options, + ImmutableMap., String>of()); long startTime = fastClock.nanoTime(); State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock); assertEquals(null, state); @@ -289,13 +286,11 @@ public void testCumulativeTimeOverflow() throws Exception { when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest); when(statusRequest.execute()).thenReturn(statusResponse); - DataflowAggregatorTransforms dataflowAggregatorTransforms = - mock(DataflowAggregatorTransforms.class); - FastNanoClockAndFuzzySleeper clock = new FastNanoClockAndFuzzySleeper(); - DataflowPipelineJob job = new DataflowPipelineJob( - JOB_ID, options, dataflowAggregatorTransforms); + DataflowPipelineJob job = + new DataflowPipelineJob(JOB_ID, options, + ImmutableMap., String>of()); long startTime = clock.nanoTime(); State state = job.waitUntilFinish(Duration.millis(4), null, clock, clock); assertEquals(null, state); @@ -315,11 +310,9 @@ public void testGetStateReturnsServiceState() throws Exception { when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest); when(statusRequest.execute()).thenReturn(statusResponse); - DataflowAggregatorTransforms dataflowAggregatorTransforms = - mock(DataflowAggregatorTransforms.class); - - DataflowPipelineJob job = new DataflowPipelineJob( - JOB_ID, options, dataflowAggregatorTransforms); + DataflowPipelineJob job = + new DataflowPipelineJob(JOB_ID, options, + ImmutableMap., String>of()); assertEquals( State.RUNNING, @@ -333,11 +326,10 @@ public void testGetStateWithExceptionReturnsUnknown() throws Exception { when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest); when(statusRequest.execute()).thenThrow(IOException.class); - DataflowAggregatorTransforms dataflowAggregatorTransforms = - mock(DataflowAggregatorTransforms.class); DataflowPipelineJob job = - new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms); + new DataflowPipelineJob(JOB_ID, options, + ImmutableMap., String>of()); long startTime = fastClock.nanoTime(); assertEquals(