Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads #24616

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;

import org.apache.spark.annotation.Private;

Expand All @@ -43,7 +45,7 @@
public class InMemoryStore implements KVStore {

private Object metadata;
private ConcurrentMap<Class<?>, InstanceList> data = new ConcurrentHashMap<>();
private InMemoryLists inMemoryLists = new InMemoryLists();

@Override
public <T> T getMetadata(Class<T> klass) {
Expand All @@ -57,13 +59,13 @@ public void setMetadata(Object value) {

@Override
public long count(Class<?> type) {
InstanceList list = data.get(type);
InstanceList<?> list = inMemoryLists.get(type);
return list != null ? list.size() : 0;
}

@Override
public long count(Class<?> type, String index, Object indexedValue) throws Exception {
InstanceList list = data.get(type);
InstanceList<?> list = inMemoryLists.get(type);
int count = 0;
Object comparable = asKey(indexedValue);
KVTypeInfo.Accessor accessor = list.getIndexAccessor(index);
Expand All @@ -77,45 +79,49 @@ public long count(Class<?> type, String index, Object indexedValue) throws Excep

@Override
public <T> T read(Class<T> klass, Object naturalKey) {
InstanceList list = data.get(klass);
Object value = list != null ? list.get(naturalKey) : null;
InstanceList<T> list = inMemoryLists.get(klass);
T value = list != null ? list.get(naturalKey) : null;
if (value == null) {
throw new NoSuchElementException();
}
return klass.cast(value);
return value;
}

@Override
public void write(Object value) throws Exception {
InstanceList list = data.computeIfAbsent(value.getClass(), key -> {
try {
return new InstanceList(key);
} catch (Exception e) {
throw Throwables.propagate(e);
}
});
list.put(value);
inMemoryLists.write(value);
}

@Override
public void delete(Class<?> type, Object naturalKey) {
InstanceList list = data.get(type);
InstanceList<?> list = inMemoryLists.get(type);
if (list != null) {
list.delete(naturalKey);
}
}

@Override
public <T> KVStoreView<T> view(Class<T> type){
InstanceList list = data.get(type);
return list != null ? list.view(type)
: new InMemoryView<>(type, Collections.<T>emptyList(), null);
InstanceList<T> list = inMemoryLists.get(type);
return list != null ? list.view() : emptyView();
}

@Override
public void close() {
metadata = null;
data.clear();
inMemoryLists.clear();
}

@SuppressWarnings("unchecked")
@Override
public <T> boolean removeAllByKeys(Class<T> klass, String index, Collection keys) {
InstanceList<T> list = inMemoryLists.get(klass);

if (list != null) {
return list.countingRemoveAllByKeys(index, keys) > 0;
} else {
return false;
}
}

@SuppressWarnings("unchecked")
Expand All @@ -126,64 +132,144 @@ private static Comparable<Object> asKey(Object in) {
return (Comparable<Object>) in;
}

private static class InstanceList {
@SuppressWarnings("unchecked")
private static <T> KVStoreView<T> emptyView() {
return (InMemoryView<T>) InMemoryView.EMPTY_VIEW;
}

/**
* Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a
* class of type T to an InstanceList of type T.
*/
private static class InMemoryLists {
private ConcurrentMap<Class<?>, InstanceList<?>> data = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final


@SuppressWarnings("unchecked")
public <T> InstanceList<T> get(Class<T> type) {
return (InstanceList<T>)data.get(type);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after )

}

@SuppressWarnings("unchecked")
public <T> void write(T value) throws Exception {
InstanceList<T> list =
(InstanceList<T>) data.computeIfAbsent(value.getClass(), InstanceList::new);
list.put(value);
}

public void clear() {
data.clear();
}
}

private static class InstanceList<T> {

private static class CountingRemoveIfForEach<T> implements BiConsumer<Comparable<Object>, T> {
ConcurrentMap<Comparable<Object>, T> data;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private, final where it makes sense

Predicate<? super T> filter;
int count = 0;

CountingRemoveIfForEach(
ConcurrentMap<Comparable<Object>, T> data,
Predicate<? super T> filter) {
this.data = data;
this.filter = filter;
}

public void accept(Comparable<Object> key, T value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Override

// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which affects remove() on
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment sounds like it belongs at this class's declaration, not inside this method?

// all iterators of concurrent maps, and specifically makes countingRemoveIf difficult to
// implement correctly against the values() iterator, we use forEach instead....
if (filter.test(value)) {
if (data.remove(key, value)) {
count++;
}
}
}
}

private final KVTypeInfo ti;
private final KVTypeInfo.Accessor naturalKey;
private final ConcurrentMap<Comparable<Object>, Object> data;

private int size;
private final ConcurrentMap<Comparable<Object>, T> data;

private InstanceList(Class<?> type) throws Exception {
this.ti = new KVTypeInfo(type);
private InstanceList(Class<?> klass) {
this.ti = new KVTypeInfo(klass);
this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
this.data = new ConcurrentHashMap<>();
this.size = 0;
}

KVTypeInfo.Accessor getIndexAccessor(String indexName) {
return ti.getAccessor(indexName);
}

public Object get(Object key) {
// Note: removeIf returns a boolean if any element has been removed.
// While debugging this code, it was handy to have the count of elements
// removed, rather than an indicator of whether something has been
// removed, and a count is no more complicated than a boolean so I've
// retained that behavior here, although there is no current requirement.
@SuppressWarnings("unchecked")
int countingRemoveAllByKeys(String index, Collection keys) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC you're removing all entries whose value corresponding to the given index match the given keys list.

So it sounds to me like the name here (and in related code) should use ByValues instead of ByKeys?

Also, you have a raw type in the argument list, which tells me that your @SuppressWarnings is either incorrect or perhaps not needed (if you make the argument Collection<?>).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm removing values whose key indicated by "index" matches (one of) the passed keys.
So, I'm matching keys, hence the ByKeys and the keyFromValue which retrieves the key indicated by 'getter' of the passed value.
BTW, I think I could add generic typing to Accessor<V, K> and ensure that the getter and value objects match, but I don't think the K type would turn out to be terribly useful, as ultimately there's no match between the index (a String) and the key type. Let me know if you think that's useful.

Yes, I'll try to use the <?> in a number of the SuppressWarnings and see what happens. It might be possible to get rid of a number of them, which would be fabulous.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I suspect we have a terminology overload issue here.

  1. The read|write|view methods on KVStore refer to the things you read, write, and look at as "instance", "object", and "entities" respectively (and also "value" for write() as the name of the parameter).
  2. read() provides a "naturalKey" to access a specific instance, which is the value of the unique/primary/NATURAL_INDEX_NAME index
  3. KVStoreView refers to the values of an index as passed in first() and last() as values, not keys -- naturalKey or no
  4. InMemoryStore refers to the Comparable wrappers placed around the values of a particular index as a key.
  5. KVStore seems to say that the key is actually created per type written and are based on the type name (which itself is referred to as either klass or type)

So, yes, I'm probably using terminology wrong, and I hereby declare myself confused :(

With respect to klass vs type I had gone with klass in the removeAll___() as it was consistent with having 'klass' everywhere in the ElementTrackingStore, but please let me know if that should be changed!

With respect to key, value, field-value-for-index, comparable-wrapper-around-field-value-for-index, I admit to not knowing what to call which things when. One way to deal with this is to make the removeAll on the KVStoreView, where "value" is everywhere (I think) considered in the context of the type and index. The upside is that we could naturally call such a method removeAll() as the class and index are owned by the View. From an impl standpoint, it would make the definition of LevelDB's view quite a bit more complicated, and it would require work in InMemoryView as well -- I'd need to pass down the containing hash of indexed-values->entity, rather than just the entities (locally referred to as elements). The other downside is that you're using a "View" to mutate....

Another approach would be to use 'values' as you suggested with some commentary to clear up what we mean by values in the parameter and call the methods something like removeAllIndexedValues() and (internally) indexedValueFromEntity() or some such.

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I very much believe that there might be very confusing terminology in this code. I went back and forth on implementation and interfaces a ton of times before reaching something I was happy with, and by that time I didn't really bother with the internal naming of things so much.

But here we're talking about a new method in a "public" interface (not this particular line, but the new method in KVStore), so better be a little bit more careful. And IMO index values are not keys, so "removeByKeys" is a little weird. Maybe "removeByIndexValues" is clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"so better be a little bit more careful" - strongly agree. I'll try removeAllByIndexValues (slight merging of our proposals) -- it's a little bit of a word salad, but nothing better really strikes me. Thanks!

Predicate<? super T> filter = getPredicate(ti.getAccessor(index), keys);
CountingRemoveIfForEach<T> callback = new CountingRemoveIfForEach<>(data, filter);

data.forEach(callback);
return callback.count;
}

public T get(Object key) {
return data.get(asKey(key));
}

public void put(Object value) throws Exception {
Preconditions.checkArgument(ti.type().equals(value.getClass()),
"Unexpected type: %s", value.getClass());
if (data.put(asKey(naturalKey.get(value)), value) == null) {
size++;
}
public void put(T value) throws Exception {
data.put(asKey(naturalKey.get(value)), value);
}

public void delete(Object key) {
if (data.remove(asKey(key)) != null) {
size--;
}
data.remove(asKey(key));
}

public int size() {
return size;
return data.size();
}

@SuppressWarnings("unchecked")
public InMemoryView<T> view() {
return new InMemoryView<>(data.values(), ti);
}

@SuppressWarnings("unchecked")
public <T> InMemoryView<T> view(Class<T> type) {
Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: %s", type);
Collection<T> all = (Collection<T>) data.values();
return new InMemoryView<>(type, all, ti);
private static <T> Predicate<? super T> getPredicate(
KVTypeInfo.Accessor getter,
Collection keys) {
if (Comparable.class.isAssignableFrom(getter.getType())) {
HashSet set = new HashSet(keys);

return (value) -> set.contains(keyFromValue(getter, value));
} else {
HashSet<Comparable> set = new HashSet<>(keys.size());
for (Object key : keys) {
set.add(asKey(key));
}
return (value) -> set.contains(asKey(keyFromValue(getter, value)));
}
}

private static Object keyFromValue(KVTypeInfo.Accessor getter, Object value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar thing, key sounds like the wrong word here.

try {
return getter.get(value);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}
}

private static class InMemoryView<T> extends KVStoreView<T> {
private static InMemoryView EMPTY_VIEW = new InMemoryView<>(Collections.emptyList(), null);

private final Collection<T> elements;
private final KVTypeInfo ti;
private final KVTypeInfo.Accessor natural;

InMemoryView(Class<T> type, Collection<T> elements, KVTypeInfo ti) {
super(type);
InMemoryView(Collection<T> elements, KVTypeInfo ti) {
this.elements = elements;
this.ti = ti;
this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : null;
Expand All @@ -195,34 +281,30 @@ public Iterator<T> iterator() {
return new InMemoryIterator<>(elements.iterator());
}

try {
KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null;
int modifier = ascending ? 1 : -1;

final List<T> sorted = copyElements();
Collections.sort(sorted, (e1, e2) -> modifier * compare(e1, e2, getter));
Stream<T> stream = sorted.stream();
KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null;
int modifier = ascending ? 1 : -1;

if (first != null) {
stream = stream.filter(e -> modifier * compare(e, getter, first) >= 0);
}
final List<T> sorted = copyElements();
sorted.sort((e1, e2) -> modifier * compare(e1, e2, getter));
Stream<T> stream = sorted.stream();

if (last != null) {
stream = stream.filter(e -> modifier * compare(e, getter, last) <= 0);
}
if (first != null) {
stream = stream.filter(e -> modifier * compare(e, getter, first) >= 0);
}

if (skip > 0) {
stream = stream.skip(skip);
}
if (last != null) {
stream = stream.filter(e -> modifier * compare(e, getter, last) <= 0);
}

if (max < sorted.size()) {
stream = stream.limit((int) max);
}
if (skip > 0) {
stream = stream.skip(skip);
}

return new InMemoryIterator<>(stream.iterator());
} catch (Exception e) {
throw Throwables.propagate(e);
if (max < sorted.size()) {
stream = stream.limit((int) max);
}

return new InMemoryIterator<>(stream.iterator());
}

/**
Expand All @@ -248,16 +330,16 @@ private int compare(T e1, T e2, KVTypeInfo.Accessor getter) {
diff = compare(e1, natural, natural.get(e2));
}
return diff;
} catch (Exception e) {
throw Throwables.propagate(e);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}

private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) {
try {
return asKey(getter.get(e1)).compareTo(asKey(v2));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you try to use the getPredicate stuff in this class too? Seems like it could save some computation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not attempted to apply the basic trick in getPredicate (of calling or not calling asKey() depending on whether the index requires it). I agree that there might be a win there, but the application seems difficult and unlikely to yield clear code. getPredicate() is doing two things -- it's converting a Collection into a Set as well as converting all the entries through asKey as necessary, AND it's removing the necessity of calling getClass.isArray when it isn't.

There appear three basic times where this strategy would be useful -- during copyElements when parent is defined, and during iteration when first and/or last are defined. Well, I don't see a contract about when first or last can be changed wrt when an iteration is started or running, so I'm a little worried about modifying that code, and I'm not sure what the usecases are for parents. But let's assume that first and last are actually fixed during iteration (it seems bad that it is not), and I'm willing to make changes to the parent code blind .... Presumably I'd want a set of compare() routines that took a Comparable as the second argument (the second argument being fixed locally to asKey(first|last|parent)) with two potential names -- one that would call asKey and one that wouldn't. I'm blanking on ideas for names. Ideas?

What I'll do is create the "dumb" version of this compare call, and either we'll revert because we want to allow first and last to be modified during iteration, or we'll come up with some good names for dealing with the other half of the asKey calls. Or we'll decide that the getClass.isArray calls for only one side of the compare isn't so bad.

} catch (Exception e) {
throw Throwables.propagate(e);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.util.kvstore;

import java.io.Closeable;
import java.util.Collection;

import org.apache.spark.annotation.Private;

Expand Down Expand Up @@ -126,4 +127,8 @@ public interface KVStore extends Closeable {
*/
long count(Class<?> type, String index, Object indexedValue) throws Exception;

/**
* A cheaper way to remove multiple items from the KVStore
*/
<T> boolean removeAllByKeys(Class<T> klass, String index, Collection keys) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
@Private
public abstract class KVStoreView<T> implements Iterable<T> {

final Class<T> type;

boolean ascending = true;
String index = KVIndex.NATURAL_INDEX_NAME;
Object first = null;
Expand All @@ -48,10 +46,6 @@ public abstract class KVStoreView<T> implements Iterable<T> {
long skip = 0L;
long max = Long.MAX_VALUE;

public KVStoreView(Class<T> type) {
this.type = type;
}

/**
* Reverses the order of iteration. By default, iterates in ascending order.
*/
Expand Down
Loading