Skip to content

Commit

Permalink
Fixing compilation (after rebase)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Kudinkin committed Feb 17, 2023
1 parent f4109f2 commit 9fd17ce
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 14 deletions.
Expand Up @@ -19,17 +19,10 @@
package org.apache.hudi.table.action.commit;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.utils.ClosableMergingIterator;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.io.IOException;

/**
Expand Down
Expand Up @@ -18,7 +18,7 @@

package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.client.utils.ClosableMergingIterator;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
Expand All @@ -27,7 +27,6 @@
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableMappingIterator;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -122,9 +121,9 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
bootstrapFileReader =
HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath);

recordIterator = new MergingIterator<>(
recordIterator = new ClosableMergingIterator<>(
baseFileRecordIterator,
bootstrapFileReader.getRecordIterator(),
(ClosableIterator<HoodieRecord>) bootstrapFileReader.getRecordIterator(),
(left, right) ->
left.joinWith(right, mergeHandle.getWriterSchemaWithMetaFields()));
recordSchema = mergeHandle.getWriterSchemaWithMetaFields();
Expand All @@ -135,7 +134,7 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,

boolean isBufferingRecords = ExecutorFactory.isBufferingRecords(writeConfig);

wrapper = ExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> {
executor = ExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> {
HoodieRecord newRecord;
if (schemaEvolutionTransformerOpt.isPresent()) {
newRecord = schemaEvolutionTransformerOpt.get().apply(record);
Expand All @@ -161,7 +160,7 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
executor.shutdownNow();
executor.awaitTermination();
} else {
reader.close();
baseFileReader.close();
mergeHandle.close();
}
}
Expand Down
Expand Up @@ -71,8 +71,9 @@ void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle, Path so
}
Reader orcReader = OrcFile.createReader(sourceFilePath, OrcFile.readerOptions(table.getHadoopConf()));
TypeDescription orcSchema = orcReader.getSchema();
RecordReader reader = orcReader.rows(new Reader.Options(table.getHadoopConf()).schema(orcSchema));
BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void> executor = null;
try (RecordReader reader = orcReader.rows(new Reader.Options(table.getHadoopConf()).schema(orcSchema))) {
try {
executor = new BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
new OrcReaderIterator(reader, avroSchema, orcSchema), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey();
Expand Down

0 comments on commit 9fd17ce

Please sign in to comment.