Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public HoodieData<WriteStatus> compact(

boolean useFileGroupReaderBasedCompaction = context.supportsFileGroupReader() // the engine needs to support fg reader first
&& !metaClient.isMetadataTable()
&& config.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
&& HoodieReaderConfig.isFileGroupReaderEnabled(metaClient.getTableConfig().getTableVersion(), config)
&& operationType == WriteOperationType.COMPACT
&& !hasBootstrapFile(operations) // bootstrap file read for fg reader is not ready
&& config.populateMetaFields(); // Virtual key support by fg reader is not ready
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,9 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.util.CollectionUtils.nonEmpty;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
Expand Down Expand Up @@ -104,14 +101,6 @@ public Option<HoodieCompactionPlan> execute() {
return Option.empty();
}
}
// Committed and pending compaction instants should have strictly lower timestamps
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstantsAsStream()
.filter(instant -> compareTimestamps(instant.requestedTime(), GREATER_THAN_OR_EQUALS, instantTime))
.collect(Collectors.toList());
ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we removing it?
and even if required, is it possible to enable it only for tbl version 8 or above?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not part of table version 8 code. This was added specifically for table version 6 in https://github.com/apache/hudi/pull/12805/files

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this for backwards compatible compaction for table version 6?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope. this piece of code was added back by me just for table version 6.
https://github.com/apache/hudi/pull/12805/files

I added 2 validations as part of that patch. And @lokeshj1703 is removing the 2nd one from it.
@lokeshj1703 : can you help us understand why do we need to remove the validation?

If there was a test written specifically targetted for table version 8, but when you tried to run it against table version 6 and it failed -> And to get the test to succeed, if we are removing this validation, its not something that we wanted to do.

Lets reason about table version 6 and take a call.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was failing due to MDT compaction scheduling with table version 6. This was not related to table version 8.
If we delete compaction completed instant in DT but the corresponding deltacommit still exists in MDT. The MDT writer goes and tries to create compaction instant in MDT using time - one minus the inflight instant and fails check in this PR.

DT
dc10
dc20.inf
MDT
dc10
dc20
dc19 - compaction fails since dc20 already exists.

This check wouldn't have failed with older compaction scheduling logic of MDT where compaction instant is created using suffix

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add this check for table version 6 only after MDT backward compatible changes are in.

"Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
+ conflictingInstants);
}

HoodieCompactionPlan plan = scheduleCompaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,29 @@
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathFilter;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.fs.Path;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,14 +52,18 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.table.timeline.MetadataConversionUtils.getHoodieCommitMetadata;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;

