Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/** SparkPCollectionView is used to pass serialized views to lambdas. */
public class SparkPCollectionView implements Serializable {

private static final Logger LOG = LoggerFactory.getLogger(SparkPCollectionView.class);
// Holds the view --> broadcast mapping. Transient so it will be null from resume
private transient volatile Map<PCollectionView<?>, SideInputBroadcast> broadcastHelperMap = null;

Expand Down Expand Up @@ -85,6 +88,13 @@ private SideInputBroadcast createBroadcastHelper(
PCollectionView<?> view, JavaSparkContext context) {
Tuple2<byte[], Coder<Iterable<WindowedValue<?>>>> tuple2 = pviews.get(view);
SideInputBroadcast helper = SideInputBroadcast.create(tuple2._1, tuple2._2);
String pCollectionName =
view.getPCollection() != null ? view.getPCollection().getName() : "UNKNOWN";
LOG.debug(
"Broadcasting [size={}B] view {} from pCollection {}",
helper.getBroadcastSizeEstimate(),
view,
pCollectionName);
helper.broadcast(context);
broadcastHelperMap.put(view, helper);
return helper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@
*/
package org.apache.beam.runners.spark.util;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.spark.util.SideInputStorage.Key;
import org.apache.beam.runners.spark.util.SideInputStorage.Value;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.Cache;
import org.apache.spark.util.SizeEstimator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** {@link SideInputReader} that caches materialized views. */
public class CachedSideInputReader implements SideInputReader {

private static final Logger LOG = LoggerFactory.getLogger(CachedSideInputReader.class);

/**
* Create a new cached {@link SideInputReader}.
*
Expand All @@ -38,46 +44,9 @@ public static CachedSideInputReader of(SideInputReader delegate) {
return new CachedSideInputReader(delegate);
}

/**
* Composite key of {@link PCollectionView} and {@link BoundedWindow} used to identify
* materialized results.
*
* @param <T> type of result
*/
private static class Key<T> {

private final PCollectionView<T> view;
private final BoundedWindow window;

Key(PCollectionView<T> view, BoundedWindow window) {
this.view = view;
this.window = window;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Key<?> key = (Key<?>) o;
return Objects.equals(view, key.view) && Objects.equals(window, key.window);
}

@Override
public int hashCode() {
return Objects.hash(view, window);
}
}

/** Wrapped {@link SideInputReader} which results will be cached. */
private final SideInputReader delegate;

/** Materialized results. */
private final Map<Key<?>, ?> materialized = new HashMap<>();

private CachedSideInputReader(SideInputReader delegate) {
this.delegate = delegate;
}
Expand All @@ -86,9 +55,28 @@ private CachedSideInputReader(SideInputReader delegate) {
@Override
public <T> T get(PCollectionView<T> view, BoundedWindow window) {
Copy link
Member

@iemejia iemejia Jan 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am worried on the possible semantics consequences of CachedSideInputReader.get() returning a null value when it is not in the Cache. Wouldn't it imply that a window could get an empty side input assigned?
The documentation on this is not really clear (pinging @kennknowles to see if I am misreading it).
Wonder if there is a test to validate that this cannot happen or if we can create one somehow?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the meaning of that comment is that null is a value. You can have a PCollection<@Nullable Foo> that contains just one copy of null and use View.asSingleton() and the side input returns the null.

In other words, get must return a value of type T. But the type T may itself be @Nullable Something. The annotation on SideInputReader should be removed. It is is incorrect if we use a static analysis that understands this. Findbugs does not understand this but we should aspire for our annotations to be correct so the documentation is clear.

@SuppressWarnings("unchecked")
final Map<Key<T>, T> materializedCasted = (Map) materialized;
return materializedCasted.computeIfAbsent(
new Key<>(view, window), key -> delegate.get(view, window));
final Cache<Key<T>, Value<T>> materializedCasted =
(Cache) SideInputStorage.getMaterializedSideInputs();

Key<T> sideInputKey = new Key<>(view, window);

try {
Value<T> cachedResult =
materializedCasted.get(
sideInputKey,
() -> {
final T result = delegate.get(view, window);
LOG.debug(
"Caching de-serialized side input for {} of size [{}B] in memory.",
sideInputKey,
SizeEstimator.estimate(result));

return new Value<>(result);
});
return cachedResult.getValue();
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.util.SizeEstimator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -73,4 +74,8 @@ private T deserialize() {
}
return val;
}

public long getBroadcastSizeEstimate() {
return SizeEstimator.estimate(bytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.util;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.CacheBuilder;

/**
* Cache deserialized side inputs for executor so every task doesn't need to deserialize them again.
* Side inputs are stored in {@link Cache} with 5 minutes expireAfterAccess.
*/
class SideInputStorage {

/** JVM deserialized side input cache. */
private static final Cache<Key<?>, Value<?>> materializedSideInputs =
CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build();

static Cache<Key<?>, Value<?>> getMaterializedSideInputs() {
return materializedSideInputs;
}

/**
* Composite key of {@link PCollectionView} and {@link BoundedWindow} used to identify
* materialized results.
*
* @param <T> type of result
*/
public static class Key<T> {

private final PCollectionView<T> view;
private final BoundedWindow window;

Key(PCollectionView<T> view, BoundedWindow window) {
this.view = view;
this.window = window;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Key<?> key = (Key<?>) o;
return Objects.equals(view, key.view) && Objects.equals(window, key.window);
}

@Override
public int hashCode() {
return Objects.hash(view, window);
}

@Override
public String toString() {
String pName = view.getPCollection() != null ? view.getPCollection().getName() : "Unknown";
return "Key{"
+ "view="
+ view.getTagInternal()
+ " of Pcollection["
+ pName
+ "], window="
+ window
+ '}';
}
}

/**
* Null value is not allowed in guava's Cache and is valid in SideInput so we use wrapper for
* cache value.
*/
public static class Value<T> {

T value;

Value(T value) {
this.value = value;
}

public T getValue() {
return value;
}
}
}