-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30964][Core][WebUI] Accelerate InMemoryStore with a new index #27716
Changes from 10 commits
4f93ffc
c6c2c82
b0bb448
8969a4d
d50c801
9cb44c3
da463e9
753a14a
2ec82df
091fb7e
e02acc0
491e9eb
c0d3755
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -163,6 +163,12 @@ public void clear() { | |
} | ||
} | ||
|
||
/** | ||
* An alias class for the type "ConcurrentHashMap<Comparable<Object>, Boolean>", which is used | ||
* as a concurrent hashset for storing natural keys and the boolean value doesn't matter. | ||
*/ | ||
private static class NaturalKeys extends ConcurrentHashMap<Comparable<Object>, Boolean> {} | ||
|
||
private static class InstanceList<T> { | ||
|
||
/** | ||
|
@@ -205,23 +211,48 @@ public void accept(Comparable<Object> key, T value) { | |
private final KVTypeInfo ti; | ||
private final KVTypeInfo.Accessor naturalKey; | ||
private final ConcurrentMap<Comparable<Object>, T> data; | ||
private final String naturalParentIndexName; | ||
// A mapping from parent to the natural keys of its children. | ||
// For example, a mapping from a stage ID to all the task IDs in the stage. | ||
private final ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap; | ||
|
||
private InstanceList(Class<?> klass) { | ||
this.ti = new KVTypeInfo(klass); | ||
this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME); | ||
this.data = new ConcurrentHashMap<>(); | ||
this.naturalParentIndexName = ti.getParentIndexName(KVIndex.NATURAL_INDEX_NAME); | ||
this.parentToChildrenMap = new ConcurrentHashMap<>(); | ||
} | ||
|
||
KVTypeInfo.Accessor getIndexAccessor(String indexName) { | ||
return ti.getAccessor(indexName); | ||
} | ||
|
||
int countingRemoveAllByIndexValues(String index, Collection<?> indexValues) { | ||
Predicate<? super T> filter = getPredicate(ti.getAccessor(index), indexValues); | ||
CountingRemoveIfForEach<T> callback = new CountingRemoveIfForEach<>(data, filter); | ||
if (!naturalParentIndexName.isEmpty() && naturalParentIndexName.equals(index)) { | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// If there is a parent index for the natural index and `index` happens to be it, | ||
// Spark can use the `parentToChildrenMap` to get the related natural keys, and then | ||
// delete them from `data`. | ||
int count = 0; | ||
for (Object indexValue : indexValues) { | ||
Comparable<Object> parentKey = asKey(indexValue); | ||
NaturalKeys children = parentToChildrenMap.getOrDefault(parentKey, new NaturalKeys()); | ||
for (Comparable<Object> naturalKey : children.keySet()) { | ||
data.remove(naturalKey); | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
count ++; | ||
} | ||
parentToChildrenMap.remove(parentKey); | ||
} | ||
return count; | ||
} else { | ||
Predicate<? super T> filter = getPredicate(ti.getAccessor(index), indexValues); | ||
CountingRemoveIfForEach<T> callback = new CountingRemoveIfForEach<>(data, filter); | ||
|
||
data.forEach(callback); | ||
return callback.count(); | ||
// Go through all the values in `data` and delete objects that meets the predicate `filter`. | ||
// This can be slow when there is a large number of entries in `data`. | ||
data.forEach(callback); | ||
return callback.count(); | ||
} | ||
} | ||
|
||
public T get(Object key) { | ||
|
@@ -230,18 +261,31 @@ public T get(Object key) { | |
|
||
public void put(T value) throws Exception { | ||
data.put(asKey(naturalKey.get(value)), value); | ||
if (!naturalParentIndexName.isEmpty()) { | ||
Comparable<Object> parentKey = asKey(getIndexAccessor(naturalParentIndexName).get(value)); | ||
NaturalKeys children = | ||
parentToChildrenMap.computeIfAbsent(parentKey, k -> new NaturalKeys()); | ||
children.put(asKey(naturalKey.get(value)), true); | ||
} | ||
} | ||
|
||
public void delete(Object key) { | ||
data.remove(asKey(key)); | ||
if (!naturalParentIndexName.isEmpty()) { | ||
for (NaturalKeys v : parentToChildrenMap.values()) { | ||
if (v.remove(asKey(key))) { | ||
break; | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When a parent key in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, nothing will change if the NaturalKeys There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I meant after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, |
||
} | ||
} | ||
|
||
public int size() { | ||
return data.size(); | ||
} | ||
|
||
public InMemoryView<T> view() { | ||
return new InMemoryView<>(data.values(), ti); | ||
return new InMemoryView<>(data, ti, naturalParentIndexName, parentToChildrenMap); | ||
} | ||
|
||
private static <T> Predicate<? super T> getPredicate( | ||
|
@@ -271,22 +315,30 @@ private static Object indexValueForEntity(KVTypeInfo.Accessor getter, Object ent | |
|
||
private static class InMemoryView<T> extends KVStoreView<T> { | ||
private static final InMemoryView<?> EMPTY_VIEW = | ||
new InMemoryView<>(Collections.emptyList(), null); | ||
new InMemoryView<>(new ConcurrentHashMap<>(), null, "", new ConcurrentHashMap<>()); | ||
|
||
private final Collection<T> elements; | ||
private final ConcurrentMap<Comparable<Object>, T> data; | ||
private final KVTypeInfo ti; | ||
private final KVTypeInfo.Accessor natural; | ||
|
||
InMemoryView(Collection<T> elements, KVTypeInfo ti) { | ||
this.elements = elements; | ||
private final ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap; | ||
private final String naturalParentIndexName; | ||
|
||
InMemoryView( | ||
ConcurrentMap<Comparable<Object>, T> data, | ||
KVTypeInfo ti, | ||
String naturalParentIndexName, | ||
ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap) { | ||
this.data = data; | ||
this.ti = ti; | ||
this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : null; | ||
this.naturalParentIndexName = naturalParentIndexName; | ||
this.parentToChildrenMap = parentToChildrenMap; | ||
} | ||
|
||
@Override | ||
public Iterator<T> iterator() { | ||
if (elements.isEmpty()) { | ||
return new InMemoryIterator<>(elements.iterator()); | ||
if (data.isEmpty()) { | ||
return new InMemoryIterator<>(Collections.emptyIterator()); | ||
} | ||
|
||
KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null; | ||
|
@@ -322,15 +374,32 @@ public Iterator<T> iterator() { | |
*/ | ||
private List<T> copyElements() { | ||
if (parent != null) { | ||
KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index); | ||
Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index."); | ||
Comparable<?> parentKey = asKey(parent); | ||
|
||
return elements.stream() | ||
.filter(e -> compare(e, parentGetter, parentKey) == 0) | ||
.collect(Collectors.toList()); | ||
Comparable<Object> parentKey = asKey(parent); | ||
if (!naturalParentIndexName.isEmpty() && | ||
naturalParentIndexName.equals(ti.getParentIndexName(index))) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is possible. I have explained in https://github.com/apache/spark/pull/27716/files#r385846069. |
||
// If there is a parent index for the natural index and the parent of`index` happens to be | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
// it, Spark can use the `parentToChildrenMap` to get the related natural keys, and then | ||
// copy them from `data`. | ||
NaturalKeys children = parentToChildrenMap.getOrDefault(parentKey, new NaturalKeys()); | ||
ArrayList<T> elements = new ArrayList<>(); | ||
for (Comparable<Object> naturalKey : children.keySet()) { | ||
data.computeIfPresent(naturalKey, (k, v) -> { | ||
gengliangwang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
elements.add(v); | ||
return v; | ||
}); | ||
} | ||
return elements; | ||
} else { | ||
// Go through all the values in `data` and collect all the objects has certain parent | ||
// value. This can be slow when there is a large number of entries in `data`. | ||
KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index); | ||
Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index."); | ||
return data.values().stream() | ||
.filter(e -> compare(e, parentGetter, parentKey) == 0) | ||
.collect(Collectors.toList()); | ||
} | ||
} else { | ||
return new ArrayList<>(elements); | ||
return new ArrayList<>(data.values()); | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since there is no document, I still have problems understanding it.
The basic idea is: if the
index
is natural index, we can just look it up in O(1). For other index, we have to do linear scan, extract keys and find matches.The extension here: if the
index
is parent of natural index, get the children natural indexes and do O(1) lookup.However, seems the basic idea is missing? Shall we avoid linear scan if
index
is natural index?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes and no.
I am aware of the natural index is not handled properly here. But all the method calls of this method doesn't pass the natural index in.
There is a simpler API to use in
KVStore
So I think this is minor and I prefer to do it in another PR, since this one is complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably unify the
delete
method, but it's not related to this PR.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will do it in another PR.