/**
Expand Down Expand Up @@ -164,7 +174,9 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
// We do not know fileIds for inserts (first inserts are either log files or base files),
// delete all files for the corresponding failed commit, if present (same as COW)
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath, filesToDelete.get()));

if (metaClient.getTableConfig().getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) {
hoodieRollbackRequests.addAll(getRollbackRequestToAppendForVersionSix(partitionPath, instantToRollback, commitMetadataOptional.get(), table));
}
Comment on lines +177 to +179
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have the diverging logic within getHoodieRollbackRequests instead of polluting the core logic here? Does other place calling getHoodieRollbackRequests need revisiting too?

break;
default:
throw new HoodieRollbackException("Unknown listing type, during rollback of " + instantToRollback);
Expand All @@ -181,6 +193,62 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
}
}

public static List<HoodieRollbackRequest> getRollbackRequestToAppendForVersionSix(String partitionPath, HoodieInstant rollbackInstant,
HoodieCommitMetadata commitMetadata, HoodieTable<?, ?, ?, ?> table) {
List<HoodieRollbackRequest> hoodieRollbackRequests = new ArrayList<>();
checkArgument(table.version().lesserThan(HoodieTableVersion.EIGHT));
checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));

// wStat.getPrevCommit() might not give the right commit time in the following
// scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
// used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
// But the index (global) might store the baseCommit of the base and not the requested, hence get the
// baseCommit always by listing the file slice
// With multi writers, rollbacks could be lazy. and so we need to use getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices()
Map<String, FileSlice> latestFileSlices = table.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.requestedTime(), true)
.collect(Collectors.toMap(FileSlice::getFileId, Function.identity()));

List<HoodieWriteStat> hoodieWriteStats = Option.ofNullable(commitMetadata.getPartitionToWriteStats().get(partitionPath)).orElse(Collections.emptyList());
hoodieWriteStats = hoodieWriteStats.stream()
.filter(writeStat -> {
// Filter out stats without prevCommit since they are all inserts
boolean validForRollback = (writeStat != null) && (!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
&& (writeStat.getPrevCommit() != null) && latestFileSlices.containsKey(writeStat.getFileId());

if (!validForRollback) {
return false;
}

FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());

// For sanity, log-file base-instant time can never be less than base-commit on which we are rolling back
checkArgument(
compareTimestamps(latestFileSlice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, rollbackInstant.requestedTime()),
"Log-file base-instant could not be less than the instant being rolled back");

// Command block "rolling back" the preceding block {@link HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK}
// w/in the latest file-slice is appended iff base-instant of the log-file is _strictly_ less
// than the instant of the Delta Commit being rolled back. Otherwise, log-file will be cleaned up
// in a different branch of the flow.
return compareTimestamps(latestFileSlice.getBaseInstantTime(), LESSER_THAN, rollbackInstant.requestedTime());
})
.collect(Collectors.toList());

for (HoodieWriteStat writeStat : hoodieWriteStats.stream().filter(
hoodieWriteStat -> !StringUtils.isNullOrEmpty(hoodieWriteStat.getFileId())).collect(Collectors.toList())) {
FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());
String fileId = writeStat.getFileId();
String latestBaseInstant = latestFileSlice.getBaseInstantTime();
Path fullLogFilePath = HadoopFSUtils.constructAbsolutePathInHadoopPath(table.getConfig().getBasePath(), writeStat.getPath());
Map<String, Long> logFilesWithBlocksToRollback = Collections.singletonMap(
fullLogFilePath.toString(), writeStat.getTotalWriteBytes() > 0 ? writeStat.getTotalWriteBytes() : 1L);
hoodieRollbackRequests.add(new HoodieRollbackRequest(partitionPath, fileId, latestBaseInstant,
Collections.emptyList(), logFilesWithBlocksToRollback));
}
return hoodieRollbackRequests;
}

private List<StoragePathInfo> listAllFilesSinceCommit(String commit,
String baseFileExtension,
String partitionPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
switch (type) {
case MERGE:
case CREATE:
return createRollbackRequestForCreateAndMerge(fileId, partitionPath, filePath, instantToRollback);
return createRollbackRequestForCreateAndMerge(fileId, partitionPath, filePath, instantToRollback, filePathStr);
case APPEND:
return createRollbackRequestForAppend(fileId, partitionPath, filePath, instantToRollback, filePathStr);
default:
Expand All @@ -106,10 +106,8 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
}
}

protected HoodieRollbackRequest createRollbackRequestForCreateAndMerge(String fileId,
String partitionPath,
StoragePath filePath,
HoodieInstant instantToRollback) {
protected HoodieRollbackRequest createRollbackRequestForCreateAndMerge(String fileId, String partitionPath, StoragePath filePath,
HoodieInstant instantToRollback, String filePathToRollback) {
if (table.version().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
return new HoodieRollbackRequest(partitionPath, fileId, instantToRollback.requestedTime(),
Collections.singletonList(filePath.toString()), Collections.emptyMap());
Expand All @@ -120,7 +118,7 @@ protected HoodieRollbackRequest createRollbackRequestForCreateAndMerge(String fi
fileId = baseFileToDelete.getFileId();
baseInstantTime = baseFileToDelete.getCommitTime();
} else if (FSUtils.isLogFile(filePath)) {
throw new HoodieRollbackException("Log files should have only APPEND as IOTypes " + filePath);
return createRollbackRequestForAppend(fileId, partitionPath, filePath, instantToRollback, filePathToRollback);
}
Objects.requireNonNull(fileId, "Cannot find valid fileId from path: " + filePath);
Objects.requireNonNull(baseInstantTime, "Cannot find valid base instant from path: " + filePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc,
Schema tableSchemaWithMetaFields) {
List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream()
.map(ClusteringOperation::create).collect(Collectors.toList());
boolean canUseFileGroupReaderBasedClustering = getWriteConfig().getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
boolean canUseFileGroupReaderBasedClustering = HoodieReaderConfig.isFileGroupReaderEnabled(getHoodieTable().getMetaClient().getTableConfig().getTableVersion(), getWriteConfig())
&& getWriteConfig().getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
&& clusteringOps.stream().allMatch(slice -> StringUtils.isNullOrEmpty(slice.getBootstrapFilePath()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@

package org.apache.hudi.common.config;

import org.apache.hudi.common.table.HoodieTableVersion;

import org.apache.hadoop.conf.Configuration;

import javax.annotation.concurrent.Immutable;

import java.util.Map;

/**
* Configurations for reading a file group
*/
Expand Down Expand Up @@ -89,4 +95,18 @@ public class HoodieReaderConfig extends HoodieConfig {
"hoodie.write.record.merge.custom.implementation.classes";
public static final String RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY =
"hoodie.datasource.write.record.merger.impls";

public static boolean isFileGroupReaderEnabled(HoodieTableVersion tableVersion, HoodieConfig config) {
return tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) && config.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED);
}

