Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dataflow/snippets/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<apache_beam.version>2.63.0</apache_beam.version>
<apache_beam.version>2.62.0</apache_beam.version>
<slf4j.version>2.0.12</slf4j.version>
<parquet.version>1.14.0</parquet.version>
<iceberg.version>1.4.2</iceberg.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.dataflow;

// [START dataflow_apache_iceberg_dynamic_destinations]
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;

public class ApacheIcebergDynamicDestinations {

// The schema for the table rows.
public static final Schema SCHEMA = new Schema.Builder()
.addInt64Field("id")
.addStringField("name")
.addStringField("airport")
.build();

// The data to write to table, formatted as JSON strings.
static final List<String> TABLE_ROWS = List.of(
"{\"id\":0, \"name\":\"Alice\", \"airport\": \"ORD\" }",
"{\"id\":1, \"name\":\"Bob\", \"airport\": \"SYD\" }",
"{\"id\":2, \"name\":\"Charles\", \"airport\": \"ORD\" }"
);

public interface Options extends PipelineOptions {
@Description("The URI of the Apache Iceberg warehouse location")
String getWarehouseLocation();

void setWarehouseLocation(String value);

@Description("The name of the Apache Iceberg catalog")
String getCatalogName();

void setCatalogName(String value);
}

// Write JSON data to Apache Iceberg, using dynamic destinations to determine the Iceberg table
// where Dataflow writes each record. The JSON data contains a field named "airport". The
// Dataflow pipeline writes to Iceberg tables with the naming pattern "flights-{airport}".
public static void main(String[] args) {
// Parse the pipeline options passed into the application. Example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: insert a line break before the comment. Otherwise this reads like a "wall of text".

// --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);

// Configure the Iceberg source I/O
Map catalogConfig = ImmutableMap.<String, Object>builder()
.put("warehouse", options.getWarehouseLocation())
.put("type", "hadoop")
.build();

ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("catalog_name", options.getCatalogName())
.put("catalog_properties", catalogConfig)
// Route the incoming records based on the value of the "airport" field.
.put("table", "flights-{airport}")
// Specify which fields to keep from the input data.
.put("keep", Arrays.asList("name", "id"))
.build();

// Build the pipeline.
pipeline
// Read in-memory JSON data.
.apply(Create.of(TABLE_ROWS))
// Convert the JSON records to Row objects.
.apply(JsonToRow.withSchema(SCHEMA))
// Write each Row to Apache Iceberg.
.apply(Managed.write(Managed.ICEBERG).withConfig(config));

// Run the pipeline.
pipeline.run().waitUntilFinish();
}
}
// [END dataflow_apache_iceberg_dynamic_destinations]
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@

package com.example.dataflow;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.common.collect.ImmutableMap;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.UUID;
import org.apache.beam.sdk.PipelineResult;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
Expand Down Expand Up @@ -52,25 +52,19 @@
import org.junit.Test;

public class ApacheIcebergIT {
private ByteArrayOutputStream bout;
private final PrintStream originalOut = System.out;

private static final String CATALOG_NAME = "local";
private static final String TABLE_NAME = "table1";
private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(TABLE_NAME);

// The output file that the Dataflow pipeline writes.
private static final String OUTPUT_FILE_NAME_PREFIX = UUID.randomUUID().toString();
private static final String OUTPUT_FILE_NAME = OUTPUT_FILE_NAME_PREFIX + "-00000-of-00001.txt";

private Configuration hadoopConf = new Configuration();
private java.nio.file.Path warehouseDirectory;
private String warehouseLocation;
private Catalog catalog;
private Table table;
private static final String CATALOG_NAME = "local";

String outputFileNamePrefix = UUID.randomUUID().toString();
String outputFileName = outputFileNamePrefix + "-00000-of-00001.txt";

private void createIcebergTable(Catalog catalog, TableIdentifier tableId) {
private Table createIcebergTable(String name) {

TableIdentifier tableId = TableIdentifier.of(name);

// This schema represents an Iceberg table schema. It needs to match the
// org.apache.beam.sdk.schemas.Schema that is defined in ApacheIcebergWrite. However, these
Expand All @@ -79,10 +73,10 @@ private void createIcebergTable(Catalog catalog, TableIdentifier tableId) {
NestedField.required(1, "id", Types.LongType.get()),
NestedField.optional(2, "name", Types.StringType.get()));

table = catalog.createTable(tableId, schema);
return catalog.createTable(tableId, schema);
}

private void writeTableRecord()
private void writeTableRecord(Table table)
throws IOException {
GenericRecord record = GenericRecord.create(table.schema());
record.setField("id", 0L);
Expand All @@ -109,72 +103,94 @@ private void writeTableRecord()
.commit();
}

private boolean tableContainsRecord(Table table, String data) {
CloseableIterable<Record> records = IcebergGenerics.read(table).build();
for (Record r : records) {
if (r.toString().contains(data)) {
return true;
}
}
return false;
}

@Before
public void setUp() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment: consider keeping the pipe for stdout to bout. I think that the sample should attempt to process the result by printing messages to the console.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sample writes records to an Iceberg catalog, so the IT tests whether the records were added successfully.

Previously I was doing this in a roundabout way (printing the records to stdout first) but this version seems more direct.

bout = new ByteArrayOutputStream();
System.setOut(new PrintStream(bout));

// Create an Apache Iceberg catalog with a table.
warehouseDirectory = Files.createTempDirectory("test-warehouse");
warehouseLocation = "file:" + warehouseDirectory.toString();
System.out.println(warehouseLocation);
catalog =
CatalogUtil.loadCatalog(
CatalogUtil.ICEBERG_CATALOG_HADOOP,
CATALOG_NAME,
ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation),
hadoopConf);
createIcebergTable(catalog, TABLE_IDENTIFIER);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extraneous line if unneeded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

}

