Skip to content

Commit

Permalink
Accelerate InMemoryStore with a new index
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed Feb 26, 2020
1 parent 8f247e5 commit 4f93ffc
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -205,23 +204,45 @@ public void accept(Comparable<Object> key, T value) {
private final KVTypeInfo ti;
private final KVTypeInfo.Accessor naturalKey;
private final ConcurrentMap<Comparable<Object>, T> data;
private final String naturalParentIndexName;
// A mapping from parent to the natural keys of its children.
private final ConcurrentMap<Comparable<Object>, ConcurrentMap<Comparable<Object>, Boolean>>
parentToChildrenMap;

private InstanceList(Class<?> klass) {
this.ti = new KVTypeInfo(klass);
this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
this.data = new ConcurrentHashMap<>();
this.parentToChildrenMap = new ConcurrentHashMap<>();
this.naturalParentIndexName = ti.getParentIndexName(KVIndex.NATURAL_INDEX_NAME);
}

KVTypeInfo.Accessor getIndexAccessor(String indexName) {
return ti.getAccessor(indexName);
}

int countingRemoveAllByIndexValues(String index, Collection<?> indexValues) {
Predicate<? super T> filter = getPredicate(ti.getAccessor(index), indexValues);
CountingRemoveIfForEach<T> callback = new CountingRemoveIfForEach<>(data, filter);
if (!naturalParentIndexName.isEmpty() && naturalParentIndexName.equals(index)) {
for (Object indexValue : indexValues) {
Comparable<Object> parentKey = asKey(indexValue);
ConcurrentMap<Comparable<Object>, Boolean> children =
parentToChildrenMap.computeIfAbsent(parentKey, k -> new ConcurrentHashMap<>());
int count = 0;
for (Object key : children.keySet()) {
data.remove(asKey(key));
count ++;
}
parentToChildrenMap.remove(parentKey);
return count;
}
} else {
Predicate<? super T> filter = getPredicate(ti.getAccessor(index), indexValues);
CountingRemoveIfForEach<T> callback = new CountingRemoveIfForEach<>(data, filter);

data.forEach(callback);
return callback.count();
data.forEach(callback);
return callback.count();
}
return 0;
}

public T get(Object key) {
Expand All @@ -230,18 +251,31 @@ public T get(Object key) {

public void put(T value) throws Exception {
data.put(asKey(naturalKey.get(value)), value);
if (!naturalParentIndexName.isEmpty()) {
Comparable<Object> parentKey = asKey(getIndexAccessor(naturalParentIndexName).get(value));
ConcurrentMap<Comparable<Object>, Boolean> children =
parentToChildrenMap.computeIfAbsent(parentKey, k -> new ConcurrentHashMap<>());
children.put(asKey(naturalKey.get(value)), true);
}
}

public void delete(Object key) {
data.remove(asKey(key));
if (!naturalParentIndexName.isEmpty()) {
for (ConcurrentMap<Comparable<Object>, Boolean> v : parentToChildrenMap.values()) {
if (v.remove(asKey(key))) {
break;
}
}
}
}

public int size() {
return data.size();
}

public InMemoryView<T> view() {
return new InMemoryView<>(data.values(), ti);
return new InMemoryView<>(data, ti, naturalParentIndexName, parentToChildrenMap);
}

private static <T> Predicate<? super T> getPredicate(
Expand Down Expand Up @@ -271,22 +305,32 @@ private static Object indexValueForEntity(KVTypeInfo.Accessor getter, Object ent

private static class InMemoryView<T> extends KVStoreView<T> {
private static final InMemoryView<?> EMPTY_VIEW =
new InMemoryView<>(Collections.emptyList(), null);
new InMemoryView<>(new ConcurrentHashMap<>(), null, "", new ConcurrentHashMap<>());

private final Collection<T> elements;
private final ConcurrentMap<Comparable<Object>, T> data;
private final KVTypeInfo ti;
private final KVTypeInfo.Accessor natural;

InMemoryView(Collection<T> elements, KVTypeInfo ti) {
this.elements = elements;
private final ConcurrentMap<Comparable<Object>, ConcurrentMap<Comparable<Object>, Boolean>>
parentToChildrenMap;
private final String naturalParentIndexName;

InMemoryView(
ConcurrentMap<Comparable<Object>, T> data,
KVTypeInfo ti,
String naturalParentIndexName,
ConcurrentMap<Comparable<Object>, ConcurrentMap<Comparable<Object>, Boolean>>
parentToChildrenMap) {
this.data = data;
this.ti = ti;
this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : null;
this.naturalParentIndexName = naturalParentIndexName;
this.parentToChildrenMap = parentToChildrenMap;
}

@Override
public Iterator<T> iterator() {
if (elements.isEmpty()) {
return new InMemoryIterator<>(elements.iterator());
if (data.isEmpty()) {
return new InMemoryIterator<>(data.values().iterator());
}

KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null;
Expand Down Expand Up @@ -322,15 +366,25 @@ public Iterator<T> iterator() {
*/
private List<T> copyElements() {
if (parent != null) {
KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index.");
Comparable<?> parentKey = asKey(parent);

return elements.stream()
.filter(e -> compare(e, parentGetter, parentKey) == 0)
.collect(Collectors.toList());
Comparable<Object> parentKey = asKey(parent);
if (!naturalParentIndexName.isEmpty() &&
naturalParentIndexName.equals(ti.getParentIndexName(index))) {
ConcurrentMap<Comparable<Object>, Boolean> children =
parentToChildrenMap.computeIfAbsent(parentKey, k -> new ConcurrentHashMap<>());
ArrayList<T> elements = new ArrayList<>();
for (Comparable<Object> naturalKey : children.keySet()) {
data.computeIfPresent(naturalKey, (k, v) -> {elements.add(v); return v;});
}
return elements;
} else {
KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index.");
return data.values().stream()
.filter(e -> compare(e, parentGetter, parentKey) == 0)
.collect(Collectors.toList());
}
} else {
return new ArrayList<>(elements);
return new ArrayList<>(data.values());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ public KVTypeInfo(Class<?> type) {

Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME),
"No natural index defined for type %s.", type.getName());
Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(),
"Natural index of %s cannot have a parent.", type.getName());

for (KVIndex idx : indices.values()) {
if (!idx.parent().isEmpty()) {
Expand Down Expand Up @@ -117,6 +115,11 @@ Accessor getParentAccessor(String indexName) {
return index.parent().isEmpty() ? null : getAccessor(index.parent());
}

String getParentIndexName(String indexName) {
KVIndex index = indices.get(indexName);
return index.parent();
}

/**
* Abstracts the difference between invoking a Field and a Method.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,13 @@ class LevelDBTypeInfo {

// First create the parent indices, then the child indices.
ti.indices().forEach(idx -> {
if (idx.parent().isEmpty()) {
// In LevelDB, there is no parent index for the NUTURAL INDEX.
if (idx.parent().isEmpty() || idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) {
indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), null));
}
});
ti.indices().forEach(idx -> {
if (!idx.parent().isEmpty()) {
if (!idx.parent().isEmpty() && !idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) {
indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()),
indices.get(idx.parent())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private[spark] object TaskIndexNames {
private[spark] class TaskDataWrapper(
// Storing this as an object actually saves memory; it's also used as the key in the in-memory
// store, so in that case you'd save the extra copy of the value here.
@KVIndexParam
@KVIndexParam(parent = TaskIndexNames.STAGE)
val taskId: JLong,
@KVIndexParam(value = TaskIndexNames.TASK_INDEX, parent = TaskIndexNames.STAGE)
val index: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

package org.apache.spark.status

import org.scalatest.concurrent.TimeLimits
import org.scalatest.time.{Millis, Span}
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.{TaskInfo, TaskLocality}
import org.apache.spark.util.{Distribution, Utils}
import org.apache.spark.util.kvstore._

class AppStatusStoreSuite extends SparkFunSuite {
class AppStatusStoreSuite extends SparkFunSuite with TimeLimits {

private val uiQuantiles = Array(0.0, 0.25, 0.5, 0.75, 1.0)
private val stageId = 1
Expand Down Expand Up @@ -78,6 +82,26 @@ class AppStatusStoreSuite extends SparkFunSuite {
assert(store.count(classOf[CachedQuantile]) === 2)
}

test("InMemoryStore should build index from Stage ID to Task data") {
val store = new InMemoryStore()
(0 until 1000).map { sId =>
(0 until 1000).map { taskId =>
val task = newTaskData(sId * 1000 + taskId, "SUCCESS", sId)
store.write(task)
}
}
val appStatusStore = new AppStatusStore(store)
failAfter(Span(200, Millis)) {
appStatusStore.taskSummary(1, attemptId, Array(0, 0.25, 0.5, 0.75, 1))
}
val stageIds = Seq(1, 11, 66, 88)
val stageKeys = stageIds.map(Array(_, attemptId))
failAfter(Span(10, Millis)) {
store.removeAllByIndexValues(classOf[TaskDataWrapper], TaskIndexNames.STAGE,
stageKeys.asJavaCollection)
}
}

private def createAppStore(disk: Boolean, live: Boolean): AppStatusStore = {
val conf = new SparkConf()
if (live) {
Expand Down Expand Up @@ -152,12 +176,13 @@ class AppStatusStoreSuite extends SparkFunSuite {
}
}

private def newTaskData(i: Int, status: String = "SUCCESS"): TaskDataWrapper = {
private def newTaskData(i: Int, status: String = "SUCCESS",
sId: Int = stageId): TaskDataWrapper = {
new TaskDataWrapper(
i.toLong, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None, true,
i, i, i, i, i, i, i, i, i, i,
i, i, i, i, i, i, i, i, i, i,
i, i, i, i, stageId, attemptId)
i, i, i, i, sId, attemptId)
}

private def writeTaskDataToStore(i: Int, store: KVStore, status: String): Unit = {
Expand Down

0 comments on commit 4f93ffc

Please sign in to comment.