Skip to content
13 changes: 9 additions & 4 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;

import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION_DEFAULT;

/**
* Metadata for a table.
*/
public class TableMetadata implements Serializable {
static final long INITIAL_SEQUENCE_NUMBER = 0;
static final int DEFAULT_TABLE_FORMAT_VERSION = 1;
static final int SUPPORTED_TABLE_FORMAT_VERSION = 2;
static final int INITIAL_SPEC_ID = 0;
static final int INITIAL_SORT_ORDER_ID = 1;
Expand All @@ -64,22 +66,25 @@ public static TableMetadata newTableMetadata(TableOperations ops,
PartitionSpec spec,
String location,
Map<String, String> properties) {
return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, DEFAULT_TABLE_FORMAT_VERSION);
int formatVersion = PropertyUtil.propertyAsInt(properties, FORMAT_VERSION, FORMAT_VERSION_DEFAULT);
return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, formatVersion);
}

public static TableMetadata newTableMetadata(Schema schema,
PartitionSpec spec,
SortOrder sortOrder,
String location,
Map<String, String> properties) {
return newTableMetadata(schema, spec, sortOrder, location, properties, DEFAULT_TABLE_FORMAT_VERSION);
int formatVersion = PropertyUtil.propertyAsInt(properties, FORMAT_VERSION, FORMAT_VERSION_DEFAULT);
return newTableMetadata(schema, spec, sortOrder, location, properties, formatVersion);
}

public static TableMetadata newTableMetadata(Schema schema,
PartitionSpec spec,
String location,
Map<String, String> properties) {
return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, DEFAULT_TABLE_FORMAT_VERSION);
int formatVersion = PropertyUtil.propertyAsInt(properties, FORMAT_VERSION, FORMAT_VERSION_DEFAULT);
return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, formatVersion);
}

