Skip to content

Conversation

@VeronicaWasson
Copy link
Contributor

Description

Add snippet for Iceberg dynamic destinations

Relevant doc bug: b/371047621

Checklist

  • I have followed Sample Format Guide
  • pom.xml parent set to latest shared-configuration
  • Appropriate changes to README are included in PR
  • These samples need a new API enabled in testing projects to pass (let us know which ones)
  • These samples need a new/updated env vars in testing projects set to pass (let us know which ones)
  • Tests pass: mvn clean verify required
  • Lint passes: mvn -P lint checkstyle:check required
  • Static Analysis: mvn -P lint clean compile pmd:cpd-check spotbugs:check advisory only
  • This sample adds a new sample directory, and I updated the CODEOWNERS file with the codeowners for this sample
  • This sample adds a new Product API, and I updated the Blunderbuss issue/PR auto-assigner with the codeowners for this sample
  • Please merge this PR for me once it is approved

@VeronicaWasson VeronicaWasson requested review from a team and yoshi-approver as code owners November 4, 2024 23:06
@product-auto-label product-auto-label bot added samples Issues that are directly related to samples. api: dataflow Issues related to the Dataflow API. labels Nov 4, 2024
@snippet-bot
Copy link

snippet-bot bot commented Nov 4, 2024

Here is the summary of changes.

You are about to add 1 region tag.

This comment is generated by snippet-bot.
If you find problems with this result, please file an issue at:
https://github.com/googleapis/repo-automation-bots/issues.
To update this comment, add snippet-bot:force-run label or use the checkbox below:

  • Refresh this comment

@iennae
Copy link
Contributor

iennae commented Jan 16, 2025

@code-review-assist review

Copy link

@ghost ghost left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pull request adds a snippet demonstrating dynamic destinations with Apache Iceberg in Dataflow and refactors integration tests. The changes are well-structured and address a relevant documentation bug. Here are a few suggestions based on the Google Java Style Guide:

  • Use diamond operators for type inference.
  • Declare Map and ImmutableMap using their interfaces.
  • Consider using List.of() for creating immutable lists when possible.

@iennae
Copy link
Contributor

iennae commented Jan 16, 2025

@code-review-assist walkthrough

@ghost
Copy link

ghost commented Jan 16, 2025

Dataflow to Apache Iceberg Samples Walkthrough

This repository contains Java code samples demonstrating how to interact with Apache Iceberg tables using Apache Beam and Google Cloud Dataflow. The primary goal is to provide clear, concise examples for developers to integrate Dataflow pipelines with Iceberg for data ingestion and processing.

Repository Structure

The repository is structured as follows:

  • dataflow/snippets/: This directory contains the core Dataflow pipeline code samples. This is the main focus of this walkthrough and the pull request.
    • pom.xml: This file manages project dependencies. The pull request updates the Apache Beam version to 2.60.0, enabling the use of dynamic destinations which is a key feature of this PR. This is a crucial change as it unlocks new functionality.
    • src/main/java/: Contains the Java source code for the Dataflow pipelines.
      • com.example.dataflow.*: This package contains the pipeline implementations. The ApacheIcebergDynamicDestinations class is the primary addition in this PR, demonstrating the use of dynamic destinations in Iceberg.
    • src/test/java/: Contains the integration tests for the pipeline code. The pull request refactors these tests to handle multiple destination tables, making them more robust and adaptable to the dynamic destination feature.

Code Walkthrough

Let's trace the execution flow of the ApacheIcebergDynamicDestinations pipeline (the main addition in this PR):

  1. Pipeline Creation: The createPipeline method in ApacheIcebergDynamicDestinations.java creates a new Apache Beam pipeline. The pipeline options (warehouse location and catalog name) are passed in from the command line.
  2. Iceberg Configuration: The pipeline configures the Iceberg I/O connector using Managed.write(Managed.ICEBERG). Crucially, it sets the table property to "flights-{airport}". This utilizes the dynamic destination feature (introduced in Beam 2.60) allowing the pipeline to write to different Iceberg tables based on the airport field in the input data. This is the core functionality added by this PR.
  3. Data Ingestion: The pipeline ingests sample data using Create.of(TABLE_ROWS). This data is in JSON format and represents flight information (id, name, airport).
  4. Schema Conversion: JsonToRow.withSchema(SCHEMA) converts the JSON data into Beam Row objects, aligning with the Iceberg schema.
  5. Data Writing: Finally, Managed.write(Managed.ICEBERG).withConfig(config) writes the data to the specified Iceberg tables. The dynamic destination configuration ensures that records are routed to the appropriate table based on the airport value.

Integration Tests

The integration tests in ApacheIcebergIT.java verify the functionality of the pipelines. The pull request enhances these tests by:

  • Handling Multiple Tables: The tests now create and verify multiple Iceberg tables, reflecting the dynamic destination capability. The previous version assumed only one table, which is no longer the case with the new dynamic destination feature.
  • Improved Assertions: The tests use more precise assertions to check for the presence of specific records in the correct tables, ensuring comprehensive validation.

