Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Iceberg Sink #30797

Merged
merged 3 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
118 changes: 118 additions & 0 deletions .github/workflows/IO_Iceberg.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: IcebergIO Unit Tests

on:
push:
tags: ['v*']
branches: ['master', 'release-*']
paths:
- "sdks/java/io/iceberg/**"
- ".github/workflows/IO_Iceberg.yml"
pull_request_target:
branches: ['master', 'release-*']
paths:
- "sdks/java/io/iceberg/**"
- 'release/trigger_all_tests.json'
- '.github/trigger_files/IO_Iceberg.json'
issue_comment:
types: [created]
schedule:
- cron: '15 1/6 * * *'
workflow_dispatch:

#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
pull-requests: write
checks: write
contents: read
deployments: read
id-token: none
issues: write
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}'
cancel-in-progress: true

env:
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}

jobs:
IO_Iceberg:
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
matrix:
job_name: ["IO_Iceberg"]
timeout-minutes: 60
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request_target' ||
(github.event_name == 'schedule' && github.repository == 'apache/beam') ||
github.event_name == 'workflow_dispatch' ||
github.event.comment.body == 'Run Java_Amqp_IO_Direct PreCommit'
runs-on: [self-hosted, ubuntu-20.04, main]
steps:
- uses: actions/checkout@v4
- name: Setup repository
uses: ./.github/actions/setup-action
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
- name: Setup environment
uses: ./.github/actions/setup-environment-action
- name: run Amqp IO build script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:java:io:iceberg:build
arguments: |
-PdisableSpotlessCheck=true \
-PdisableCheckStyle=true \
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
with:
name: JUnit Test Results
path: "**/build/reports/tests/"
- name: Publish JUnit Test Results
uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
commit: '${{ env.prsha || env.GITHUB_SHA }}'
comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }}
files: '**/build/test-results/**/*.xml'
- name: Archive SpotBugs Results
uses: actions/upload-artifact@v4
if: always()
with:
name: SpotBugs Results
path: '**/build/reports/spotbugs/*.html'
- name: Publish SpotBugs Results
uses: jwgmeligmeyling/spotbugs-github-action@v1.2
if: always()
with:
name: Publish SpotBugs
path: '**/build/reports/spotbugs/*.html'
98 changes: 98 additions & 0 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import java.util.stream.Collectors

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.iceberg',
)

description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg"
ext.summary = "Integration with Iceberg data warehouses."

def hadoopVersions = [
"285": "2.8.5",
"292": "2.9.2",
"2102": "2.10.2",
"324": "3.2.4",
]

hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")}

def iceberg_version = "1.4.2"
def parquet_version = "1.12.0"
def orc_version = "1.9.2"
def hive_version = "3.1.3"

dependencies {
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:io:hadoop-common")
implementation library.java.slf4j_api
implementation "org.apache.parquet:parquet-column:$parquet_version"
implementation "org.apache.parquet:parquet-common:$parquet_version"
implementation "org.apache.orc:orc-core:$orc_version"
implementation "org.apache.iceberg:iceberg-core:$iceberg_version"
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
implementation "org.apache.iceberg:iceberg-arrow:$iceberg_version"
implementation "org.apache.iceberg:iceberg-data:$iceberg_version"



provided library.java.avro
provided library.java.hadoop_client
permitUnusedDeclared library.java.hadoop_client
provided library.java.hadoop_common
testImplementation library.java.hadoop_client

testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version"
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.junit
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
hadoopVersions.each {kv ->
"hadoopVersion$kv.key" "org.apache.hadoop:hadoop-client:$kv.value"
}
}

hadoopVersions.each {kv ->
configurations."hadoopVersion$kv.key" {
resolutionStrategy {
force "org.apache.hadoop:hadoop-client:$kv.value"
}
}
}

task hadoopVersionsTest(group: "Verification") {
description = "Runs Iceberg tests with different Hadoop versions"
def taskNames = hadoopVersions.keySet().stream()
.map{num -> "hadoopVersion${num}Test"}
.collect(Collectors.toList())
dependsOn taskNames
}

