From daca4cd3223cf71606fe91c1a914b09b960f3137 Mon Sep 17 00:00:00 2001 From: David Moravek Date: Tue, 20 Nov 2018 18:04:43 +0100 Subject: [PATCH 1/5] [BEAM-5987] Spark: Share cached side inputs between tasks. --- .../spark/translation/MultiDoFnFunction.java | 26 ++++++++++++++++++- .../spark/util/CachedSideInputReader.java | 14 +++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index 661e23e7b6da..4db6858c3229 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -22,10 +22,13 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.UUID; +import java.util.WeakHashMap; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.InMemoryStateInternals; import org.apache.beam.runners.core.InMemoryTimerInternals; +import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; @@ -49,6 +52,8 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap; import org.apache.spark.Accumulator; import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -61,6 +66,17 @@ public class MultiDoFnFunction implements PairFlatMapFunction>, TupleTag, WindowedValue> { + private static final Logger LOG = LoggerFactory.getLogger(MultiDoFnFunction.class); + + /** JVM wide side input cache. */ + private static final Map sideInputReaders = + Collections.synchronizedMap(new WeakHashMap<>()); + + /** + * Id that is consistent among executors. We can not use stepName because of possible collisions. + */ + private final String uniqueId = UUID.randomUUID().toString(); + private final Accumulator metricsAccum; private final String stepName; private final DoFn doFn; @@ -150,11 +166,19 @@ public TimerInternals timerInternals() { context = new SparkProcessContext.NoOpStepContext(); } + final SideInputReader sideInputReader = + sideInputReaders.computeIfAbsent( + uniqueId, + key -> { + LOG.info("Creating a new side input reader for [{}] with id [{}].", stepName, key); + return CachedSideInputReader.of(new SparkSideInputReader(sideInputs)); + }); + final DoFnRunner doFnRunner = DoFnRunners.simpleRunner( options.get(), doFn, - CachedSideInputReader.of(new SparkSideInputReader(sideInputs)), + sideInputReader, outputManager, mainOutputTag, additionalOutputTags, diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java index 49200c6c4b42..bfa873269562 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java @@ -24,10 +24,15 @@ import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.spark.util.SizeEstimator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** {@link SideInputReader} that caches materialized views. */ public class CachedSideInputReader implements SideInputReader { + private static final Logger LOG = LoggerFactory.getLogger(CachedSideInputReader.class); + /** * Create a new cached {@link SideInputReader}. * @@ -88,7 +93,14 @@ public T get(PCollectionView view, BoundedWindow window) { @SuppressWarnings("unchecked") final Map, T> materializedCasted = (Map) materialized; return materializedCasted.computeIfAbsent( - new Key<>(view, window), key -> delegate.get(view, window)); + new Key<>(view, window), + key -> { + final T result = delegate.get(view, window); + LOG.info( + "Caching de-serialized side input of size [{}B] in memory.", + SizeEstimator.estimate(result)); + return result; + }); } @Override From 55df5681000c8821aacbc30f3a9299c7ecac99fd Mon Sep 17 00:00:00 2001 From: "marek.simunek" Date: Fri, 23 Nov 2018 16:47:38 +0100 Subject: [PATCH 2/5] [BEAM-5987] deserialized sideInputs are cached in executor --- .../spark/translation/MultiDoFnFunction.java | 26 +--- .../spark/util/CachedSideInputReader.java | 87 ++++++------- .../runners/spark/util/SideInputStorage.java | 115 ++++++++++++++++++ 3 files changed, 153 insertions(+), 75 deletions(-) create mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index 4db6858c3229..661e23e7b6da 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -22,13 +22,10 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.UUID; -import java.util.WeakHashMap; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.InMemoryStateInternals; import org.apache.beam.runners.core.InMemoryTimerInternals; -import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; @@ -52,8 +49,6 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap; import org.apache.spark.Accumulator; import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -66,17 +61,6 @@ public class MultiDoFnFunction implements PairFlatMapFunction>, TupleTag, WindowedValue> { - private static final Logger LOG = LoggerFactory.getLogger(MultiDoFnFunction.class); - - /** JVM wide side input cache. */ - private static final Map sideInputReaders = - Collections.synchronizedMap(new WeakHashMap<>()); - - /** - * Id that is consistent among executors. We can not use stepName because of possible collisions. - */ - private final String uniqueId = UUID.randomUUID().toString(); - private final Accumulator metricsAccum; private final String stepName; private final DoFn doFn; @@ -166,19 +150,11 @@ public TimerInternals timerInternals() { context = new SparkProcessContext.NoOpStepContext(); } - final SideInputReader sideInputReader = - sideInputReaders.computeIfAbsent( - uniqueId, - key -> { - LOG.info("Creating a new side input reader for [{}] with id [{}].", stepName, key); - return CachedSideInputReader.of(new SparkSideInputReader(sideInputs)); - }); - final DoFnRunner doFnRunner = DoFnRunners.simpleRunner( options.get(), doFn, - sideInputReader, + CachedSideInputReader.of(new SparkSideInputReader(sideInputs)), outputManager, mainOutputTag, additionalOutputTags, diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java index bfa873269562..699e30bbc4f3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java @@ -17,11 +17,14 @@ */ package org.apache.beam.runners.spark.util; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; +import com.google.common.cache.Cache; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; import javax.annotation.Nullable; import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.runners.spark.util.SideInputStorage.Key; +import org.apache.beam.runners.spark.util.SideInputStorage.Value; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; import org.apache.spark.util.SizeEstimator; @@ -33,6 +36,12 @@ public class CachedSideInputReader implements SideInputReader { private static final Logger LOG = LoggerFactory.getLogger(CachedSideInputReader.class); + /** + * Keep references for the whole lifecycle of CachedSideInputReader otherwise sideInput needs to + * be de-serialized again. + */ + private Set sideInputReferences = new HashSet<>(); + /** * Create a new cached {@link SideInputReader}. * @@ -43,46 +52,9 @@ public static CachedSideInputReader of(SideInputReader delegate) { return new CachedSideInputReader(delegate); } - /** - * Composite key of {@link PCollectionView} and {@link BoundedWindow} used to identify - * materialized results. - * - * @param type of result - */ - private static class Key { - - private final PCollectionView view; - private final BoundedWindow window; - - Key(PCollectionView view, BoundedWindow window) { - this.view = view; - this.window = window; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final Key key = (Key) o; - return Objects.equals(view, key.view) && Objects.equals(window, key.window); - } - - @Override - public int hashCode() { - return Objects.hash(view, window); - } - } - /** Wrapped {@link SideInputReader} which results will be cached. */ private final SideInputReader delegate; - /** Materialized results. */ - private final Map, ?> materialized = new HashMap<>(); - private CachedSideInputReader(SideInputReader delegate) { this.delegate = delegate; } @@ -91,16 +63,31 @@ private CachedSideInputReader(SideInputReader delegate) { @Override public T get(PCollectionView view, BoundedWindow window) { @SuppressWarnings("unchecked") - final Map, T> materializedCasted = (Map) materialized; - return materializedCasted.computeIfAbsent( - new Key<>(view, window), - key -> { - final T result = delegate.get(view, window); - LOG.info( - "Caching de-serialized side input of size [{}B] in memory.", - SizeEstimator.estimate(result)); - return result; - }); + final Cache, Value> materializedCasted = + (Cache) SideInputStorage.getMaterializedSideInputs(); + + Key sideInputKey = new Key<>(view, window); + @SuppressWarnings("unchecked") + final Set> sideInputReferencesCasted = (Set>) sideInputReferences; + + Value value; + try { + value = + materializedCasted.get( + sideInputKey, + () -> { + final T result = delegate.get(view, window); + LOG.info( + "Caching de-serialized side input for {} of size [{}B] in memory.", + sideInputKey, + SizeEstimator.estimate(result)); + return new Value<>(sideInputKey, result); + }); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } + sideInputReferencesCasted.add(value); + return value.getData(); } @Override diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java new file mode 100644 index 000000000000..506dae44fec4 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.util; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.util.Objects; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Cache deserialized side inputs for executor so every task doesnt need to deserialize them again. + * Side inputs are stored in {@link Cache} with weakValues so if there is no reference to a value, + * sideInput is garbage collected. + */ +public class SideInputStorage { + + /** JVM deserialized side input cache. */ + private static final Cache, Value> materializedSideInputs = + CacheBuilder.newBuilder().weakValues().build(); + + public static Cache, Value> getMaterializedSideInputs() { + return materializedSideInputs; + } + + /** + * Composite key of {@link PCollectionView} and {@link BoundedWindow} used to identify + * materialized results. + * + * @param type of result + */ + public static class Key { + + private final PCollectionView view; + private final BoundedWindow window; + + Key(PCollectionView view, BoundedWindow window) { + this.view = view; + this.window = window; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Key key = (Key) o; + return Objects.equals(view, key.view) && Objects.equals(window, key.window); + } + + @Override + public int hashCode() { + return Objects.hash(view, window); + } + + @Override + public String toString() { + return "Key{" + "view=" + view + ", window=" + window + '}'; + } + } + + /** + * Each {@link CachedSideInputReader} keeps references to value so it won't be garbage collected. + * References are stored in Set and adding lasts very long, because calculating of hash on + * serialized data. That's why we keep referenceKey and calculate hash only from referenceKey. + */ + public static class Value { + private final Key referenceKey; + private final T data; + + public Value(Key referenceKey, T data) { + this.referenceKey = referenceKey; + this.data = data; + } + + public T getData() { + return data; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Value value = (Value) o; + return Objects.equals(referenceKey, value.referenceKey); + } + + @Override + public int hashCode() { + return referenceKey.hashCode(); + } + } +} From c72eee904cd4593cf939d5576f3df9e97bce801d Mon Sep 17 00:00:00 2001 From: "marek.simunek" Date: Wed, 2 Jan 2019 13:58:27 +0100 Subject: [PATCH 3/5] [BEAM-5987] cached spark side inputs are evicted based on time access --- .../spark/util/CachedSideInputReader.java | 37 ++++---------- .../runners/spark/util/SideInputStorage.java | 50 +++---------------- 2 files changed, 18 insertions(+), 69 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java index 699e30bbc4f3..5738f0cbb4e9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java @@ -18,13 +18,10 @@ package org.apache.beam.runners.spark.util; import com.google.common.cache.Cache; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.ExecutionException; import javax.annotation.Nullable; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.spark.util.SideInputStorage.Key; -import org.apache.beam.runners.spark.util.SideInputStorage.Value; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; import org.apache.spark.util.SizeEstimator; @@ -36,12 +33,6 @@ public class CachedSideInputReader implements SideInputReader { private static final Logger LOG = LoggerFactory.getLogger(CachedSideInputReader.class); - /** - * Keep references for the whole lifecycle of CachedSideInputReader otherwise sideInput needs to - * be de-serialized again. - */ - private Set sideInputReferences = new HashSet<>(); - /** * Create a new cached {@link SideInputReader}. * @@ -63,31 +54,25 @@ private CachedSideInputReader(SideInputReader delegate) { @Override public T get(PCollectionView view, BoundedWindow window) { @SuppressWarnings("unchecked") - final Cache, Value> materializedCasted = + final Cache, T> materializedCasted = (Cache) SideInputStorage.getMaterializedSideInputs(); Key sideInputKey = new Key<>(view, window); - @SuppressWarnings("unchecked") - final Set> sideInputReferencesCasted = (Set>) sideInputReferences; - Value value; try { - value = - materializedCasted.get( - sideInputKey, - () -> { - final T result = delegate.get(view, window); - LOG.info( - "Caching de-serialized side input for {} of size [{}B] in memory.", - sideInputKey, - SizeEstimator.estimate(result)); - return new Value<>(sideInputKey, result); - }); + return materializedCasted.get( + sideInputKey, + () -> { + final T result = delegate.get(view, window); + LOG.info( + "Caching de-serialized side input for {} of size [{}B] in memory.", + sideInputKey, + SizeEstimator.estimate(result)); + return result; + }); } catch (ExecutionException e) { throw new RuntimeException(e.getCause()); } - sideInputReferencesCasted.add(value); - return value.getData(); } @Override diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java index 506dae44fec4..1af328013a77 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java @@ -20,21 +20,21 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.util.Objects; +import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; /** - * Cache deserialized side inputs for executor so every task doesnt need to deserialize them again. - * Side inputs are stored in {@link Cache} with weakValues so if there is no reference to a value, - * sideInput is garbage collected. + * Cache deserialized side inputs for executor so every task doesn't need to deserialize them again. + * Side inputs are stored in {@link Cache} with 5 minutes expireAfterAccess. */ -public class SideInputStorage { +class SideInputStorage { /** JVM deserialized side input cache. */ - private static final Cache, Value> materializedSideInputs = - CacheBuilder.newBuilder().weakValues().build(); + private static final Cache, ?> materializedSideInputs = + CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build(); - public static Cache, Value> getMaterializedSideInputs() { + static Cache, ?> getMaterializedSideInputs() { return materializedSideInputs; } @@ -76,40 +76,4 @@ public String toString() { return "Key{" + "view=" + view + ", window=" + window + '}'; } } - - /** - * Each {@link CachedSideInputReader} keeps references to value so it won't be garbage collected. - * References are stored in Set and adding lasts very long, because calculating of hash on - * serialized data. That's why we keep referenceKey and calculate hash only from referenceKey. - */ - public static class Value { - private final Key referenceKey; - private final T data; - - public Value(Key referenceKey, T data) { - this.referenceKey = referenceKey; - this.data = data; - } - - public T getData() { - return data; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Value value = (Value) o; - return Objects.equals(referenceKey, value.referenceKey); - } - - @Override - public int hashCode() { - return referenceKey.hashCode(); - } - } } From 6a71b503039de25d1bcd589242dcd8c59d7289ca Mon Sep 17 00:00:00 2001 From: "marek.simunek" Date: Tue, 8 Jan 2019 13:25:50 +0100 Subject: [PATCH 4/5] [BEAM-5987] added more logging about broadcast --- .../spark/translation/SparkPCollectionView.java | 10 ++++++++++ .../beam/runners/spark/util/SideInputBroadcast.java | 5 +++++ .../beam/runners/spark/util/SideInputStorage.java | 10 +++++++++- 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java index 163ba55e6d1b..764397adfb50 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java @@ -26,11 +26,14 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** SparkPCollectionView is used to pass serialized views to lambdas. */ public class SparkPCollectionView implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(SparkPCollectionView.class); // Holds the view --> broadcast mapping. Transient so it will be null from resume private transient volatile Map, SideInputBroadcast> broadcastHelperMap = null; @@ -85,6 +88,13 @@ private SideInputBroadcast createBroadcastHelper( PCollectionView view, JavaSparkContext context) { Tuple2>>> tuple2 = pviews.get(view); SideInputBroadcast helper = SideInputBroadcast.create(tuple2._1, tuple2._2); + String pCollectionName = + view.getPCollection() != null ? view.getPCollection().getName() : "UNKNOWN"; + LOG.info( + "Broadcasting [size={}B] view {} from pCollection {}", + helper.getBroadcastSizeEstimate(), + view, + pCollectionName); helper.broadcast(context); broadcastHelperMap.put(view, helper); return helper; diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java index e0cb3de05771..f42159d3a7d4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.util.SizeEstimator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,4 +74,8 @@ private T deserialize() { } return val; } + + public long getBroadcastSizeEstimate() { + return SizeEstimator.estimate(bytes); + } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java index 1af328013a77..bd86bdde4505 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java @@ -73,7 +73,15 @@ public int hashCode() { @Override public String toString() { - return "Key{" + "view=" + view + ", window=" + window + '}'; + String pName = view.getPCollection() != null ? view.getPCollection().getName() : "Unknown"; + return "Key{" + + "view=" + + view.getTagInternal() + + " of Pcollection[" + + pName + + "], window=" + + window + + '}'; } } } From ddfe7dddc6ecb42572822ba5ddf588afb380b945 Mon Sep 17 00:00:00 2001 From: "marek.simunek" Date: Fri, 11 Jan 2019 16:13:49 +0100 Subject: [PATCH 5/5] [BEAM-5987] fix for sideInputWithNull --- .../translation/SparkPCollectionView.java | 2 +- .../spark/util/CachedSideInputReader.java | 28 +++++++++++-------- .../runners/spark/util/SideInputStorage.java | 25 ++++++++++++++--- 3 files changed, 38 insertions(+), 17 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java index 764397adfb50..2c9a77f6f0be 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java @@ -90,7 +90,7 @@ private SideInputBroadcast createBroadcastHelper( SideInputBroadcast helper = SideInputBroadcast.create(tuple2._1, tuple2._2); String pCollectionName = view.getPCollection() != null ? view.getPCollection().getName() : "UNKNOWN"; - LOG.info( + LOG.debug( "Broadcasting [size={}B] view {} from pCollection {}", helper.getBroadcastSizeEstimate(), view, diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java index 5738f0cbb4e9..5d2e52141dec 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java @@ -17,13 +17,14 @@ */ package org.apache.beam.runners.spark.util; -import com.google.common.cache.Cache; import java.util.concurrent.ExecutionException; import javax.annotation.Nullable; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.spark.util.SideInputStorage.Key; +import org.apache.beam.runners.spark.util.SideInputStorage.Value; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.Cache; import org.apache.spark.util.SizeEstimator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,22 +55,25 @@ private CachedSideInputReader(SideInputReader delegate) { @Override public T get(PCollectionView view, BoundedWindow window) { @SuppressWarnings("unchecked") - final Cache, T> materializedCasted = + final Cache, Value> materializedCasted = (Cache) SideInputStorage.getMaterializedSideInputs(); Key sideInputKey = new Key<>(view, window); try { - return materializedCasted.get( - sideInputKey, - () -> { - final T result = delegate.get(view, window); - LOG.info( - "Caching de-serialized side input for {} of size [{}B] in memory.", - sideInputKey, - SizeEstimator.estimate(result)); - return result; - }); + Value cachedResult = + materializedCasted.get( + sideInputKey, + () -> { + final T result = delegate.get(view, window); + LOG.debug( + "Caching de-serialized side input for {} of size [{}B] in memory.", + sideInputKey, + SizeEstimator.estimate(result)); + + return new Value<>(result); + }); + return cachedResult.getValue(); } catch (ExecutionException e) { throw new RuntimeException(e.getCause()); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java index bd86bdde4505..a0c59dd86f5a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java @@ -17,12 +17,12 @@ */ package org.apache.beam.runners.spark.util; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import java.util.Objects; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.CacheBuilder; /** * Cache deserialized side inputs for executor so every task doesn't need to deserialize them again. @@ -31,10 +31,10 @@ class SideInputStorage { /** JVM deserialized side input cache. */ - private static final Cache, ?> materializedSideInputs = + private static final Cache, Value> materializedSideInputs = CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build(); - static Cache, ?> getMaterializedSideInputs() { + static Cache, Value> getMaterializedSideInputs() { return materializedSideInputs; } @@ -84,4 +84,21 @@ public String toString() { + '}'; } } + + /** + * Null value is not allowed in guava's Cache and is valid in SideInput so we use wrapper for + * cache value. + */ + public static class Value { + + T value; + + Value(T value) { + this.value = value; + } + + public T getValue() { + return value; + } + } }