Skip to content

Commit

Permalink
Add length to FileAppender to avoid a call to S3 when writing. (Netfl…
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue committed Mar 19, 2019
1 parent 146094a commit 5e6bf4a
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 9 deletions.
11 changes: 9 additions & 2 deletions api/src/main/java/com/netflix/iceberg/Files.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public PositionOutputStream create() {
}

try {
return new PositionFileOutputStream(new RandomAccessFile(file, "rw"));
return new PositionFileOutputStream(file, new RandomAccessFile(file, "rw"));
} catch (FileNotFoundException e) {
throw new RuntimeIOException(e, "Failed to create file: %s", file);
}
Expand Down Expand Up @@ -185,14 +185,20 @@ public void close() throws IOException {
}

private static class PositionFileOutputStream extends PositionOutputStream {
private final File file;
private final RandomAccessFile stream;
private boolean isClosed = false;

private PositionFileOutputStream(RandomAccessFile stream) {
private PositionFileOutputStream(File file, RandomAccessFile stream) {
this.file = file;
this.stream = stream;
}

@Override
public long getPos() throws IOException {
if (isClosed) {
return file.length();
}
return stream.getFilePointer();
}

Expand All @@ -209,6 +215,7 @@ public void write(byte[] b, int off, int len) throws IOException {
@Override
public void close() throws IOException {
stream.close();
this.isClosed = true;
}

@Override
Expand Down
5 changes: 5 additions & 0 deletions api/src/main/java/com/netflix/iceberg/io/FileAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ default void addAll(Iterable<D> values) {
* @return {@link Metrics} for this file. Only valid after the file is closed.
*/
Metrics metrics();

/**
* @return the length of this file. Only valid after the file is closed.
*/
long length();
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public void close() throws IOException {
writer.close();
}

@Override
public long length() {
return writer.length();
}

private static FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
try {
return Avro.write(file)
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/java/com/netflix/iceberg/ManifestWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
class ManifestWriter implements FileAppender<DataFile> {
private static final Logger LOG = LoggerFactory.getLogger(ManifestWriter.class);

private final String location;
private final OutputFile file;
private final int specId;
private final FileAppender<ManifestEntry> writer;
Expand All @@ -50,7 +49,6 @@ class ManifestWriter implements FileAppender<DataFile> {
private int deletedFiles = 0;

ManifestWriter(PartitionSpec spec, OutputFile file, long snapshotId) {
this.location = file.location();
this.file = file;
this.specId = spec.specId();
this.writer = newAppender(FileFormat.AVRO, spec, file);
Expand Down Expand Up @@ -119,9 +117,14 @@ public Metrics metrics() {
return writer.metrics();
}

@Override
public long length() {
return writer.length();
}

public ManifestFile toManifestFile() {
Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed");
return new GenericManifestFile(location, file.toInputFile().getLength(), specId, snapshotId,
return new GenericManifestFile(file.location(), writer.length(), specId, snapshotId,
addedFiles, existingFiles, deletedFiles, stats.summaries());
}

Expand Down
24 changes: 21 additions & 3 deletions core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

package com.netflix.iceberg.avro;

import com.google.common.base.Preconditions;
import com.netflix.iceberg.Metrics;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.io.FileAppender;
import com.netflix.iceberg.io.OutputFile;
import com.netflix.iceberg.io.PositionOutputStream;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
Expand All @@ -32,13 +34,15 @@
import java.util.function.Function;

class AvroFileAppender<D> implements FileAppender<D> {
private PositionOutputStream stream = null;
private DataFileWriter<D> writer = null;
private long numRecords = 0L;

AvroFileAppender(Schema schema, OutputFile file,
Function<Schema, DatumWriter<?>> createWriterFunc,
CodecFactory codec, Map<String, String> metadata) throws IOException {
this.writer = newAvroWriter(schema, file, createWriterFunc, codec, metadata);
this.stream = file.create();
this.writer = newAvroWriter(schema, stream, createWriterFunc, codec, metadata);
}

@Override
Expand All @@ -56,6 +60,20 @@ public Metrics metrics() {
return new Metrics(numRecords, null, null, null);
}

@Override
public long length() {
Preconditions.checkState(writer == null,
"Cannot return length while appending to an open file.");
if (stream != null) {
try {
return stream.getPos();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to get stream length");
}
}
throw new RuntimeIOException("Failed to get stream length: no open stream");
}

@Override
public void close() throws IOException {
if (writer != null) {
Expand All @@ -66,7 +84,7 @@ public void close() throws IOException {

@SuppressWarnings("unchecked")
private static <D> DataFileWriter<D> newAvroWriter(
Schema schema, OutputFile file, Function<Schema, DatumWriter<?>> createWriterFunc,
Schema schema, PositionOutputStream stream, Function<Schema, DatumWriter<?>> createWriterFunc,
CodecFactory codec, Map<String, String> metadata) throws IOException {
DataFileWriter<D> writer = new DataFileWriter<>(
(DatumWriter<D>) createWriterFunc.apply(schema));
Expand All @@ -78,6 +96,6 @@ private static <D> DataFileWriter<D> newAvroWriter(
}

// TODO: support overwrite
return writer.create(schema, file.create());
return writer.create(schema, stream);
}
}
14 changes: 13 additions & 1 deletion orc/src/main/java/com/netflix/iceberg/orc/OrcFileAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.iceberg.orc;

import com.google.common.base.Preconditions;
import com.netflix.iceberg.Metrics;
import com.netflix.iceberg.Schema;
import com.netflix.iceberg.io.FileAppender;
Expand All @@ -39,6 +40,7 @@ public class OrcFileAppender implements FileAppender<VectorizedRowBatch> {
private final TypeDescription orcSchema;
private final ColumnIdMap columnIds = new ColumnIdMap();
private final Path path;
private boolean isClosed = false;

public static final String COLUMN_NUMBERS_ATTRIBUTE = "iceberg.column.ids";

Expand Down Expand Up @@ -97,9 +99,19 @@ public Metrics metrics() {
}
}

@Override
public long length() {
Preconditions.checkState(isClosed,
"Cannot return length while appending to an open file.");
return writer.getRawDataSize();
}

@Override
public void close() throws IOException {
writer.close();
if (!isClosed) {
this.isClosed = true;
writer.close();
}
}

public TypeDescription getSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package com.netflix.iceberg.parquet;

import com.google.common.base.Preconditions;
import com.netflix.iceberg.Metrics;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.io.FileAppender;
Expand Down Expand Up @@ -48,6 +49,13 @@ public Metrics metrics() {
return new Metrics(numRecords, null, null, null);
}

@Override
public long length() {
Preconditions.checkState(writer == null,
"Cannot return length while appending to an open file.");
return writer.getDataSize();
}

@Override
public void close() throws IOException {
if (writer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ public Metrics metrics() {
return ParquetMetrics.fromMetadata(writer.getFooter());
}

@Override
public long length() {
try {
return writer.getPos();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to get file length");
}
}

private void checkSize() {
if (recordCount >= nextCheckRecordCount) {
long bufferedSize = writeStore.getBufferedSize();
Expand Down

0 comments on commit 5e6bf4a

Please sign in to comment.