-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
File system offload #4403
File system offload #4403
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the approach in this pull request follows what we have done for cloud storage. It is an okay approach.
However I would actually suggest using what hadoop ecosystem already provides. There are many file formats in hadoop ecosystem.
I would actually suggest using SequenceFile which is the most common file format in hadoop ecosystem. SequenceFile already supports multiple types of compression, which we don't need to reinvent again.
https://hadoop.apache.org/docs/r2.7.5/api/org/apache/hadoop/io/SequenceFile.html
So you can have two SequenceFile per offloaded ledger, one for data, the other for index.
OK, I'm going to look at the API and change my PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job! @congbobo184
It is much simpler after using MapFile. Well done!
Overall looks pretty good. I left a few comments. PTAL!
tiered-storage/file-system/pom.xml
Outdated
<groupId>org.apache.pulsar</groupId> | ||
<artifactId>tiered-storage-parent</artifactId> | ||
<version>2.4.0-SNAPSHOT</version> | ||
<relativePath>../../../../congbo184/pulsar/tiered-storage</relativePath> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:) the path seems to be wrong?
tiered-storage/file-system/pom.xml
Outdated
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-hdfs-client</artifactId> | ||
<version>3.2.0</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@congbobo184 can you define the dependencies on root pom file?
</dependency> | ||
<dependency> | ||
<groupId>org.eclipse.jetty</groupId> | ||
<artifactId>jetty-server</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
define the dependencies at root pom file.
* Configuration for tiered storage. | ||
*/ | ||
@Data | ||
public class TieredStorageConfigurationData implements Serializable, Cloneable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about call it FileSystemConfigurationData
? Since this class only about FileSystem offloader, right?
reader.get(key, value); | ||
this.ledgerMetadata = parseLedgerMetadata(value.copyBytes()); | ||
} catch (IOException e) { | ||
log.error("Fail to read LedgerMetadata for key {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this code logic is all about reading metadata, I don't think you need to add for key -1
in the log statement and the exception error message. Adding for key -1
is actually a bit confused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could define a constant for -1?
e.g.
public static final long METADATA_KEY_INDEX = -1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could define a constant for -1?
e.g.
public static final long METADATA_KEY_INDEX = -1
?
yes, i will fix it
|
||
private static final Logger log = LoggerFactory.getLogger(FileSystemManagedLedgerOffloader.class); | ||
private static final String STORAGE_BASE_PATH = "storageBasePath"; | ||
private static final String[] DRIVER_NAMES = {"filesystem"}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think we will have multiple drivers?
} | ||
this.configuration.setClassLoader(FileSystemLedgerOffloaderFactory.class.getClassLoader()); | ||
this.driverName = conf.getManagedLedgerOffloadDriver(); | ||
this.storageBasePath = configuration.get("hadoop.tmp.dir"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use a different configuration key "pulsar.offloaded.segments.dir"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use a different configuration key "pulsar.offloaded.segments.dir"?
if we use a different key, we should get it once, then add it once like
configuration.set("hadoop.tmp.dir", configuration.get("pulsar.offloaded.segments.dir"));
It feels a little strange
return; | ||
} | ||
long ledgerId = readHandle.getId(); | ||
String storagePath = getStoragePath(storageBasePath, extraMetadata.get("ManagedLedgerName")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you define a constant for ManagedLedgerName
?
IOUtils.closeStream(dataWriter); | ||
promise.complete(null); | ||
} catch (InterruptedException | ExecutionException | IOException e) { | ||
log.error("Exception when get CompletableFuture<LedgerEntries>. ", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you improve the logging statement to provide more meaningful messages?
try { | ||
MapFile.Reader reader = new MapFile.Reader(new Path(dataFilePath), | ||
configuration); | ||
promise.complete(FileStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId), reader, ledgerId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we are sharing a scheduler for both reading and writing. If a ledger is offloading, then it will be blocked the reading operations. Can we use different executors for reading and writing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll think it over :)
run cpp tests |
run cpp tests |
run cpp tests |
run java8 tests |
1 similar comment
run java8 tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@congbobo184 well done!
Fixes apache#3216 Implementation of offload to HDFS ### Motivation Implementation of offload to HDFS ### Verifying this change Add the test for this
Fixes #3216
Implementation of offload to HDFS
Motivation
Implementation of offload to HDFS
Verifying this change
Add the test for this
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (yes)
Documentation
Does this pull request introduce a new feature? (yes / no)
If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
If a feature is not applicable for documentation, explain why?
If a feature is not documented yet in this PR, please create a followup issue for adding the documentation