Pull Request Impact

This pull request introduces a significant enhancement by adding support for dynamic destinations in Apache Iceberg. The updated pom.xml ensures compatibility with the required Beam version. The new ApacheIcebergDynamicDestinations sample demonstrates this feature, and the improved integration tests validate its correctness and robustness. The changes are well-integrated and enhance the overall functionality and usability of the samples.

Potential Improvements

While the PR is well-structured, adding more comprehensive error handling and logging could further improve its robustness. Consider adding checks for null or empty values in the input data and handling potential exceptions during table creation or data writing. More detailed logging would aid in debugging and troubleshooting.

@VeronicaWasson
Copy link
Contributor Author

@iennae @kweinmeister Should I address anything from the code review bot?

@grayside grayside changed the title docs(samples): Dataflow to Apache Iceberg with dynamic destinations feat(dataflow): Dataflow to Apache Iceberg with dynamic destinations Jan 30, 2025
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<apache_beam.version>2.58.0</apache_beam.version>
<apache_beam.version>2.60.0</apache_beam.version>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a little bit old version. cc @ahmedabu98

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 but noting that Dataflow will be upgrading Iceberg to the latest version anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bumped the version

Copy link

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! Thanks for adding this example.

LGTM

<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<apache_beam.version>2.58.0</apache_beam.version>
<apache_beam.version>2.60.0</apache_beam.version>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 but noting that Dataflow will be upgrading Iceberg to the latest version anyways.

@telpirion telpirion self-assigned this Feb 13, 2025
Copy link
Contributor

@telpirion telpirion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @VeronicaWasson ! This sample looks really good. I have just a handful of comments for you. Please take a look at your earliest convenience.

Tip: I recommend formatting this sample similar to other Java samples. Here's a recently merged sample that follows all style guidance.

void setCatalogName(String value);
}

public static PipelineResult.State main(String[] args) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: return void instead of a result. Move the call to pipeline.run().waitUntilFinish() into the createPipeline() method.

See:
https://googlecloudplatform.github.io/samples-style-guide/#result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

(fwiw, I've gotten some conflicting guidance on this in previous Dataflow code snippets)

.apply(JsonToRow.withSchema(SCHEMA))
.apply(Managed.write(Managed.ICEBERG).withConfig(config));

return pipeline;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: process the response from the pipeline in some manner.

See:
https://googlecloudplatform.github.io/samples-style-guide/#pattern

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I'm no longer returning the result, there's nothing to check here. (If the pipeline fails, the IT will fail.)

"{\"id\":2, \"name\":\"Charles\", \"airport\": \"ORD\" }"
);

// [END dataflow_apache_iceberg_dynamic_destinations]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: don't omit the interface definition here from the rest of the sample. If the code requires that the developer extend an interface, then we should show it in the sample.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

// Parse the pipeline options passed into the application. Example:
// --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: declare variables with types, rather than var. As illustrative/didactic code, we want to communicate to the reader what sort of type they need to work with (especially for strongly-typed languages).

See:
https://github.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md#java-11-features

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

.build();

// The data to write to table, formatted as JSON strings.
static final List<String> TABLE_ROWS = Arrays.asList(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would use List.of() instead here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

void setCatalogName(String value);
}

public static PipelineResult.State main(String[] args) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: remove the arg parsing from the sample.

See:
https://googlecloudplatform.github.io/samples-style-guide/#no-cli

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is idiomatic for Dataflow / Apache Beam. To run the pipeline, the user passes in command line arguments that get parsed via the PipelineOptions class.

e.g.:

https://beam.apache.org/documentation/programming-guide/#pipeline-options-cli

}

// [START dataflow_apache_iceberg_dynamic_destinations]
public static Pipeline createPipeline(Options options) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would provide a comment for this method that describes what the code sample does. It's hard for me to understand, just from a casual glance, what effect this code has.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing that out, I added some more comments.

}

@Before
public void setUp() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment: consider keeping the pipe for stdout to bout. I think that the sample should attempt to process the result by printing messages to the console.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sample writes records to an Iceberg catalog, so the IT tests whether the records were added successfully.

Previously I was doing this in a roundabout way (printing the records to stdout first) but this version seems more direct.

ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation),
hadoopConf);
createIcebergTable(catalog, TABLE_IDENTIFIER);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extraneous line if unneeded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

final Table tableSYD = createIcebergTable("flights-SYD");

// Run the Dataflow pipeline.
PipelineResult.State state = ApacheIcebergDynamicDestinations.main(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See previous comments about processing results in the sample (and not returning types).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@kweinmeister kweinmeister assigned telpirion and unassigned telpirion Feb 27, 2025
@telpirion telpirion merged commit a95fb5d into GoogleCloudPlatform:main Feb 27, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: dataflow Issues related to the Dataflow API. samples Issues that are directly related to samples.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants