Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink #6968

Merged
merged 15 commits into from
Dec 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion flink-connectors/flink-connector-kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ under the License.
<aws.sdk.version>1.11.319</aws.sdk.version>
<aws.kinesis-kcl.version>1.9.0</aws.kinesis-kcl.version>
<aws.kinesis-kpl.version>0.12.9</aws.kinesis-kpl.version>
<aws.dynamodbstreams-kinesis-adapter.version>1.4.0</aws.dynamodbstreams-kinesis-adapter.version>
</properties>

<packaging>jar</packaging>
Expand Down Expand Up @@ -119,12 +120,24 @@ under the License.
<exclusions>
<exclusion>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
<artifactId>aws-java-sdk-cloudwatch</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
<version>${aws.dynamodbstreams-kinesis-adapter.version}</version>
<exclusions>
<exclusion>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-cloudwatch</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kinesis;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.internals.DynamoDBStreamsDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Properties;

/**
* Consume events from DynamoDB streams.
*
* @param <T> the type of data emitted
*/
public class FlinkDynamoDBStreamsConsumer<T> extends FlinkKinesisConsumer<T> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkDynamoDBStreamsConsumer.class);

/**
* Constructor of FlinkDynamoDBStreamsConsumer.
*
* @param stream stream to consume
* @param deserializer deserialization schema
* @param config config properties
*/
public FlinkDynamoDBStreamsConsumer(
String stream,
DeserializationSchema<T> deserializer,
Properties config) {
super(stream, deserializer, config);
}

/**
* Constructor of FlinkDynamodbStreamConsumer.
*
* @param streams list of streams to consume
* @param deserializer deserialization schema
* @param config config properties
*/
public FlinkDynamoDBStreamsConsumer(
List<String> streams,
KinesisDeserializationSchema deserializer,
Properties config) {
super(streams, deserializer, config);
}

@Override
protected KinesisDataFetcher<T> createFetcher(
List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema) {
return new DynamoDBStreamsDataFetcher<T>(
streams,
sourceContext,
runtimeContext,
configProps,
deserializationSchema,
getShardAssigner());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,31 +65,13 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
/** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";

/**
* Deprecated key.
*
* @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_BASE} instead
**/
@Deprecated
/** The base backoff time between each describeStream attempt. */
/** The base backoff time between each describeStream attempt (for consuming from DynamoDB streams). */
public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";

/**
* Deprecated key.
*
* @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_MAX} instead
**/
@Deprecated
/** The maximum backoff time between each describeStream attempt. */
/** The maximum backoff time between each describeStream attempt (for consuming from DynamoDB streams). */
public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max";

/**
* Deprecated key.
*
* @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT} instead
**/
@Deprecated
/** The power constant for exponential backoff between each describeStream attempt. */
/** The power constant for exponential backoff between each describeStream attempt (for consuming from DynamoDB streams). */
public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst";

/** The maximum number of listShards attempts if we get a recoverable exception. */
Expand Down Expand Up @@ -151,13 +133,10 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {

public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";

@Deprecated
public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;

@Deprecated
public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;

@Deprecated
public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;

public static final long DEFAULT_LIST_SHARDS_BACKOFF_BASE = 1000L;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kinesis.internals;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.DynamoDBStreamsShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.DynamoDBStreamsProxy;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;

/**
* Dynamodb streams data fetcher.
* @param <T> type of fetched data.
*/
public class DynamoDBStreamsDataFetcher<T> extends KinesisDataFetcher<T> {
private boolean shardIdFormatCheck = false;

/**
* Constructor.
*
* @param streams list of streams to fetch data
* @param sourceContext source context
* @param runtimeContext runtime context
* @param configProps config properties
* @param deserializationSchema deserialization schema
* @param shardAssigner shard assigner
*/
public DynamoDBStreamsDataFetcher(List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema,
KinesisShardAssigner shardAssigner) {

super(streams,
sourceContext,
sourceContext.getCheckpointLock(),
runtimeContext,
configProps,
deserializationSchema,
shardAssigner,
null,
new AtomicReference<>(),
new ArrayList<>(),
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
// use DynamoDBStreamsProxy
DynamoDBStreamsProxy::create);
}

@Override
protected boolean shouldAdvanceLastDiscoveredShardId(String shardId, String lastSeenShardIdOfStream) {
if (DynamoDBStreamsShardHandle.compareShardIds(shardId, lastSeenShardIdOfStream) <= 0) {
// shardID update is valid only if the given shard id is greater
// than the previous last seen shard id of the stream.
return false;
}

return true;
}

/**
* Create a new DynamoDB streams shard consumer.
*
* @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
* @param handle stream handle
* @param lastSeqNum last sequence number
* @param shardMetricsReporter the reporter to report metrics to
* @return
*/
@Override
protected ShardConsumer createShardConsumer(
Integer subscribedShardStateIndex,
StreamShardHandle handle,
SequenceNumber lastSeqNum,
ShardMetricsReporter shardMetricsReporter) {

return new ShardConsumer(
this,
subscribedShardStateIndex,
handle,
lastSeqNum,
DynamoDBStreamsProxy.create(getConsumerConfiguration()),
shardMetricsReporter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -518,11 +518,16 @@ public void advanceLastDiscoveredShardOfStream(String stream, String shardId) {
if (lastSeenShardIdOfStream == null) {
// if not previously set, simply put as the last seen shard id
this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
} else if (StreamShardHandle.compareShardIds(shardId, lastSeenShardIdOfStream) > 0) {
} else if (shouldAdvanceLastDiscoveredShardId(shardId, lastSeenShardIdOfStream)) {
this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
}
}

/** Given lastSeenShardId, check if last discovered shardId should be advanced. */
protected boolean shouldAdvanceLastDiscoveredShardId(String shardId, String lastSeenShardIdOfStream) {
return (StreamShardHandle.compareShardIds(shardId, lastSeenShardIdOfStream) > 0);
}

/**
* A utility function that does the following:
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kinesis.model;

import com.amazonaws.services.kinesis.model.Shard;

/**
* DynamoDB streams shard handle format and utilities.
*/
public class DynamoDBStreamsShardHandle extends StreamShardHandle{
public static final String SHARDID_PREFIX = "shardId-";
public static final int SHARDID_PREFIX_LEN = SHARDID_PREFIX.length();

public DynamoDBStreamsShardHandle(String streamName, Shard shard) {
super(streamName, shard);
}

public static int compareShardIds(String firstShardId, String secondShardId) {
if (!isValidShardId(firstShardId)) {
throw new IllegalArgumentException(
String.format("The first shard id %s has invalid format.", firstShardId));
} else if (!isValidShardId(secondShardId)) {
throw new IllegalArgumentException(
String.format("The second shard id %s has invalid format.", secondShardId));
}

return firstShardId.substring(SHARDID_PREFIX_LEN).compareTo(
secondShardId.substring(SHARDID_PREFIX_LEN));
}

/**
* <p>
* Dynamodb streams shard ID is a char string ranging from 28 characters to 65 characters.
* (See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_Shard.html)
*
* The shardId observed usually takes the format of: "shardId-00000001536805703746-69688cb1",
* where "shardId-" is a prefix, followed by a 20-digit timestamp string and 0-36 or more
* characters, separated by '-'. Following this format, it is expected the child shards created
* during a re-sharding event have shardIds bigger than their parents.
* </p>
* @param shardId shard Id
* @return boolean indicate if the given shard Id is valid
*/
public static boolean isValidShardId(String shardId) {
return shardId == null ? false : shardId.matches("^shardId-\\d{20}-{0,1}\\w{0,36}");
}
}
Loading