Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis ingestion with empty shards #12792

Merged

Conversation

AmatyaAvadhanula
Copy link
Contributor

Enables Kinesis ingestion with empty shards

Description

Kinesis ingestion requires all shards to have at least 1 record at the required position in druid.
Even if this is satisified initially, resharding the stream can lead to empty intermediate shards. A significant delay in writing to newly created shards was also problematic.

Kinesis shard sequence numbers are big integers. Introduce two more custom sequence tokens UNREAD_TRIM_HORIZON and UNREAD_LATEST to indicate that a shard has not been read from and that it needs to be read from the start or the end respectively.
These values can be used to avoid the need to read at least one record to obtain a sequence number for ingesting a newly discovered shard.

If a record cannot be obtained immediately, use a marker to obtain the relevant shardIterator and use this shardIterator to obtain a valid sequence number. As long as a valid sequence number is not obtained, continue storing the token as the offset.

These tokens (UNREAD_TRIM_HORIZON and UNREAD_LATEST) are logically ordered to be earlier than any valid sequence number.

However, the ordering requires a few subtle changes to the existing mechanism for record sequence validation:

  1. The sequence availability check ensures that the current offset is before the earliest available sequence in the shard. However, current token being an UNREAD token indicates that any sequence number in the shard is valid (despite the ordering)

  2. Kinesis sequence numbers are inclusive i.e if current sequence == end sequence, there are more records left to read.
    However, the equality check is exclusive when dealing with UNREAD tokens.


Key changed/added classes in this PR
  • KinesisSequenceNumber
  • KinesisRecordSupplier

This PR has:

  • been self-reviewed.
  • [] added documentation for new or modified features or behaviors.
  • [] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

}

@Override
public boolean isAvailableWithEarliest(OrderedSequenceNumber<String> earliest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name is a bit hard to understand. Maybe javadoc will help

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. Will add one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

@Override
public boolean isMoreToReadBeforeReadingRecord(OrderedSequenceNumber<String> end)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name is a bit hard to understand. Maybe javadoc will help

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is based on an existing method of the same name. I'll add a javadoc though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@maytasm maytasm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add some Integration test (See integration test group "kinesis-index")?

@AmatyaAvadhanula
Copy link
Contributor Author

AmatyaAvadhanula commented Jul 27, 2022

@maytasm, thanks for the review.

Can you also add some Integration test (See integration test group "kinesis-index")?

Sure, I can add a simple test where the records are queried after ingesting from a stream with at least one empty shard.
Please let me know if you had any other scenarios in mind

@maytasm
Copy link
Contributor

maytasm commented Jul 27, 2022

@maytasm, thanks for the review.

Can you also add some Integration test (See integration test group "kinesis-index")?

Sure, I can add a simple test where the records are queried after ingesting from a stream with at least one empty shard. Please let me know if you had any other scenarios in mind

Sounds good to me! Thanks!

@maytasm
Copy link
Contributor

maytasm commented Jul 29, 2022

@AmatyaAvadhanula
I have some questions regarding KinesisRecordSupplier#getSequenceNumber.

  1. Are we still handling the case when shardIterator returned by kinesis.getShardIterator is null?
  2. Is the timeoutMillis still needed? Is the timeoutMillis use for handling when shards are temporary closed due to SplitShard or MergeShards operation?

@@ -157,6 +158,14 @@ public Long getEarliestSequenceNumber(StreamPartition<Integer> partition)
return nextPos;
}

@Override
public boolean isOffsetAvailable(StreamPartition<Integer> partition, OrderedSequenceNumber<Long> offset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this needs a lock on recordSupplierLock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks!

@@ -666,6 +670,30 @@ public String getEarliestSequenceNumber(StreamPartition<String> partition)
return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
}

@Override
public boolean isOffsetAvailable(StreamPartition<String> partition, OrderedSequenceNumber<String> offset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite different from the current logic in KinesisRecordSupplier#getSequenceNumber.
such as handling when closed or the Retry for kinesis.getRecords

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous exception handling has been added

@AmatyaAvadhanula
Copy link
Contributor Author

Can you also add some Integration test (See integration test group "kinesis-index")?

Added a few tests where empty shards are present

Are we still handling the case when shardIterator returned by kinesis.getShardIterator is null?

Yes, an end of shard marker is still being returned. If this doesn't happen immediately, the task will handle it appropriately after assignment.

Is the timeoutMillis still needed? Is the timeoutMillis use for handling when shards are temporary closed due to SplitShard or MergeShards operation?

Could you please clarify what timeoutMillis is?
If you mean fetchSequenceNumberTimeout, it is the time that we poll kinesis for records before determining that it is empty. This was needed as we had to get at least one record before beginning ingestion and kinesis may not return records at TRIM_HORIZON or LATEST in the first few iterations depending on when data was added.
It isn't needed now as the new marker is returned and the task will handle it appropriately.
This prevents multiple retries to fetch records and makes it faster as well.

Copy link
Contributor

@maytasm maytasm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this include a Design Review, we should get one more approval from a commiter

@@ -300,7 +300,6 @@ The `tuningConfig` is optional. If no `tuningConfig` is specified, default param
|`recordBufferSize`|Integer|Size of the buffer (number of events) used between the Kinesis fetch threads and the main ingestion thread.|no (default == 10000)|
|`recordBufferOfferTimeout`|Integer|Length of time in milliseconds to wait for space to become available in the buffer before timing out.| no (default == 5000)|
|`recordBufferFullWait`|Integer|Length of time in milliseconds to wait for the buffer to drain before attempting to fetch records from Kinesis again.|no (default == 5000)|
|`fetchSequenceNumberTimeout`|Integer|Length of time in milliseconds to wait for Kinesis to return the earliest or latest sequence number for a shard. Kinesis will not return the latest sequence number if no data is actively being written to that shard. In this case, this fetch call will repeatedly timeout and retry until fresh data is written to the stream.|no (default == 60000)|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if someone has this parameter in their ingestion spec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is ignored. (Just like any parameter which isn't relevant in the payload, if at all)

@@ -128,8 +128,7 @@ protected void possiblyResetDataSourceMetadata(
for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId());
String earliestSequenceNumber = recordSupplier.getEarliestSequenceNumber(streamPartition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pass this to the isOffsetAvailable method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since its an expensive call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's no longer being used here. Thanks for catching this!

);
}

@Override
public int compareTo(OrderedSequenceNumber<String> o)
{
KinesisSequenceNumber num = (KinesisSequenceNumber) o;
if (isUnread() && num.isUnread()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we fallback to comparing maxSequenceNumber in this case as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a boolean (isMaxSequenceNumber) which is used later. There is only one value to be compared per sequence number

@AmatyaAvadhanula
Copy link
Contributor Author

@abhishekagarwal87 thanks for review!

Could you please verify the changes in the latest commit as well? It tries to fall back to earliest sequence number comparison in case the primary check for kinesis offset availability fails.

@abhishekagarwal87 abhishekagarwal87 merged commit d294404 into apache:master Aug 5, 2022
@abhishekagarwal87
Copy link
Contributor

Thank you @AmatyaAvadhanula. I have merged your change.

@abhishekagarwal87
Copy link
Contributor

@AmatyaAvadhanula
Now that kinesis ingestion works with empty shards, we need to remove this limitation from the docs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants