Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2814] Make Z-index more generic Column-Stats Index #4106

Merged
merged 28 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.util.TypeUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;

import javax.annotation.Nonnull;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;

/**
Expand Down Expand Up @@ -520,54 +522,55 @@ private String getDefaultExecutionStrategyClassName(EngineType engineType) {
}

/**
* strategy types for build z-ordering/space-filling curves.
* Type of a strategy for building Z-order/Hilbert space-filling curves.
*/
public enum BuildCurveStrategyType {
DIRECT("direct"),
SAMPLE("sample");

private static final Map<String, BuildCurveStrategyType> VALUE_TO_ENUM_MAP =
TypeUtils.getValueToEnumMap(BuildCurveStrategyType.class, e -> e.value);

private final String value;

BuildCurveStrategyType(String value) {
this.value = value;
}

public static BuildCurveStrategyType fromValue(String value) {
switch (value.toLowerCase(Locale.ROOT)) {
case "direct":
return DIRECT;
case "sample":
return SAMPLE;
default:
throw new HoodieException("Invalid value of Type.");
BuildCurveStrategyType enumValue = VALUE_TO_ENUM_MAP.get(value);
if (enumValue == null) {
throw new HoodieException(String.format("Invalid value (%s)", value));
}

return enumValue;
}
}

/**
* strategy types for optimize layout for hudi data.
* Layout optimization strategies such as Z-order/Hilbert space-curves, etc
*/
public enum BuildLayoutOptimizationStrategy {
public enum LayoutOptimizationStrategy {
ZORDER("z-order"),
HILBERT("hilbert");

private static final Map<String, LayoutOptimizationStrategy> VALUE_TO_ENUM_MAP =
TypeUtils.getValueToEnumMap(LayoutOptimizationStrategy.class, e -> e.value);

private final String value;

BuildLayoutOptimizationStrategy(String value) {
LayoutOptimizationStrategy(String value) {
this.value = value;
}

public String toCustomString() {
return value;
}

public static BuildLayoutOptimizationStrategy fromValue(String value) {
switch (value.toLowerCase(Locale.ROOT)) {
case "z-order":
return ZORDER;
case "hilbert":
return HILBERT;
default:
throw new HoodieException("Invalid value of Type.");
@Nonnull
public static LayoutOptimizationStrategy fromValue(String value) {
LayoutOptimizationStrategy enumValue = VALUE_TO_ENUM_MAP.get(value);
if (enumValue == null) {
throw new HoodieException(String.format("Invalid value (%s)", value));
}

return enumValue;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public abstract HoodieWriteMetadata<O> bulkInsertPrepped(HoodieEngineContext con
public abstract HoodieWriteMetadata<O> insertOverwriteTable(HoodieEngineContext context, String instantTime, I records);

/**
* Updates Metadata Indexes (like Z-Index)
* Updates Metadata Indexes (like Column Stats index)
* TODO rebase onto metadata table (post RFC-27)
*
* @param context instance of {@link HoodieEngineContext}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,17 @@ public Option<HoodieClusteringPlan> generateClusteringPlan() {
return Option.empty();
}

List<HoodieClusteringGroup> clusteringGroups = getEngineContext().flatMap(partitionPaths,
partitionPath -> {
List<FileSlice> fileSlicesEligible = getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList());
return buildClusteringGroupsForPartition(partitionPath, fileSlicesEligible).limit(getWriteConfig().getClusteringMaxNumGroups());
},
partitionPaths.size())
.stream().limit(getWriteConfig().getClusteringMaxNumGroups()).collect(Collectors.toList());
List<HoodieClusteringGroup> clusteringGroups = getEngineContext()
.flatMap(
partitionPaths,
partitionPath -> {
List<FileSlice> fileSlicesEligible = getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList());
return buildClusteringGroupsForPartition(partitionPath, fileSlicesEligible).limit(getWriteConfig().getClusteringMaxNumGroups());
},
partitionPaths.size())
.stream()
.limit(getWriteConfig().getClusteringMaxNumGroups())
.collect(Collectors.toList());

if (clusteringGroups.isEmpty()) {
LOG.info("No data available to cluster");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ public SparkSizeBasedClusteringPlanStrategy(HoodieSparkMergeOnReadTable<T> table

@Override
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
HoodieWriteConfig writeConfig = getWriteConfig();

List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
List<FileSlice> currentGroup = new ArrayList<>();
long totalSizeSoFar = 0;
HoodieWriteConfig writeConfig = getWriteConfig();

for (FileSlice currentSlice : fileSlices) {
// assume each filegroup size is ~= parquet.max.file.size
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
// check if max size is reached and create new group, if needed.
if (totalSizeSoFar >= writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
Expand All @@ -84,25 +84,26 @@ protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String
currentGroup = new ArrayList<>();
totalSizeSoFar = 0;
}

// Add to the current file-group
currentGroup.add(currentSlice);
// totalSizeSoFar could be 0 when new group was created in the previous conditional block.
// reset to the size of current slice, otherwise the number of output file group will become 0 even though current slice is present.
if (totalSizeSoFar == 0) {
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
}
// assume each filegroup size is ~= parquet.max.file.size
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
}

if (!currentGroup.isEmpty()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: "
+ writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
}

return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder()
.setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
.setNumOutputFileGroups(fileSliceGroup.getRight())
.setMetrics(buildMetrics(fileSliceGroup.getLeft()))
.build());
return fileSliceGroups.stream().map(fileSliceGroup ->
HoodieClusteringGroup.newBuilder()
.setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
.setNumOutputFileGroups(fileSliceGroup.getRight())
.setMetrics(buildMetrics(fileSliceGroup.getLeft()))
.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public JavaRDD<WriteStatus> performClusteringWithRecordsRDD(final JavaRDD<Hoodie
props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString());
props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance()
.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.execution.bulkinsert;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
Expand All @@ -27,17 +29,20 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sort.SpaceCurveSortingHelper;
import org.apache.hudi.table.BulkInsertPartitioner;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.OrderingIndexHelper;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;

/**
* A partitioner that does spatial curve optimization sorting based on specified column values for each RDD partition.
* support z-curve optimization, hilbert will come soon.
Expand Down Expand Up @@ -74,23 +79,45 @@ public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> reco
private JavaRDD<GenericRecord> prepareGenericRecord(JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups, final Schema schema) {
SerializableSchema serializableSchema = new SerializableSchema(schema);
JavaRDD<GenericRecord> genericRecordJavaRDD = inputRecords.map(f -> (GenericRecord) f.getData().getInsertValue(serializableSchema.get()).get());
Dataset<Row> originDF = AvroConversionUtils.createDataFrame(genericRecordJavaRDD.rdd(), schema.toString(), sparkEngineContext.getSqlContext().sparkSession());
Dataset<Row> zDataFrame;
Dataset<Row> originDF =
AvroConversionUtils.createDataFrame(
genericRecordJavaRDD.rdd(),
schema.toString(),
sparkEngineContext.getSqlContext().sparkSession()
);

Dataset<Row> sortedDF = reorder(originDF, numOutputGroups);

return HoodieSparkUtils.createRdd(sortedDF, schema.getName(),
schema.getNamespace(), false, org.apache.hudi.common.util.Option.empty()).toJavaRDD();
}

private Dataset<Row> reorder(Dataset<Row> originDF, int numOutputGroups) {
String orderedColumnsListConfig = config.getClusteringSortColumns();

if (isNullOrEmpty(orderedColumnsListConfig) || numOutputGroups <= 0) {
// No-op
return originDF;
}

switch (config.getLayoutOptimizationCurveBuildMethod()) {
List<String> orderedCols =
Arrays.stream(orderedColumnsListConfig.split(","))
.map(String::trim)
.collect(Collectors.toList());

HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy =
HoodieClusteringConfig.LayoutOptimizationStrategy.fromValue(config.getLayoutOptimizationStrategy());

HoodieClusteringConfig.BuildCurveStrategyType curveBuildStrategyType = config.getLayoutOptimizationCurveBuildMethod();

switch (curveBuildStrategyType) {
case DIRECT:
zDataFrame = OrderingIndexHelper
.createOptimizedDataFrameByMapValue(originDF, config.getClusteringSortColumns(), numOutputGroups, config.getLayoutOptimizationStrategy());
break;
return SpaceCurveSortingHelper.orderDataFrameByMappingValues(originDF, layoutOptStrategy, orderedCols, numOutputGroups);
case SAMPLE:
zDataFrame = OrderingIndexHelper
.createOptimizeDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups, config.getLayoutOptimizationStrategy());
break;
return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(originDF, layoutOptStrategy, orderedCols, numOutputGroups);
default:
throw new HoodieException("Not a valid build curve method for doWriteOperation: ");
throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveBuildStrategyType));
}
return HoodieSparkUtils.createRdd(zDataFrame, schema.getName(),
schema.getNamespace(), false, org.apache.hudi.common.util.Option.empty()).toJavaRDD();
}

@Override
Expand Down
Loading