Skip to content

Commit

Permalink
Fix some tests and optimize merging logic
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Jun 28, 2023
1 parent de45923 commit 8662958
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;

/**
* Record Reader implementation to merge fresh avro data with base parquet data, to support real time queries.
*/
Expand All @@ -67,10 +69,10 @@ public abstract class AbstractRealtimeRecordReader {
private Schema readerSchema;
private Schema writerSchema;
private Schema hiveSchema;
private HoodieTableMetaClient metaClient;
private final HoodieTableMetaClient metaClient;
protected SchemaEvolutionContext schemaEvolutionContext;
// support merge operation
protected boolean supportPayload = true;
protected boolean supportPayload;
// handle hive type to avro record
protected HiveAvroSerializer serializer;
private boolean supportTimestamp;
Expand Down Expand Up @@ -149,11 +151,11 @@ private void init() throws Exception {
partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
: new ArrayList<>();
writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields);
List<String> projectionFields = HoodieRealtimeRecordReaderUtils.orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), partitioningFields);
List<String> projectionFields = HoodieRealtimeRecordReaderUtils.orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, EMPTY_STRING),
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, EMPTY_STRING), partitioningFields);

Map<String, Field> schemaFieldsMap = HoodieRealtimeRecordReaderUtils.getNameToFieldMap(writerSchema);
hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap, jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS));
hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap, jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS, EMPTY_STRING));
// TODO(vc): In the future, the reader schema should be updated based on log files & be able
// to null out fields not present before

Expand All @@ -166,7 +168,7 @@ private void init() throws Exception {
}

public Schema constructHiveOrderedSchema(Schema writerSchema, Map<String, Field> schemaFieldsMap, String hiveColumnString) {
String[] hiveColumns = hiveColumnString.split(",");
String[] hiveColumns = hiveColumnString.isEmpty() ? new String[0] : hiveColumnString.split(",");
LOG.info("Hive Columns : " + hiveColumnString);
List<Field> hiveSchemaFields = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
Expand All @@ -35,6 +36,8 @@
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -43,8 +46,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class HoodieMergeOnReadSnapshotReader extends AbstractRealtimeRecordReader implements Iterator<HoodieRecord> {
public class HoodieMergeOnReadSnapshotReader extends AbstractRealtimeRecordReader implements Iterator<HoodieRecord>, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeOnReadSnapshotReader.class);

private final String tableBasePath;
private final String baseFilePath;
Expand All @@ -70,20 +76,24 @@ public HoodieMergeOnReadSnapshotReader(String tableBasePath, String baseFilePath
this.latestInstantTime = latestInstantTime;
this.readerSchema = readerSchema;
this.jobConf = jobConf;
HoodieTimer timer = new HoodieTimer().startTimer();
this.logRecordScanner = getMergedLogRecordScanner();
this.baseFileReader = HoodieRealtimeRecordReaderUtils.getBaseFileReader(new Path(baseFilePath), jobConf);
LOG.debug("Time taken to scan log records: {}", timer.endTimer());
this.baseFileReader = HoodieRealtimeRecordReaderUtils.getBaseFileReader(new Path(this.baseFilePath), jobConf);
this.logRecordMap = logRecordScanner.getRecords();
this.logRecordKeys = new HashSet<>(this.logRecordMap.keySet());
List<HoodieRecord> mergedRecords = new ArrayList<>();
ClosableIterator<HoodieRecord> baseFileIterator = baseFileReader.getRecordIterator(readerSchema);
ClosableIterator<String> baseFileIterator = baseFileReader.getRecordKeyIterator();
timer.startTimer();
while (baseFileIterator.hasNext()) {
HoodieRecord record = baseFileIterator.next();
if (logRecordKeys.contains(record.getRecordKey())) {
logRecordKeys.remove(record.getRecordKey());
Option<HoodieAvroIndexedRecord> mergedRecord = buildGenericRecordWithCustomPayload(logRecordMap.get(record.getRecordKey()));
String key = baseFileIterator.next();
if (logRecordKeys.contains(key)) {
logRecordKeys.remove(key);
Option<HoodieAvroIndexedRecord> mergedRecord = buildGenericRecordWithCustomPayload(logRecordMap.get(key));
mergedRecord.ifPresent(mergedRecords::add);
}
}
LOG.debug("Time taken to merge base file and log file records: {}", timer.endTimer());
this.recordsIterator = mergedRecords.iterator();
}

Expand All @@ -107,21 +117,18 @@ private static HoodieRealtimeFileSplit getRealtimeSplit(String tableBasePath, St
tableBasePath,
logFilePaths,
latestInstantTime,
false,
false, // TODO: Fix this to support incremental queries
Option.empty());
return HoodieInputFormatUtils.createRealtimeFileSplit(realtimePath, start, length, hosts);
}

private HoodieMergedLogRecordScanner getMergedLogRecordScanner() {
// NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
// but can return records for completed commits > the commit we are trying to read (if using
// readCommit() API)
return HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(FSUtils.getFs(split.getPath().toString(), jobConf))
.withBasePath(split.getBasePath())
.withLogFilePaths(split.getDeltaLogPaths())
.withReaderSchema(getLogScannerReaderSchema())
.withLatestInstantTime(split.getMaxCommitTime())
.withBasePath(tableBasePath)
.withLogFilePaths(logFilePaths.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()))
.withReaderSchema(readerSchema)
.withLatestInstantTime(latestInstantTime)
.withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf))
.withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
.withReverseReader(false)
Expand All @@ -139,7 +146,17 @@ private Option<HoodieAvroIndexedRecord> buildGenericRecordWithCustomPayload(Hood
if (usesCustomPayload) {
return record.toIndexedRecord(getWriterSchema(), payloadProps);
} else {
return record.toIndexedRecord(getReaderSchema(), payloadProps);
return record.toIndexedRecord(readerSchema, payloadProps);
}
}

