diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java index b33c53871c32f..db08740975176 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java @@ -163,6 +163,12 @@ public void clear() { } } + /** + * An alias class for the type "ConcurrentHashMap, Boolean>", which is used + * as a concurrent hashset for storing natural keys and the boolean value doesn't matter. + */ + private static class NaturalKeys extends ConcurrentHashMap, Boolean> {} + private static class InstanceList { /** @@ -205,11 +211,19 @@ public void accept(Comparable key, T value) { private final KVTypeInfo ti; private final KVTypeInfo.Accessor naturalKey; private final ConcurrentMap, T> data; + private final String naturalParentIndexName; + private final Boolean hasNaturalParentIndex; + // A mapping from parent to the natural keys of its children. + // For example, a mapping from a stage ID to all the task IDs in the stage. + private final ConcurrentMap, NaturalKeys> parentToChildrenMap; private InstanceList(Class klass) { this.ti = new KVTypeInfo(klass); this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME); this.data = new ConcurrentHashMap<>(); + this.naturalParentIndexName = ti.getParentIndexName(KVIndex.NATURAL_INDEX_NAME); + this.parentToChildrenMap = new ConcurrentHashMap<>(); + this.hasNaturalParentIndex = !naturalParentIndexName.isEmpty(); } KVTypeInfo.Accessor getIndexAccessor(String indexName) { @@ -217,11 +231,30 @@ KVTypeInfo.Accessor getIndexAccessor(String indexName) { } int countingRemoveAllByIndexValues(String index, Collection indexValues) { - Predicate filter = getPredicate(ti.getAccessor(index), indexValues); - CountingRemoveIfForEach callback = new CountingRemoveIfForEach<>(data, filter); + if (hasNaturalParentIndex && naturalParentIndexName.equals(index)) { + // If there is a parent index for the natural index and `index` happens to be it, + // Spark can use the `parentToChildrenMap` to get the related natural keys, and then + // delete them from `data`. + int count = 0; + for (Object indexValue : indexValues) { + Comparable parentKey = asKey(indexValue); + NaturalKeys children = parentToChildrenMap.getOrDefault(parentKey, new NaturalKeys()); + for (Comparable naturalKey : children.keySet()) { + data.remove(naturalKey); + count ++; + } + parentToChildrenMap.remove(parentKey); + } + return count; + } else { + Predicate filter = getPredicate(ti.getAccessor(index), indexValues); + CountingRemoveIfForEach callback = new CountingRemoveIfForEach<>(data, filter); - data.forEach(callback); - return callback.count(); + // Go through all the values in `data` and delete objects that meets the predicate `filter`. + // This can be slow when there is a large number of entries in `data`. + data.forEach(callback); + return callback.count(); + } } public T get(Object key) { @@ -230,10 +263,27 @@ public T get(Object key) { public void put(T value) throws Exception { data.put(asKey(naturalKey.get(value)), value); + if (hasNaturalParentIndex) { + Comparable parentKey = asKey(getIndexAccessor(naturalParentIndexName).get(value)); + NaturalKeys children = + parentToChildrenMap.computeIfAbsent(parentKey, k -> new NaturalKeys()); + children.put(asKey(naturalKey.get(value)), true); + } } public void delete(Object key) { data.remove(asKey(key)); + if (hasNaturalParentIndex) { + for (NaturalKeys v : parentToChildrenMap.values()) { + if (v.remove(asKey(key))) { + // `v` can be empty after removing the natural key and we can remove it from + // `parentToChildrenMap`. However, `parentToChildrenMap` is a ConcurrentMap and such + // checking and deleting can be slow. + // This method is to delete one object with certain key, let's make it simple here. + break; + } + } + } } public int size() { @@ -241,7 +291,7 @@ public int size() { } public InMemoryView view() { - return new InMemoryView<>(data.values(), ti); + return new InMemoryView<>(data, ti, naturalParentIndexName, parentToChildrenMap); } private static Predicate getPredicate( @@ -271,22 +321,32 @@ private static Object indexValueForEntity(KVTypeInfo.Accessor getter, Object ent private static class InMemoryView extends KVStoreView { private static final InMemoryView EMPTY_VIEW = - new InMemoryView<>(Collections.emptyList(), null); + new InMemoryView<>(new ConcurrentHashMap<>(), null, "", new ConcurrentHashMap<>()); - private final Collection elements; + private final ConcurrentMap, T> data; private final KVTypeInfo ti; private final KVTypeInfo.Accessor natural; - - InMemoryView(Collection elements, KVTypeInfo ti) { - this.elements = elements; + private final ConcurrentMap, NaturalKeys> parentToChildrenMap; + private final String naturalParentIndexName; + private final Boolean hasNaturalParentIndex; + + InMemoryView( + ConcurrentMap, T> data, + KVTypeInfo ti, + String naturalParentIndexName, + ConcurrentMap, NaturalKeys> parentToChildrenMap) { + this.data = data; this.ti = ti; this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : null; + this.naturalParentIndexName = naturalParentIndexName; + this.parentToChildrenMap = parentToChildrenMap; + this.hasNaturalParentIndex = !naturalParentIndexName.isEmpty(); } @Override public Iterator iterator() { - if (elements.isEmpty()) { - return new InMemoryIterator<>(elements.iterator()); + if (data.isEmpty()) { + return new InMemoryIterator<>(Collections.emptyIterator()); } KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null; @@ -322,15 +382,31 @@ public Iterator iterator() { */ private List copyElements() { if (parent != null) { - KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index); - Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index."); - Comparable parentKey = asKey(parent); - - return elements.stream() - .filter(e -> compare(e, parentGetter, parentKey) == 0) - .collect(Collectors.toList()); + Comparable parentKey = asKey(parent); + if (hasNaturalParentIndex && naturalParentIndexName.equals(ti.getParentIndexName(index))) { + // If there is a parent index for the natural index and the parent of `index` happens to + // be it, Spark can use the `parentToChildrenMap` to get the related natural keys, and + // then copy them from `data`. + NaturalKeys children = parentToChildrenMap.getOrDefault(parentKey, new NaturalKeys()); + ArrayList elements = new ArrayList<>(); + for (Comparable naturalKey : children.keySet()) { + data.computeIfPresent(naturalKey, (k, v) -> { + elements.add(v); + return v; + }); + } + return elements; + } else { + // Go through all the values in `data` and collect all the objects has certain parent + // value. This can be slow when there is a large number of entries in `data`. + KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index); + Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index."); + return data.values().stream() + .filter(e -> compare(e, parentGetter, parentKey) == 0) + .collect(Collectors.toList()); + } } else { - return new ArrayList<>(elements); + return new ArrayList<>(data.values()); } } diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java index d2a26982d8703..5404d33dba5fb 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java @@ -68,8 +68,6 @@ public KVTypeInfo(Class type) { Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME), "No natural index defined for type %s.", type.getName()); - Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(), - "Natural index of %s cannot have a parent.", type.getName()); for (KVIndex idx : indices.values()) { if (!idx.parent().isEmpty()) { @@ -117,6 +115,11 @@ Accessor getParentAccessor(String indexName) { return index.parent().isEmpty() ? null : getAccessor(index.parent()); } + String getParentIndexName(String indexName) { + KVIndex index = indices.get(indexName); + return index.parent(); + } + /** * Abstracts the difference between invoking a Field and a Method. */ diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java index f4d359234cb9e..d7423537ddfcf 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java @@ -133,12 +133,13 @@ class LevelDBTypeInfo { // First create the parent indices, then the child indices. ti.indices().forEach(idx -> { - if (idx.parent().isEmpty()) { + // In LevelDB, there is no parent index for the NUTURAL INDEX. + if (idx.parent().isEmpty() || idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) { indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), null)); } }); ti.indices().forEach(idx -> { - if (!idx.parent().isEmpty()) { + if (!idx.parent().isEmpty() && !idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) { indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), indices.get(idx.parent()))); } diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index f0a94d84d8a04..c957ff75a501f 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -154,7 +154,7 @@ private[spark] object TaskIndexNames { private[spark] class TaskDataWrapper( // Storing this as an object actually saves memory; it's also used as the key in the in-memory // store, so in that case you'd save the extra copy of the value here. - @KVIndexParam + @KVIndexParam(parent = TaskIndexNames.STAGE) val taskId: JLong, @KVIndexParam(value = TaskIndexNames.TASK_INDEX, parent = TaskIndexNames.STAGE) val index: Int,