Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8790][State] Improve performance for recovery from incremental checkpoint #5582

Conversation

sihuazhou
Copy link
Contributor

What is the purpose of the change

This PR fixes FLINK-8790. When there are multi state handle to be restored, we can improve the performance as follow:

    1. Choose the best state handle to init the target db
    1. Use the other state handles to create tmp db, and clip the tmp db according to the target key group range (via rocksdb.deleteRange()), this can help use get rid of the key group check in
      data insertion loop and also help us get rid of traversing the useless records.

Brief change log

  • Improve the performance when restoring from multi state handles

Verifying this change

The changes can be verified by the exists tests and below unit test can also help to verify it.

  • RocksDBIncrementalCheckpointUtilsTest.java

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)

Documentation

  • Does this pull request introduce a new feature? (no)

@sihuazhou
Copy link
Contributor Author

@StefanRRichter Could you please have a look at this?

@StefanRRichter
Copy link
Contributor

Thanks for the contribution! We are currently busy with the 1.5 release. I will have a closer look at this PR and your other pending JIRAs after the release is out.

@sihuazhou
Copy link
Contributor Author

Thanks, looking forward.

@sihuazhou sihuazhou force-pushed the improve_recovery_from_increment_checkpoint branch 8 times, most recently from f18eb80 to 112bd74 Compare March 20, 2018 03:04
@sihuazhou
Copy link
Contributor Author

Unfortunately, after confirming with RocksDB, the deleteRange() is still an experimental feature, it may have impact on read performance currently(event thought we could use the ReadOption to reduce the impaction).

In practice, I tested the impact of read performance of deleteRange() in our case (only delete 2 ranges at most), I didn't find any impact in fact. And the TiKV has already used it to delete entire shards. But, to be on the safe side, I think the current PR should be frozen, but I think the implementation base on deleteRange() in this PR should be a better implementation(especially when user scaling up the job, in that case we only need to clip the RocksDB without iterating any records, a super fast way) if the deleteRange() is no longer a feature of experimental.

Anyways, even although we can't use the deleteRange() currently, but we can still improve the performance of the incremental checkpoint in somehow. We can improve it the by the follow way: if one of the state handle's key-group is a sub-range of the target key-group range. we can open it directly to prevent the overhead of iterating it. @StefanRRichter What do you think? If you don't object this, I will update the PR follow the above approach.

@sihuazhou
Copy link
Contributor Author

Hi @StefanRRichter could you please have a look at this?

@@ -138,4 +138,12 @@ private static void writeVariableIntBytes(
value >>>= 8;
} while (value != 0);
}

public static byte[] serializeKeyGroup(int keyGroup, int keyGroupPrefixBytes) {
byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can rather pass the startKeyGroupPrefixBytes array directly instead of creating it in every invocation from keyGroupPrefixBytes. Like that, the caller can reuse the same array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! There is one problem with this PR, After confirming with RocksDB the deleteRange() is still a experimental feature of RocksDB...even thought I did some experiments on this deleteRange() in our case, and I did find any downside, I'm not sure whether we should still use the deleteRange(), but the deleteRange() should be used for the recovery of the incremental checkpoint definitely when it's stable. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some info about the experiment I did:

  • I set ReadOptions::ignore_range_deletions = true to speed up the read performance, because we won't read any records that belong to the key-group we have deleted.
  • I only call the deleteRange() twice, because we will at most call it twice in the recovery of the incremental checkpoint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very good question. But I think for as long as we consider rescaling incremental checkpoints itself as experimental, we can try to use the deleteRange and change it in case we experience any problems. Would that be ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StefanRRichter I think that makes a lot of sense! I will rebase the PR and ping you again, it's a bit outdated now...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, no problem, I am also completing this review soon. One general thing that I was wondering about: did you ever see the sstable ingestion feature? It would be super nice for the rescaling of incremental checkpoints if we could simply ingest the sstables from multiple checkpoints into one database and the just clip the range boundaries. Unfortunately, from what I have seen this only works for external sstables written bei the sstable writer (see here: https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files). I wonder if there is any way to modify the sstables if incremental checkpoints to make them usable for ingestion, but maybe it is just completely impossible. I also found this interesting discussion that outlines another potential approach: facebook/rocksdb#499. Any thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I did notice the "sstable ingestion feature", and also did some experiment on it. You are right that currently the ingestion feature only works for the sstables written by the sstable writer. I tried to use the sstable writer to generate external sstables in parallel and ingest the sstables into the target db, but unfortunately the performance of the sstable writer is quite poor in RocksJava...I left the experiment conclusion in FLINK-8845(that is the reason why I took a step back to use the WriteBatch to speed up the recovery for full checkpoint), I pasted the comments below:

Unfortunately, even though according to RocksDB wiki, the best way to load data into RocksDB is "Generate SST files (using SstFileWriter) with non-overlapping ranges in parallel and bulk load the SST files.". But after implementing this and test with a simple bench mark, I found that the performance is not that good as expected, it's almost the same or worst that as using Rocks.put(). After a bit analysis I found that when building SST it consumed a lot of time to create DirectSlice and currently we can't reuse the DirectSlice in java api. Even though in C++ this could help to get a outperformance result, but in java I think we can't use this to improve the performance currently (maybe somedays RocksDB might improve this to enable us get a approximate performance in java as using C++) ...

And regarding to facebook/rocksdb#499, If I'm not misunderstand, I think we might also can't use the repairDB() because we have many column families, and the other opinions in that thread is quite similar with the approach that I've tried to build the sstables in parallel and it turned out that it didn't work properly with Java API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that was also how I understood the discussions and docs. In that ase, let's proceed with this approach and I will finalize the review now.

", but found " + rawStateHandle.getClass());
}
if (!hasExtraKeys) {
restoreFromSingleHandle(restoreStateHandles.iterator().next());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new (and also the old code before) look like there could be a potential bug: if restoreStateHandles.size() > 1 is false, how can we be sure that restoreStateHandles.iterator().next() exists? Even if it works from some hidden assumption, it does not look so clean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I think this should be improved.

}
}

