Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads #24616

Closed
wants to merge 8 commits into from

Conversation

Projects
None yet
4 participants
@davidnavas
Copy link
Contributor

commented May 15, 2019

What changes were proposed in this pull request?

The details of the PR are explored in-depth in the sub-tasks of the umbrella jira SPARK-27726.
Briefly:

  1. Stop issuing asynchronous requests to cleanup elements in the tracking store when a request is already pending
  2. Fix a couple of thread-safety issues (mutable state and mis-ordered updates)
  3. Move Summary deletion outside of Stage deletion loop like Tasks already are
  4. Reimplement multi-delete in a removeAllKeys call which allows InMemoryStore to implement it in a performant manner.
  5. Some generic typing and exception handling cleanup

We see about five orders of magnitude improvement in the deletion code, which for us is the difference between a server that needs restarting daily, and one that is stable over weeks.

How was this patch tested?

Unit tests for the fire-once asynchronous code and the removeAll calls in both LevelDB and InMemoryStore are supplied. It was noted that the testing code for the LevelDB and InMemoryStore is highly repetitive, and should probably be merged, but we did not attempt that in this PR.

A version of this code was run in our production 2.3.3 and we were able to sustain higher throughput without going into GC overload (which was happening on a daily basis some weeks ago).

A version of this code was also put under a purpose-built Performance Suite of tests to verify performance under both types of Store implementations for both before and after code streams and for both total and partial delete cases (this code is not included in this PR).

@vanzin

This comment has been minimized.

Copy link
Contributor

commented May 15, 2019

ok to test

@davidnavas davidnavas force-pushed the davidnavas:PentaBugFix branch from c330fd9 to 389a98e May 15, 2019

@SparkQA

This comment has been minimized.

Copy link

commented May 15, 2019

Test build #105427 has finished for PR 24616 at commit c330fd9.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented May 15, 2019

Test build #105428 has finished for PR 24616 at commit 389a98e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@vanzin
Copy link
Contributor

left a comment

I definitely need another read.

At some point I thought about allowing "unordered views", which I think would also fix the main performance problem with the deletes (which is the copy + sort operation). But I kinda like your new interface (bar some naming nits).


@SuppressWarnings("unchecked")
public <T> InstanceList<T> get(Class<T> type) {
return (InstanceList<T>)data.get(type);

This comment has been minimized.

Copy link
@vanzin

vanzin May 15, 2019

Contributor

nit: space after )

