Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 4
"modification": 5
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* IcebergIO: support declaring a table's sort order on dynamic table creation via the new `sort_fields` config ([#38269](https://github.com/apache/beam/issues/38269)).

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,11 @@
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
Expand Down Expand Up @@ -145,19 +147,22 @@ public class AddFiles extends PTransform<PCollection<String>, PCollectionRowTupl
private final int manifestFileSize;
private final @Nullable String locationPrefix;
private final @Nullable List<String> partitionFields;
private final @Nullable List<String> sortFields;
private final @Nullable Map<String, String> tableProps;

public AddFiles(
IcebergCatalogConfig catalogConfig,
String tableIdentifier,
@Nullable String locationPrefix,
@Nullable List<String> partitionFields,
@Nullable List<String> sortFields,
@Nullable Map<String, String> tableProps,
@Nullable Integer manifestFileSize,
@Nullable Duration intervalTrigger) {
this.catalogConfig = catalogConfig;
this.tableIdentifier = tableIdentifier;
this.partitionFields = partitionFields;
this.sortFields = sortFields;
this.tableProps = tableProps;
this.intervalTrigger = intervalTrigger;
this.manifestFileSize =
Expand Down Expand Up @@ -193,6 +198,7 @@ public PCollectionRowTuple expand(PCollection<String> input) {
tableIdentifier,
locationPrefix,
partitionFields,
sortFields,
tableProps))
.withOutputTags(DATA_FILES, TupleTagList.of(ERRORS)));
SchemaCoder<SerializableDataFile> sdfCoder;
Expand Down Expand Up @@ -267,6 +273,7 @@ static class ConvertToDataFile extends DoFn<String, SerializableDataFile> {
public static final TupleTag<SerializableDataFile> DATA_FILES = new TupleTag<>();
private final @Nullable String prefix;
private final @Nullable List<String> partitionFields;
private final @Nullable List<String> sortFields;
private final @Nullable Map<String, String> tableProps;
private transient @MonotonicNonNull ExecutorService executor;
private transient @MonotonicNonNull LinkedList<Future<ProcessResult>> activeTasks;
Expand All @@ -281,11 +288,13 @@ public ConvertToDataFile(
String identifier,
@Nullable String prefix,
@Nullable List<String> partitionFields,
@Nullable List<String> sortFields,
@Nullable Map<String, String> tableProps) {
this.catalogConfig = catalogConfig;
this.identifier = identifier;
this.prefix = prefix;
this.partitionFields = partitionFields;
this.sortFields = sortFields;
this.tableProps = tableProps;
}

Expand Down Expand Up @@ -522,12 +531,18 @@ private Table getOrCreateTable(String filePath, FileFormat format) throws IOExce
try {
org.apache.iceberg.Schema schema = getSchema(filePath, format);
PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema);
SortOrder sortOrder = SortOrderUtils.toSortOrder(sortFields, schema);

return tableProps == null
? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec)
: catalogConfig
Catalog.TableBuilder builder =
catalogConfig
.catalog()
.createTable(TableIdentifier.parse(identifier), schema, spec, tableProps);
.buildTable(tableId, schema)
.withPartitionSpec(spec)
.withSortOrder(sortOrder);
Comment thread
dejii marked this conversation as resolved.
if (tableProps != null) {
builder.withProperties(tableProps);
}
return builder.create();
} catch (AlreadyExistsException e2) { // if table already exists, just load it
return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ public static Builder builder() {
+ " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.")
public abstract @Nullable Map<String, String> getTableProperties();

@SchemaFieldDescription(
"Fields used to set the table's sort order, applied when the table is created. "
+ "Each entry has the form `<term> [asc|desc] [nulls first|nulls last]`, where `<term>` "
+ "is a field name or one of the partition transforms (e.g. `bucket(col, 4)`, `day(ts)`). "
+ "Direction defaults to ascending; null order defaults to nulls-first for ascending and "
+ "nulls-last for descending.\n"
+ "For more information on sort orders, please visit https://iceberg.apache.org/spec/#sort-orders.")
public abstract @Nullable List<String> getSortFields();

@SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
public abstract @Nullable ErrorHandling getErrorHandling();

Expand All @@ -127,6 +136,8 @@ public abstract static class Builder {

public abstract Builder setTableProperties(Map<String, String> props);

public abstract Builder setSortFields(List<String> sortFields);

public abstract Builder setErrorHandling(ErrorHandling errorHandling);

public abstract Configuration build();
Expand Down Expand Up @@ -172,6 +183,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
configuration.getTable(),
configuration.getLocationPrefix(),
configuration.getPartitionFields(),
configuration.getSortFields(),
configuration.getTableProperties(),
configuration.getManifestFileSize(),
frequency != null ? Duration.standardSeconds(frequency) : null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SortOrder;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;

Expand All @@ -41,6 +43,16 @@ public PartitionSpec getPartitionSpec() {
@Pure
public abstract @Nullable List<String> getPartitionFields();

/** Sort order to apply when the table is dynamically created. */
@Pure
@SchemaIgnore
public SortOrder getSortOrder() {
Comment thread
dejii marked this conversation as resolved.
return SortOrderUtils.toSortOrder(getSortFields(), getSchema());
}

@Pure
public abstract @Nullable List<String> getSortFields();

@Pure
public abstract @Nullable Map<String, String> getTableProperties();

Expand All @@ -55,6 +67,8 @@ public abstract static class Builder {

public abstract Builder setPartitionFields(@Nullable List<String> partitionFields);

public abstract Builder setSortFields(@Nullable List<String> sortFields);

public abstract Builder setTableProperties(@Nullable Map<String, String> tableProperties);

@Pure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ public static Builder builder() {
+ " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.")
public abstract @Nullable Map<String, String> getTableProperties();

@SchemaFieldDescription(
"Fields used to set the table's sort order, applied when the table is created. "
+ "Each entry has the form `<term> [asc|desc] [nulls first|nulls last]`, where `<term>` "
+ "is a field name or one of the partition transforms (e.g. `bucket(col, 4)`, `day(ts)`). "
+ "Direction defaults to ascending; null order defaults to nulls-first for ascending and "
+ "nulls-last for descending. Note: this sets the table's declared sort order as metadata; "
+ "it does not cause Beam to physically sort records before writing.\n"
+ "For more information on sort orders, please visit https://iceberg.apache.org/spec/#sort-orders.")
public abstract @Nullable List<String> getSortFields();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTable(String table);
Expand All @@ -158,6 +168,8 @@ public abstract static class Builder {

public abstract Builder setTableProperties(Map<String, String> tableProperties);

public abstract Builder setSortFields(List<String> sortFields);

public abstract Configuration build();
}

Expand Down Expand Up @@ -223,6 +235,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
FileFormat.PARQUET.toString(),
rows.getSchema(),
configuration.getPartitionFields(),
configuration.getSortFields(),
configuration.getTableProperties(),
configuration.getDrop(),
configuration.getKeep(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,22 @@ class PortableIcebergDestinations implements DynamicDestinations {
private final String fileFormat;

private final @Nullable List<String> partitionFields;
private final @Nullable List<String> sortFields;
private final @Nullable Map<String, String> tableProperties;

public PortableIcebergDestinations(
String destinationTemplate,
String fileFormat,
Schema inputSchema,
@Nullable List<String> partitionFields,
@Nullable List<String> sortFields,
@Nullable Map<String, String> tableProperties,
@Nullable List<String> fieldsToDrop,
@Nullable List<String> fieldsToKeep,
@Nullable String onlyField) {
this.interpolator = new RowStringInterpolator(destinationTemplate, inputSchema);
this.partitionFields = partitionFields;
this.sortFields = sortFields;
this.tableProperties = tableProperties;
RowFilter rf = new RowFilter(inputSchema);

Expand Down Expand Up @@ -86,6 +89,7 @@ public IcebergDestination instantiateDestination(String dest) {
IcebergTableCreateConfig.builder()
.setSchema(getDataSchema())
.setPartitionFields(partitionFields)
.setSortFields(sortFields)
.setTableProperties(tableProperties)
.build())
.setFileFormat(FileFormat.fromString(fileFormat))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
Expand Down Expand Up @@ -329,6 +330,7 @@ Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
@Nullable IcebergTableCreateConfig createConfig = destination.getTableCreateConfig();
PartitionSpec partitionSpec =
createConfig != null ? createConfig.getPartitionSpec() : PartitionSpec.unpartitioned();
SortOrder sortOrder = createConfig != null ? createConfig.getSortOrder() : SortOrder.unsorted();
Map<String, String> tableProperties =
createConfig != null && createConfig.getTableProperties() != null
? createConfig.getTableProperties()
Expand Down Expand Up @@ -357,13 +359,20 @@ Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
} catch (NoSuchTableException e) { // Otherwise, create the table
org.apache.iceberg.Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
try {
table = catalog.createTable(identifier, tableSchema, partitionSpec, tableProperties);
table =
catalog
.buildTable(identifier, tableSchema)
.withPartitionSpec(partitionSpec)
.withSortOrder(sortOrder)
.withProperties(tableProperties)
.create();
LOG.info(
"Created Iceberg table '{}' with schema: {}\n"
+ ", partition spec: {}, table properties: {}",
+ ", partition spec: {}, sort order: {}, table properties: {}",
identifier,
tableSchema,
partitionSpec,
sortOrder,
tableProperties);
} catch (AlreadyExistsException ignored) {
// race condition: another worker already created this table
Expand Down
Loading
Loading