Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ private TableProperties() {
public static final String WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes";
public static final long WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = Long.MAX_VALUE;

public static final String WRITE_PARTITIONED_FANOUT_ENABLED = "write.partitioned.fanout.enabled";
public static final boolean WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT = false;

public static final String SNAPSHOT_ID_INHERITANCE_ENABLED = "compatibility.snapshot-id-inheritance.enabled";
public static final boolean SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,19 @@
* under the License.
*/

package org.apache.iceberg.flink.sink;
package org.apache.iceberg.io;

import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
private final Map<PartitionKey, RollingFileWriter> writers = Maps.newHashMap();

PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitionedFanoutWriter;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.orc.ORC;
Expand Down
2 changes: 2 additions & 0 deletions site/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.summary.partition-limit | 0 | Includes partition-level summary stats in snapshot summaries if the changed partition count is less than this limit |
| write.metadata.delete-after-commit.enabled | false | Controls whether to delete the oldest version metadata files after commit |
| write.metadata.previous-versions-max | 100 | The max number of previous version metadata files to keep before deleting after commit |
| write.partitioned.fanout.enabled | false | Enables Partitioned-Fanout-Writer writes |

### Table behavior properties

Expand Down Expand Up @@ -155,4 +156,5 @@ df.write
| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes |
| check-nullability | true | Sets the nullable check on fields |
| snapshot-property._custom-key_ | null | Adds an entry with custom-key and corresponding value in the snapshot summary |
| partitioned.fanout.enabled | false | Overrides this table's write.partitioned.fanout.enabled |

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.broadcast.Broadcast;
Expand Down Expand Up @@ -104,10 +105,18 @@ private List<DataFile> rewriteDataForTask(CombinedScanTask task) throws Exceptio

TaskWriter<InternalRow> writer;
if (spec.fields().isEmpty()) {
writer = new UnpartitionedWriter<>(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE);
writer = new UnpartitionedWriter<>(spec, format, appenderFactory, fileFactory, io.value(),
Long.MAX_VALUE);
} else if (PropertyUtil.propertyAsBoolean(properties,
TableProperties.WRITE_PARTITIONED_FANOUT_ENABLED,
TableProperties.WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT)) {
writer = new SparkPartitionedFanoutWriter(
spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, schema,
structType);
} else {
writer = new SparkPartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE,
schema, structType);
writer = new SparkPartitionedWriter(
spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, schema,
structType);
}

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.source;

import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitionedFanoutWriter;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;

public class SparkPartitionedFanoutWriter extends PartitionedFanoutWriter<InternalRow> {
private final PartitionKey partitionKey;
private final InternalRowWrapper internalRowWrapper;

public SparkPartitionedFanoutWriter(PartitionSpec spec, FileFormat format,
FileAppenderFactory<InternalRow> appenderFactory,
OutputFileFactory fileFactory, FileIO io, long targetFileSize,
Schema schema, StructType sparkSchema) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.partitionKey = new PartitionKey(spec, schema);
this.internalRowWrapper = new InternalRowWrapper(sparkSchema);
}

@Override
protected PartitionKey partition(InternalRow row) {
partitionKey.partition(internalRowWrapper.wrap(row));
return partitionKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.iceberg.TableProperties.WRITE_PARTITIONED_FANOUT_ENABLED;
import static org.apache.iceberg.types.Types.NestedField.optional;

@RunWith(Parameterized.class)
Expand Down Expand Up @@ -327,51 +328,17 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws

@Test
public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOException {
File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");

HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
Table table = tables.create(SCHEMA, spec, location.toString());

List<SimpleRecord> expected = Lists.newArrayListWithCapacity(8000);
for (int i = 0; i < 2000; i++) {
expected.add(new SimpleRecord(i, "a"));
expected.add(new SimpleRecord(i, "b"));
expected.add(new SimpleRecord(i, "c"));
expected.add(new SimpleRecord(i, "d"));
}

Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);

df.select("id", "data").sort("data").write()
.format("iceberg")
.option("write-format", format.toString())
.mode("append")
.option("target-file-size-bytes", 4) // ~4 bytes; low enough to trigger
.save(location.toString());

table.refresh();

Dataset<Row> result = spark.read()
.format("iceberg")
.load(location.toString());
partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType.NONE);
}

List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
Assert.assertEquals("Result rows should match", expected, actual);
@Test
public void testPartitionedFanoutCreateWithTargetFileSizeViaOption() throws IOException {
partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType.TABLE);
}

List<DataFile> files = Lists.newArrayList();
for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
for (DataFile file : ManifestFiles.read(manifest, table.io())) {
files.add(file);
}
}
// TODO: ORC file now not support target file size
if (!format.equals(FileFormat.ORC)) {
Assert.assertEquals("Should have 8 DataFiles", 8, files.size());
Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
}
@Test
public void testPartitionedFanoutCreateWithTargetFileSizeViaOption2() throws IOException {
partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType.JOB);
}

@Test
Expand Down Expand Up @@ -505,4 +472,83 @@ public void testViewsReturnRecentResults() throws IOException {
Assert.assertEquals("Number of rows should match", expected2.size(), actual2.size());
Assert.assertEquals("Result rows should match", expected2, actual2);
}