@Override
public void close() throws Exception {
if (baseFileReader != null) {
baseFileReader.close();
}
if (logRecordScanner != null) {
logRecordScanner.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
package org.apache.hudi.hadoop.realtime;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
Expand All @@ -34,7 +35,6 @@
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
Expand All @@ -44,35 +44,28 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.apache.hudi.common.fs.FSUtils.getFs;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestHoodieMergeOnReadSnapshotReader extends TestHoodieRealtimeRecordReader {

private static final String PARTITION_COLUMN = "datestr";
private static final String FILE_ID = "fileid0";
private JobConf baseJobConf;
private FileSystem fs;
private Configuration hadoopConf;
Expand All @@ -84,45 +77,18 @@ public void setUp() {
hadoopConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
baseJobConf = new JobConf(hadoopConf);
baseJobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1024 * 1024));
fs = FSUtils.getFs(basePath.toUri().toString(), baseJobConf);
fs = getFs(basePath.toUri().toString(), baseJobConf);
}

@TempDir
public java.nio.file.Path basePath;

@Test
public void testSnapshotReader() throws Exception {
testReaderInternal(ExternalSpillableMap.DiskMapType.BITCASK, false, false,
HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK);
testReaderInternal(false, HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK);
}

private void setHiveColumnNameProps(List<Schema.Field> fields, JobConf jobConf, boolean isPartitioned) {
String names = fields.stream().map(Schema.Field::name).collect(Collectors.joining(","));
String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);

String hiveOrderedColumnNames = fields.stream()
.map(Schema.Field::name)
.filter(name -> !name.equalsIgnoreCase(PARTITION_COLUMN))
.collect(Collectors.joining(","));
if (isPartitioned) {
hiveOrderedColumnNames += "," + PARTITION_COLUMN;
jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, PARTITION_COLUMN);
}
jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveOrderedColumnNames);
}

private static File getLogTempFile(long startTime, long endTime, String diskType) {
return Arrays.stream(Objects.requireNonNull(new File("/tmp").listFiles()))
.filter(f -> f.isDirectory() && f.getName().startsWith("hudi-" + diskType) && f.lastModified() > startTime && f.lastModified() < endTime)
.findFirst()
.orElse(new File(""));
}

private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean partitioned, HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception {
private void testReaderInternal(boolean partitioned, HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception {
// initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ);
Expand All @@ -141,9 +107,13 @@ private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType,
List<Pair<String, Integer>> logVersionsWithAction = new ArrayList<>();
logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 1));
logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 2));
FileSlice fileSlice =
new FileSlice(partitioned ? FSUtils.getRelativePartitionPath(new Path(basePath.toString()),
new Path(partitionDir.getAbsolutePath())) : "default", baseInstant, "fileid0");
String baseFilePath = partitionDir + "/" + FILE_ID + "_1-0-1_" + baseInstant + ".parquet";
String partitionPath = partitioned ? getRelativePartitionPath(new Path(basePath.toString()), new Path(partitionDir.getAbsolutePath())) : "default";
FileSlice fileSlice = new FileSlice(
new HoodieFileGroupId(partitionPath, FILE_ID),
baseInstant,
new HoodieBaseFile(fs.getFileStatus(new Path(baseFilePath))),
new ArrayList<>());
logVersionsWithAction.forEach(logVersionWithAction -> {
try {
// update files or generate new log file
Expand All @@ -157,11 +127,11 @@ private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType,

HoodieLogFormat.Writer writer;
if (action.equals(HoodieTimeline.ROLLBACK_ACTION)) {
writer = InputFormatTestUtil.writeRollback(partitionDir, fs, "fileid0", baseInstant, instantTime,
writer = InputFormatTestUtil.writeRollback(partitionDir, fs, FILE_ID, baseInstant, instantTime,
String.valueOf(baseInstantTs + logVersion - 1), logVersion);
} else {
writer =
InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", baseInstant,
InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, FILE_ID, baseInstant,
instantTime, 120, 0, logVersion, logBlockType);
}
long size = writer.getCurrentSize();
Expand All @@ -172,7 +142,7 @@ private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType,
// create a split with baseFile (parquet file written earlier) and new log file(s)
fileSlice.addLogFile(writer.getLogFile());
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, baseJobConf),
new FileSplit(new Path(baseFilePath), 0, 1, baseJobConf),
basePath.toUri().toString(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.collect(Collectors.toList()),
instantTime,
Expand All @@ -190,40 +160,12 @@ private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType,
1,
new String[0]);

// create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), baseJobConf, null);
JobConf jobConf = new JobConf(baseJobConf);
List<Schema.Field> fields = schema.getFields();
setHiveColumnNameProps(fields, jobConf, partitioned);

jobConf.setEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType);
jobConf.setBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), isCompressionEnabled);

// validate record reader compaction
long logTmpFileStartTime = System.currentTimeMillis();
HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);

// use reader to read base Parquet File and log file, merge in flight and return latest commit
// here all 100 records should be updated, see above
// another 20 new insert records should also output with new commit time.
NullWritable key = recordReader.createKey();
ArrayWritable value = recordReader.createValue();
int recordCnt = 0;
while (recordReader.next(key, value)) {
Writable[] values = value.get();
// check if the record written is with latest commit, here "101"
assertEquals(latestInstant, values[0].toString());
key = recordReader.createKey();
value = recordReader.createValue();
recordCnt++;
List<HoodieRecord> records = new ArrayList<>();
while (snapshotReader.hasNext()) {
records.add(snapshotReader.next());
}
recordReader.getPos();
assertEquals(1.0, recordReader.getProgress(), 0.05);
assertEquals(120, recordCnt);
recordReader.close();
// the temp file produced by logScanner should be deleted
assertFalse(getLogTempFile(logTmpFileStartTime, System.currentTimeMillis(), diskMapType.toString()).exists());
assertEquals(100, records.size());
snapshotReader.close();
} catch (Exception ioe) {
throw new HoodieException(ioe.getMessage(), ioe);
}
Expand Down

0 comments on commit 8662958

Please sign in to comment.