@After
public void tearDown() throws IOException {
Files.deleteIfExists(Paths.get(OUTPUT_FILE_NAME));
System.setOut(originalOut);
Files.deleteIfExists(Paths.get(outputFileName));
}

@Test
public void testApacheIcebergWrite() {
String tableName = "write_table";
final Table table = createIcebergTable("write_table");

// Run the Dataflow pipeline.
ApacheIcebergWrite.main(
new String[] {
"--runner=DirectRunner",
"--warehouseLocation=" + warehouseLocation,
"--catalogName=" + CATALOG_NAME,
"--tableName=" + TABLE_NAME
"--tableName=" + tableName
});

// Verify that the pipeline wrote records to the table.
Table table = catalog.loadTable(TABLE_IDENTIFIER);
CloseableIterable<Record> records = IcebergGenerics.read(table)
.build();
for (Record r : records) {
System.out.println(r);
}
assertTrue(tableContainsRecord(table, "0, Alice"));
assertTrue(tableContainsRecord(table, "1, Bob"));
assertTrue(tableContainsRecord(table, "2, Charles"));
}

@Test
public void testApacheIcebergDynamicDestinations() {
final Table tableORD = createIcebergTable("flights-ORD");
final Table tableSYD = createIcebergTable("flights-SYD");

// Run the Dataflow pipeline.
ApacheIcebergDynamicDestinations.main(
new String[] {
"--runner=DirectRunner",
"--warehouseLocation=" + warehouseLocation,
"--catalogName=" + CATALOG_NAME
});

String got = bout.toString();
assertTrue(got.contains("0, Alice"));
assertTrue(got.contains("1, Bob"));
assertTrue(got.contains("2, Charles"));
// Verify that the pipeline wrote records to the correct tables.
assertTrue(tableContainsRecord(tableORD, "0, Alice"));
assertTrue(tableContainsRecord(tableORD, "2, Charles"));
assertTrue(tableContainsRecord(tableSYD, "1, Bob"));
}

@Test
public void testApacheIcebergRead() throws IOException {
String tableName = "read_table";
final Table table = createIcebergTable(tableName);

// Seed the Apache Iceberg table with data.
writeTableRecord();
writeTableRecord(table);

// Run the Dataflow pipeline.
ApacheIcebergRead.main(
new String[] {
"--runner=DirectRunner",
"--warehouseLocation=" + warehouseLocation,
"--catalogName=" + CATALOG_NAME,
"--tableName=" + TABLE_NAME,
"--outputPath=" + OUTPUT_FILE_NAME_PREFIX
"--tableName=" + tableName,
"--outputPath=" + outputFileNamePrefix
});

// Verify the pipeline wrote the table data to a local file.
String output = Files.readString(Paths.get(OUTPUT_FILE_NAME));
// Verify the pipeline wrote the table data to a text file.
String output = Files.readString(Paths.get(outputFileName));
assertTrue(output.contains("0:Person-0"));
}
}
}