Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1434] fix incorrect log file path in HoodieWriteStat #2300

Merged
merged 3 commits into from
Dec 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void init() throws IOException, InterruptedException, URISyntaxException
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
dataBlock = new HoodieAvroDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
writer.appendBlock(dataBlock);
} finally {
if (writer != null) {
writer.close();
Expand Down Expand Up @@ -183,7 +183,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
writer.appendBlock(dataBlock);
} finally {
if (writer != null) {
writer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public CopyOnWriteInsertHandler(HoodieWriteConfig config, String instantTime,
public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
final HoodieRecord insertPayload = payload.record;
String partitionPath = insertPayload.getPartitionPath();
HoodieWriteHandle handle = handles.get(partitionPath);
HoodieWriteHandle<?,?,?,?> handle = handles.get(partitionPath);
if (handle == null) {
// If the records are sorted, this means that we encounter a new partition path
// and the records for the previous partition path are all written,
Expand All @@ -87,7 +87,7 @@ public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {

if (!handle.canWrite(payload.record)) {
// Handle is full. Close the handle and add the WriteStatus
statuses.add(handle.close());
statuses.addAll(handle.close());
// Open new handle
handle = writeHandleFactory.create(config, instantTime, hoodieTable,
insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
Expand All @@ -108,8 +108,8 @@ public List<WriteStatus> getResult() {
}

private void closeOpenHandles() {
for (HoodieWriteHandle handle : handles.values()) {
statuses.add(handle.close());
for (HoodieWriteHandle<?,?,?,?> handle : handles.values()) {
statuses.addAll(handle.close());
}
handles.clear();
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {
Expand Down Expand Up @@ -162,11 +164,6 @@ public void write() {
}
}

@Override
public WriteStatus getWriteStatus() {
return writeStatus;
}

@Override
public IOType getIOType() {
return IOType.CREATE;
Expand All @@ -176,9 +173,8 @@ public IOType getIOType() {
* Performs actions to durably, persist the current changes and returns a WriteStatus object.
*/
@Override
public WriteStatus close() {
LOG
.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten);
public List<WriteStatus> close() {
LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten);
try {

fileWriter.close();
Expand All @@ -203,7 +199,7 @@ public WriteStatus close() {
LOG.info(String.format("CreateHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(),
stat.getFileId(), runtimeStats.getTotalCreateTime()));

return writeStatus;
return Collections.singletonList(writeStatus);
} catch (IOException e) {
throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -258,7 +260,7 @@ public void write(GenericRecord oldRecord) {
}

@Override
public WriteStatus close() {
public List<WriteStatus> close() {
try {
// write out any pending records (this can happen when inserts are turned into updates)
Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)
Expand Down Expand Up @@ -301,7 +303,7 @@ public WriteStatus close() {
LOG.info(String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(),
stat.getFileId(), runtimeStats.getTotalUpsertTime()));

return writeStatus;
return Collections.singletonList(writeStatus);
} catch (IOException e) {
throw new HoodieUpsertException("Failed to close UpdateHandle", e);
}
Expand Down Expand Up @@ -333,11 +335,6 @@ public Path getOldFilePath() {
return oldFilePath;
}

@Override
public WriteStatus getWriteStatus() {
return writeStatus;
}

@Override
public IOType getIOType() {
return IOType.MERGE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
Expand Down Expand Up @@ -101,7 +102,7 @@ public void write(GenericRecord oldRecord) {
}

@Override
public WriteStatus close() {
public List<WriteStatus> close() {
// write out any pending records (this can happen when inserts are turned into updates)
newRecordKeysSorted.stream().forEach(key -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
* Base class for all write operations logically performed at the file group level.
Expand All @@ -56,7 +58,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
protected final Schema writerSchema;
protected final Schema writerSchemaWithMetafields;
protected HoodieTimer timer;
protected final WriteStatus writeStatus;
protected WriteStatus writeStatus;
protected final String partitionPath;
protected final String fileId;
protected final String writeToken;
Expand Down Expand Up @@ -167,9 +169,15 @@ protected GenericRecord rewriteRecord(GenericRecord record) {
return HoodieAvroUtils.rewriteRecord(record, writerSchemaWithMetafields);
}

public abstract WriteStatus close();
public abstract List<WriteStatus> close();

public abstract WriteStatus getWriteStatus();
public List<WriteStatus> writeStatuses() {
return Collections.singletonList(writeStatus);
}

public String getPartitionPath() {
return partitionPath;
}

public abstract IOType getIOType();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private Writer openWriter() {
} else {
return this.writer;
}
} catch (InterruptedException | IOException e) {
} catch (IOException e) {
throw new HoodieException("Unable to initialize HoodieLogFormat writer", e);
}
}
Expand Down Expand Up @@ -335,7 +335,7 @@ private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) thro
Map<HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header);
this.writer = writer.appendBlock(block);
writer.appendBlock(block);
records.clear();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ protected HoodieRollbackStat undoAppend(String appendBaseFilePath, HoodieInstant
// generate metadata
Map<HoodieLogBlock.HeaderMetadataType, String> header = RollbackUtils.generateHeader(instantToRollback.getTimestamp(), instantTime);
// if update belongs to an existing log file
writer = writer.appendBlock(new HoodieCommandBlock(header));
writer.appendBlock(new HoodieCommandBlock(header));
} finally {
try {
if (writer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fil
return handleUpdateInternal(upsertHandle, fileId);
}

protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId)
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId)
throws IOException {
if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException(
Expand All @@ -277,11 +277,12 @@ protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle ups
}

// TODO(vc): This needs to be revisited
if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
if (upsertHandle.getPartitionPath() == null) {
LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
+ upsertHandle.getWriteStatus());
+ upsertHandle.writeStatuses());
}
return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();

return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
}

protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ Map<String, HoodieRollbackStat> maybeDeleteAndCollectStats(HoodieEngineContext c
if (doDelete) {
Map<HoodieLogBlock.HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
// if update belongs to an existing log file
writer = writer.appendBlock(new HoodieCommandBlock(header));
writer.appendBlock(new HoodieCommandBlock(header));
}
} catch (IOException | InterruptedException io) {
throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fil
return handleUpdateInternal(upsertHandle, fileId);
}

protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId)
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,?,?,?> upsertHandle, String fileId)
throws IOException {
if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException(
Expand All @@ -274,12 +274,11 @@ protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle ups
JavaMergeHelper.newInstance().runMerge(table, upsertHandle);
}

// TODO(vc): This needs to be revisited
if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
+ upsertHandle.getWriteStatus());
List<WriteStatus> statuses = upsertHandle.writeStatuses();
if (upsertHandle.getPartitionPath() == null) {
LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " + statuses);
}
return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();
return Collections.singletonList(statuses).iterator();
}

protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String parti
return handleUpdateInternal(upsertHandle, instantTime, fileId);
}

protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, String instantTime,
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,?,?,?> upsertHandle, String instantTime,
String fileId) throws IOException {
if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException(
Expand All @@ -187,11 +187,12 @@ protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle ups
}

// TODO(vc): This needs to be revisited
if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
if (upsertHandle.getPartitionPath() == null) {
LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
+ upsertHandle.getWriteStatus());
+ upsertHandle.writeStatuses());
}
return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();

return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
}

protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
Expand All @@ -207,10 +208,10 @@ protected HoodieMergeHandle getUpdateHandle(String instantTime, String partition

public Iterator<List<WriteStatus>> handleInsert(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
HoodieCreateHandle createHandle =
HoodieCreateHandle<?,?,?,?> createHandle =
new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, taskContextSupplier);
createHandle.write();
return Collections.singletonList(Collections.singletonList(createHandle.close())).iterator();
return Collections.singletonList(createHandle.close()).iterator();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ private BootstrapWriteStatus handleMetadataBootstrap(String srcPartitionPath, St
HoodieFileStatus srcFileStatus, KeyGeneratorInterface keyGenerator) {

Path sourceFilePath = FileStatusUtils.toPath(srcFileStatus.getPath());
HoodieBootstrapHandle bootstrapHandle = new HoodieBootstrapHandle(config, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
HoodieBootstrapHandle<?,?,?,?> bootstrapHandle = new HoodieBootstrapHandle(config, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
table, partitionPath, FSUtils.createNewFileIdPfx(), table.getTaskContextSupplier());
Schema avroSchema = null;
try {
Expand Down Expand Up @@ -329,7 +329,8 @@ private BootstrapWriteStatus handleMetadataBootstrap(String srcPartitionPath, St
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
BootstrapWriteStatus writeStatus = (BootstrapWriteStatus)bootstrapHandle.getWriteStatus();

BootstrapWriteStatus writeStatus = (BootstrapWriteStatus) bootstrapHandle.writeStatuses().get(0);
BootstrapFileMapping bootstrapFileMapping = new BootstrapFileMapping(
config.getBootstrapSourceBasePath(), srcPartitionPath, partitionPath,
srcFileStatus, writeStatus.getFileId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fil
return handleUpdateInternal(upsertHandle, fileId);
}

protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId)
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,?,?,?> upsertHandle, String fileId)
throws IOException {
if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException(
Expand All @@ -289,11 +289,12 @@ protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle ups
}

// TODO(vc): This needs to be revisited
if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
if (upsertHandle.getPartitionPath() == null) {
LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
+ upsertHandle.getWriteStatus());
+ upsertHandle.writeStatuses());
}
return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();

return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
}

protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,10 @@ public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fil
LOG.info("Small file corrections for updates for commit " + instantTime + " for file " + fileId);
return super.handleUpdate(partitionPath, fileId, recordItr);
} else {
HoodieAppendHandle appendHandle = new HoodieAppendHandle<>(config, instantTime, table,
HoodieAppendHandle<?,?,?,?> appendHandle = new HoodieAppendHandle<>(config, instantTime, table,
partitionPath, fileId, recordItr, taskContextSupplier);
appendHandle.doAppend();
appendHandle.close();
return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
return Collections.singletonList(appendHandle.close()).iterator();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ JavaPairRDD<String, HoodieRollbackStat> maybeDeleteAndCollectStats(HoodieEngineC
if (doDelete) {
Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
// if update belongs to an existing log file
writer = writer.appendBlock(new HoodieCommandBlock(header));
writer.appendBlock(new HoodieCommandBlock(header));
}
} catch (IOException | InterruptedException io) {
throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ private WriteStatus prepareFirstRecordCommit(List<String> recordsStrs) throws IO
}
Map<String, HoodieRecord> insertRecordMap = insertRecords.stream()
.collect(Collectors.toMap(r -> r.getRecordKey(), Function.identity()));
HoodieCreateHandle createHandle =
HoodieCreateHandle<?,?,?,?> createHandle =
new HoodieCreateHandle(config, "100", table, insertRecords.get(0).getPartitionPath(), "f1-0", insertRecordMap, supplier);
createHandle.write();
return createHandle.close();
return createHandle.close().get(0);
}).collect();

final Path commitFile = new Path(config.getBasePath() + "/.hoodie/" + HoodieTimeline.makeCommitFileName("100"));
Expand Down
Loading