public static int evaluateGroupRange(KeyGroupRange range1, KeyGroupRange range2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method name is not so helpful. It does not tell us anything about what is evaluated. Maybe a javadoc on the public methods would be helpful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code could also be based on something like KeyGroupRange.getIntersection(KeyGroupRange).getNumberOfKeyGroups().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will change it. 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thought: do you think it can make sense to also include the state size of the handle in the evaluation score? Only problem here is, is a higher or a lower size better? A higher size could also just mean that the initial database was just not in a well compacted state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it make sense to also take the state size into the count. If we take it into count, then the score may look like: "handle's state size" * "numberOfKeyGroups" / "handle's total key group". But you are right, I don't know if a higher or the a lower size is better either, which make me not sure whether we should take it into count now...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm a bit torn here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then let's just keep it simple for now, and we can still improve it if we later find that the size can also be an indicator of the better initial db state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

}

private class RestoredDBInfo implements AutoCloseable {
private RocksDB db;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All fields could be final and NonNull annotated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
}

private class RestoredDBInfo implements AutoCloseable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is not optimal because the class is more than pure info. It holds the temporary DB and is used to manage it's lifecycle. I would suggest RestoreDBInstance or RestoreDBHandle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


try (RocksIterator iterator = tmpRestoreDBInfo.db.newIterator(tmpColumnFamilyHandle)) {

iterator.seek(targetStartKeyGroupPrefixBytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the DB is clipped, do we even need to seek or will the iterator already begin at a useful key-group anyways?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a not so nice API of RocksIterator, a newly created Iterator doesn't point to the header element by default, users need to performance the seek() to make sure it is valid.

if (currentGroupRange.getStartKeyGroup() < targetGroupRange.getStartKeyGroup()) {
byte[] beginKey = RocksDBKeySerializationUtils.serializeKeyGroup(
currentGroupRange.getStartKeyGroup(), keyGroupPrefixBytes);
byte[] endKye = RocksDBKeySerializationUtils.serializeKeyGroup(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: endKye

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

*/
public class RocksDBIncrementalCheckpointUtils {

public static void clipDBWithKeyGroupRange(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if clipping the database to avoid prefix check is an optimization or not? If we don't clip, the must seek the iterator and apply a single if to every key. This if is very predictable for the CPU because it always passed except for when we terminate the loop. This sounds rather cheap. What are your thoughts about why deleting ranges is the better approach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I don't see a clear benefit either, but I think it makes the loop code look cleaner to me. But If you think that we don't need to clip the database to avoid prefix check, that's also good to me and I will change it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code will still look ok, it is just one more if (we even only need if in the cases where we would clip something). If this allows us to eliminate some amount of codes, test, move away from experimental features, and may be faster then I think it is a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, will change it.

/**
* Tests to guard {@link RocksDBIncrementalCheckpointUtils}.
*/
public class RocksDBIncrementalCheckpointUtilsTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add extends TestLogger

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@sihuazhou sihuazhou force-pushed the improve_recovery_from_increment_checkpoint branch 6 times, most recently from 1f750b8 to 60f5b5f Compare June 1, 2018 01:04
@sihuazhou
Copy link
Contributor Author

@StefanRRichter Thanks for your nice review, addressed your comments, could you please have a look again?

chooseTheBestStateHandleToInit(restoreStateHandles, targetKeyGroupRange);

int targetStartKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
byte[] targetStartKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this array is not used anymore after we write to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, I wanted to pull this array creation out of the for (KeyedStateHandle rawStateHandle : restoreStateHandles) { but forgot to remove the array creation in the loop and replace the startKeyGroupPrefixBytes to targetStartKeyGroupPrefixBytes , nice catch! 👍

return registeredStateMetaInfoEntry.f0;
}

private void chooseTheBestStateHandleToInit(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name of this method is no longer accurate: it does not only chose the best handle, it already restores as db instance. Maybe we can we still break this up into two methods, so that each method only does one thing. I think it is not so nice if that creating the db is a side effect of a method that claims to only find something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will split it into two methods.

stateMetaInfoSnapshots);
columnFamilyHandles);

if (needClip) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using this clipping as a flag, would it not be better to just have a method that clips a RestoredDBInstance which is simply called after restoreDBFromStateHandle in the on case that needs it? This would allow to not mix up those two tings in one method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, maybe clip can also become a method of the RestoredDBInstance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right, will follow your suggestion.

AbstractKeyedStateBackend<K> keyedStateBackend = super.createKeyedStateBackend(
env, jobID, operatorIdentifier, keySerializer, numberOfKeyGroups, keyGroupRange, kvStateRegistry);

// We ignore the range deletions on production, but when we are running the tests we shouldn't ignore it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you briefly explain why this whole ReadOptions change is required and what this comment about ignoring range deleted is related to? This seems to introduce some implicit complexities, so I just want to double check if this is really required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduce this because we may called the deleteRange() when rescaling from incremental checkpoint. According to the RocksDB's comment on its CPP file, in order to get rid of the downside of the read performance, we should set readOptions.setIgnoreRangeDeletions(true);.

  • On production, that is fine because we won't query any record that belong to the key-group we have delete.

  • But when running tests, we may need to verify that after restoring from checkpoint, we didn't take any external key-group that isn't belong to the target key group into the backend(e.g. StateBackendTestBase#testKeyGroupSnapshotRestore().). The reason that we need to readOptions.setIgnoreRangeDeletions(false); in this case might be explain as below:

db.deleteRange(range1);
readOptions.setIgnoreRangeDeletions(true);

db.get(readOptions, key in range1); // this may not be null, because we have ignore the range deletions.

readOptions.setIgnoreRangeDeletions(false);

db.get(readOptions, key in range1); // this will be null

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see, this is only happening in the case where there is only one handle and we are only interested in a subset of the key-groups. Unfortunately, that should be the common case of scaling out. I am wondering if we should not prefer to apply normal deletes over range delete, because what will happen if we take again a snapshot from a database that was using range deletes? Will the keys all be gone in cases of full and incremental snapshots? If the performance of normal deletes is not terrible, that might be cleaner for as long as range deletes are not working properly or have potential negative side-effects. What is your opinion about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry...I think I may didn't understand "I am wondering if we should not prefer to apply normal deletes over range delete" properly, is that mean "I am wondering if we should prefer to apply normal deletes over range delete". As far as I know the keys all be gone only when compaction occur, for deleteRange() it only write a special record in db, looks like Deleted Range(beginKey, endKey], it won't remove any records from the db indeed.

And yes, concerning to the negative side-effects of the deleteRange() I also still have the same concerns, even thought rescaling from checkpoint is still an experimental feature. I think a more safer way to improve the performance of recover from incremental checkpoint is we don't clip it, and only choose the instance to be the initial db when its key-group range is a subset of the target key-group range. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to let chooseTheBestStateHandleToInit to return a non-null instance only when there's one instance's key-group is fully covered by the target key-group. This way we won't clip anything, and the clip related code can be removed away(or still retain it in case we may use it in the future?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh so poor of my english...I think maybe base on fraction is a better choose, e.g. if "number of the invalid key-group of handle" / "number of all key-group of handle" <= 1/4, we prefer to the "restore+single deletes".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good 👍 This also means we do not need any tricks with the read options :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will change it according to the approach we discussion above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a different note, this became more complex now, I wonder if we should also add a test for incremental rescale. I think that could be done at a level of using the KeyedOneInputStreamOperatorTestHarness for different ranges, choosing incremental RocksDB, trigger two checkpoints, scale in and out into a new harness with different key group range. Ideally, this test could cover all the different corner cases (about the fraction)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think it needs a test now, I will add it.

}

@Override
public void close() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove throws Exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@sihuazhou
Copy link
Contributor Author

@StefanRRichter Thanks for your nice review and preventing this PR to fall into a sick way, I will change the code according to your comments and ping you again when I finish this.

@sihuazhou sihuazhou force-pushed the improve_recovery_from_increment_checkpoint branch from 60f5b5f to 4cb3d92 Compare June 5, 2018 06:04
@sihuazhou
Copy link
Contributor Author

Hi @StefanRRichter I updated the PR according to the previous discussions, could you please have a look when you have time? The travis failed is unrelated, it's a checkstyle error introduced by the previous PRs.

@StefanRRichter
Copy link
Contributor

LGTM 👍 Very nice work. I will merge it with some very minor touchups.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants