Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator;
import org.apache.hudi.storage.StorageConfiguration;

import java.util.Arrays;
Expand Down Expand Up @@ -72,17 +73,9 @@ protected HoodieBackedTableMetadataWriterTableVersionSix(StorageConfiguration<?>
HoodieWriteConfig writeConfig,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
HoodieEngineContext engineContext,
ExpressionIndexRecordGenerator expressionIndexRecordGenerator,
Option<String> inflightInstantTimestamp) {
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp);
}

@Override
List<MetadataPartitionType> getEnabledPartitions(HoodieMetadataConfig metadataConfig, HoodieTableMetaClient metaClient) {
return MetadataPartitionType.getEnabledPartitions(metadataConfig, metaClient).stream()
.filter(partition -> !partition.equals(MetadataPartitionType.SECONDARY_INDEX))
.filter(partition -> !partition.equals(MetadataPartitionType.EXPRESSION_INDEX))
.filter(partition -> !partition.equals(MetadataPartitionType.PARTITION_STATS))
.collect(Collectors.toList());
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, expressionIndexRecordGenerator, inflightInstantTimestamp);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.metadata;

import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.FailOnFirstErrorWriteStatus;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metadata.index.Indexer;
import org.apache.hudi.stats.HoodieColumnRangeMetadata;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
Expand Down Expand Up @@ -106,6 +108,7 @@
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToBloomFilterRecords;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToColumnStatsRecords;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToExpressionIndexRecords;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToFilesPartitionRecords;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToPartitionStatsRecords;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToRecordIndexRecords;
Expand Down Expand Up @@ -362,6 +365,41 @@ public static HoodieWriteConfig createMetadataWriteConfig(
return metadataWriteConfig;
}

/**
* Convert the clean action to metadata records.
*/
public static Map<String, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext engineContext,
HoodieCleanMetadata cleanMetadata,
String instantTime,
HoodieTableMetaClient dataMetaClient,
HoodieMetadataConfig metadataConfig,
Map<MetadataPartitionType, Indexer> enabledIndexBuilderMap,
int bloomIndexParallelism,
Option<HoodieRecord.HoodieRecordType> recordTypeOpt) {
final Map<String, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
final HoodieData<HoodieRecord> filesPartitionRecordsRDD = engineContext.parallelize(
convertMetadataToFilesPartitionRecords(cleanMetadata, instantTime), 1);
partitionToRecordsMap.put(MetadataPartitionType.FILES.getPartitionPath(), filesPartitionRecordsRDD);
if (enabledIndexBuilderMap.containsKey(MetadataPartitionType.BLOOM_FILTERS)) {
final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD =
convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime, bloomIndexParallelism);
partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath(), metadataBloomFilterRecordsRDD);
}

if (enabledIndexBuilderMap.containsKey(MetadataPartitionType.COLUMN_STATS)) {
final HoodieData<HoodieRecord> metadataColumnStatsRDD =
convertMetadataToColumnStatsRecords(cleanMetadata, engineContext,
dataMetaClient, metadataConfig, recordTypeOpt);
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(), metadataColumnStatsRDD);
}
if (enabledIndexBuilderMap.containsKey(MetadataPartitionType.EXPRESSION_INDEX)) {
convertMetadataToExpressionIndexRecords(engineContext, cleanMetadata, instantTime, dataMetaClient, metadataConfig, bloomIndexParallelism, partitionToRecordsMap,
recordTypeOpt);
}

return partitionToRecordsMap;
}

/**
* Convert commit action to metadata records for the enabled partition types.
*
Expand All @@ -370,12 +408,14 @@ public static HoodieWriteConfig createMetadataWriteConfig(
* @param commitMetadata - Commit action metadata
* @param instantTime - Action instant time
* @param dataMetaClient - HoodieTableMetaClient for data
* @param tableMetadata
* @param tableMetadata - metadata table reader
* @param metadataConfig - HoodieMetadataConfig
* @param enabledPartitionTypes - Set of enabled MDT partitions to update
* @param bloomFilterType - Type of generated bloom filter records
* @param bloomIndexParallelism - Parallelism for bloom filter record generation
* @param enableOptimizeLogBlocksScan - flag used to enable scanInternalV2 for log blocks in data table
* @param writesFileIdEncoding - file id encoding used while generating record index records
* @param engineType - execution engine type
* @param recordTypeOpt - record type override for generated metadata records
* @return Map of partition to metadata records for the commit action
*/
public static Map<String, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext context, HoodieWriteConfig dataWriteConfig, HoodieCommitMetadata commitMetadata,
Expand Down Expand Up @@ -555,8 +595,10 @@ public static Set<String> getFilesToFetchColumnStats(List<HoodieWriteStat> parti
// Get the latest merged file slices based on the commited files part of the latest snapshot and the new files of the current commit metadata
List<StoragePathInfo> consolidatedPathInfos = new ArrayList<>();
partitionedWriteStat.forEach(
stat -> consolidatedPathInfos.add(
new StoragePathInfo(new StoragePath(dataMetaClient.getBasePath(), stat.getPath()), stat.getFileSizeInBytes(), false, (short) 0, 0, 0)));
stat -> {
StoragePathInfo pathInfo = new StoragePathInfo(new StoragePath(dataMetaClient.getBasePath(), stat.getPath()), stat.getFileSizeInBytes(), false, (short) 0, 0, 0);
consolidatedPathInfos.add(pathInfo);
});
SyncableFileSystemView fileSystemViewForCommitedFiles =
FileSystemViewManager.createViewManager(new HoodieLocalEngineContext(dataMetaClient.getStorageConf()),
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getViewStorageConfig(), dataWriteConfig.getCommonConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.metadata.model.FileSliceAndPartition;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieIndexDefinition;
Expand Down Expand Up @@ -235,7 +236,7 @@ public static <T> Map<String, String> getRecordKeyToSecondaryKey(HoodieTableMeta
}

public static <T> HoodieData<HoodieRecord> readSecondaryKeysFromFileSlices(HoodieEngineContext engineContext,
List<Pair<String, FileSlice>> partitionFileSlicePairs,
List<FileSliceAndPartition> partitionFileSlicePairs,
int secondaryIndexMaxParallelism,
String activeModule, HoodieTableMetaClient metaClient,
HoodieIndexDefinition indexDefinition,
Expand All @@ -255,8 +256,8 @@ public static <T> HoodieData<HoodieRecord> readSecondaryKeysFromFileSlices(Hoodi
engineContext.setJobStatus(activeModule, "Secondary Index: reading secondary keys from " + partitionFileSlicePairs.size() + " file slices");
HoodieFileFormat baseFileFormat = metaClient.getTableConfig().getBaseFileFormat();
return engineContext.parallelize(partitionFileSlicePairs, parallelism).flatMap(partitionAndBaseFile -> {
final String partition = partitionAndBaseFile.getKey();
final FileSlice fileSlice = partitionAndBaseFile.getValue();
final String partition = partitionAndBaseFile.partitionPath();
final FileSlice fileSlice = partitionAndBaseFile.fileSlice();
Option<StoragePath> dataFilePath = Option.ofNullable(fileSlice.getBaseFile().map(baseFile -> FSUtils.getAbsoluteFilePath(basePath, partition, baseFile.getFileName())).orElseGet(null));
HoodieSchema readerSchema;
if (dataFilePath.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.metadata.index;

import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieWriteConfig;

import lombok.extern.slf4j.Slf4j;

/**
* Base implementation of {@link Indexer} that handles common metadata-partition bootstrap flow,
* including file-group initialization, commit, and partition state update.
*/
@Slf4j
public abstract class BaseIndexer implements Indexer {
protected final HoodieEngineContext engineContext;
protected final HoodieWriteConfig dataTableWriteConfig;
protected final HoodieTableMetaClient dataTableMetaClient;

protected BaseIndexer(
HoodieEngineContext engineContext,
HoodieWriteConfig dataTableWriteConfig,
HoodieTableMetaClient dataTableMetaClient) {
this.engineContext = engineContext;
this.dataTableWriteConfig = dataTableWriteConfig;
this.dataTableMetaClient = dataTableMetaClient;
}

/**
* Hook invoked after the bootstrap bulk commit for an index partition succeeds.
* <p>
* The default implementation marks the index partition as available in the data table config.
* Subclasses can override this to perform index-specific follow-up work (for example, index-definition
* registration or post-commit validation).
*
* @param metadataMetaClient metadata table meta client used during initialization
* @param records records committed during index partition initialization
* @param fileGroupCount number of file groups created for the index partition
* @param relativePartitionPath metadata table relative partition path being initialized
*/
@Override
public void postInitialization(HoodieTableMetaClient metadataMetaClient, HoodieData<HoodieRecord> records, int fileGroupCount, String relativePartitionPath) {
dataTableMetaClient.getTableConfig().setMetadataPartitionState(dataTableMetaClient, relativePartitionPath, true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.metadata.index;

import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.metadata.model.FileToIndex;
import org.apache.hudi.storage.StorageConfiguration;

import java.util.List;

/**
* Engine-specific generator for expression index records used during metadata index bootstrap.
*/
public interface ExpressionIndexRecordGenerator {
EngineType getEngineType();

/**
* Generates expression index records.
*
* @param filesToIndex triplet of partition, (file path, file size)
* @param indexDefinition definition of the expression index
* @param metaClient {@link HoodieTableMetaClient} instance
* @param parallelism parallelism to use for engine operations
* @param tableSchema table schema
* @param readerSchema reader schema
* @param storageConf storage config
* @param instantTime instant time
* @return expression index records
*/
HoodieData<HoodieRecord> generate(
List<FileToIndex> filesToIndex,
HoodieIndexDefinition indexDefinition,
HoodieTableMetaClient metaClient,
int parallelism,
HoodieSchema tableSchema,
HoodieSchema readerSchema,
StorageConfiguration<?> storageConf,
String instantTime);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.metadata.index;

import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
import org.apache.hudi.metadata.model.FileSliceAndPartition;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.util.Lazy;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* Interface for initializing and updating a type of metadata or index
* in the metadata table.
* <p>
* When a new type of index is added to MetadataPartitionType, an
* implementation of the {@link Indexer} interface is required, and it
* must be added to {@link IndexerFactory}.
*/
public interface Indexer {
/**
* Generates records for initializing the index.
*
* @param dataTableInstantTime instant time of the data table that the metadata table is initialized on
* @param instantTimeForPartition instant time used for initializing a specific metadata partition
* @param partitionIdToAllFilesMap map of partition to files
* @param lazyLatestMergedPartitionFileSliceList lazily-evaluated list of file slices for the indexer that needs it
* @return zero or more {@link IndexPartitionInitialization} entries to be initialized.
* Returning an empty list means no metadata partition needs initialization in this invocation.
* @throws IOException upon IO error
*/
List<IndexPartitionInitialization> buildInitializationData(
String dataTableInstantTime,
String instantTimeForPartition,
Map<String, Map<String, Long>> partitionIdToAllFilesMap,
Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList) throws IOException;

/**
* Hook invoked after the bootstrap bulk commit for an index partition succeeds.
* Implementations can use this to perform index-specific follow-up work.
*
* @param metadataMetaClient metadata table meta client used during initialization
* @param records records committed during index partition initialization
* @param fileGroupCount number of file groups created for the index partition
* @param relativePartitionPath metadata table relative partition path being initialized
*/
void postInitialization(
HoodieTableMetaClient metadataMetaClient,
HoodieData<HoodieRecord> records,
int fileGroupCount,
String relativePartitionPath);
}
Loading
Loading