diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml index 135f1dddcc943..6c4567751d36a 100644 --- a/flink-connectors/flink-connector-kinesis/pom.xml +++ b/flink-connectors/flink-connector-kinesis/pom.xml @@ -36,6 +36,7 @@ under the License. 1.11.319 1.9.0 0.12.9 + 1.4.0 jar @@ -119,12 +120,24 @@ under the License. com.amazonaws - aws-java-sdk-dynamodb + aws-java-sdk-cloudwatch + + + + + com.amazonaws + dynamodb-streams-kinesis-adapter + ${aws.dynamodbstreams-kinesis-adapter.version} + com.amazonaws aws-java-sdk-cloudwatch + + com.fasterxml.jackson.core + jackson-databind + diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkDynamoDBStreamsConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkDynamoDBStreamsConsumer.java new file mode 100644 index 0000000000000..fd920c4b13a0a --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkDynamoDBStreamsConsumer.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.internals.DynamoDBStreamsDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Properties; + +/** + * Consume events from DynamoDB streams. + * + * @param the type of data emitted + */ +public class FlinkDynamoDBStreamsConsumer extends FlinkKinesisConsumer { + private static final Logger LOG = LoggerFactory.getLogger(FlinkDynamoDBStreamsConsumer.class); + + /** + * Constructor of FlinkDynamoDBStreamsConsumer. + * + * @param stream stream to consume + * @param deserializer deserialization schema + * @param config config properties + */ + public FlinkDynamoDBStreamsConsumer( + String stream, + DeserializationSchema deserializer, + Properties config) { + super(stream, deserializer, config); + } + + /** + * Constructor of FlinkDynamodbStreamConsumer. + * + * @param streams list of streams to consume + * @param deserializer deserialization schema + * @param config config properties + */ + public FlinkDynamoDBStreamsConsumer( + List streams, + KinesisDeserializationSchema deserializer, + Properties config) { + super(streams, deserializer, config); + } + + @Override + protected KinesisDataFetcher createFetcher( + List streams, + SourceFunction.SourceContext sourceContext, + RuntimeContext runtimeContext, + Properties configProps, + KinesisDeserializationSchema deserializationSchema) { + return new DynamoDBStreamsDataFetcher( + streams, + sourceContext, + runtimeContext, + configProps, + deserializationSchema, + getShardAssigner()); + } +} diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 42e2173474b4c..41ac6b877a954 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -65,31 +65,13 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */ public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; - /** - * Deprecated key. - * - * @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_BASE} instead - **/ - @Deprecated - /** The base backoff time between each describeStream attempt. */ + /** The base backoff time between each describeStream attempt (for consuming from DynamoDB streams). */ public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; - /** - * Deprecated key. - * - * @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_MAX} instead - **/ - @Deprecated - /** The maximum backoff time between each describeStream attempt. */ + /** The maximum backoff time between each describeStream attempt (for consuming from DynamoDB streams). */ public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max"; - /** - * Deprecated key. - * - * @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT} instead - **/ - @Deprecated - /** The power constant for exponential backoff between each describeStream attempt. */ + /** The power constant for exponential backoff between each describeStream attempt (for consuming from DynamoDB streams). */ public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst"; /** The maximum number of listShards attempts if we get a recoverable exception. */ @@ -151,13 +133,10 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; - @Deprecated public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L; - @Deprecated public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L; - @Deprecated public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; public static final long DEFAULT_LIST_SHARDS_BACKOFF_BASE = 1000L; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java new file mode 100644 index 0000000000000..c2b7be352b1cd --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner; +import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter; +import org.apache.flink.streaming.connectors.kinesis.model.DynamoDBStreamsShardHandle; +import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; +import org.apache.flink.streaming.connectors.kinesis.proxy.DynamoDBStreamsProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Dynamodb streams data fetcher. + * @param type of fetched data. + */ +public class DynamoDBStreamsDataFetcher extends KinesisDataFetcher { + private boolean shardIdFormatCheck = false; + + /** + * Constructor. + * + * @param streams list of streams to fetch data + * @param sourceContext source context + * @param runtimeContext runtime context + * @param configProps config properties + * @param deserializationSchema deserialization schema + * @param shardAssigner shard assigner + */ + public DynamoDBStreamsDataFetcher(List streams, + SourceFunction.SourceContext sourceContext, + RuntimeContext runtimeContext, + Properties configProps, + KinesisDeserializationSchema deserializationSchema, + KinesisShardAssigner shardAssigner) { + + super(streams, + sourceContext, + sourceContext.getCheckpointLock(), + runtimeContext, + configProps, + deserializationSchema, + shardAssigner, + null, + new AtomicReference<>(), + new ArrayList<>(), + createInitialSubscribedStreamsToLastDiscoveredShardsState(streams), + // use DynamoDBStreamsProxy + DynamoDBStreamsProxy::create); + } + + @Override + protected boolean shouldAdvanceLastDiscoveredShardId(String shardId, String lastSeenShardIdOfStream) { + if (DynamoDBStreamsShardHandle.compareShardIds(shardId, lastSeenShardIdOfStream) <= 0) { + // shardID update is valid only if the given shard id is greater + // than the previous last seen shard id of the stream. + return false; + } + + return true; + } + + /** + * Create a new DynamoDB streams shard consumer. + * + * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to + * @param handle stream handle + * @param lastSeqNum last sequence number + * @param shardMetricsReporter the reporter to report metrics to + * @return + */ + @Override + protected ShardConsumer createShardConsumer( + Integer subscribedShardStateIndex, + StreamShardHandle handle, + SequenceNumber lastSeqNum, + ShardMetricsReporter shardMetricsReporter) { + + return new ShardConsumer( + this, + subscribedShardStateIndex, + handle, + lastSeqNum, + DynamoDBStreamsProxy.create(getConsumerConfiguration()), + shardMetricsReporter); + } +} diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index 77ca23c9d378e..9cebc41ad2606 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -518,11 +518,16 @@ public void advanceLastDiscoveredShardOfStream(String stream, String shardId) { if (lastSeenShardIdOfStream == null) { // if not previously set, simply put as the last seen shard id this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId); - } else if (StreamShardHandle.compareShardIds(shardId, lastSeenShardIdOfStream) > 0) { + } else if (shouldAdvanceLastDiscoveredShardId(shardId, lastSeenShardIdOfStream)) { this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId); } } + /** Given lastSeenShardId, check if last discovered shardId should be advanced. */ + protected boolean shouldAdvanceLastDiscoveredShardId(String shardId, String lastSeenShardIdOfStream) { + return (StreamShardHandle.compareShardIds(shardId, lastSeenShardIdOfStream) > 0); + } + /** * A utility function that does the following: * diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/DynamoDBStreamsShardHandle.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/DynamoDBStreamsShardHandle.java new file mode 100644 index 0000000000000..534184f4ad812 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/DynamoDBStreamsShardHandle.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.model; + +import com.amazonaws.services.kinesis.model.Shard; + +/** + * DynamoDB streams shard handle format and utilities. + */ +public class DynamoDBStreamsShardHandle extends StreamShardHandle{ + public static final String SHARDID_PREFIX = "shardId-"; + public static final int SHARDID_PREFIX_LEN = SHARDID_PREFIX.length(); + + public DynamoDBStreamsShardHandle(String streamName, Shard shard) { + super(streamName, shard); + } + + public static int compareShardIds(String firstShardId, String secondShardId) { + if (!isValidShardId(firstShardId)) { + throw new IllegalArgumentException( + String.format("The first shard id %s has invalid format.", firstShardId)); + } else if (!isValidShardId(secondShardId)) { + throw new IllegalArgumentException( + String.format("The second shard id %s has invalid format.", secondShardId)); + } + + return firstShardId.substring(SHARDID_PREFIX_LEN).compareTo( + secondShardId.substring(SHARDID_PREFIX_LEN)); + } + + /** + *

+ * Dynamodb streams shard ID is a char string ranging from 28 characters to 65 characters. + * (See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_Shard.html) + * + * The shardId observed usually takes the format of: "shardId-00000001536805703746-69688cb1", + * where "shardId-" is a prefix, followed by a 20-digit timestamp string and 0-36 or more + * characters, separated by '-'. Following this format, it is expected the child shards created + * during a re-sharding event have shardIds bigger than their parents. + *

+ * @param shardId shard Id + * @return boolean indicate if the given shard Id is valid + */ + public static boolean isValidShardId(String shardId) { + return shardId == null ? false : shardId.matches("^shardId-\\d{20}-{0,1}\\w{0,36}"); + } +} diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxy.java new file mode 100644 index 0000000000000..eb5620faf172a --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxy.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ClientConfigurationFactory; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.Shard; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ENDPOINT; +import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION; +import static org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getCredentialsProvider; +import static org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.setAwsClientConfigProperties; + +/** + * DynamoDB streams proxy: interface interacting with the DynamoDB streams. + */ +public class DynamoDBStreamsProxy extends KinesisProxy { + private static final Logger LOG = LoggerFactory.getLogger(DynamoDBStreamsProxy.class); + + /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ + private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) DynamoDB Streams Connector"; + + protected DynamoDBStreamsProxy(Properties configProps) { + super(configProps); + } + + /** + * Creates a DynamoDB streams proxy. + * + * @param configProps configuration properties + * @return the created DynamoDB streams proxy + */ + public static KinesisProxyInterface create(Properties configProps) { + return new DynamoDBStreamsProxy(configProps); + } + + /** + * Creates an AmazonDynamoDBStreamsAdapterClient. + * Uses it as the internal client interacting with the DynamoDB streams. + * + * @param configProps configuration properties + * @return an AWS DynamoDB streams adapter client + */ + @Override + protected AmazonKinesis createKinesisClient(Properties configProps) { + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); + setAwsClientConfigProperties(awsClientConfig, configProps); + + AWSCredentialsProvider credentials = getCredentialsProvider(configProps); + awsClientConfig.setUserAgentPrefix( + String.format( + USER_AGENT_FORMAT, + EnvironmentInformation.getVersion(), + EnvironmentInformation.getRevisionInformation().commitId)); + + AmazonDynamoDBStreamsAdapterClient adapterClient = + new AmazonDynamoDBStreamsAdapterClient(credentials, awsClientConfig); + + if (configProps.containsKey(AWS_ENDPOINT)) { + adapterClient.setEndpoint(configProps.getProperty(AWS_ENDPOINT)); + } else { + adapterClient.setRegion(Region.getRegion( + Regions.fromName(configProps.getProperty(AWS_REGION)))); + } + + return adapterClient; + } + + @Override + public GetShardListResult getShardList( + Map streamNamesWithLastSeenShardIds) throws InterruptedException { + GetShardListResult result = new GetShardListResult(); + + for (Map.Entry streamNameWithLastSeenShardId : + streamNamesWithLastSeenShardIds.entrySet()) { + String stream = streamNameWithLastSeenShardId.getKey(); + String lastSeenShardId = streamNameWithLastSeenShardId.getValue(); + result.addRetrievedShardsToStream(stream, getShardsOfStream(stream, lastSeenShardId)); + } + return result; + } + + private List getShardsOfStream( + String streamName, + @Nullable String lastSeenShardId) + throws InterruptedException { + List shardsOfStream = new ArrayList<>(); + + DescribeStreamResult describeStreamResult; + do { + describeStreamResult = describeStream(streamName, lastSeenShardId); + List shards = describeStreamResult.getStreamDescription().getShards(); + for (Shard shard : shards) { + shardsOfStream.add(new StreamShardHandle(streamName, shard)); + } + + if (shards.size() != 0) { + lastSeenShardId = shards.get(shards.size() - 1).getShardId(); + } + } while (describeStreamResult.getStreamDescription().isHasMoreShards()); + + return shardsOfStream; + } +} diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java index 262181ae3bcb2..4e44aebd3771e 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java @@ -28,6 +28,8 @@ import com.amazonaws.ClientConfigurationFactory; import com.amazonaws.SdkClientException; import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.ExpiredNextTokenException; import com.amazonaws.services.kinesis.model.GetRecordsRequest; import com.amazonaws.services.kinesis.model.GetRecordsResult; @@ -42,6 +44,7 @@ import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; +import com.amazonaws.services.kinesis.model.StreamStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,6 +129,15 @@ public class KinesisProxy implements KinesisProxyInterface { /** Maximum retry attempts for the get shard iterator operation. */ private final int getShardIteratorMaxRetries; + /* Backoff millis for the describe stream operation. */ + private final long describeStreamBaseBackoffMillis; + + /* Maximum backoff millis for the describe stream operation. */ + private final long describeStreamMaxBackoffMillis; + + /* Exponential backoff power constant for the describe stream operation. */ + private final double describeStreamExpConstant; + /** * Create a new KinesisProxy based on the supplied configuration properties. * @@ -133,7 +145,7 @@ public class KinesisProxy implements KinesisProxyInterface { */ protected KinesisProxy(Properties configProps) { checkNotNull(configProps); - KinesisConfigUtil.replaceDeprecatedConsumerKeys(configProps); + KinesisConfigUtil.backfillConsumerKeys(configProps); this.kinesisClient = createKinesisClient(configProps); @@ -153,7 +165,15 @@ protected KinesisProxy(Properties configProps) { configProps.getProperty( ConsumerConfigConstants.LIST_SHARDS_RETRIES, Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_RETRIES))); - + this.describeStreamBaseBackoffMillis = Long.valueOf( + configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, + Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE))); + this.describeStreamMaxBackoffMillis = Long.valueOf( + configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, + Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX))); + this.describeStreamExpConstant = Double.valueOf( + configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, + Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT))); this.getRecordsBaseBackoffMillis = Long.valueOf( configProps.getProperty( ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, @@ -469,6 +489,61 @@ private ListShardsResult listShards(String streamName, @Nullable String startSha return listShardsResults; } + /** + * Get metainfo for a Kinesis stream, which contains information about which shards this + * Kinesis stream possess. + * + *

This method is using a "full jitter" approach described in AWS's article, + * + * "Exponential Backoff and Jitter". + * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. + * This jitter backoff approach will help distribute calls across the fetchers over time. + * + * @param streamName the stream to describe + * @param startShardId which shard to start with for this describe operation + * + * @return the result of the describe stream operation + */ + protected DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) + throws InterruptedException { + final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); + describeStreamRequest.setStreamName(streamName); + describeStreamRequest.setExclusiveStartShardId(startShardId); + + DescribeStreamResult describeStreamResult = null; + + // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). + int attemptCount = 0; + while (describeStreamResult == null) { // retry until we get a result + try { + describeStreamResult = kinesisClient.describeStream(describeStreamRequest); + } catch (LimitExceededException le) { + long backoffMillis = fullJitterBackoff( + describeStreamBaseBackoffMillis, + describeStreamMaxBackoffMillis, + describeStreamExpConstant, + attemptCount++); + LOG.warn(String.format("Got LimitExceededException when describing stream %s. " + + "Backing off for %d millis.", streamName, backoffMillis)); + Thread.sleep(backoffMillis); + } catch (ResourceNotFoundException re) { + throw new RuntimeException("Error while getting stream details", re); + } + } + + String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); + if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) + || streamStatus.equals(StreamStatus.UPDATING.toString()))) { + if (LOG.isWarnEnabled()) { + LOG.warn(String.format("The status of stream %s is %s ; result of the current " + + "describeStream operation will not contain any shard information.", + streamName, streamStatus)); + } + } + + return describeStreamResult; + } + protected static long fullJitterBackoff(long base, long max, double power, int attempt) { long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt)); return (long) (seed.nextDouble() * exponentialBackoff); // random jitter between 0 and the exponential backoff diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/DynamoDBStreamsSchema.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/DynamoDBStreamsSchema.java new file mode 100644 index 0000000000000..ea6eeee917c47 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/DynamoDBStreamsSchema.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.serialization; + +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import com.amazonaws.services.dynamodbv2.model.Record; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; + +/** + * Schema used for deserializing DynamoDB streams records. + */ +public class DynamoDBStreamsSchema implements KinesisDeserializationSchema { + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Override + public Record deserialize(byte[] message, String partitionKey, String seqNum, + long approxArrivalTimestamp, String stream, String shardId) throws IOException { + return MAPPER.readValue(message, Record.class); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(Record.class); + } + +} diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index 75c84cdca5a27..b277696bf1a51 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -193,18 +193,31 @@ public static Properties replaceDeprecatedProducerKeys(Properties configProps) { return configProps; } - public static Properties replaceDeprecatedConsumerKeys(Properties configProps) { - HashMap deprecatedOldKeyToNewKeys = new HashMap<>(); - deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE); - deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX); - deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT); - for (Map.Entry entry : deprecatedOldKeyToNewKeys.entrySet()) { - String deprecatedOldKey = entry.getKey(); + /** + *

+ * A set of configuration paremeters associated with the describeStreams API may be used if: + * 1) an legacy client wants to consume from Kinesis + * 2) a current client wants to consumer from DynamoDB streams + * + * In the context of 1), the set of configurations needs to be translated to the corresponding + * configurations in the Kinesis listShards API. In the mean time, keep these configs since + * they are applicable in the context of 2), i.e., polling data from a DynamoDB stream. + *

+ * + * @param configProps original config properties. + * @return backfilled config properties. + */ + public static Properties backfillConsumerKeys(Properties configProps) { + HashMap oldKeyToNewKeys = new HashMap<>(); + oldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE); + oldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX); + oldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT); + for (Map.Entry entry : oldKeyToNewKeys.entrySet()) { + String oldKey = entry.getKey(); String newKey = entry.getValue(); - if (configProps.containsKey(deprecatedOldKey)) { - LOG.warn("Please note {} property has been deprecated. Please use the {} new property key", deprecatedOldKey, newKey); - configProps.setProperty(newKey, configProps.getProperty(deprecatedOldKey)); - configProps.remove(deprecatedOldKey); + if (configProps.containsKey(oldKey)) { + configProps.setProperty(newKey, configProps.getProperty(oldKey)); + // Do not remove the oldKey since they may be used in the context of talking to DynamoDB streams } } return configProps; diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java new file mode 100644 index 0000000000000..44b1ddd1f441e --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.examples; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kinesis.FlinkDynamoDBStreamsConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; + +import java.util.Properties; + +/** + * Sample command-line program of consuming data from a single DynamoDB stream. + */ +public class ConsumeFromDynamoDBStreams { + private static final String DYNAMODB_STREAM_NAME = "stream"; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(1); + + Properties dynamodbStreamsConsumerConfig = new Properties(); + final String streamName = pt.getRequired(DYNAMODB_STREAM_NAME); + dynamodbStreamsConsumerConfig.setProperty( + ConsumerConfigConstants.AWS_REGION, pt.getRequired("region")); + dynamodbStreamsConsumerConfig.setProperty( + ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accesskey")); + dynamodbStreamsConsumerConfig.setProperty( + ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretkey")); + + DataStream dynamodbStreams = see.addSource(new FlinkDynamoDBStreamsConsumer<>( + streamName, + new SimpleStringSchema(), + dynamodbStreamsConsumerConfig)); + + dynamodbStreams.print(); + + see.execute(); + } + +} diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/model/DynamoDBStreamsShardHandleTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/model/DynamoDBStreamsShardHandleTest.java new file mode 100644 index 0000000000000..c8e3415315661 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/model/DynamoDBStreamsShardHandleTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.model; + +import org.junit.Test; + +import static org.apache.flink.streaming.connectors.kinesis.model.DynamoDBStreamsShardHandle.SHARDID_PREFIX; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Shard handle unit tests. + */ +public class DynamoDBStreamsShardHandleTest { + @Test + public void testIsValidShardId() { + // normal form + String shardId = "shardId-00000001536805703746-69688cb1"; + assertEquals(true, DynamoDBStreamsShardHandle.isValidShardId(shardId)); + + // short form + shardId = "shardId-00000001536805703746"; + assertEquals(true, DynamoDBStreamsShardHandle.isValidShardId(shardId)); + + // long form + shardId = "shardId-00000001536805703746-69688cb1aljkwerijfl8228sl12a123akfla"; + assertEquals(true, DynamoDBStreamsShardHandle.isValidShardId(shardId)); + + // invalid with wrong prefix + shardId = "sId-00000001536805703746-69688cb1"; + assertEquals(false, DynamoDBStreamsShardHandle.isValidShardId(shardId)); + + // invalid with non-digits + shardId = "shardId-0000000153680570aabb-69688cb1"; + assertEquals(false, DynamoDBStreamsShardHandle.isValidShardId(shardId)); + + // invalid with shardId too long + shardId = "shardId-00000001536805703746-69688cb1aljkwerijfl8228sl12a123akfla0000"; + assertEquals(false, DynamoDBStreamsShardHandle.isValidShardId(shardId)); + } + + @Test + public void testCompareShardId() { + final int numShardIds = 10; + final int shardIdDigitLen = 20; + final String zeros = "00000000000000000000"; // twenty '0' chars + String shardIdValid = "shardId-00000001536805703746-69688cb1"; + String shardIdInvalid = "shardId-0000000153680570aabb-69688cb1"; + + assertEquals(0, DynamoDBStreamsShardHandle.compareShardIds(shardIdValid, shardIdValid)); + + // comparison of invalid shardIds should yield exception + try { + DynamoDBStreamsShardHandle.compareShardIds(shardIdValid, shardIdInvalid); + fail("invalid shard Id" + shardIdInvalid + " should trigger exception"); + } catch (IllegalArgumentException e) { + // expected + } + try { + DynamoDBStreamsShardHandle.compareShardIds(shardIdInvalid, shardIdValid); + fail("invalid shard Id" + shardIdInvalid + " should trigger exception"); + } catch (IllegalArgumentException e) { + // expected + } + + // compare randomly generated shardIds based on timestamp + String[] shardIds = new String[numShardIds]; + for (int i = 0; i < numShardIds; i++) { + String nowStr = String.valueOf(System.currentTimeMillis()); + if (nowStr.length() < shardIdDigitLen) { + shardIds[i] = SHARDID_PREFIX + zeros.substring(0, shardIdDigitLen - nowStr.length()) + + nowStr; + } else { + shardIds[i] = SHARDID_PREFIX + nowStr.substring(0, shardIdDigitLen); + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // ignore + } + } + for (int i = 1; i < numShardIds - 1; i++) { + assertTrue(DynamoDBStreamsShardHandle.compareShardIds(shardIds[i - 1], shardIds[i]) < 0); + assertTrue(DynamoDBStreamsShardHandle.compareShardIds(shardIds[i], shardIds[i]) == 0); + assertTrue(DynamoDBStreamsShardHandle.compareShardIds(shardIds[i], shardIds[i + 1]) < 0); + } + } + +}