Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
David Navas committed May 16, 2019
1 parent 389a98e commit dfa52c5
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,15 @@ public void close() {
inMemoryLists.clear();
}

@SuppressWarnings("unchecked")
@Override
public <T> boolean removeAllByKeys(Class<T> klass, String index, Collection keys) {
public <T> boolean removeAllByIndexValues(
Class<T> klass,
String index,
Collection<?> indexValues) {
InstanceList<T> list = inMemoryLists.get(klass);

if (list != null) {
return list.countingRemoveAllByKeys(index, keys) > 0;
return list.countingRemoveAllByIndexValues(index, indexValues) > 0;
} else {
return false;
}
Expand Down Expand Up @@ -146,7 +148,7 @@ private static class InMemoryLists {

@SuppressWarnings("unchecked")
public <T> InstanceList<T> get(Class<T> type) {
return (InstanceList<T>)data.get(type);
return (InstanceList<T>) data.get(type);
}

@SuppressWarnings("unchecked")
Expand All @@ -163,10 +165,23 @@ public void clear() {

private static class InstanceList<T> {

/**
* A BiConsumer to control multi-entity removal. We use this in a forEach rather than an
* iterator because there is a bug in jdk8 which affects remove() on all concurrent map
* iterators. https://bugs.openjdk.java.net/browse/JDK-8078645
*/
private static class CountingRemoveIfForEach<T> implements BiConsumer<Comparable<Object>, T> {
ConcurrentMap<Comparable<Object>, T> data;
Predicate<? super T> filter;
int count = 0;
private final ConcurrentMap<Comparable<Object>, T> data;
private final Predicate<? super T> filter;

/**
* Keeps a count of the number of elements removed. This count is not currently surfaced
* to clients of KVStore as Java's generic removeAll() construct returns only a boolean,
* but I found it handy to have the count of elements removed while debugging; a count being
* no more complicated than a boolean, I've retained that behavior here, even though there
* is no current requirement.
*/
private int count = 0;

CountingRemoveIfForEach(
ConcurrentMap<Comparable<Object>, T> data,
Expand All @@ -176,15 +191,14 @@ private static class CountingRemoveIfForEach<T> implements BiConsumer<Comparable
}

public void accept(Comparable<Object> key, T value) {
// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which affects remove() on
// all iterators of concurrent maps, and specifically makes countingRemoveIf difficult to
// implement correctly against the values() iterator, we use forEach instead....
if (filter.test(value)) {
if (data.remove(key, value)) {
count++;
}
}
}

public int count() { return count; }
}

private final KVTypeInfo ti;
Expand All @@ -201,18 +215,12 @@ KVTypeInfo.Accessor getIndexAccessor(String indexName) {
return ti.getAccessor(indexName);
}

// Note: removeIf returns a boolean if any element has been removed.
// While debugging this code, it was handy to have the count of elements
// removed, rather than an indicator of whether something has been
// removed, and a count is no more complicated than a boolean so I've
// retained that behavior here, although there is no current requirement.
@SuppressWarnings("unchecked")
int countingRemoveAllByKeys(String index, Collection keys) {
Predicate<? super T> filter = getPredicate(ti.getAccessor(index), keys);
int countingRemoveAllByIndexValues(String index, Collection<?> indexValues) {
Predicate<? super T> filter = getPredicate(ti.getAccessor(index), indexValues);
CountingRemoveIfForEach<T> callback = new CountingRemoveIfForEach<>(data, filter);

data.forEach(callback);
return callback.count;
return callback.count();
}

public T get(Object key) {
Expand All @@ -231,39 +239,38 @@ public int size() {
return data.size();
}

@SuppressWarnings("unchecked")
public InMemoryView<T> view() {
return new InMemoryView<>(data.values(), ti);
}

@SuppressWarnings("unchecked")
private static <T> Predicate<? super T> getPredicate(
KVTypeInfo.Accessor getter,
Collection keys) {
Collection<?> keys) {
if (Comparable.class.isAssignableFrom(getter.getType())) {
HashSet set = new HashSet(keys);
HashSet<?> set = new HashSet<>(keys);

return (value) -> set.contains(keyFromValue(getter, value));
return (value) -> set.contains(indexValueForEntity(getter, value));
} else {
HashSet<Comparable> set = new HashSet<>(keys.size());
for (Object key : keys) {
set.add(asKey(key));
}
return (value) -> set.contains(asKey(keyFromValue(getter, value)));
return (value) -> set.contains(asKey(indexValueForEntity(getter, value)));
}
}

private static Object keyFromValue(KVTypeInfo.Accessor getter, Object value) {
private static Object indexValueForEntity(KVTypeInfo.Accessor getter, Object entity) {
try {
return getter.get(value);
return getter.get(entity);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}
}

private static class InMemoryView<T> extends KVStoreView<T> {
private static InMemoryView EMPTY_VIEW = new InMemoryView<>(Collections.emptyList(), null);
private static final InMemoryView EMPTY_VIEW =
new InMemoryView<>(Collections.emptyList(), null);

private final Collection<T> elements;
private final KVTypeInfo ti;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,6 @@ public interface KVStore extends Closeable {
/**
* A cheaper way to remove multiple items from the KVStore
*/
<T> boolean removeAllByKeys(Class<T> klass, String index, Collection keys) throws Exception;
<T> boolean removeAllByIndexValues(Class<T> klass, String index, Collection<?> indexValues)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,16 @@ public Iterator<T> iterator() {
}

@Override
public <T> boolean removeAllByKeys(
public <T> boolean removeAllByIndexValues(
Class<T> klass,
String index,
Collection keys) throws Exception {
Collection<?> indexValues) throws Exception {
LevelDBTypeInfo.Index naturalIndex = getTypeInfo(klass).naturalIndex();
boolean removed = false;
KVStoreView<T> view = view(klass).index(index);

for (Object key : keys) {
for (T value: view.first(key).last(key)) {
for (Object indexValue : indexValues) {
for (T value: view.first(indexValue).last(indexValue)) {
Object itemKey = naturalIndex.getValue(value);
delete(klass, itemKey);
removed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,17 @@ public void testRemoveAll() throws Exception {
HashSet set = new HashSet();
set.add(new int[] {0, 0, 0});
set.add(new int[] { 2, 2, 2 });
store.removeAllByKeys(ArrayKeyIndexType.class, KVIndex.NATURAL_INDEX_NAME, set);
store.removeAllByIndexValues(ArrayKeyIndexType.class, KVIndex.NATURAL_INDEX_NAME, set);
assertEquals(7, store.count(ArrayKeyIndexType.class));

set.clear();
set.add(new String[] { "things" });
store.removeAllByKeys(ArrayKeyIndexType.class, "id", set);
store.removeAllByIndexValues(ArrayKeyIndexType.class, "id", set);
assertEquals(4, store.count(ArrayKeyIndexType.class));

set.clear();
set.add(new String[] { "more things" });
store.removeAllByKeys(ArrayKeyIndexType.class, "id", set);
store.removeAllByIndexValues(ArrayKeyIndexType.class, "id", set);
assertEquals(0, store.count(ArrayKeyIndexType.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,17 @@ public void testRemoveAll() throws Exception {
HashSet set = new HashSet();
set.add(new int[] {0, 0, 0});
set.add(new int[] { 2, 2, 2 });
db.removeAllByKeys(ArrayKeyIndexType.class, KVIndex.NATURAL_INDEX_NAME, set);
db.removeAllByIndexValues(ArrayKeyIndexType.class, KVIndex.NATURAL_INDEX_NAME, set);
assertEquals(7, db.count(ArrayKeyIndexType.class));

set.clear();
set.add(new String[] { "things" });
db.removeAllByKeys(ArrayKeyIndexType.class, "id", set);
db.removeAllByIndexValues(ArrayKeyIndexType.class, "id", set);
assertEquals(4, db.count(ArrayKeyIndexType.class));

set.clear();
set.add(new String[] { "more things" });
db.removeAllByKeys(ArrayKeyIndexType.class, "id", set);
db.removeAllByIndexValues(ArrayKeyIndexType.class, "id", set);
assertEquals(0, db.count(ArrayKeyIndexType.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ private[spark] class AppStatusListener(
s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING
}

val stageKeys = stages.map { s =>
val stageIndexValues = stages.map { s =>
val key = Array(s.info.stageId, s.info.attemptId)
kvstore.delete(s.getClass(), key)

Expand Down Expand Up @@ -1173,16 +1173,16 @@ private[spark] class AppStatusListener(
}

// Delete summaries in one pass, as deleting them for each stage is slow
val totalSummariesDeleted = kvstore.removeAllByKeys(
kvstore.removeAllByIndexValues(
classOf[ExecutorStageSummaryWrapper],
"stage",
stageKeys)
stageIndexValues)

// Delete tasks for all stages in one pass, as deleting them for each stage individually is slow
kvstore.removeAllByKeys(
kvstore.removeAllByIndexValues(
classOf[TaskDataWrapper],
TaskIndexNames.STAGE,
stageKeys)
stageIndexValues)
}

private def cleanupTasks(stage: LiveStage): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.status

import java.util.Collection
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
Expand All @@ -28,7 +28,7 @@ import com.google.common.util.concurrent.MoreExecutors

import org.apache.spark.SparkConf
import org.apache.spark.internal.config.Status._
import org.apache.spark.status.ElementTrackingStore.{WriteQueueResult, WriteSkippedQueue}
import org.apache.spark.status.ElementTrackingStore._
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.kvstore._

Expand All @@ -51,17 +51,19 @@ import org.apache.spark.util.kvstore._
private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore {

private class LatchedTriggers(val triggers: Seq[Trigger[_]]) {
val countDeferred = new AtomicInteger(0)
private val pending = new AtomicBoolean(false)

def fireOnce(f: Seq[Trigger[_]] => Unit): Boolean = {
val shouldExecute = countDeferred.compareAndSet(0, 1)
if (shouldExecute) {
def fireOnce(f: Seq[Trigger[_]] => Unit): WriteQueueResult = {
val shouldEnqueue = pending.compareAndSet(false, true)
if (shouldEnqueue) {
doAsync {
countDeferred.set(0)
pending.set(false)
f(triggers)
}
WriteQueued
} else {
WriteSkippedQueue
}
shouldExecute
}

def :+(addlTrigger: Trigger[_]): LatchedTriggers = {
Expand Down Expand Up @@ -129,25 +131,29 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten

if (checkTriggers && !stopped) {
triggers.get(value.getClass).map { latchedList =>
WriteQueueResult(latchedList.fireOnce { list =>
latchedList.fireOnce { list =>
val count = store.count(value.getClass)
list.foreach { t =>
if (count > t.threshold) {
t.action(count)
}
}
})
}
}.getOrElse(WriteSkippedQueue)
} else {
WriteSkippedQueue
}
}

def removeAllByKeys[T](klass: Class[T], index: String, keys: Iterable[_]): Boolean =
removeAllByKeys(klass, index, keys.asJavaCollection)
def removeAllByIndexValues[T](klass: Class[T], index: String, indexValues: Iterable[_]): Boolean =
removeAllByIndexValues(klass, index, indexValues.asJavaCollection)

override def removeAllByKeys[T](klass: Class[T], index: String, keys: Collection[_]): Boolean =
store.removeAllByKeys(klass, index, keys)
override def removeAllByIndexValues[T](
klass: Class[T],
index: String,
indexValues: Collection[_]): Boolean = {
store.removeAllByIndexValues(klass, index, indexValues)
}

override def delete(klass: Class[_], naturalKey: Any): Unit = store.delete(klass, naturalKey)

Expand Down Expand Up @@ -200,15 +206,6 @@ private[spark] object ElementTrackingStore {
* The result of write() is otherwise unused.
*/
sealed trait WriteQueueResult
object WriteQueueResult {
def apply(b: Boolean): WriteQueueResult = {
if (b) {
WriteQueued
} else {
WriteSkippedQueue
}
}
}

object WriteQueued extends WriteQueueResult
object WriteSkippedQueue extends WriteQueueResult
Expand Down

0 comments on commit dfa52c5

Please sign in to comment.