From b68cbcbbb80a75b4b9768df0b3658b38479e25d6 Mon Sep 17 00:00:00 2001 From: Veronica Wasson <3992422+VeronicaWasson@users.noreply.github.com> Date: Fri, 1 Nov 2024 21:53:39 +0000 Subject: [PATCH 1/6] docs(sample): Dataflow to Apache Iceberg with dynamic destinations --- dataflow/snippets/pom.xml | 2 +- .../ApacheIcebergDynamicDestinations.java | 99 +++++++++++++++++++ .../com/example/dataflow/ApacheIcebergIT.java | 89 ++++++++++------- 3 files changed, 153 insertions(+), 37 deletions(-) create mode 100644 dataflow/snippets/src/main/java/com/example/dataflow/ApacheIcebergDynamicDestinations.java diff --git a/dataflow/snippets/pom.xml b/dataflow/snippets/pom.xml index 76a6d12a149..f2403b449a1 100755 --- a/dataflow/snippets/pom.xml +++ b/dataflow/snippets/pom.xml @@ -37,7 +37,7 @@ 11 11 UTF-8 - 2.58.0 + 2.60.0 2.0.12 1.14.0 1.4.2 diff --git a/dataflow/snippets/src/main/java/com/example/dataflow/ApacheIcebergDynamicDestinations.java b/dataflow/snippets/src/main/java/com/example/dataflow/ApacheIcebergDynamicDestinations.java new file mode 100644 index 00000000000..1dfda9c0402 --- /dev/null +++ b/dataflow/snippets/src/main/java/com/example/dataflow/ApacheIcebergDynamicDestinations.java @@ -0,0 +1,99 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +// [START dataflow_apache_iceberg_dynamic_destinations] +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.JsonToRow; + +public class ApacheIcebergDynamicDestinations { + + // The schema for the table rows. + public static final Schema SCHEMA = new Schema.Builder() + .addInt64Field("id") + .addStringField("name") + .addStringField("airport") + .build(); + + // The data to write to table, formatted as JSON strings. + static final List TABLE_ROWS = Arrays.asList( + "{\"id\":0, \"name\":\"Alice\", \"airport\": \"ORD\" }", + "{\"id\":1, \"name\":\"Bob\", \"airport\": \"SYD\" }", + "{\"id\":2, \"name\":\"Charles\", \"airport\": \"ORD\" }" + ); + + // [END dataflow_apache_iceberg_dynamic_destinations] + public interface Options extends PipelineOptions { + @Description("The URI of the Apache Iceberg warehouse location") + String getWarehouseLocation(); + + void setWarehouseLocation(String value); + + @Description("The name of the Apache Iceberg catalog") + String getCatalogName(); + + void setCatalogName(String value); + } + + public static PipelineResult.State main(String[] args) { + // Parse the pipeline options passed into the application. Example: + // --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \ + // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options + var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline pipeline = createPipeline(options); + return pipeline.run().waitUntilFinish(); + } + + // [START dataflow_apache_iceberg_dynamic_destinations] + public static Pipeline createPipeline(Options options) { + Pipeline pipeline = Pipeline.create(options); + + // Configure the Iceberg source I/O + Map catalogConfig = ImmutableMap.builder() + .put("warehouse", options.getWarehouseLocation()) + .put("type", "hadoop") + .build(); + + ImmutableMap config = ImmutableMap.builder() + .put("catalog_name", options.getCatalogName()) + .put("catalog_properties", catalogConfig) + // Route the incoming records based on the value of the "airport" field. + .put("table", "flights-{airport}") + // Specify which fields to keep from the input data. + .put("keep", Arrays.asList("name", "id")) + .build(); + + // Build the pipeline. + pipeline.apply(Create.of(TABLE_ROWS)) + .apply(JsonToRow.withSchema(SCHEMA)) + .apply(Managed.write(Managed.ICEBERG).withConfig(config)); + + return pipeline; + } +} +// [END dataflow_apache_iceberg_dynamic_destinations] diff --git a/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java b/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java index 1c7cfcbe213..0e1a55ef4b4 100644 --- a/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java +++ b/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java @@ -16,15 +16,15 @@ package com.example.dataflow; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import com.google.common.collect.ImmutableMap; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.PrintStream; import java.nio.file.Files; import java.nio.file.Paths; import java.util.UUID; +import org.apache.beam.sdk.PipelineResult; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.iceberg.CatalogProperties; @@ -52,25 +52,19 @@ import org.junit.Test; public class ApacheIcebergIT { - private ByteArrayOutputStream bout; - private final PrintStream originalOut = System.out; - - private static final String CATALOG_NAME = "local"; - private static final String TABLE_NAME = "table1"; - private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(TABLE_NAME); - - // The output file that the Dataflow pipeline writes. - private static final String OUTPUT_FILE_NAME_PREFIX = UUID.randomUUID().toString(); - private static final String OUTPUT_FILE_NAME = OUTPUT_FILE_NAME_PREFIX + "-00000-of-00001.txt"; private Configuration hadoopConf = new Configuration(); private java.nio.file.Path warehouseDirectory; private String warehouseLocation; private Catalog catalog; - private Table table; + private static final String CATALOG_NAME = "local"; + String OUTPUT_FILE_NAME_PREFIX = UUID.randomUUID().toString(); + String OUTPUT_FILE_NAME = OUTPUT_FILE_NAME_PREFIX + "-00000-of-00001.txt"; - private void createIcebergTable(Catalog catalog, TableIdentifier tableId) { + private Table createIcebergTable(String name) { + + TableIdentifier tableId = TableIdentifier.of(name); // This schema represents an Iceberg table schema. It needs to match the // org.apache.beam.sdk.schemas.Schema that is defined in ApacheIcebergWrite. However, these @@ -79,10 +73,10 @@ private void createIcebergTable(Catalog catalog, TableIdentifier tableId) { NestedField.required(1, "id", Types.LongType.get()), NestedField.optional(2, "name", Types.StringType.get())); - table = catalog.createTable(tableId, schema); + return catalog.createTable(tableId, schema); } - private void writeTableRecord() + private void writeTableRecord(Table table) throws IOException { GenericRecord record = GenericRecord.create(table.schema()); record.setField("id", 0L); @@ -109,59 +103,82 @@ private void writeTableRecord() .commit(); } + private boolean tableContainsRecord(Table table, String data) { + CloseableIterable records = IcebergGenerics.read(table).build(); + for (Record r : records) { + if (r.toString().contains(data)) { + return true; + } + } + return false; + } + @Before public void setUp() throws IOException { - bout = new ByteArrayOutputStream(); - System.setOut(new PrintStream(bout)); - // Create an Apache Iceberg catalog with a table. warehouseDirectory = Files.createTempDirectory("test-warehouse"); warehouseLocation = "file:" + warehouseDirectory.toString(); - System.out.println(warehouseLocation); catalog = CatalogUtil.loadCatalog( CatalogUtil.ICEBERG_CATALOG_HADOOP, CATALOG_NAME, ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation), hadoopConf); - createIcebergTable(catalog, TABLE_IDENTIFIER); + } @After public void tearDown() throws IOException { Files.deleteIfExists(Paths.get(OUTPUT_FILE_NAME)); - System.setOut(originalOut); } @Test public void testApacheIcebergWrite() { + String tableName = "write_table"; + Table table = createIcebergTable("write_table"); + // Run the Dataflow pipeline. ApacheIcebergWrite.main( new String[] { "--runner=DirectRunner", "--warehouseLocation=" + warehouseLocation, "--catalogName=" + CATALOG_NAME, - "--tableName=" + TABLE_NAME + "--tableName=" + tableName }); // Verify that the pipeline wrote records to the table. - Table table = catalog.loadTable(TABLE_IDENTIFIER); - CloseableIterable records = IcebergGenerics.read(table) - .build(); - for (Record r : records) { - System.out.println(r); - } + assertTrue(tableContainsRecord(table,"0, Alice")); + assertTrue(tableContainsRecord(table,"1, Bob")); + assertTrue(tableContainsRecord(table,"2, Charles")); + } + + @Test + public void testApacheIcebergDynamicDestinations() { + Table tableORD = createIcebergTable("flights-ORD"); + Table tableSYD = createIcebergTable("flights-SYD"); + + // Run the Dataflow pipeline. + PipelineResult.State state = ApacheIcebergDynamicDestinations.main( + new String[] { + "--runner=DirectRunner", + "--warehouseLocation=" + warehouseLocation, + "--catalogName=" + CATALOG_NAME + }); + assertEquals(PipelineResult.State.DONE, state); - String got = bout.toString(); - assertTrue(got.contains("0, Alice")); - assertTrue(got.contains("1, Bob")); - assertTrue(got.contains("2, Charles")); + // Verify that the pipeline wrote records to the correct tables. + assertTrue(tableContainsRecord(tableORD, "0, Alice")); + assertTrue(tableContainsRecord(tableORD, "2, Charles")); + assertTrue(tableContainsRecord(tableSYD, "1, Bob")); } @Test public void testApacheIcebergRead() throws IOException { + String tableName = "read_table"; + Table table = createIcebergTable(tableName); + // Seed the Apache Iceberg table with data. - writeTableRecord(); + writeTableRecord(table); // Run the Dataflow pipeline. ApacheIcebergRead.main( @@ -169,11 +186,11 @@ public void testApacheIcebergRead() throws IOException { "--runner=DirectRunner", "--warehouseLocation=" + warehouseLocation, "--catalogName=" + CATALOG_NAME, - "--tableName=" + TABLE_NAME, + "--tableName=" + tableName, "--outputPath=" + OUTPUT_FILE_NAME_PREFIX }); - // Verify the pipeline wrote the table data to a local file. + // Verify the pipeline wrote the table data to a text file. String output = Files.readString(Paths.get(OUTPUT_FILE_NAME)); assertTrue(output.contains("0:Person-0")); } From 19fea7664fdeb68b4f40c4430ab0b000613cb39f Mon Sep 17 00:00:00 2001 From: Veronica Wasson <3992422+VeronicaWasson@users.noreply.github.com> Date: Mon, 4 Nov 2024 23:05:33 +0000 Subject: [PATCH 2/6] Fix linter errors --- .../com/example/dataflow/ApacheIcebergIT.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java b/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java index 0e1a55ef4b4..f482ba8cdb9 100644 --- a/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java +++ b/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java @@ -59,8 +59,8 @@ public class ApacheIcebergIT { private Catalog catalog; private static final String CATALOG_NAME = "local"; - String OUTPUT_FILE_NAME_PREFIX = UUID.randomUUID().toString(); - String OUTPUT_FILE_NAME = OUTPUT_FILE_NAME_PREFIX + "-00000-of-00001.txt"; + String outputFileNamePrefix = UUID.randomUUID().toString(); + String outputFileName = outputFileNamePrefix + "-00000-of-00001.txt"; private Table createIcebergTable(String name) { @@ -129,13 +129,13 @@ public void setUp() throws IOException { @After public void tearDown() throws IOException { - Files.deleteIfExists(Paths.get(OUTPUT_FILE_NAME)); + Files.deleteIfExists(Paths.get(outputFileName)); } @Test public void testApacheIcebergWrite() { String tableName = "write_table"; - Table table = createIcebergTable("write_table"); + final Table table = createIcebergTable("write_table"); // Run the Dataflow pipeline. ApacheIcebergWrite.main( @@ -147,15 +147,15 @@ public void testApacheIcebergWrite() { }); // Verify that the pipeline wrote records to the table. - assertTrue(tableContainsRecord(table,"0, Alice")); - assertTrue(tableContainsRecord(table,"1, Bob")); - assertTrue(tableContainsRecord(table,"2, Charles")); + assertTrue(tableContainsRecord(table, "0, Alice")); + assertTrue(tableContainsRecord(table, "1, Bob")); + assertTrue(tableContainsRecord(table, "2, Charles")); } @Test public void testApacheIcebergDynamicDestinations() { - Table tableORD = createIcebergTable("flights-ORD"); - Table tableSYD = createIcebergTable("flights-SYD"); + final Table tableORD = createIcebergTable("flights-ORD"); + final Table tableSYD = createIcebergTable("flights-SYD"); // Run the Dataflow pipeline. PipelineResult.State state = ApacheIcebergDynamicDestinations.main( @@ -175,7 +175,7 @@ public void testApacheIcebergDynamicDestinations() { @Test public void testApacheIcebergRead() throws IOException { String tableName = "read_table"; - Table table = createIcebergTable(tableName); + final Table table = createIcebergTable(tableName); // Seed the Apache Iceberg table with data. writeTableRecord(table); @@ -187,11 +187,11 @@ public void testApacheIcebergRead() throws IOException { "--warehouseLocation=" + warehouseLocation, "--catalogName=" + CATALOG_NAME, "--tableName=" + tableName, - "--outputPath=" + OUTPUT_FILE_NAME_PREFIX + "--outputPath=" + outputFileNamePrefix }); // Verify the pipeline wrote the table data to a text file. - String output = Files.readString(Paths.get(OUTPUT_FILE_NAME)); + String output = Files.readString(Paths.get(outputFileName)); assertTrue(output.contains("0:Person-0")); } } From 4bda967c3ca341298c941cbe2226079ee71b8819 Mon Sep 17 00:00:00 2001 From: Veronica Wasson <3992422+VeronicaWasson@users.noreply.github.com> Date: Wed, 5 Feb 2025 23:01:54 +0000 Subject: [PATCH 3/6] Bump Apache Beam version --- dataflow/snippets/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataflow/snippets/pom.xml b/dataflow/snippets/pom.xml index f2403b449a1..9e14cfe6ada 100755 --- a/dataflow/snippets/pom.xml +++ b/dataflow/snippets/pom.xml @@ -37,7 +37,7 @@ 11 11 UTF-8 - 2.60.0 + 2.62.0 2.0.12 1.14.0 1.4.2 From be09e4393e3e6790b0b403ed1426bd9a3c5e405d Mon Sep 17 00:00:00 2001 From: Veronica Wasson <3992422+VeronicaWasson@users.noreply.github.com> Date: Wed, 5 Feb 2025 23:03:52 +0000 Subject: [PATCH 4/6] Update copyright year --- .../com/example/dataflow/ApacheIcebergDynamicDestinations.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataflow/snippets/src/main/java/com/example/dataflow/ApacheIcebergDynamicDestinations.java b/dataflow/snippets/src/main/java/com/example/dataflow/ApacheIcebergDynamicDestinations.java index 1dfda9c0402..5368f8a2340 100644 --- a/dataflow/snippets/src/main/java/com/example/dataflow/ApacheIcebergDynamicDestinations.java +++ b/dataflow/snippets/src/main/java/com/example/dataflow/ApacheIcebergDynamicDestinations.java @@ -1,5 +1,5 @@ /* - * Copyright 2024 Google LLC + * Copyright 2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 03238b7a12f081d5bffd68d6f397bd51371e8bbd Mon Sep 17 00:00:00 2001 From: Veronica Wasson <3992422+VeronicaWasson@users.noreply.github.com> Date: Tue, 18 Feb 2025 22:58:58 +0000 Subject: [PATCH 5/6] Address PR review feedback --- .../ApacheIcebergDynamicDestinations.java | 16 +++++----------- .../com/example/dataflow/ApacheIcebergIT.java | 5 ++--- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/dataflow/snippets/src/main/java/com/example/dataflow/ApacheIcebergDynamicDestinations.java b/dataflow/snippets/src/main/java/com/example/dataflow/ApacheIcebergDynamicDestinations.java index 5368f8a2340..83b33b01419 100644 --- a/dataflow/snippets/src/main/java/com/example/dataflow/ApacheIcebergDynamicDestinations.java +++ b/dataflow/snippets/src/main/java/com/example/dataflow/ApacheIcebergDynamicDestinations.java @@ -41,13 +41,12 @@ public class ApacheIcebergDynamicDestinations { .build(); // The data to write to table, formatted as JSON strings. - static final List TABLE_ROWS = Arrays.asList( + static final List TABLE_ROWS = List.of( "{\"id\":0, \"name\":\"Alice\", \"airport\": \"ORD\" }", "{\"id\":1, \"name\":\"Bob\", \"airport\": \"SYD\" }", "{\"id\":2, \"name\":\"Charles\", \"airport\": \"ORD\" }" ); - // [END dataflow_apache_iceberg_dynamic_destinations] public interface Options extends PipelineOptions { @Description("The URI of the Apache Iceberg warehouse location") String getWarehouseLocation(); @@ -60,17 +59,11 @@ public interface Options extends PipelineOptions { void setCatalogName(String value); } - public static PipelineResult.State main(String[] args) { + public static void main(String[] args) { // Parse the pipeline options passed into the application. Example: // --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \ // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options - var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - Pipeline pipeline = createPipeline(options); - return pipeline.run().waitUntilFinish(); - } - - // [START dataflow_apache_iceberg_dynamic_destinations] - public static Pipeline createPipeline(Options options) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); Pipeline pipeline = Pipeline.create(options); // Configure the Iceberg source I/O @@ -93,7 +86,8 @@ public static Pipeline createPipeline(Options options) { .apply(JsonToRow.withSchema(SCHEMA)) .apply(Managed.write(Managed.ICEBERG).withConfig(config)); - return pipeline; + // Run the pipeline. + pipeline.run().waitUntilFinish(); } } // [END dataflow_apache_iceberg_dynamic_destinations] diff --git a/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java b/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java index f482ba8cdb9..e2acef1ef1f 100644 --- a/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java +++ b/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java @@ -158,13 +158,12 @@ public void testApacheIcebergDynamicDestinations() { final Table tableSYD = createIcebergTable("flights-SYD"); // Run the Dataflow pipeline. - PipelineResult.State state = ApacheIcebergDynamicDestinations.main( + ApacheIcebergDynamicDestinations.main( new String[] { "--runner=DirectRunner", "--warehouseLocation=" + warehouseLocation, "--catalogName=" + CATALOG_NAME }); - assertEquals(PipelineResult.State.DONE, state); // Verify that the pipeline wrote records to the correct tables. assertTrue(tableContainsRecord(tableORD, "0, Alice")); @@ -194,4 +193,4 @@ public void testApacheIcebergRead() throws IOException { String output = Files.readString(Paths.get(outputFileName)); assertTrue(output.contains("0:Person-0")); } -} +} \ No newline at end of file From 8738b5d2a2124700e7ccd51bbdc6d8e547e21046 Mon Sep 17 00:00:00 2001 From: Veronica Wasson <3992422+VeronicaWasson@users.noreply.github.com> Date: Tue, 18 Feb 2025 23:16:03 +0000 Subject: [PATCH 6/6] Add code comments --- .../dataflow/ApacheIcebergDynamicDestinations.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/dataflow/snippets/src/main/java/com/example/dataflow/ApacheIcebergDynamicDestinations.java b/dataflow/snippets/src/main/java/com/example/dataflow/ApacheIcebergDynamicDestinations.java index 83b33b01419..0aa14040bb5 100644 --- a/dataflow/snippets/src/main/java/com/example/dataflow/ApacheIcebergDynamicDestinations.java +++ b/dataflow/snippets/src/main/java/com/example/dataflow/ApacheIcebergDynamicDestinations.java @@ -59,6 +59,9 @@ public interface Options extends PipelineOptions { void setCatalogName(String value); } + // Write JSON data to Apache Iceberg, using dynamic destinations to determine the Iceberg table + // where Dataflow writes each record. The JSON data contains a field named "airport". The + // Dataflow pipeline writes to Iceberg tables with the naming pattern "flights-{airport}". public static void main(String[] args) { // Parse the pipeline options passed into the application. Example: // --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \ @@ -82,8 +85,12 @@ public static void main(String[] args) { .build(); // Build the pipeline. - pipeline.apply(Create.of(TABLE_ROWS)) + pipeline + // Read in-memory JSON data. + .apply(Create.of(TABLE_ROWS)) + // Convert the JSON records to Row objects. .apply(JsonToRow.withSchema(SCHEMA)) + // Write each Row to Apache Iceberg. .apply(Managed.write(Managed.ICEBERG).withConfig(config)); // Run the pipeline.