Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.beam.sdk.io.aws2.kinesis;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;

import java.net.URI;
import java.util.Objects;
import org.apache.beam.sdk.io.aws2.options.AwsSerializableUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClientBuilder;
Expand All @@ -32,24 +33,25 @@

/** Basic implementation of {@link AWSClientsProvider} used by default in {@link KinesisIO}. */
class BasicKinesisProvider implements AWSClientsProvider {
private final String accessKey;
private final String secretKey;
private final String awsCredentialsProviderSerialized;
private final String region;
private final @Nullable String serviceEndpoint;

BasicKinesisProvider(
String accessKey, String secretKey, Region region, @Nullable String serviceEndpoint) {
checkArgument(accessKey != null, "accessKey can not be null");
checkArgument(secretKey != null, "secretKey can not be null");
AwsCredentialsProvider awsCredentialsProvider,
Region region,
@Nullable String serviceEndpoint) {
checkArgument(awsCredentialsProvider != null, "awsCredentialsProvider can not be null");
checkArgument(region != null, "region can not be null");
this.accessKey = accessKey;
this.secretKey = secretKey;
this.awsCredentialsProviderSerialized =
AwsSerializableUtils.serializeAwsCredentialsProvider(awsCredentialsProvider);
checkNotNull(awsCredentialsProviderSerialized, "awsCredentialsProviderString can not be null");
this.region = region.toString();
this.serviceEndpoint = serviceEndpoint;
}

private AwsCredentialsProvider getCredentialsProvider() {
return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey));
return AwsSerializableUtils.deserializeAwsCredentialsProvider(awsCredentialsProviderSerialized);
}

@Override
Expand All @@ -75,4 +77,23 @@ public CloudWatchClient getCloudWatchClient() {
}
return clientBuilder.build();
}

@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BasicKinesisProvider that = (BasicKinesisProvider) o;
return Objects.equals(awsCredentialsProviderSerialized, that.awsCredentialsProviderSerialized)
&& Objects.equals(region, that.region)
&& Objects.equals(serviceEndpoint, that.serviceEndpoint);
}

@Override
public int hashCode() {
return Objects.hash(awsCredentialsProviderSerialized, region, serviceEndpoint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
Expand All @@ -47,13 +50,23 @@
*
* <h3>Reading from Kinesis</h3>
*
* <p>Example usage:
* <p>Example usages:
*
* <pre>{@code
* p.apply(KinesisIO.read()
* .withStreamName("streamName")
* .withInitialPositionInStream(InitialPositionInStream.LATEST)
* .withAWSClientsProvider("AWS_KEY", _"AWS_SECRET", STREAM_REGION)
* // using AWS default credentials provider chain (recommended)
* .withAWSClientsProvider(DefaultCredentialsProvider.create(), STREAM_REGION)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to keep an example with a key/secret as it was and just to add another one with a custom AWS credentials provider.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

* .apply( ... ) // other transformations
* }</pre>
*
* <pre>{@code
* p.apply(KinesisIO.read()
* .withStreamName("streamName")
* .withInitialPositionInStream(InitialPositionInStream.LATEST)
* // using plain AWS key and secret
* .withAWSClientsProvider("AWS_KEY", "AWS_SECRET", STREAM_REGION)
* .apply( ... ) // other transformations
* }</pre>
*
Expand All @@ -69,7 +82,7 @@
* </ul>
* <li>data used to initialize {@link KinesisClient} and {@link CloudWatchClient} clients:
* <ul>
* <li>credentials (aws key, aws secret)
* <li>AWS credentials
* <li>region where the stream is located
* </ul>
* </ul>
Expand Down Expand Up @@ -313,7 +326,7 @@ public Read withInitialTimestampInStream(Instant initialTimestamp) {
* Allows to specify custom {@link AWSClientsProvider}. {@link AWSClientsProvider} provides
* {@link KinesisClient} and {@link CloudWatchClient} instances which are later used for
* communication with Kinesis. You should use this method if {@link
* Read#withAWSClientsProvider(String, String, Region)} does not suit your needs.
* Read#withAWSClientsProvider(AwsCredentialsProvider, Region)} does not suit your needs.
*/
public Read withAWSClientsProvider(AWSClientsProvider awsClientsProvider) {
return toBuilder().setAWSClientsProvider(awsClientsProvider).build();
Expand All @@ -338,8 +351,33 @@ public Read withAWSClientsProvider(String awsAccessKey, String awsSecretKey, Reg
*/
public Read withAWSClientsProvider(
String awsAccessKey, String awsSecretKey, Region region, String serviceEndpoint) {
AwsCredentialsProvider awsCredentialsProvider =
StaticCredentialsProvider.create(AwsBasicCredentials.create(awsAccessKey, awsSecretKey));
return withAWSClientsProvider(awsCredentialsProvider, region, serviceEndpoint);
}

/**
* Specify {@link AwsCredentialsProvider} and region to be used to read from Kinesis. If you
* need more sophisticated credential protocol, then you should look at {@link
* Read#withAWSClientsProvider(AWSClientsProvider)}.
*/
public Read withAWSClientsProvider(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, add the unit test for all new public methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I added a new test class for the Read builder api here. Let me know in case similar tests exist and i've just missed it
https://github.com/apache/beam/pull/15788/files#diff-16ffba2e35f48eeb476638eeab62a74b320c9ec9f30974336f183a0a36c5d3f2R36-R91

AwsCredentialsProvider awsCredentialsProvider, Region region) {
return withAWSClientsProvider(awsCredentialsProvider, region, null);
}

/**
* Specify {@link AwsCredentialsProvider} and region to be used to read from Kinesis. If you
* need more sophisticated credential protocol, then you should look at {@link
* Read#withAWSClientsProvider(AWSClientsProvider)}.
*
* <p>The {@code serviceEndpoint} sets an alternative service host. This is useful to execute
* the tests with a kinesis service emulator.
*/
public Read withAWSClientsProvider(
AwsCredentialsProvider awsCredentialsProvider, Region region, String serviceEndpoint) {
return withAWSClientsProvider(
new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
new BasicKinesisProvider(awsCredentialsProvider, region, serviceEndpoint));
}

/** Specifies to read at most a given number of records. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.kinesis;

import static org.junit.Assert.assertEquals;

import org.apache.beam.sdk.util.SerializableUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;

/** Tests on {@link org.apache.beam.sdk.io.aws2.kinesis.BasicKinesisProvider}. */
@RunWith(JUnit4.class)
public class BasicKinesisClientProviderTest {

@Test
public void testSerialization() {
AwsCredentialsProvider awsCredentialsProvider =
StaticCredentialsProvider.create(
AwsBasicCredentials.create("ACCESS_KEY_ID", "SECRET_ACCESS_KEY"));

BasicKinesisProvider kinesisProvider =
new BasicKinesisProvider(awsCredentialsProvider, Region.AP_EAST_1, null);

byte[] serializedBytes = SerializableUtils.serializeToByteArray(kinesisProvider);

BasicKinesisProvider kinesisProviderDeserialized =
(BasicKinesisProvider)
SerializableUtils.deserializeFromByteArray(serializedBytes, "Basic Kinesis Provider");

assertEquals(kinesisProvider, kinesisProviderDeserialized);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.kinesis;

import static org.junit.Assert.assertEquals;

import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Read;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;

/** Tests for non trivial builder variants of {@link KinesisIO#read}. */
@RunWith(JUnit4.class)
public class KinesisIOReadTest {

@Test
public void testBuildWithBasicCredentials() {
Region region = Region.US_EAST_1;
AwsBasicCredentials credentials = AwsBasicCredentials.create("key", "secret");

Read read =
KinesisIO.read()
.withAWSClientsProvider(
credentials.accessKeyId(), credentials.secretAccessKey(), region);

assertEquals(
read.getAWSClientsProvider(),
new BasicKinesisProvider(StaticCredentialsProvider.create(credentials), region, null));
}

@Test
public void testBuildWithCredentialsProvider() {
Region region = Region.US_EAST_1;
AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

Read read = KinesisIO.read().withAWSClientsProvider(credentialsProvider, region);

assertEquals(
read.getAWSClientsProvider(), new BasicKinesisProvider(credentialsProvider, region, null));
}

@Test
public void testBuildWithBasicCredentialsAndCustomEndpoint() {
String customEndpoint = "localhost:9999";
Region region = Region.US_WEST_1;
AwsBasicCredentials credentials = AwsBasicCredentials.create("key", "secret");

Read read =
KinesisIO.read()
.withAWSClientsProvider(
credentials.accessKeyId(), credentials.secretAccessKey(), region, customEndpoint);

assertEquals(
read.getAWSClientsProvider(),
new BasicKinesisProvider(
StaticCredentialsProvider.create(credentials), region, customEndpoint));
}

@Test
public void testBuildWithCredentialsProviderAndCustomEndpoint() {
String customEndpoint = "localhost:9999";
Region region = Region.US_WEST_1;
AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

Read read =
KinesisIO.read().withAWSClientsProvider(credentialsProvider, region, customEndpoint);

assertEquals(
read.getAWSClientsProvider(),
new BasicKinesisProvider(credentialsProvider, region, customEndpoint));
}
}