public static boolean isFileGroupReaderEnabled(HoodieTableVersion tableVersion, Map<String, String> parameters) {
return tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
&& Boolean.parseBoolean(parameters.getOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue().toString()));
}

public static boolean isFileGroupReaderEnabled(HoodieTableVersion tableVersion, Configuration conf) {
return tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
&& conf.getBoolean(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue());
}
Comment on lines +103 to +111
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove all these new config logic and fix forward.

In fact, basic functionality of reading file groups in table version 6 is validated in TestMergeModeCommitTimeOrdering, TestMergeModeEventTimeOrdering. So we should dig deeper and understand the additional issue discovered if there is any.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configuration conf introduce hadoop dependencies into hudi-common. Should we use StorageConfiguration instead?

}
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ public class HoodieTableConfig extends HoodieConfig {
DATE_TIME_PARSER
);

private static final Set<String> CONFIGS_REQUIRED_FOR_OLDER_VERSIONED_TABLES = new HashSet<>(Arrays.asList(
KEY_GENERATOR_CLASS_NAME.key(),
KEY_GENERATOR_TYPE.key()
));

public static final ConfigProperty<String> TABLE_CHECKSUM = ConfigProperty
.key("hoodie.table.checksum")
.noDefaultValue()
Expand Down Expand Up @@ -632,6 +637,7 @@ static boolean validateConfigVersion(ConfigProperty<?> configProperty, HoodieTab
// validate that the table version is greater than or equal to the config version
HoodieTableVersion firstVersion = HoodieTableVersion.fromReleaseVersion(configProperty.getSinceVersion().get());
boolean valid = tableVersion.greaterThan(firstVersion) || tableVersion.equals(firstVersion);
valid = valid || CONFIGS_REQUIRED_FOR_OLDER_VERSIONED_TABLES.contains(configProperty.key());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the rationale here? It looks like we need to validate this in another place instead of here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lokeshj1703 : can you respond to this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These configs are added with since version 1.0.0 but are still required for table version 6. Therefore I had to add a check here so that these configs are not dropped for tables with version 6.

if (!valid) {
LOG.warn("Table version {} is lower than or equal to config's first version {}. Config {} will be ignored.",
tableVersion, firstVersion, configProperty.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ public abstract class AbstractHoodieLogRecordScanner {
private final boolean enableOptimizedLogBlocksScan;
// table version for compatibility
private final HoodieTableVersion tableVersion;
// Allows to consider inflight instants while merging log records
protected boolean allowInflightInstants = false;
// for pending log block check with table version before 8
private HoodieTimeline commitsTimeline = null;
private HoodieTimeline completedInstantsTimeline = null;
Expand Down Expand Up @@ -281,7 +283,7 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
totalLogBlocks.incrementAndGet();
if (logBlock.isDataOrDeleteBlock()) {
if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT) && !allowInflightInstants) {
if (!getOrCreateCompletedInstantsTimeline().containsOrBeforeTimelineStarts(instantTime)
|| getOrCreateInflightInstantsTimeline().containsInstant(instantTime)) {
// hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
Expand Down Expand Up @@ -479,7 +481,7 @@ && compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), GREATER_THA
continue;
}
if (logBlock.getBlockType() != COMMAND_BLOCK) {
if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT) && !allowInflightInstants) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you fix HoodieFileGroupReader, BaseHoodieLogRecordReader, and HoodieBaseFileGroupRecordBuffer on the same logic? *LogRecordScanner classes are going to be deprecated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lokeshj1703 : can you follow up here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be immediate necessity since we don't have support to reading MDT using new File group reader. But when @linliu-code gets to replacing it, we might need the support. So, we can take a call to delay the support for couple of weeks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yihua This was required for RLI. RLI directly calls AbstractHoodieLogRecordScanner via org.apache.hudi.metadata.HoodieTableMetadataUtil#getRevivedAndDeletedKeysFromMergedLogs.

if (!getOrCreateCompletedInstantsTimeline().containsOrBeforeTimelineStarts(instantTime)
|| getOrCreateInflightInstantsTimeline().containsInstant(instantTime)) {
// hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ protected HoodieMergedLogRecordScanner(HoodieStorage storage, String basePath, L
InternalSchema internalSchema,
Option<String> keyFieldOverride,
boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger,
Option<HoodieTableMetaClient> hoodieTableMetaClientOption) {
Option<HoodieTableMetaClient> hoodieTableMetaClientOption,
boolean allowInflightInstants) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar on changes in the log record scanner class.

super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize,
instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger,
hoodieTableMetaClientOption);
Expand All @@ -113,6 +114,7 @@ protected HoodieMergedLogRecordScanner(HoodieStorage storage, String basePath, L
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(readerSchema), diskMapType, new DefaultSerializer<>(), isBitCaskDiskMapCompressionEnabled);
this.scannedPrefixes = new HashSet<>();
this.allowInflightInstants = allowInflightInstants;
} catch (IOException e) {
throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
}
Expand Down Expand Up @@ -342,6 +344,7 @@ public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
// By default, we're doing a full-scan
private boolean forceFullScan = true;
private boolean enableOptimizedLogBlocksScan = false;
protected boolean allowInflightInstants = false;
private HoodieRecordMerger recordMerger = HoodiePreCombineAvroRecordMerger.INSTANCE;
protected HoodieTableMetaClient hoodieTableMetaClient;

Expand Down Expand Up @@ -466,6 +469,11 @@ public Builder withTableMetaClient(HoodieTableMetaClient hoodieTableMetaClient)
return this;
}

public Builder withAllowInflightInstants(boolean allowInflightInstants) {
this.allowInflightInstants = allowInflightInstants;
return this;
}

@Override
public HoodieMergedLogRecordScanner build() {
if (this.partitionName == null && CollectionUtils.nonEmpty(this.logFilePaths)) {
Expand All @@ -479,7 +487,7 @@ public HoodieMergedLogRecordScanner build() {
bufferSize, spillableMapBasePath, instantRange,
diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, forceFullScan,
Option.ofNullable(partitionName), internalSchema, Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger,
Option.ofNullable(hoodieTableMetaClient));
Option.ofNullable(hoodieTableMetaClient), allowInflightInstants);
}
}
}
Loading
Loading