hadoopVersions.each { kv ->
task "hadoopVersion${kv.key}Test"(type: Test, group: "Verification") {
description = "Runs Iceberg tests with Hadoop version $kv.value"
classpath = configurations."hadoopVersion$kv.key" + sourceSets.test.runtimeClasspath
include '**/*Test.class'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.iceberg;

import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

class AppendFilesToTables
extends PTransform<PCollection<FileWriteResult>, PCollection<KV<String, Snapshot>>> {

private final IcebergCatalogConfig catalogConfig;

AppendFilesToTables(IcebergCatalogConfig catalogConfig) {
this.catalogConfig = catalogConfig;
}

@Override
public PCollection<KV<String, Snapshot>> expand(PCollection<FileWriteResult> writtenFiles) {

// Apply any sharded writes and flatten everything for catalog updates
return writtenFiles
.apply(
"Key metadata updates by table",
WithKeys.of(
new SerializableFunction<FileWriteResult, String>() {
@Override
public String apply(FileWriteResult input) {
return input.getTableIdentifier().toString();
}
}))
.apply("Group metadata updates by table", GroupByKey.create())
.apply(
"Append metadata updates to tables",
ParDo.of(new AppendFilesToTablesDoFn(catalogConfig)))
.setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Snapshot.class)));
}

private static class AppendFilesToTablesDoFn
extends DoFn<KV<String, Iterable<FileWriteResult>>, KV<String, Snapshot>> {

private final IcebergCatalogConfig catalogConfig;

private transient @MonotonicNonNull Catalog catalog;

private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig) {
this.catalogConfig = catalogConfig;
}

private Catalog getCatalog() {
if (catalog == null) {
catalog = catalogConfig.catalog();
}
return catalog;
}

@ProcessElement
public void processElement(
@Element KV<String, Iterable<FileWriteResult>> element,
OutputReceiver<KV<String, Snapshot>> out,
BoundedWindow window) {
Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
AppendFiles update = table.newAppend();
for (FileWriteResult writtenFile : element.getValue()) {
update.appendFile(writtenFile.getDataFile());
}
update.commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this update be atomic for all files ?

In so, we might have to push this to a separate step behind a shuffle.

The key question is what will happen if the step fails after writing some of the elements and gets retried.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the files per destination are grouped into a single atomic commit. There are two things that could go wrong:

  1. Failure after the commit but before downstream processing, so a new transaction will try to append the same files. I verified that this is idempotent (and I included it as a unit test just to clarify).
  2. Some tables successfully commit but then there are enough failures that the pipeline itself fails. We probably can do a multi-table transaction. We would write the various files all to a manifest and then merge to a single thread and commit all the manifests at once. We don't do this for other sinks, do we?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, (2) is fine. It's more about making sure that we don't double write if a work item fails. But if writing is idempotent it's simpler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to be late on this, I just wondering if we would not need a kind of "commit coordinator" to be sure we have one commit at a time: if we have concurrent commits, it could be problematic in Iceberg.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not that familiar with the iceberg libraries. I was under the impression that the optimistic concurrency protocol was handled by them (https://iceberg.apache.org/docs/1.5.2/reliability/#concurrent-write-operations and on filesystem tables described by https://iceberg.apache.org/spec/#file-system-tables).

out.outputWithTimestamp(
KV.of(element.getKey(), table.currentSnapshot()), window.maxTimestamp());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.iceberg;

import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;

/**
* Assigns the destination metadata for each input record.
*
* <p>The output record will have the format { dest: ..., data: ...} where the dest field has the
* assigned metadata and the data field has the original row.
*/
class AssignDestinations extends PTransform<PCollection<Row>, PCollection<Row>> {

private DynamicDestinations dynamicDestinations;

public AssignDestinations(DynamicDestinations dynamicDestinations) {
this.dynamicDestinations = dynamicDestinations;
}

@Override
public PCollection<Row> expand(PCollection<Row> input) {

final Schema inputSchema = input.getSchema();
final Schema outputSchema =
Schema.builder()
.addRowField("data", inputSchema)
.addRowField("dest", dynamicDestinations.getMetadataSchema())
.build();

return input
.apply(
ParDo.of(
new DoFn<Row, Row>() {
@ProcessElement
public void processElement(@Element Row data, OutputReceiver<Row> out) {
out.output(
Row.withSchema(outputSchema)
.addValues(data, dynamicDestinations.assignDestinationMetadata(data))
.build());
}
}))
.setRowSchema(outputSchema);
}
}