New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-38909][CORE][YARN] Encapsulate LevelDB
used to store remote/external shuffle state as DB
#36200
Conversation
LevelDB
used by ExternalShuffleBlockResolver
and YarnShuffleService
as LocalDB
LevelDB
used by ExternalShuffleBlockResolver
and YarnShuffleService
as LocalDB
LevelDB
used by ExternalShuffleBlockResolver
and YarnShuffleService
as LocalDB
LevelDB
used by ExternalShuffleBlockResolver
and YarnShuffleService
as LocalDB
LevelDB
used by ExternalShuffleBlockResolver
and YarnShuffleService
as LocalDB
LevelDB
used by ExternalShuffleBlockResolver
and YarnShuffleService
as DB
I am sure in future, we might have another SPIP to remove RocksDB in favor of something else :-) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a pass through it, looks mostly fine to me - will go over it again later this week.
But will let @dongjoon-hyun drive this review - since he is more close to the overall change.
common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDBIterator.java
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java
Outdated
Show resolved
Hide resolved
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
Outdated
Show resolved
Hide resolved
nit: Btw, from a completeness point of view, DB.get makes sense to include :-) |
common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDB.java
Outdated
Show resolved
Hide resolved
OK ~ |
Also cc @tgravescs do you have time to help review this pr? Thanks ~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, but would be good to have more eyes on this.
common/network-common/src/main/java/org/apache/spark/network/shuffledb/DB.java
Outdated
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/shuffledb/DBBackend.java
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/shuffledb/StoreVersion.java
Show resolved
Hide resolved
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
Show resolved
Hide resolved
changes look fine to me |
Merged to master. |
Thanks @mridulm @tgravescs @zhouyejoe @dongjoon-hyun !!! |
* The local KV storage used to persist the shuffle state, | ||
* the implementations may include LevelDB, RocksDB, etc. | ||
*/ | ||
public interface DB extends Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a developer API or user API? I didn't see the custom DB could be plugged in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
developer API. The implementation of RocksDB is under review: #37610,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So could you add annotation @DeveloperApi
for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not support users plugging in a different DB instance - it comes from within spark code.
Supported DB's come from DBBackend
and DBProvider.initDB
has an explicit switch for the supported types.
Wont be @DeveloperAPI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mridulm It's a public interface. I think it should either be a user API or developer API, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm... should we use @Private
as kvstore.KVStore
?
spark/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java
Lines 65 to 66 in 52cd037
@Private | |
public interface KVStore extends Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The configuration is public @Ngone51, not the implementation itself.
To put it differently, there is no way for users to leverage any of this - unless they modify spark code.
I am fine with marking it as @Private
if we need to make the intent clearer.
It is the same for most submodules in common/* - except probably for common/kvstore, where Marcello marked the interfaces as @Private :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mridulm @LuciferYang Thanks.
logger.info("Configured {} as {} and actually used value {}", | ||
Constants.SHUFFLE_SERVICE_DB_BACKEND, dbBackendName, dbBackend); | ||
} | ||
db = DBProvider.initDB(dbBackend, this.registeredExecutorFile, CURRENT_VERSION, mapper); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, this should be put inside the if block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right. I'll give a follow-up to fix this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need change as
if (registeredExecutorFile != null) {
....
db = DBProvider.initDB(dbBackend, this.registeredExecutorFile, CURRENT_VERSION, mapper);
} else {
db = null;
}
due to db
is defined as final, and when 'registeredExecutorFile' is null, 'DBProvider.initDB' will return null directly . So does DBProvider.initDB
still need to be put inside the if block ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If so, I think if (registeredExecutorFile != null)
is actually useless here..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DBProvider.initDB
handles null
registeredExecutorFile
as input.
Given db
is a final
variable, it is better to update it at a single place, and not across if/else
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to delete if (registeredExecutorFile != null)
condition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current code in #37648 looks ok to me.
val shuffleDBName = sparkConf.get(config.SHUFFLE_SERVICE_DB_BACKEND) | ||
val dBBackend = DBBackend.byName(shuffleDBName) | ||
logInfo(s"Configured ${config.SHUFFLE_SERVICE_DB_BACKEND.key} as $shuffleDBName " + | ||
s"and actually used value ${dBBackend.name()} ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like we may use a different db backend regardless of the user configured value in some cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, because the DBBackend.byName
method has changed during cr, this log is outdated. I will fix it in the follow-up also.
…ated to shuffle state db ### What changes were proposed in this pull request? This is a followup of #36200, the main change of this pr as follows: - Make `DB` and `DBProvider` as Private to make the API intent clearer - Delete `remove` method from `DBProvider` witch is unnecessary `Override` - Remove the useless null check condition from `ExternalShuffleBlockResolver` and `RemoteBlockPushResolver`, fix related sutes - Correction log print content ### Why are the changes needed? Fix new comments after #36200 merged. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass All GitHub Actions Closes #37648 from LuciferYang/SPARK-38909-followup. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
What changes were proposed in this pull request?
ExternalShuffleBlockResolver
,YarnShuffleService
andRemoteBlockPushResolver
useLevelDB
directly, this is not conducive to extending the use ofRocksDB
in this scenario. This pr is encapsulated for expansibility. It will be the pre-work of SPARK-38888, the main change as follows:DB
interface and implementation of correspondingLevelDB
now,RocksDB
implementation will be involved in the futureDBBackend
enum and new configSHUFFLE_SERVICE_DB_BACKEND
to specify a disk-based store used in shuffle service local db, onlyLEVELDB
is supported now,RocksDB
implementation will be involved in the futureDBIterator
to traverse the items in theDB
, onlyLevelDBIterator
is supported now,RocksDBIterator
implementation will be involved in the futureDBProvider
to initializationDB
, it is only used to generateLevelDB
now,RocksDB
implementation will be involved in the futureStoreVersion
to top-level class, it will be reused byRocksDB
implementationLevelDB
inExternalShuffleBlockResolver
,YarnShuffleService
andRemoteBlockPushResolver
withDB
Why are the changes needed?
This is pre work of SPARK-38888
Does this PR introduce any user-facing change?
SHUFFLE_SERVICE_DB_BACKEND(spark.shuffle.service.db.backend)
, the new config use to use to specify a disk-based store used in shuffle service local db. OnlyLEVELDB
is supported after this pr.How was this patch tested?
Pass GA