-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HDFS-16967. RBF: File based state stores should allow concurrent access to the records #5523
Conversation
…ss to the records
Based on one of the testing data points (hdfs as state store driver), for the same num of mount table records to be loaded in the cache, avg time taken by default is ~1500 ms whereas with concurrent mode, it goes down to ~130 ms. |
🎊 +1 overall
This message was automatically generated. |
@@ -168,9 +182,30 @@ public boolean initDriver() { | |||
return false; | |||
} | |||
setInitialized(true); | |||
int threads = getConcurrentFilesAccessNumThreads(); | |||
if (threads > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be >1?
Technically 1 thread would be serial.
.../java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
Show resolved
Hide resolved
Configuration conf = | ||
FederationStateStoreTestUtils.getStateStoreConfiguration(StateStoreFileSystemImpl.class); | ||
conf.set(StateStoreFileSystemImpl.FEDERATION_STORE_FS_PATH, "/hdfs-federation/"); | ||
conf.set(FEDERATION_STORE_FS_ASYNC_THREADS, numFsAsyncThreads); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make it setInt()
and pass the number as an int.
It would be cleaner.
I'm not sure how the Parameterized handles that though.
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes generally look good to me. Just a few small comments to make the code more readable.
return new QueryResult<>(result, getTime()); | ||
} | ||
|
||
private <T extends BaseRecord> Void getRecordsFromFileAndRemoveOldTmpRecords(Class<T> clazz, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add documentation to this function indication that the results list is being modified to collect the results.
Changing the function name would make that clearer too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, thanks.
Since the method name already seems long enough, directly added to the Javadoc. Sounds good?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
return success.get(); | ||
} | ||
|
||
private <T extends BaseRecord> Void writeRecordToFile(AtomicBoolean success, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment as above. Can we add documentation indicating success
is being modified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -137,6 +149,8 @@ public abstract <T extends BaseRecord> BufferedWriter getWriter( | |||
*/ | |||
protected abstract String getRootDir(); | |||
|
|||
protected abstract int getConcurrentFilesAccessNumThreads(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we provide an implementation here and then just have one set of configs for the following two
FEDERATION_STORE_PREFIX + "driver.file.async.threads";
FEDERATION_STORE_PREFIX + "driver.fs.async.threads";
I'm okay with keeping them separate though if you have prefer that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe separate configs would be better since we anyways have different implementation for many of other methods too. Sounds good to keep it as is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, good to keep it as it is.
Thanks @simbadzina, addressed your comments |
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
Thanks for your contribution, @virajjasani. |
…ss to the records (apache#5523) Reviewed-by: Inigo Goiri <inigoiri@apache.org> Reviewed-by: Simbarashe Dzinamarira <sdzinamarira@linkedin.com> Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
…ss to the records (apache#5523) Reviewed-by: Inigo Goiri <inigoiri@apache.org> Reviewed-by: Simbarashe Dzinamarira <sdzinamarira@linkedin.com> Signed-off-by: Takanobu Asanuma <tasanuma@apache.org> (cherry-picked from 937caf7) ACLOVERRIDE
File based state store implementations (StateStoreFileImpl and StateStoreFileSystemImpl) should allow updating as well as reading of the state store records concurrently rather than serially. Concurrent access to the record files on the hdfs based store seems to be improving the state store cache loading performance by more than 10x.
For instance, in order to maintain data integrity, when any mount table record(s) is updated, the cache is reloaded. This reload operation seems to be able to gain significant performance improvement by the concurrent access of the mount table records.