Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-26890][Connector/Kinesis] Handle invalid shards in DynamoDB #21237

Merged
merged 1 commit into from Nov 4, 2022

Conversation

elphastori
Copy link
Contributor

What is the purpose of the change

This pull request makes the Kinesis DynamoDB Streams Consumer close shards that are no longer valid rather than throwing an irrecoverable exceptions. This is because the DescribeStream API can return shards that are no longer valid and expects the client to handle them.

Brief change log

  • ResourceNotFoundException thrown in getShardIterator for DynamoDB streams consumer is now swallowed
  • A null shardIterator is returned which is an existing signal to close a shard
  • Added an INFO log to explain that the shard has been closed

Verifying this change

This change added tests and can be verified as follows:

  • Added a unit test that validates that a ResourceNotFoundException thrown in getShardIterator is caught and a null is returned
  • Manually verified the change by running the ConsumeFromDyanamoDB example, updating records in a DDB table every 10 ms and pausing for updates to re-produce a shard that is no longer valid. Shards that threw a NoResourceFoundException were closed.
dyn_resource = boto3.resource('dynamodb')
users = Users(dyn_resource)
users.table = dyn_resource.Table("UsersExample")
while True:
	users.update_user(random.randint(0, 999), names.get_first_name())
	time.sleep(1)
  • Manually introduced random failures by changing the last digit of a shardId to a 0 and this also resulted in a NoResourceFoundException that was handled correctly
public String getShardIterator(
		StreamShardHandle shard, String shardIteratorType, @Nullable Object startingMarker)
		throws InterruptedException {
	...
	if (random.nextInt(10) == 0) {
		char[] shardIdChars = shard.getShard().getShardId().toCharArray();
		shardIdChars[shardIdChars.length - 1] = '0';
		shard =
				new StreamShardHandle(
						shard.getStreamName(),
						new Shard().withShardId(new String(shardIdChars)));
	}
	...
}

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 3, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@dannycranmer dannycranmer merged commit f410434 into apache:release-1.16 Nov 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants