Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8511] [WIP] KinesisIO.Read enhanced fanout #9899

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ class BeamModulePlugin implements Plugin<Project> {
aws_java_sdk2_auth : "software.amazon.awssdk:auth:$aws_java_sdk2_version",
aws_java_sdk2_cloudwatch : "software.amazon.awssdk:cloudwatch:$aws_java_sdk2_version",
aws_java_sdk2_dynamodb : "software.amazon.awssdk:dynamodb:$aws_java_sdk2_version",
aws_java_sdk2_kinesis : "software.amazon.awssdk:kinesis:$aws_java_sdk2_version",
aws_java_sdk2_sdk_core : "software.amazon.awssdk:sdk-core:$aws_java_sdk2_version",
aws_java_sdk2_sns : "software.amazon.awssdk:sns:$aws_java_sdk2_version",
bigdataoss_gcsio : "com.google.cloud.bigdataoss:gcsio:$google_cloud_bigdataoss_version",
Expand Down
17 changes: 17 additions & 0 deletions sdks/java/io/amazon-web-services2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import groovy.json.JsonOutput

plugins { id 'org.apache.beam.module' }
applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.io.aws2')
provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()

description = "Apache Beam :: SDKs :: Java :: IO :: Amazon Web Services 2"
ext.summary = "IO library to read and write Amazon Web Services services from Beam."
Expand All @@ -31,15 +33,28 @@ dependencies {
compile library.java.aws_java_sdk2_auth
compile library.java.aws_java_sdk2_cloudwatch
compile library.java.aws_java_sdk2_dynamodb
compile library.java.aws_java_sdk2_kinesis
compile library.java.aws_java_sdk2_sdk_core
compile library.java.aws_java_sdk2_sns
compile library.java.jackson_core
compile library.java.jackson_annotations
compile library.java.jackson_databind
compile library.java.jackson_dataformat_cbor
compile library.java.joda_time
compile library.java.slf4j_api
compile "software.amazon.kinesis:amazon-kinesis-client:2.2.5"
compile "commons-lang:commons-lang:2.6"
testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
testCompile project(path: ":sdks:java:io:common", configuration: "testRuntime")
testCompile project(path: ":sdks:java:io:kinesis", configuration: "testRuntime")
testCompile library.java.mockito_core
testCompile library.java.guava_testlib
testCompile library.java.hamcrest_core
testCompile library.java.junit
testCompile library.java.hamcrest_library
testCompile library.java.powermock
testCompile library.java.powermock_mockito
testCompile "org.assertj:assertj-core:3.11.1"
testCompile 'org.testcontainers:testcontainers:1.11.3'
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(":runners:direct-java")
Expand All @@ -50,4 +65,6 @@ test {
'--region=us-west-2',
'--awsCredentialsProvider={"@type": "StaticCredentialsProvider", "accessKeyId": "key_id_value", "secretAccessKey": "secret_value"}'
])
// Forking every test resolves an issue where KinesisMockReadTest gets stuck forever.
forkEvery 1
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.Serializable;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;

/**
* Provides instances of AWS clients.
*
* <p>Please note, that any instance of {@link AWSClientsProvider} must be {@link Serializable} to
* ensure it can be sent to worker machines.
*/
public interface AWSClientsProvider extends Serializable {
KinesisClient getKinesisClient();

KinesisAsyncClient getKinesisAsyncClient();

CloudWatchClient getCloudWatchClient();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.kinesis;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import java.net.URI;
import javax.annotation.Nullable;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;

/** Basic implementation of {@link AWSClientsProvider} used by default in {@link KinesisIO}. */
class BasicKinesisProvider implements AWSClientsProvider {
private final String accessKey;
private final String secretKey;
private final String region;
@Nullable private final String serviceEndpoint;

BasicKinesisProvider(
String accessKey, String secretKey, Region region, @Nullable String serviceEndpoint) {
checkArgument(accessKey != null, "accessKey can not be null");
checkArgument(secretKey != null, "secretKey can not be null");
checkArgument(region != null, "region can not be null");
this.accessKey = accessKey;
this.secretKey = secretKey;
this.region = region.toString();
this.serviceEndpoint = serviceEndpoint;
}

private AwsCredentialsProvider getCredentialsProvider() {
return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey));
}

@Override
public KinesisClient getKinesisClient() {
KinesisClientBuilder clientBuilder =
KinesisClient.builder()
.credentialsProvider(getCredentialsProvider())
.region(Region.of(region));
if (serviceEndpoint != null) {
clientBuilder.endpointOverride(URI.create(serviceEndpoint));
}
return clientBuilder.build();
}

@Override
public KinesisAsyncClient getKinesisAsyncClient() {
KinesisAsyncClientBuilder clientBuilder =
KinesisAsyncClient.builder()
.credentialsProvider(getCredentialsProvider())
.region(Region.of(region));
if (serviceEndpoint != null) {
clientBuilder.endpointOverride(URI.create(serviceEndpoint));
}
return clientBuilder.build();
}

@Override
public CloudWatchClient getCloudWatchClient() {
CloudWatchClientBuilder clientBuilder =
CloudWatchClient.builder()
.credentialsProvider(getCredentialsProvider())
.region(Region.of(region));
if (serviceEndpoint != null) {
clientBuilder.endpointOverride(URI.create(serviceEndpoint));
}
return clientBuilder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.Serializable;

/**
* Used to generate checkpoint object on demand. How exactly the checkpoint is generated is up to
* implementing class.
*/
interface CheckpointGenerator extends Serializable {

KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) throws TransientKinesisException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.kinesis;

import java.util.NoSuchElementException;
import java.util.Objects;

/**
* Similar to Guava {@code Optional}, but throws {@link NoSuchElementException} for missing element.
*/
abstract class CustomOptional<T> {

@SuppressWarnings("unchecked")
public static <T> CustomOptional<T> absent() {
return (Absent<T>) Absent.INSTANCE;
}

public static <T> CustomOptional<T> of(T v) {
return new Present<>(v);
}

public abstract boolean isPresent();

public abstract T get();

private static class Present<T> extends CustomOptional<T> {

private final T value;

private Present(T value) {
this.value = value;
}

@Override
public boolean isPresent() {
return true;
}

@Override
public T get() {
return value;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof Present)) {
return false;
}

Present<?> present = (Present<?>) o;
return Objects.equals(value, present.value);
}

@Override
public int hashCode() {
return Objects.hash(value);
}
}

private static class Absent<T> extends CustomOptional<T> {

private static final Absent<Object> INSTANCE = new Absent<>();

private Absent() {}

@Override
public boolean isPresent() {
return false;
}

@Override
public T get() {
throw new NoSuchElementException();
}

@Override
public boolean equals(Object o) {
return o instanceof Absent;
}

@Override
public int hashCode() {
return 0;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.kinesis;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;

import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.Shard;

/**
* Creates {@link KinesisReaderCheckpoint}, which spans over all shards in given stream. List of
* shards is obtained dynamically on call to {@link #generate(SimplifiedKinesisClient)}.
*/
class DynamicCheckpointGenerator implements CheckpointGenerator {

private static final Logger LOG = LoggerFactory.getLogger(DynamicCheckpointGenerator.class);
private final String streamName;
private final String consumerArn;
private final StartingPoint startingPoint;
private final StartingPointShardsFinder startingPointShardsFinder;

public DynamicCheckpointGenerator(
String streamName, String consumerArn, StartingPoint startingPoint) {
this.streamName = streamName;
this.consumerArn = consumerArn;
this.startingPoint = startingPoint;
this.startingPointShardsFinder = new StartingPointShardsFinder();
}

public DynamicCheckpointGenerator(
String streamName,
String consumerArn,
StartingPoint startingPoint,
StartingPointShardsFinder startingPointShardsFinder) {
this.streamName = checkNotNull(streamName, "streamName");
this.consumerArn = consumerArn;
this.startingPoint = checkNotNull(startingPoint, "startingPoint");
this.startingPointShardsFinder =
checkNotNull(startingPointShardsFinder, "startingPointShardsFinder");
}

@Override
public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis)
throws TransientKinesisException {
Set<Shard> shardsAtStartingPoint =
startingPointShardsFinder.findShardsAtStartingPoint(kinesis, streamName, startingPoint);
LOG.info(
"Creating a checkpoint with following shards {} at {}",
shardsAtStartingPoint,
startingPoint.getTimestamp());
return new KinesisReaderCheckpoint(
shardsAtStartingPoint.stream()
.map(
shard ->
new ShardCheckpoint(streamName, shard.shardId(), consumerArn, startingPoint))
.collect(Collectors.toList()));
}

@Override
public String toString() {
return String.format("Checkpoint generator for %s: %s", streamName, startingPoint);
}
}