private static class InstanceList<T> {

private static class CountingRemoveIfForEach<T> implements BiConsumer<Comparable<Object>, T> {
ConcurrentMap<Comparable<Object>, T> data;

This comment has been minimized.

Copy link
@vanzin

vanzin May 15, 2019

Contributor

nit: private, final where it makes sense

}

public void accept(Comparable<Object> key, T value) {
// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which affects remove() on

This comment has been minimized.

Copy link
@vanzin

vanzin May 15, 2019

Contributor

This comment sounds like it belongs at this class's declaration, not inside this method?

// removed, and a count is no more complicated than a boolean so I've
// retained that behavior here, although there is no current requirement.
@SuppressWarnings("unchecked")
int countingRemoveAllByKeys(String index, Collection keys) {

This comment has been minimized.

Copy link
@vanzin

vanzin May 15, 2019

Contributor

IIUC you're removing all entries whose value corresponding to the given index match the given keys list.

So it sounds to me like the name here (and in related code) should use ByValues instead of ByKeys?

Also, you have a raw type in the argument list, which tells me that your @SuppressWarnings is either incorrect or perhaps not needed (if you make the argument Collection<?>).

This comment has been minimized.

Copy link
@davidnavas

davidnavas May 16, 2019

Author Contributor

I'm removing values whose key indicated by "index" matches (one of) the passed keys.
So, I'm matching keys, hence the ByKeys and the keyFromValue which retrieves the key indicated by 'getter' of the passed value.
BTW, I think I could add generic typing to Accessor<V, K> and ensure that the getter and value objects match, but I don't think the K type would turn out to be terribly useful, as ultimately there's no match between the index (a String) and the key type. Let me know if you think that's useful.

Yes, I'll try to use the <?> in a number of the SuppressWarnings and see what happens. It might be possible to get rid of a number of them, which would be fabulous.

This comment has been minimized.

Copy link
@davidnavas

davidnavas May 16, 2019

Author Contributor

Hmm, I suspect we have a terminology overload issue here.

  1. The read|write|view methods on KVStore refer to the things you read, write, and look at as "instance", "object", and "entities" respectively (and also "value" for write() as the name of the parameter).
  2. read() provides a "naturalKey" to access a specific instance, which is the value of the unique/primary/NATURAL_INDEX_NAME index
  3. KVStoreView refers to the values of an index as passed in first() and last() as values, not keys -- naturalKey or no
  4. InMemoryStore refers to the Comparable wrappers placed around the values of a particular index as a key.
  5. KVStore seems to say that the key is actually created per type written and are based on the type name (which itself is referred to as either klass or type)

So, yes, I'm probably using terminology wrong, and I hereby declare myself confused :(

With respect to klass vs type I had gone with klass in the removeAll___() as it was consistent with having 'klass' everywhere in the ElementTrackingStore, but please let me know if that should be changed!

With respect to key, value, field-value-for-index, comparable-wrapper-around-field-value-for-index, I admit to not knowing what to call which things when. One way to deal with this is to make the removeAll on the KVStoreView, where "value" is everywhere (I think) considered in the context of the type and index. The upside is that we could naturally call such a method removeAll() as the class and index are owned by the View. From an impl standpoint, it would make the definition of LevelDB's view quite a bit more complicated, and it would require work in InMemoryView as well -- I'd need to pass down the containing hash of indexed-values->entity, rather than just the entities (locally referred to as elements). The other downside is that you're using a "View" to mutate....

Another approach would be to use 'values' as you suggested with some commentary to clear up what we mean by values in the parameter and call the methods something like removeAllIndexedValues() and (internally) indexedValueFromEntity() or some such.

Thoughts?

This comment has been minimized.

Copy link
@vanzin

vanzin May 16, 2019

Contributor

I very much believe that there might be very confusing terminology in this code. I went back and forth on implementation and interfaces a ton of times before reaching something I was happy with, and by that time I didn't really bother with the internal naming of things so much.

But here we're talking about a new method in a "public" interface (not this particular line, but the new method in KVStore), so better be a little bit more careful. And IMO index values are not keys, so "removeByKeys" is a little weird. Maybe "removeByIndexValues" is clearer.

This comment has been minimized.

Copy link
@davidnavas

davidnavas May 16, 2019

Author Contributor

"so better be a little bit more careful" - strongly agree. I'll try removeAllByIndexValues (slight merging of our proposals) -- it's a little bit of a word salad, but nothing better really strikes me. Thanks!

}

private static Object keyFromValue(KVTypeInfo.Accessor getter, Object value) {

This comment has been minimized.

Copy link
@vanzin

vanzin May 15, 2019

Contributor

Similar thing, key sounds like the wrong word here.

@@ -46,7 +50,26 @@ import org.apache.spark.util.kvstore._
*/
private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore {

private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
private class LatchedTriggers(val triggers: Seq[Trigger[_]]) {
val countDeferred = new AtomicInteger(0)

This comment has been minimized.

Copy link
@vanzin

vanzin May 15, 2019

Contributor

You're kinda using this as a boolean, so AtomicBoolean? Also, private, and the variable name is a little cryptic (maybe just call it busy or something?).

f(triggers)
}
}
shouldExecute

This comment has been minimized.

Copy link
@vanzin

vanzin May 15, 2019

Contributor

You could avoid the WriteQueueResult object with an inlined if...else here.

This comment has been minimized.

Copy link
@davidnavas

davidnavas May 16, 2019

Author Contributor

I think there's a case where I needed the tri-value, but I'll take another peek.

This comment has been minimized.

Copy link
@davidnavas

davidnavas May 16, 2019

Author Contributor

Hmm, my memory is faulty, this isn't a tri-value. I'll inline here, sure.

}

// Delete summaries in one pass, as deleting them for each stage is slow
val totalSummariesDeleted = kvstore.removeAllByKeys(

This comment has been minimized.

Copy link
@vanzin

vanzin May 15, 2019

Contributor

Unused variable.

This comment has been minimized.

Copy link
@davidnavas

davidnavas May 16, 2019

Author Contributor

excellent - yes, this used to be where I logged the count, thanks for catching!

@davidnavas

This comment has been minimized.

Copy link
Contributor Author

commented May 16, 2019

Updated PR
'''
At some point I thought about allowing "unordered views", which I think would also fix the main performance problem with the deletes (which is the copy + sort operation). But I kinda like your new interface (bar some naming nits).
'''

Yes, that's where I started too -- let me share with you why I ended up with removeAll anyway...

Setup:
Define N as the number of entities for a particular type
Define MAX as the maximum number of entities that the ElementStore is tasked to retain (iirc, for stages, this is 1000)
Define K as the number of elements being deleted in one go.

So, first thing I did was to remove the sort if first==last on the view, which isn't correct in general, but certainly surfaces an unordered view. The problem is that for N >> MAX, K ~= N, and the basic algorithm is still O(N^2) because we're filtering the whole list on each pass.

Having got my head on straight, the second thing I did was to move the entire delete out of the stage loop. Then I noticed that you had already done that for Tasks. But I was annoyed by the implied sort that happens anyway, so I implemented a removeIf construct (attempt unordered view part 2). This held up really well, until I performance tested with LevelDB.

LevelDB works GREAT with the original implementation. The removeIf implementation was not so wonderful when N << MAX, because then the delete is O(N) instead of O(K). I hate having to explain a performance regression on a performance PR, and that's why I moved to removeAll and separate implementations for each Store. It's actually about half the speed of the removeIf construct for InMemory, probably because of all the wrappers being built around the indexValues (stage naturalKeys). And of course, InMemoryStore still is O(N) and not O(K) on removal for N << MAX. But it's 100x faster than LevelDB, and isn't anyway a regression in behavior.

removeAll was definitely not my first go-to. :>

@SparkQA

This comment has been minimized.

Copy link

commented May 16, 2019

Test build #105465 has finished for PR 24616 at commit dfa52c5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@markhamstra

This comment has been minimized.

Copy link
Contributor

commented May 16, 2019

test this please

@SparkQA

This comment has been minimized.

Copy link

commented May 16, 2019

Test build #105471 has finished for PR 24616 at commit dfa52c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@vanzin
Copy link
Contributor

left a comment

Will give another pass on Monday but overall looks ok, just left some nits.


private static <T> Predicate<? super T> getPredicate(
KVTypeInfo.Accessor getter,
Collection<?> keys) {

This comment has been minimized.

Copy link
@vanzin

vanzin May 17, 2019

Contributor

s/keys/values

}

private static class InMemoryView<T> extends KVStoreView<T> {
private static final InMemoryView EMPTY_VIEW =

This comment has been minimized.

Copy link
@vanzin

vanzin May 17, 2019

Contributor

InMemoryView<?>?

* A cheaper way to remove multiple items from the KVStore
*/
<T> boolean removeAllByIndexValues(Class<T> klass, String index, Collection<?> indexValues)
throws Exception;

This comment has been minimized.

Copy link
@vanzin

vanzin May 17, 2019

Contributor

nit: indented too much (2 or 4 spaces is fine here)

@@ -1142,20 +1144,10 @@ private[spark] class AppStatusListener(
s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING
}

stages.foreach { s =>
val stageIndexValues = stages.map { s =>

This comment has been minimized.

Copy link
@vanzin

vanzin May 17, 2019

Contributor

s/stageIndexValues/stageIds (or stageKeys, since here they're actually the primary keys of the stages being removed...)

private val pending = new AtomicBoolean(false)

def fireOnce(f: Seq[Trigger[_]] => Unit): WriteQueueResult = {
val shouldEnqueue = pending.compareAndSet(false, true)

This comment has been minimized.

Copy link
@vanzin

vanzin May 17, 2019

Contributor

Actually don't need this variable now.

@SparkQA

This comment has been minimized.

Copy link

commented May 20, 2019

Test build #105575 has finished for PR 24616 at commit c66a569.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@vanzin
Copy link
Contributor

left a comment

Looks good, just a few more minor things.

} catch (Exception e) {
throw Throwables.propagate(e);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}

private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) {
try {
return asKey(getter.get(e1)).compareTo(asKey(v2));

This comment has been minimized.

Copy link
@vanzin

vanzin May 20, 2019

Contributor

Did you try to use the getPredicate stuff in this class too? Seems like it could save some computation.

This comment has been minimized.

Copy link
@davidnavas

davidnavas May 20, 2019

Author Contributor

I have not attempted to apply the basic trick in getPredicate (of calling or not calling asKey() depending on whether the index requires it). I agree that there might be a win there, but the application seems difficult and unlikely to yield clear code. getPredicate() is doing two things -- it's converting a Collection into a Set as well as converting all the entries through asKey as necessary, AND it's removing the necessity of calling getClass.isArray when it isn't.

There appear three basic times where this strategy would be useful -- during copyElements when parent is defined, and during iteration when first and/or last are defined. Well, I don't see a contract about when first or last can be changed wrt when an iteration is started or running, so I'm a little worried about modifying that code, and I'm not sure what the usecases are for parents. But let's assume that first and last are actually fixed during iteration (it seems bad that it is not), and I'm willing to make changes to the parent code blind .... Presumably I'd want a set of compare() routines that took a Comparable as the second argument (the second argument being fixed locally to asKey(first|last|parent)) with two potential names -- one that would call asKey and one that wouldn't. I'm blanking on ideas for names. Ideas?

What I'll do is create the "dumb" version of this compare call, and either we'll revert because we want to allow first and last to be modified during iteration, or we'll come up with some good names for dealing with the other half of the asKey calls. Or we'll decide that the getClass.isArray calls for only one side of the compare isn't so bad.


assertEquals(9, store.count(ArrayKeyIndexType.class));

HashSet set = new HashSet();

This comment has been minimized.

Copy link
@vanzin

vanzin May 20, 2019

Contributor

Add generic type.

Or you could use ImmutableSet and not need this local variable (and avoid re-using it for different types and avoid the warnings from javac at the same time).


assertEquals(9, db.count(ArrayKeyIndexType.class));

HashSet set = new HashSet();

This comment has been minimized.

Copy link
@vanzin

vanzin May 20, 2019

Contributor

Same thing.

var queued1: WriteQueueResult = null
var queued2: WriteQueueResult = null
var queued3: WriteQueueResult = null

This comment has been minimized.

Copy link
@vanzin

vanzin May 20, 2019

Contributor

remove one empty line

queued0 = tracking.write(new Type1, checkTriggers = true)
waiter.synchronized {
if (!done) {
waiter.wait()

This comment has been minimized.

Copy link
@vanzin

vanzin May 20, 2019

Contributor

Hmmm... maybe add a timeout here (or use eventually)? The issue is that if there's a bug in your code, this may not actually return. (It's passing, but if a bug is added later then this would be an annoying thing to debug.)

@SparkQA

This comment has been minimized.

Copy link

commented May 21, 2019

Test build #105581 has finished for PR 24616 at commit e062cf9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@vanzin
Copy link
Contributor

left a comment

Looks good. Merging to master, I'll fix the small nits during merge.

I'll also try 2.4 but I have a feeling it will not merge cleanly.

* class of type T to an InstanceList of type T.
*/
private static class InMemoryLists {
private ConcurrentMap<Class<?>, InstanceList<?>> data = new ConcurrentHashMap<>();

This comment has been minimized.

Copy link
@vanzin

vanzin May 21, 2019

Contributor

final

this.filter = filter;
}

public void accept(Comparable<Object> key, T value) {

This comment has been minimized.

Copy link
@vanzin

vanzin May 21, 2019

Contributor

@Override

store.removeAllByIndexValues(
ArrayKeyIndexType.class,
"id",
ImmutableSet.of(new String [] { "more things" }));

This comment has been minimized.

Copy link
@vanzin

vanzin May 21, 2019

Contributor

indentation

@vanzin vanzin closed this in 9e73be3 May 21, 2019

@davidnavas

This comment has been minimized.

Copy link
Contributor Author

commented May 21, 2019

Thanks for the assist, appreciate your time. The original commits were actually written against 2.3, and I don't think I had any problems just cherry-picking to master, so it might not be so bad on 2.4!

vanzin added a commit that referenced this pull request May 21, 2019

[SPARK-27726][CORE] Fix performance of ElementTrackingStore deletes w…
…hen using InMemoryStore under high loads

The details of the PR are explored in-depth in the sub-tasks of the umbrella jira SPARK-27726.
Briefly:
  1. Stop issuing asynchronous requests to cleanup elements in the tracking store when a request is already pending
  2. Fix a couple of thread-safety issues (mutable state and mis-ordered updates)
  3. Move Summary deletion outside of Stage deletion loop like Tasks already are
  4. Reimplement multi-delete in a removeAllKeys call which allows InMemoryStore to implement it in a performant manner.
  5. Some generic typing and exception handling cleanup

We see about five orders of magnitude improvement in the deletion code, which for us is the difference between a server that needs restarting daily, and one that is stable over weeks.

Unit tests for the fire-once asynchronous code and the removeAll calls in both LevelDB and InMemoryStore are supplied.  It was noted that the testing code for the LevelDB and InMemoryStore is highly repetitive, and should probably be merged, but we did not attempt that in this PR.

A version of this code was run in our production 2.3.3 and we were able to sustain higher throughput without going into GC overload (which was happening on a daily basis some weeks ago).

A version of this code was also put under a purpose-built Performance Suite of tests to verify performance under both types of Store implementations for both before and after code streams and for both total and partial delete cases (this code is not included in this PR).

Closes #24616 from davidnavas/PentaBugFix.

Authored-by: David Navas <davidn@clearstorydata.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit 9e73be3)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
@vanzin

This comment has been minimized.

Copy link
Contributor

commented May 21, 2019

There were some import conflicts in 2.4, I fixed those an pushed after testing locally.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.