static TableMetadata newTableMetadata(Schema schema,
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,7 @@ private TableProperties() {

public static final String GC_ENABLED = "gc.enabled";
public static final boolean GC_ENABLED_DEFAULT = true;

public static final String FORMAT_VERSION = "format.version";
public static final int FORMAT_VERSION_DEFAULT = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public void delete(T row) {
appender.add(row);
}

public long length() {
return appender.length();
}

@Override
public void close() throws IOException {
if (deleteFile == null) {
Expand Down
234 changes: 201 additions & 33 deletions core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,26 @@
import java.io.IOException;
import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.iceberg.util.Tasks;

public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
private final List<DataFile> completedFiles = Lists.newArrayList();
private final List<DeleteFile> completedDeletes = Lists.newArrayList();
private final PartitionSpec spec;
private final FileFormat format;
private final FileAppenderFactory<T> appenderFactory;
Expand All @@ -51,39 +60,149 @@ protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFact
this.targetFileSize = targetFileSize;
}

protected PartitionSpec spec() {
return spec;
}

protected FileAppenderFactory<T> appenderFactory() {
return appenderFactory;
}

@Override
public void abort() throws IOException {
close();

// clean up files created by this writer
Tasks.foreach(completedFiles)
Tasks.foreach(Iterables.concat(completedFiles, completedDeletes))
.throwFailureWhenFinished()
.noRetry()
.run(file -> io.deleteFile(file.path().toString()));
}

@Override
public DataFile[] complete() throws IOException {
public WriterResult complete() throws IOException {
close();

return completedFiles.toArray(new DataFile[0]);
return WriterResult.builder()
.addDataFiles(completedFiles)
.addDeleteFiles(completedDeletes)
.build();
}

protected abstract class BaseDeltaWriter implements Closeable {
private final RollingFileWriter dataWriter;

private final boolean enableEqDelete;
private RollingEqDeleteWriter eqDeleteWriter = null;
private SortedPosDeleteWriter<T> posDeleteWriter = null;
private StructLikeMap<FilePos> insertedRowMap = null;

public BaseDeltaWriter(PartitionKey partition, List<Integer> equalityFieldIds, Schema schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The list equalityFieldIds is only used in this constructor and it is used to create a projection of the schema that is passed in. I think it would be better to pass the delete schema or null in, so that we don't need each writer to create a new projection of the row schema.

this.dataWriter = new RollingFileWriter(partition);

this.enableEqDelete = equalityFieldIds != null && !equalityFieldIds.isEmpty();
if (enableEqDelete) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use a delta writer if eq deletes are disabled?

I typically like to use classes that don't need to check configuration in a tight loop. This setting introduces at least one check per row. I'd prefer using either a normal task writer or a delta writer depending on whether deletes are expected in the stream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use a delta writer if eq deletes are disabled?

Because I only want to expose the BaseDeltaWriter to compute engines, I planed to make the BaseRollingWriter & RollingFileWriter & RollingEqDeleteWriter to be private. To implement the compute-engine specific TaskWriter, the only thing we need to do is implementing the asKey and asCopiedKey methods and customizing the policy to dispatch records to DeltaWriter.

this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition);

Schema deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
}
}

protected abstract StructLike asKey(T row);

protected abstract StructLike asCopiedKey(T row);

public void write(T row) throws IOException {
if (enableEqDelete) {
FilePos filePos = FilePos.create(dataWriter.currentPath(), dataWriter.currentRows());

StructLike copiedKey = asCopiedKey(row);
// Adding a pos-delete to replace the old filePos.
FilePos previous = insertedRowMap.put(copiedKey, filePos);
if (previous != null) {
posDeleteWriter.delete(previous.path, previous.rowOffset, null /* TODO set non-nullable row*/);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would this set the row? Would we need to keep track of it somehow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The straightforward way is adding a row field in FilePos which will reference to the inserted old row, but that will hold references of all the inserted rows in a checkpoint. If the row is large while equality fields are small, then the idea way should only keep the equality fields & file-pos in the insertedRowMap, but if we want to attach row when writing pos-delete file then the memory consumption is an issue. I'm considering that maybe we will need an embedded KV lib which could split to disk in future.

}
}

dataWriter.write(row);
}

public void delete(T row) throws IOException {
Preconditions.checkState(enableEqDelete, "Could not accept equality deletion.");

StructLike key = asKey(row);
FilePos previous = insertedRowMap.remove(key);

if (previous != null) {
posDeleteWriter.delete(previous.path, previous.rowOffset, null /* TODO set non-nullable row */);
}

eqDeleteWriter.write(row);
}

@Override
public void close() throws IOException {
// Moving the completed data files into task writer's completedFiles automatically.
dataWriter.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: dataWriter should be set to null so that it can be garbage collected and so any further calls to write will fail.


if (enableEqDelete) {
// Moving the completed eq-delete files into task writer's completedDeletes automatically.
eqDeleteWriter.close();
insertedRowMap.clear();

// Moving the completed pos-delete files into completedDeletes.
completedDeletes.addAll(posDeleteWriter.complete());
}
}
}

private static class FilePos {
private final CharSequence path;
private final long rowOffset;

private FilePos(CharSequence path, long rowOffset) {
this.path = path;
this.rowOffset = rowOffset;
}

private static FilePos create(CharSequence path, long rowOffset) {
return new FilePos(path, rowOffset);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("path", path)
.add("row_offset", rowOffset)
.toString();
}
}

protected class RollingFileWriter implements Closeable {
private abstract class BaseRollingWriter<W extends Closeable> implements Closeable {
private static final int ROWS_DIVISOR = 1000;
private final PartitionKey partitionKey;

private EncryptedOutputFile currentFile = null;
private FileAppender<T> currentAppender = null;
private W currentWriter = null;
private long currentRows = 0;

public RollingFileWriter(PartitionKey partitionKey) {
private BaseRollingWriter(PartitionKey partitionKey) {
this.partitionKey = partitionKey;
openCurrent();
}

public void add(T record) throws IOException {
this.currentAppender.add(record);
abstract W newWriter(EncryptedOutputFile file, PartitionKey key);

abstract long length(W writer);

abstract void write(W writer, T record);

abstract void complete(W closedWriter);

public void write(T record) throws IOException {
write(currentWriter, record);
this.currentRows++;

if (shouldRollToNewFile()) {
Expand All @@ -92,48 +211,45 @@ public void add(T record) throws IOException {
}
}

public CharSequence currentPath() {
Preconditions.checkNotNull(currentFile, "The currentFile shouldn't be null");
return currentFile.encryptingOutputFile().location();
}

public long currentRows() {
return currentRows;
}

private void openCurrent() {
if (partitionKey == null) {
// unpartitioned
currentFile = fileFactory.newOutputFile();
this.currentFile = fileFactory.newOutputFile();
} else {
// partitioned
currentFile = fileFactory.newOutputFile(partitionKey);
this.currentFile = fileFactory.newOutputFile(partitionKey);
}
currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format);
currentRows = 0;
this.currentWriter = newWriter(currentFile, partitionKey);
this.currentRows = 0;
}

private boolean shouldRollToNewFile() {
// TODO: ORC file now not support target file size before closed
return !format.equals(FileFormat.ORC) &&
currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize;
currentRows % ROWS_DIVISOR == 0 && length(currentWriter) >= targetFileSize;
}

private void closeCurrent() throws IOException {
if (currentAppender != null) {
currentAppender.close();
// metrics are only valid after the appender is closed
Metrics metrics = currentAppender.metrics();
long fileSizeInBytes = currentAppender.length();
List<Long> splitOffsets = currentAppender.splitOffsets();
this.currentAppender = null;

if (metrics.recordCount() == 0L) {
if (currentWriter != null) {
currentWriter.close();

if (currentRows == 0L) {
io.deleteFile(currentFile.encryptingOutputFile());
} else {
DataFile dataFile = DataFiles.builder(spec)
.withEncryptionKeyMetadata(currentFile.keyMetadata())
.withPath(currentFile.encryptingOutputFile().location())
.withFileSizeInBytes(fileSizeInBytes)
.withPartition(spec.fields().size() == 0 ? null : partitionKey) // set null if unpartitioned
.withMetrics(metrics)
.withSplitOffsets(splitOffsets)
.build();
completedFiles.add(dataFile);
complete(currentWriter);
}

this.currentFile = null;
this.currentWriter = null;
this.currentRows = 0;
}
}
Expand All @@ -143,4 +259,56 @@ public void close() throws IOException {
closeCurrent();
}
}

protected class RollingFileWriter extends BaseRollingWriter<DataWriter<T>> {
public RollingFileWriter(PartitionKey partitionKey) {
super(partitionKey);
}

@Override
DataWriter<T> newWriter(EncryptedOutputFile file, PartitionKey key) {
return appenderFactory.newDataWriter(file, format, key);
}

@Override
long length(DataWriter<T> writer) {
return writer.length();
}

@Override
void write(DataWriter<T> writer, T record) {
writer.add(record);
}

@Override
void complete(DataWriter<T> closedWriter) {
completedFiles.add(closedWriter.toDataFile());
}
}

private class RollingEqDeleteWriter extends BaseRollingWriter<EqualityDeleteWriter<T>> {
private RollingEqDeleteWriter(PartitionKey partitionKey) {
super(partitionKey);
}

@Override
EqualityDeleteWriter<T> newWriter(EncryptedOutputFile file, PartitionKey key) {
return appenderFactory.newEqDeleteWriter(file, format, key);
}

@Override
long length(EqualityDeleteWriter<T> writer) {
return writer.length();
}

@Override
void write(EqualityDeleteWriter<T> writer, T record) {
writer.delete(record);
}

@Override
void complete(EqualityDeleteWriter<T> closedWriter) {
completedDeletes.add(closedWriter.toDeleteFile());
}
}
}
Loading