public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType option)
throws IOException {
File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");

HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
Table table = tables.create(SCHEMA, spec, location.toString());

List<SimpleRecord> expected = Lists.newArrayListWithCapacity(8000);
for (int i = 0; i < 2000; i++) {
expected.add(new SimpleRecord(i, "a"));
expected.add(new SimpleRecord(i, "b"));
expected.add(new SimpleRecord(i, "c"));
expected.add(new SimpleRecord(i, "d"));
}

Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);

switch (option) {
case NONE:
df.select("id", "data").sort("data").write()
.format("iceberg")
.option("write-format", format.toString())
.mode("append")
.option("target-file-size-bytes", 4) // ~4 bytes; low enough to trigger
.save(location.toString());
break;
case TABLE:
table.updateProperties().set(WRITE_PARTITIONED_FANOUT_ENABLED, "true").commit();
df.select("id", "data").write()
.format("iceberg")
.option("write-format", format.toString())
.mode("append")
.option("target-file-size-bytes", 4) // ~4 bytes; low enough to trigger
.save(location.toString());
break;
case JOB:
df.select("id", "data").write()
.format("iceberg")
.option("write-format", format.toString())
.mode("append")
.option("target-file-size-bytes", 4) // ~4 bytes; low enough to trigger
.option("partitioned.fanout.enabled", true)
.save(location.toString());
break;
default:
break;
}

table.refresh();

Dataset<Row> result = spark.read()
.format("iceberg")
.load(location.toString());

List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
Assert.assertEquals("Result rows should match", expected, actual);

List<DataFile> files = Lists.newArrayList();
for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
for (DataFile file : ManifestFiles.read(manifest, table.io())) {
files.add(file);
}
}
// TODO: ORC file now not support target file size
if (!format.equals(FileFormat.ORC)) {
Assert.assertEquals("Should have 8 DataFiles", 8, files.size());
Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
}
}

public enum IcebergOptionsType {
NONE,
TABLE,
JOB
}
}
39 changes: 35 additions & 4 deletions spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_PARTITIONED_FANOUT_ENABLED;
import static org.apache.iceberg.TableProperties.WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;

Expand All @@ -83,6 +85,7 @@ class Writer implements DataSourceWriter {
private final Schema writeSchema;
private final StructType dsSchema;
private final Map<String, String> extraSnapshotMetadata;
private final boolean partitionedFanoutEnabled;

Writer(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
DataSourceOptions options, boolean replacePartitions, String applicationId, Schema writeSchema,
Expand Down Expand Up @@ -113,6 +116,10 @@ class Writer implements DataSourceWriter {
long tableTargetFileSize = PropertyUtil.propertyAsLong(
table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
this.targetFileSize = options.getLong("target-file-size-bytes", tableTargetFileSize);

boolean tablePartitionedFanoutEnabled = PropertyUtil.propertyAsBoolean(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: will we set this for a given table ? In my option, it's per job ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need set this option for a given table, Because some tables require fanout.

table.properties(), WRITE_PARTITIONED_FANOUT_ENABLED, WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT);
this.partitionedFanoutEnabled = options.getBoolean("partitioned.fanout.enabled", tablePartitionedFanoutEnabled);
}

private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
Expand All @@ -131,7 +138,7 @@ private boolean isWapTable() {
public DataWriterFactory<InternalRow> createWriterFactory() {
return new WriterFactory(
table.spec(), format, table.locationProvider(), table.properties(), io, encryptionManager, targetFileSize,
writeSchema, dsSchema);
writeSchema, dsSchema, partitionedFanoutEnabled);
}

@Override
Expand Down Expand Up @@ -246,11 +253,12 @@ static class WriterFactory implements DataWriterFactory<InternalRow> {
private final long targetFileSize;
private final Schema writeSchema;
private final StructType dsSchema;
private final boolean partitionedFanoutEnabled;

WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
Map<String, String> properties, Broadcast<FileIO> io,
Broadcast<EncryptionManager> encryptionManager, long targetFileSize,
Schema writeSchema, StructType dsSchema) {
Schema writeSchema, StructType dsSchema, boolean partitionedFanoutEnabled) {
this.spec = spec;
this.format = format;
this.locations = locations;
Expand All @@ -260,6 +268,7 @@ static class WriterFactory implements DataWriterFactory<InternalRow> {
this.targetFileSize = targetFileSize;
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
this.partitionedFanoutEnabled = partitionedFanoutEnabled;
}

@Override
Expand All @@ -270,9 +279,12 @@ public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, lo

if (spec.fields().isEmpty()) {
return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);
} else if (partitionedFanoutEnabled) {
return new PartitionedFanout24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize,
writeSchema, dsSchema);
} else {
return new Partitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(),
targetFileSize, writeSchema, dsSchema);
return new Partitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize,
writeSchema, dsSchema);
}
}
}
Expand Down Expand Up @@ -307,4 +319,23 @@ public WriterCommitMessage commit() throws IOException {
return new TaskCommit(complete());
}
}

private static class PartitionedFanout24Writer extends SparkPartitionedFanoutWriter
implements DataWriter<InternalRow> {

PartitionedFanout24Writer(PartitionSpec spec, FileFormat format,
SparkAppenderFactory appenderFactory,
OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize,
Schema schema, StructType sparkSchema) {
super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize, schema,
sparkSchema);
}

@Override
public WriterCommitMessage commit() throws IOException {
close();

return new TaskCommit(complete());
}
}
}
Loading