Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30964][Core][WebUI] Accelerate InMemoryStore with a new index #27716

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ public void clear() {
}
}

/**
* An alias class for the type "ConcurrentHashMap<Comparable<Object>, Boolean>", which is used
* as a concurrent hashset for storing natural keys.
gengliangwang marked this conversation as resolved.
Show resolved Hide resolved
*/
private static class NaturalKeys extends ConcurrentHashMap<Comparable<Object>, Boolean> {}

private static class InstanceList<T> {

/**
Expand Down Expand Up @@ -205,23 +211,44 @@ public void accept(Comparable<Object> key, T value) {
private final KVTypeInfo ti;
private final KVTypeInfo.Accessor naturalKey;
private final ConcurrentMap<Comparable<Object>, T> data;
private final String naturalParentIndexName;
// A mapping from parent to the natural keys of its children.
// For example, a mapping from a stage ID to all the task IDs in the stage.
private final ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap;

private InstanceList(Class<?> klass) {
this.ti = new KVTypeInfo(klass);
this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
this.data = new ConcurrentHashMap<>();
this.naturalParentIndexName = ti.getParentIndexName(KVIndex.NATURAL_INDEX_NAME);
this.parentToChildrenMap = new ConcurrentHashMap<>();
}

KVTypeInfo.Accessor getIndexAccessor(String indexName) {
return ti.getAccessor(indexName);
}

int countingRemoveAllByIndexValues(String index, Collection<?> indexValues) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since there is no document, I still have problems understanding it.

The basic idea is: if the index is natural index, we can just look it up in O(1). For other index, we have to do linear scan, extract keys and find matches.

The extension here: if the index is parent of natural index, get the children natural indexes and do O(1) lookup.

However, seems the basic idea is missing? Shall we avoid linear scan if index is natural index?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and no.
I am aware of the natural index is not handled properly here. But all the method calls of this method doesn't pass the natural index in.
There is a simpler API to use in KVStore

void delete(Class<?> type, Object naturalKey)

So I think this is minor and I prefer to do it in another PR, since this one is complicated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably unify the delete method, but it's not related to this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will do it in another PR.

Predicate<? super T> filter = getPredicate(ti.getAccessor(index), indexValues);
CountingRemoveIfForEach<T> callback = new CountingRemoveIfForEach<>(data, filter);
if (!naturalParentIndexName.isEmpty() && naturalParentIndexName.equals(index)) {
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
int count = 0;
for (Object indexValue : indexValues) {
Comparable<Object> parentKey = asKey(indexValue);
NaturalKeys children =
parentToChildrenMap.computeIfAbsent(parentKey, k -> new NaturalKeys());
gengliangwang marked this conversation as resolved.
Show resolved Hide resolved
for (Comparable<Object> naturalKey : children.keySet()) {
data.remove(naturalKey);
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
count ++;
}
parentToChildrenMap.remove(parentKey);
}
return count;
} else {
Predicate<? super T> filter = getPredicate(ti.getAccessor(index), indexValues);
CountingRemoveIfForEach<T> callback = new CountingRemoveIfForEach<>(data, filter);

data.forEach(callback);
return callback.count();
data.forEach(callback);
return callback.count();
}
}

public T get(Object key) {
Expand All @@ -230,18 +257,31 @@ public T get(Object key) {

public void put(T value) throws Exception {
data.put(asKey(naturalKey.get(value)), value);
if (!naturalParentIndexName.isEmpty()) {
Comparable<Object> parentKey = asKey(getIndexAccessor(naturalParentIndexName).get(value));
NaturalKeys children =
parentToChildrenMap.computeIfAbsent(parentKey, k -> new NaturalKeys());
children.put(asKey(naturalKey.get(value)), true);
}
}

public void delete(Object key) {
data.remove(asKey(key));
if (!naturalParentIndexName.isEmpty()) {
for (NaturalKeys v : parentToChildrenMap.values()) {
if (v.remove(asKey(key))) {
break;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a parent key in parentToChildrenMap points to empty NaturalKeys, we can also remove it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, nothing will change if the NaturalKeys v doesn't contain key.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I meant after v.remove(asKey(key)), if v is empty, can we remove the (parent key, empty NaturalKeys) from parentToChildrenMap?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, parentToChildrenMap is a concurrent map and checking emptiness costs time.
The method here is to delete one entry. I think we can make it simple and keep it this way.

}
}

public int size() {
return data.size();
}

public InMemoryView<T> view() {
return new InMemoryView<>(data.values(), ti);
return new InMemoryView<>(data, ti, naturalParentIndexName, parentToChildrenMap);
}

private static <T> Predicate<? super T> getPredicate(
Expand Down Expand Up @@ -271,22 +311,30 @@ private static Object indexValueForEntity(KVTypeInfo.Accessor getter, Object ent

private static class InMemoryView<T> extends KVStoreView<T> {
private static final InMemoryView<?> EMPTY_VIEW =
new InMemoryView<>(Collections.emptyList(), null);
new InMemoryView<>(new ConcurrentHashMap<>(), null, "", new ConcurrentHashMap<>());

private final Collection<T> elements;
private final ConcurrentMap<Comparable<Object>, T> data;
private final KVTypeInfo ti;
private final KVTypeInfo.Accessor natural;

InMemoryView(Collection<T> elements, KVTypeInfo ti) {
this.elements = elements;
private final ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap;
private final String naturalParentIndexName;

InMemoryView(
ConcurrentMap<Comparable<Object>, T> data,
KVTypeInfo ti,
String naturalParentIndexName,
ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap) {
this.data = data;
this.ti = ti;
this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : null;
this.naturalParentIndexName = naturalParentIndexName;
this.parentToChildrenMap = parentToChildrenMap;
}

@Override
public Iterator<T> iterator() {
if (elements.isEmpty()) {
return new InMemoryIterator<>(elements.iterator());
if (data.isEmpty()) {
return new InMemoryIterator<>(Collections.emptyIterator());
}

KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null;
Expand Down Expand Up @@ -322,15 +370,28 @@ public Iterator<T> iterator() {
*/
private List<T> copyElements() {
if (parent != null) {
KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index.");
Comparable<?> parentKey = asKey(parent);

return elements.stream()
.filter(e -> compare(e, parentGetter, parentKey) == 0)
.collect(Collectors.toList());
Comparable<Object> parentKey = asKey(parent);
if (!naturalParentIndexName.isEmpty() &&
naturalParentIndexName.equals(ti.getParentIndexName(index))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that naturalParentIndexName doesn't equal to ti.getParentIndexName(index)? Isn't String index = KVIndex.NATURAL_INDEX_NAME?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NaturalKeys children =
parentToChildrenMap.computeIfAbsent(parentKey, k -> new NaturalKeys());
gengliangwang marked this conversation as resolved.
Show resolved Hide resolved
ArrayList<T> elements = new ArrayList<>();
for (Comparable<Object> naturalKey : children.keySet()) {
data.computeIfPresent(naturalKey, (k, v) -> {
gengliangwang marked this conversation as resolved.
Show resolved Hide resolved
elements.add(v);
return v;
});
}
return elements;
} else {
KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index.");
return data.values().stream()
.filter(e -> compare(e, parentGetter, parentKey) == 0)
.collect(Collectors.toList());
}
} else {
return new ArrayList<>(elements);
return new ArrayList<>(data.values());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ public KVTypeInfo(Class<?> type) {

Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME),
"No natural index defined for type %s.", type.getName());
Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(),
"Natural index of %s cannot have a parent.", type.getName());

for (KVIndex idx : indices.values()) {
if (!idx.parent().isEmpty()) {
Expand Down Expand Up @@ -117,6 +115,11 @@ Accessor getParentAccessor(String indexName) {
return index.parent().isEmpty() ? null : getAccessor(index.parent());
}

String getParentIndexName(String indexName) {
KVIndex index = indices.get(indexName);
return index.parent();
}

/**
* Abstracts the difference between invoking a Field and a Method.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,13 @@ class LevelDBTypeInfo {

// First create the parent indices, then the child indices.
ti.indices().forEach(idx -> {
if (idx.parent().isEmpty()) {
// In LevelDB, there is no parent index for the NUTURAL INDEX.
if (idx.parent().isEmpty() || idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) {
indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), null));
}
});
ti.indices().forEach(idx -> {
if (!idx.parent().isEmpty()) {
if (!idx.parent().isEmpty() && !idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) {
indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()),
indices.get(idx.parent())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private[spark] object TaskIndexNames {
private[spark] class TaskDataWrapper(
// Storing this as an object actually saves memory; it's also used as the key in the in-memory
// store, so in that case you'd save the extra copy of the value here.
@KVIndexParam
@KVIndexParam(parent = TaskIndexNames.STAGE)
val taskId: JLong,
@KVIndexParam(value = TaskIndexNames.TASK_INDEX, parent = TaskIndexNames.STAGE)
val index: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

package org.apache.spark.status

import org.scalatest.concurrent.TimeLimits
import org.scalatest.time.{Millis, Span}
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.{TaskInfo, TaskLocality}
import org.apache.spark.util.{Distribution, Utils}
import org.apache.spark.util.kvstore._

class AppStatusStoreSuite extends SparkFunSuite {
class AppStatusStoreSuite extends SparkFunSuite with TimeLimits {

private val uiQuantiles = Array(0.0, 0.25, 0.5, 0.75, 1.0)
private val stageId = 1
Expand Down Expand Up @@ -78,6 +82,26 @@ class AppStatusStoreSuite extends SparkFunSuite {
assert(store.count(classOf[CachedQuantile]) === 2)
}

test("InMemoryStore should build index from Stage ID to Task data") {
val store = new InMemoryStore()
(0 until 1000).map { sId =>
(0 until 1000).map { taskId =>
val task = newTaskData(sId * 1000 + taskId, "SUCCESS", sId)
store.write(task)
}
}
val appStatusStore = new AppStatusStore(store)
failAfter(Span(200, Millis)) {
appStatusStore.taskSummary(1, attemptId, Array(0, 0.25, 0.5, 0.75, 1))
}
val stageIds = Seq(1, 11, 66, 88)
val stageKeys = stageIds.map(Array(_, attemptId))
failAfter(Span(10, Millis)) {
gengliangwang marked this conversation as resolved.
Show resolved Hide resolved
store.removeAllByIndexValues(classOf[TaskDataWrapper], TaskIndexNames.STAGE,
stageKeys.asJavaCollection)
}
}

private def createAppStore(disk: Boolean, live: Boolean): AppStatusStore = {
val conf = new SparkConf()
if (live) {
Expand Down Expand Up @@ -152,12 +176,15 @@ class AppStatusStoreSuite extends SparkFunSuite {
}
}

private def newTaskData(i: Int, status: String = "SUCCESS"): TaskDataWrapper = {
private def newTaskData(
i: Int,
status: String = "SUCCESS",
sId: Int = stageId): TaskDataWrapper = {
new TaskDataWrapper(
i.toLong, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None, true,
i, i, i, i, i, i, i, i, i, i,
i, i, i, i, i, i, i, i, i, i,
i, i, i, i, stageId, attemptId)
i, i, i, i, sId, attemptId)
}

private def writeTaskDataToStore(i: Int, store: KVStore, status: String): Unit = {
Expand Down