Skip to content

Commit

Permalink
[HUDI-5072] Extract ExecutionStrategy#transform duplicate code (#7030)
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Oct 27, 2022
1 parent 5b9fcc4 commit ae541ba
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 47 deletions.
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.clustering.run.strategy;

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;

import java.io.IOException;

public class ExecutionStrategyUtil {

/**
* Transform IndexedRecord into HoodieRecord.
*
* @param indexedRecord indexedRecord.
* @param writeConfig writeConfig.
* @return hoodieRecord.
* @param <T>
*/
public static <T> HoodieRecord<T> transform(IndexedRecord indexedRecord,
HoodieWriteConfig writeConfig) {

GenericRecord record = (GenericRecord) indexedRecord;
Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();

if (!writeConfig.populateMetaFields()) {
try {
TypedProperties typedProperties = new TypedProperties(writeConfig.getProps());
keyGeneratorOpt = Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties));
} catch (IOException e) {
throw new HoodieIOException(
"Only BaseKeyGenerators are supported when meta columns are disabled ", e);
}
}

String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt);
String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt);
HoodieKey hoodieKey = new HoodieKey(key, partition);

HoodieRecordPayload avroPayload = new RewriteAvroPayload(record);
HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, avroPayload);
return hoodieRecord;
}
}
Expand Up @@ -30,12 +30,10 @@
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.CollectionUtils;
Expand All @@ -47,7 +45,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
Expand All @@ -57,16 +54,12 @@
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
Expand Down Expand Up @@ -424,21 +417,6 @@ private JavaRDD<WriteStatus>[] convertStreamToArray(Stream<JavaRDD<WriteStatus>>
* Transform IndexedRecord into HoodieRecord.
*/
private static <T> HoodieRecord<T> transform(IndexedRecord indexedRecord, HoodieWriteConfig writeConfig) {
GenericRecord record = (GenericRecord) indexedRecord;
Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
if (!writeConfig.populateMetaFields()) {
try {
keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
} catch (IOException e) {
throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e);
}
}
String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt);
String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt);
HoodieKey hoodieKey = new HoodieKey(key, partition);

HoodieRecordPayload avroPayload = new RewriteAvroPayload(record);
HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, avroPayload);
return hoodieRecord;
return ExecutionStrategyUtil.transform(indexedRecord, writeConfig);
}
}
Expand Up @@ -24,33 +24,25 @@
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -170,21 +162,6 @@ private Iterator<HoodieRecord<T>> readRecordsForGroupBaseFiles(List<ClusteringOp
* Transform IndexedRecord into HoodieRecord.
*/
private HoodieRecord<T> transform(IndexedRecord indexedRecord) {
GenericRecord record = (GenericRecord) indexedRecord;
Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
if (!getWriteConfig().populateMetaFields()) {
try {
keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(getWriteConfig().getProps())));
} catch (IOException e) {
throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e);
}
}
String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt);
String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt);
HoodieKey hoodieKey = new HoodieKey(key, partition);

HoodieRecordPayload avroPayload = new RewriteAvroPayload(record);
HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, avroPayload);
return hoodieRecord;
return ExecutionStrategyUtil.transform(indexedRecord, getWriteConfig());
}
}

0 comments on commit ae541ba

Please sign in to comment.