-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-9012] Implement and utilize native HFile writer #12866
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4399145
c09362e
cef75d5
bdc3132
22cf3ee
24992df
0991053
fc16d80
e94388f
cee4d3e
2e29038
af13aa8
86942ab
e3b126b
796a4e9
4d7f164
4ff67f2
6d21d65
2e36d66
b0eaed4
6f5973f
33853da
b8e52bb
29fcdcd
ac319c8
21a4ef0
63e48db
d498d7b
90b0213
4b3d694
2ca40f2
e042eae
1e4de72
0d65ffc
400908b
75b9e85
94a2105
4b6ba7b
d0af9c5
37de2d6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,51 +30,45 @@ | |
| import org.apache.hudi.common.util.collection.Pair; | ||
| import org.apache.hudi.exception.HoodieException; | ||
| import org.apache.hudi.exception.HoodieIOException; | ||
| import org.apache.hudi.io.hfile.HFileContext; | ||
| import org.apache.hudi.io.hfile.HFileWriter; | ||
| import org.apache.hudi.io.hfile.HFileWriterImpl; | ||
| import org.apache.hudi.storage.StoragePath; | ||
|
|
||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.hadoop.hbase.HConstants; | ||
| import org.apache.hadoop.hbase.KeyValue; | ||
| import org.apache.hadoop.hbase.io.hfile.CacheConfig; | ||
| import org.apache.hadoop.hbase.io.hfile.HFile; | ||
| import org.apache.hadoop.hbase.io.hfile.HFileContext; | ||
| import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.OutputStream; | ||
| import java.util.Collection; | ||
| import java.util.Date; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.INDEX_INFO_KEY; | ||
| import static org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.INDEX_INFO_KEY_STRING; | ||
| import static org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.fileIdIndexPath; | ||
| import static org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.getFileGroupKey; | ||
| import static org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.getPartitionKey; | ||
| import static org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.partitionIndexPath; | ||
| import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; | ||
|
|
||
| public class HBaseHFileBootstrapIndexWriter extends BootstrapIndex.IndexWriter { | ||
| private static final Logger LOG = LoggerFactory.getLogger(HBaseHFileBootstrapIndexWriter.class); | ||
| public class HFileBootstrapIndexWriter extends BootstrapIndex.IndexWriter { | ||
| private static final Logger LOG = LoggerFactory.getLogger(HFileBootstrapIndexWriter.class); | ||
|
|
||
| private final String bootstrapBasePath; | ||
| private final StoragePath indexByPartitionPath; | ||
| private final StoragePath indexByFileIdPath; | ||
| private HFile.Writer indexByPartitionWriter; | ||
| private HFile.Writer indexByFileIdWriter; | ||
| private HFileWriter indexByPartitionWriter; | ||
| private HFileWriter indexByFileIdWriter; | ||
|
|
||
| private boolean closed = false; | ||
| private int numPartitionKeysAdded = 0; | ||
| private int numFileIdKeysAdded = 0; | ||
|
|
||
| private final Map<String, List<BootstrapFileMapping>> sourceFileMappings = new HashMap<>(); | ||
|
|
||
| public HBaseHFileBootstrapIndexWriter(String bootstrapBasePath, HoodieTableMetaClient metaClient) { | ||
| public HFileBootstrapIndexWriter(String bootstrapBasePath, HoodieTableMetaClient metaClient) { | ||
| super(metaClient); | ||
| try { | ||
| metaClient.initializeBootstrapDirsIfNotExists(); | ||
|
|
@@ -114,9 +108,7 @@ private void writeNextPartition(String partitionPath, String bootstrapPartitionP | |
| m.getBootstrapFileStatus())).collect(Collectors.toMap(Pair::getKey, Pair::getValue))); | ||
| Option<byte[]> bytes = TimelineMetadataUtils.serializeAvroMetadata(bootstrapPartitionMetadata, HoodieBootstrapPartitionMetadata.class); | ||
| if (bytes.isPresent()) { | ||
| indexByPartitionWriter | ||
| .append(new KeyValue(getUTF8Bytes(getPartitionKey(partitionPath)), new byte[0], new byte[0], | ||
| HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put, bytes.get())); | ||
| indexByPartitionWriter.append(getPartitionKey(partitionPath), bytes.get()); | ||
| numPartitionKeysAdded++; | ||
| } | ||
| } catch (IOException e) { | ||
|
|
@@ -135,11 +127,10 @@ private void writeNextSourceFileMapping(BootstrapFileMapping mapping) { | |
| srcFilePartitionInfo.setPartitionPath(mapping.getPartitionPath()); | ||
| srcFilePartitionInfo.setBootstrapPartitionPath(mapping.getBootstrapPartitionPath()); | ||
| srcFilePartitionInfo.setBootstrapFileStatus(mapping.getBootstrapFileStatus()); | ||
| KeyValue kv = new KeyValue(getUTF8Bytes(getFileGroupKey(mapping.getFileGroupId())), new byte[0], new byte[0], | ||
| HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put, | ||
| TimelineMetadataUtils.serializeAvroMetadata(srcFilePartitionInfo, | ||
| HoodieBootstrapFilePartitionInfo.class).get()); | ||
| indexByFileIdWriter.append(kv); | ||
| indexByFileIdWriter.append( | ||
| getFileGroupKey(mapping.getFileGroupId()), | ||
| TimelineMetadataUtils.serializeAvroMetadata( | ||
| srcFilePartitionInfo, HoodieBootstrapFilePartitionInfo.class).get()); | ||
| numFileIdKeysAdded++; | ||
| } catch (IOException e) { | ||
| throw new HoodieIOException(e.getMessage(), e); | ||
|
|
@@ -166,9 +157,11 @@ private void commit() { | |
| .build(); | ||
| LOG.info("Appending FileId FileInfo :" + fileIdIndexInfo); | ||
|
|
||
| indexByPartitionWriter.appendFileInfo(INDEX_INFO_KEY, | ||
| indexByPartitionWriter.appendFileInfo( | ||
| INDEX_INFO_KEY_STRING, | ||
| TimelineMetadataUtils.serializeAvroMetadata(partitionIndexInfo, HoodieBootstrapIndexInfo.class).get()); | ||
| indexByFileIdWriter.appendFileInfo(INDEX_INFO_KEY, | ||
| indexByFileIdWriter.appendFileInfo( | ||
| INDEX_INFO_KEY_STRING, | ||
| TimelineMetadataUtils.serializeAvroMetadata(fileIdIndexInfo, HoodieBootstrapIndexInfo.class).get()); | ||
|
|
||
| close(); | ||
|
|
@@ -196,15 +189,11 @@ public void close() { | |
| @Override | ||
| public void begin() { | ||
| try { | ||
| HFileContext meta = new HFileContextBuilder().withCellComparator(new org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex.HoodieKVComparator()).build(); | ||
| this.indexByPartitionWriter = HFile.getWriterFactory(metaClient.getStorageConf().unwrapAs(Configuration.class), | ||
| new CacheConfig(metaClient.getStorageConf().unwrapAs(Configuration.class))) | ||
| .withPath((FileSystem) metaClient.getStorage().getFileSystem(), new Path(indexByPartitionPath.toUri())) | ||
| .withFileContext(meta).create(); | ||
| this.indexByFileIdWriter = HFile.getWriterFactory(metaClient.getStorageConf().unwrapAs(Configuration.class), | ||
| new CacheConfig(metaClient.getStorageConf().unwrapAs(Configuration.class))) | ||
| .withPath((FileSystem) metaClient.getStorage().getFileSystem(), new Path(indexByFileIdPath.toUri())) | ||
| .withFileContext(meta).create(); | ||
| HFileContext context = HFileContext.builder().build(); | ||
linliu-code marked this conversation as resolved.
Show resolved
Hide resolved
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you verify the KV comparator class name written to the HFile by the new HFile writer? We need this to be backwards compatible, i.e., Hudi 1.1 release writing table version 6 and 8, both of which should be read without any problem using Hudi 0.14/15 and Hudi 1.0.2 release.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will verify.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After remove hbase dependency, all comparator will be gone. What class name should we give?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds like
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed the class name to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the |
||
| OutputStream outputStreamForPartitionWriter = metaClient.getStorage().create(indexByPartitionPath); | ||
| this.indexByPartitionWriter = new HFileWriterImpl(context, outputStreamForPartitionWriter); | ||
| OutputStream outputStreamForFileIdWriter = metaClient.getStorage().create(indexByFileIdPath); | ||
| this.indexByFileIdWriter = new HFileWriterImpl(context, outputStreamForFileIdWriter); | ||
| } catch (IOException ioe) { | ||
| throw new HoodieIOException(ioe.getMessage(), ioe); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When writing the key-value pairs, do you write the bytes confirming to
HConstants.LATEST_TIMESTAMP,KeyValue.Type.Put, etc.?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these fields are not used in Hudi, I did not check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LATEST_TIMESTAMP : 0x7fffffffffffffffL, KeyValue.Type.Put: 4. These fields are not contained in the keys. For backward compatability, we could support them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Want to confirm: Is it ok for us to add such support in a followup pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as the native HFile reader and writer work seamlessly together without any correctness issue, we can land this PR. However, we need to address the compatibility with HBase as a blocker for Hudi 1.1 release so that the HBase-based HFile reader in Hudi 0.x can read the HFile written by the Hudi 1.1 release (for backwards compatibility).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To address that we need to run some tests to use hbase reader to read files generated by native writer. I will do that after this pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually let me file a ticket to track this.https://issues.apache.org/jira/browse/HUDI-9449