From 4881c716b4f3968d086b9e6b78bd4429cef801cd Mon Sep 17 00:00:00 2001 From: Pei He Date: Mon, 18 Sep 2017 10:52:14 +0800 Subject: [PATCH] [BEAM-1920] changes for preparing Spark 2 update. --- .../spark/aggregators/AggAccumParam.java | 4 ++- .../beam/runners/spark/io/SourceRDD.java | 32 +++++++++++++++++++ .../metrics/MetricsAccumulatorParam.java | 5 ++- 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java index 9ce8b33294ef..00553bec9946 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java @@ -25,6 +25,8 @@ */ public class AggAccumParam implements AccumulatorParam { + private static final NamedAggregators ZERO = new NamedAggregators(); + @Override public NamedAggregators addAccumulator(NamedAggregators current, NamedAggregators added) { return current.merge(added); @@ -37,6 +39,6 @@ public NamedAggregators addInPlace(NamedAggregators current, NamedAggregators ad @Override public NamedAggregators zero(NamedAggregators initialValue) { - return new NamedAggregators(); + return ZERO; } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java index a225e0f3c510..dea2b4792ce2 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.Objects; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; @@ -267,6 +268,20 @@ private static class SourcePartition implements Partition { this.source = source; } + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (o instanceof SourcePartition) { + SourcePartition that = (SourcePartition) o; + return this.rddId == that.rddId + && this.index == that.index + && this.source.equals(that.source); + } + return false; + } + @Override public int index() { return index; @@ -361,5 +376,22 @@ private static class CheckpointableSourcePartition { + + private static final MetricsContainerStepMap ZERO = new MetricsContainerStepMap(); + @Override public MetricsContainerStepMap addAccumulator( MetricsContainerStepMap c1, @@ -43,6 +46,6 @@ public MetricsContainerStepMap addInPlace( @Override public MetricsContainerStepMap zero(MetricsContainerStepMap initialValue) { - return new MetricsContainerStepMap(); + return ZERO; } }