-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Kinesis Indexing Service to core Druid #6431
Conversation
*/ | ||
public interface KafkaIndexTaskRunner extends ChatHandler | ||
public interface SeekableStreamIndexTaskRunner<T1, T2> extends ChatHandler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
neither the methods nor the interface has something related to the word seekable
so am not sure what it is called this way, probably StreamingIndexTaskRunner
is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this interface is meant to be used for seekable stream index tasks like Kafka and Kinesis index tasks, both of which requires the ability to seek
{ | ||
Appenderator getAppenderator(); | ||
|
||
/** | ||
* Run the task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please avoid repeating the code it self as a comment. In general if the comment does not add any information to the reader then there is no need for it. Reading the method signature i can tell that is running something since it is called run... same for stop.
|
||
@JsonCreator | ||
public SeekableStreamPartitions( | ||
@JsonProperty("stream") final String stream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure am getting this. when am i suppose to provide a stream or topic ? what does it means providing both?
Can you explain what this class is doing ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered using separate KafkaPartitions
and KinesisPartitions
initially, but later decided to remove them and just use this generic class instead. The problem is that when deserializing datasource metadata, Kafka uses the field topic
and Kinesis uses stream
, so I needed to map both topic and stream to the name of the stream, and that's why both stream and topic are in the constructor, it's meant so that either stream or topic need to be provided.
@JsonProperty | ||
public String getTopic() | ||
{ | ||
return stream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this shows that definition of the class it self is somehow not clear. method is getTopic but returns stream ?
Preconditions.checkArgument(this.stream != null); | ||
Preconditions.checkArgument(map != null); | ||
// Validate map | ||
for (Map.Entry<T1, T2> entry : map.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure what this is doing ? if you are checking null values then the immutableMap copy i think it does that.
Also if the goal is a null check the error message should be clear, like value is null this is invalid
@JsonProperty | ||
public Map<T1, T2> getPartitionSequenceNumberMap() | ||
{ | ||
return map; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same thing, not sure why we need both methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see above
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
SeekableStreamPartitions that = (SeekableStreamPartitions) o; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this implies if i create 2 instances with same stream string and different topic strings they are both equal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only 1 of stream, topic should be a non-null value. They both map to the name of the stream. There are both topic and stream in this class because KinesisDataSourceMeatadata uses stream
while KafkaDatasourceMeatadata uses topic
|
||
public Record(String streamName, T1 partitionId, T2 sequenceNumber, List<byte[]> data) | ||
{ | ||
this.streamName = streamName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
early in the code i see topic
, stream
, now this class is using yet another name streamName
can we use one term?
*/ | ||
public class Record<T1, T2> | ||
{ | ||
public static final String END_OF_SHARD_MARKER = "EOS"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure how END_OF_SHARD_MARKER
maps to EOS
private final T2 sequenceNumber; | ||
private final List<byte[]> data; | ||
|
||
public Record(String streamName, T1 partitionId, T2 sequenceNumber, List<byte[]> data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
am not very familiar with Kinesis, but wondering why data has to be ordered? can't we just use a Collection? or an Iterable ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my comment below
public interface RecordSupplier<T1, T2> extends Closeable | ||
{ | ||
/** | ||
* assigns a set of partitions to this RecordSupplier |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
repeating the method signature word for word. Such comments are not really needed, in fact they introduce more issues, imaging tomorrow someone refactor the code and change the name of method or name of the class RecordSupplier
this comment will be inaccurate.
I have looked at the abstractions/Api added by this PR and am not convinced that this is the best abstraction for Streaming Ingest Task. My biggest concerns is that this abstraction will only work with Kafka/Kinesis. For instance the proposed abstraction of Record as Record(String streamName, T1 partitionId, T2 sequenceNumber, List<byte[]> data) will not fit Both Pulsar and Pravega of of the box, as far i can tell there is no notion of partition in both system. Other question is that why the data is an ordered list of bytes ? does order really matter? Also This Pr models both the record Id and a position in the stream with the pair of Now i understand that this currently works nice with Kinesis and Kafka, but i am afraid that if we want to add support to another system will lead to an entire code refactor -> PR with yet 16K line refactor. Thus i really recommend to have a proposal as a design review and then go from there. |
I haven't read the patch yet, but I think it is useful to have an abstraction for ordered-partitionable streams (like Kafka and Kinesis have). It's because we have a lot of logic specifically around dealing with those kinds of systems when it comes to partition discovery, task creation, work assignment, and metadata handling. A lot of that shared code won't apply if the streams do not have an ordered-partition concept. It seems like Pulsar topics are ordered-partitionable (they can have partitions, and those partitions have messages with ordered sequence IDs) so it should be able to fall under the same abstraction. I am not familiar with Pravega so I don't know about that one. |
@gianm am not expert of Pulsar but if am not wrong Pulsar record id is per topic and not TopicPartition and i think Consumer don't have the choice over the partition...https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#subscription-modes. |
@b-slim thanks for the comments. The motivation for creating the SeekableStream abstraction is for seekable order partition-able streams, like Kafka and Kinesis, I should have been more specific in the pr description and naming the classes.
Seems like Pulsar supports partitions and offer strict ordering within each partition if failover subscription mode is used, so it should fit into the current abstraction. As for the final byte[] valueBytes = record.value();
final List<InputRow> rows = valueBytes == null
? Utils.nullableListOf((InputRow) null)
: parser.parseBatch(ByteBuffer.wrap(valueBytes)); and in if (deaggregate) {
data = new ArrayList<>();
final List<UserRecord> userRecords = UserRecord.deaggregate(Collections.singletonList(kinesisRecord));
for (UserRecord userRecord : userRecords) {
data.add(toByteArray(userRecord.getData()));
}
} else {
data = Collections.singletonList(toByteArray(kinesisRecord.getData()));
} I agree that I will update the pr description to include more details, and address the code reviews individually. |
6003167
to
3d54e85
Compare
Thanks @jsun98 for being willing to add more detail to the PR description. I'd suggest including,
I think it will help people understand the approach you took and make it easier to review it. |
@b-slim Also as an FYI - this is a module that our customers have been running in production for some time now |
@fjy please read my review, the questions I raised are about the abstraction mostly, not sure how this has to deal if this running on your customers clusters ....thanks. |
|
||
<dependency> | ||
<groupId>com.amazonaws</groupId> | ||
<artifactId>amazon-kinesis-client</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The KCL is licensed under the Amazon Software License.
I know this has had release implications for other Apache projects, I wonder if this should be considered by Druid as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. The Amazon Software License is forbidden for Apache projects based on https://www.apache.org/legal/resolved.html. In particular it has a field of use restriction, which is incompatible with Apache philosophy and license:
3.3 Use Limitation. The Work and any derivative works thereof only may be used or intended for use with the web services, computing platforms or applications provided by Amazon.com, Inc. or its affiliates, including Amazon Web Services, Inc.
The aws-java-sdk-kinesis library does not have any such limitation (it is Apache licensed). Is it possible to use this instead of amazon-kinesis-client? If so, we should do that. If not, I think we can still ship the kinesis extension, but we cannot include amazon-kinesis-client. It should be something users download separately and drop in the jar for on their own: https://www.apache.org/legal/resolved.html#optional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW this is the same basic situation as the mysql extension is in (the mysql driver is under a category X license) so we should probably package it the same way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gianm it seems that aws-java-sdk-kinesis would work except it doesn't have UserRecord.deaggregate
, which is needed in the KinesisRecordSupplier implementation. Do you know of any work arounds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assuming that the KCL was used here in the "normal" manner, ie handling leases (including balancing and checkpointing), resharding, retries, backoff, etc, there will be a quite large amount of work to be done to remove the KCL and still have a stable kinesis reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dyanarose ah, we don't actually use the KCL in the "normal" manner here; the library is included mainly for IKinesisProxy
which is the Java wrapper around the HTTP API. We handle the balancing, checkpointing, and resharding in the indexing service logic (out of necessity to be able to support exactly-once read semantics)
@gianm included the points you mentioned in the PR description |
Thanks!! |
3d54e85
to
00c612e
Compare
toolbox | ||
); | ||
|
||
stillReading = !assignment.isEmpty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you add a comment about assignment
can be changed in getRecords
?
continue; | ||
} | ||
|
||
if (log.isTraceEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this.
} | ||
|
||
@Override | ||
public void onFailure(@ParametersAreNonnullByDefault Throwable t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This annotation is for class or package, not for individual parameter. We're already using this and you can safely remove it from this class.
*/ | ||
public class SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> | ||
{ | ||
public static final String NO_END_SEQUENCE_NUMBER = "NO_END_SEQUENCE_NUMBER"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you please add a comment about what this is?
private boolean checkpointed; | ||
|
||
/** | ||
* Lock for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two variables can be package-private.
private boolean checkpointed; | ||
|
||
/** | ||
* Lock for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be private.
@Json ObjectMapper mapper | ||
) | ||
{ | ||
super( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put these on one line
return false; | ||
} | ||
KinesisIndexTaskTuningConfig that = (KinesisIndexTaskTuningConfig) o; | ||
return getMaxRowsInMemory() == that.getMaxRowsInMemory() && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing maxTotalRows
, maxRecordsPerPoll
, and intermediateHandoffPeriod
in equals, hashCode, and toString
@Override | ||
public String getPosition(StreamPartition<String> partition) | ||
{ | ||
throw new UnsupportedOperationException("getPosition is not supported in Kinesiss"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: Kinesis
try { | ||
recordsResult = kinesis.getRecords(new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1000)); | ||
} | ||
catch (ProvisionedThroughputExceededException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should let the user know when we get this exception so that they understand what's happening (and maybe can take corrective action by scaling up).
List<Record> records = recordsResult.getRecords(); | ||
|
||
if (!records.isEmpty()) { | ||
return records.get(0).getSequenceNumber(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're only ever expecting one record here, can you check and throw an exception if you get more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is fine if we get more than 1 record, kinesis.getRecords(new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1000));
fetches up to 1000 records to make sure we can find the first record in this shard (in the case where the shard is constantly removing records due to records being past retention period).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, I remember this conversation now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last set of review comments from me. I am good with this after these (+ existing comments) are addressed. Thank you for the hard work on this PR!
@JsonValue | ||
public String toString() | ||
{ | ||
return StringUtils.toLowerCase(name()).replace('_', '-'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add tests to KinesisRegionTest to test getEndpoint()
, and in particular make sure that the China regions return kinesis.cn-north-1.amazonaws.com.cn
(with the .cn at the end)
import org.apache.druid.metadata.DefaultPasswordProvider; | ||
import org.apache.druid.metadata.PasswordProvider; | ||
|
||
public class ConstructibleAWSCredentialsConfig extends AWSCredentialsConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class isn't needed anymore; you can just pass the AWSCredentialsConfig
that you get from configuration to KinesisRecordSupplier.getAmazonKinesisClient()
instead of calling the getter on the access key and secret key and then constructing a new instance of ConstructibleAWSCredentialsConfig
new SeekableStreamPartitions<>(ioConfig.getStream(), startPartitions), | ||
new SeekableStreamPartitions<>(ioConfig.getStream(), endPartitions), | ||
true, | ||
true, // should pause after reading otherwise the task may complete early which will confuse the supervisor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pauseAfterRead
is never used (and not needed, it was removed from the Kafka implementation)
@Override | ||
public String toString() | ||
{ | ||
return "KinesisSupervisorSpec{" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add context
here?
@Override | ||
public String toString() | ||
{ | ||
return "KinesisSupervisorTuningConfig{" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add maxBytesInMemory
, maxTotalRows
, logParseExceptions
, maxParseExceptions
, maxSavedParseExceptions
, maxRecordsPerPoll
, and intermediateHandoffPeriod
. I would also just remove buildV9Directly
from the toString if the parent class is doing nothing with it.
return dataSource; | ||
} | ||
|
||
public String getStream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing @JsonProperty ?
} | ||
|
||
@Override | ||
public abstract String toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is strictly necessary to enforce
} | ||
|
||
@Override | ||
public abstract String toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is strictly necessary to enforce
Duration getShutdownTimeout(); | ||
|
||
@Override | ||
String toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is strictly necessary to enforce
import java.util.Random; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
|
||
public class RandomId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think by convention we suffix a helper class like this with Utils
ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), | ||
currRecord.getSequenceNumber() | ||
).getShardIterator(); | ||
rescheduleRunnable(EXCEPTION_RETRY_DELAY_MS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this. This looks good except that we should return immediately on InterruptedException.
} | ||
catch (SdkClientException e) { | ||
log.warn(e, "encounted unknown AWS exception, retrying in [%,dms]", EXCEPTION_RETRY_DELAY_MS); | ||
rescheduleRunnable(EXCEPTION_RETRY_DELAY_MS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we can also check this exception is transient or not using S3Utils.isServiceExceptionRecoverable
.
private class PartitionResource | ||
{ | ||
private final StreamPartition<String> streamPartition; | ||
private final Object startLock = new Object(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I'm not sure what this lock is for. What threads can call startBackgroundFetch()
at the same time?
} | ||
|
||
@Override | ||
public void onFailure(@ParametersAreNonnullByDefault Throwable t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this annotation too.
``` | ||
-Ddruid.kinesis.accessKey=123 -Ddruid.kinesis.secretKey=456 | ||
``` | ||
The AWS access key ID ad secret access key are used for Kinesis API requests. If this is not provided, the service will look for credentials set in environment variables, in the default profile configuration file, and from the EC2 instance profile provider (in this order). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: The AWS access key ID ad secret access key.. -> 'and'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The latest change looks good to me. Thank you for your hard work @jsun98!!
catch (ProvisionedThroughputExceededException e) { | ||
log.warn( | ||
e, | ||
"encounted ProvisionedThroughputExceededException while fetching records, this means " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: encounted -> encountered
Also, you can recommend that they can increase the number of shards to get more throughput
} | ||
|
||
|
||
private static String makeTaskId(String dataSource, int randomBits, String type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
randomBits
no longer used
LGTM, thank you @jsun98! |
@@ -312,7 +312,7 @@ compatible because they have a different ingestion spec or partition allocation, | |||
supervisor will create a new set of tasks. In this way, the supervisors are persistent across overlord restarts and | |||
fail-overs. | |||
|
|||
A supervisor is stopped via the `POST /druid/indexer/v1/supervisor/<supervisorId>/shutdown` endpoint. This places a | |||
A supervisor is stopped via the `POST /druid/indexer/v1/supervisor/<supervisorId>/terminate` endpoint. This places a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this backward incompatible change or this is fixing the doc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/shutdown
and /terminate
are equivalent since #6272, although /shutdown
is deprecated. (On the grounds that the difference between 'suspend' and 'terminate' is more clear than 'suspend' and 'shutdown'.)
So this was just fixing the doc.
|
||
@Override | ||
public String toString() | ||
protected SeekableStreamDataSourceMetadata<Integer, Long> createConcreteDataSourceMetaData( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does concrete adds to the meaning here ? is there a non concrete DatasourceMetadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the non concrete one is called SeekableStreamDataSourceMetadata
.
} | ||
|
||
@Override | ||
public void seek(StreamPartition<Integer> partition, Long sequenceNumber) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i really i do not see the need to call a partition as StreamPartition...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As opposed to Partition
, you mean? I guess it's because StreamPartition is clearer?
@Override | ||
public Long getLatestSequenceNumber(StreamPartition<Integer> partition) | ||
{ | ||
Long currPos = consumer.position(new TopicPartition(partition.getStream(), partition.getPartitionId())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not bump the kafka version to 0.11 and use https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#beginningOffsets(java.util.Collection) and https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's another PR in the works -- #6496 -- to upgrade to Kafka 2.x, from @surekhasaharan, that we were planning to do this in after this current PR has been merged (to avoid conflicts). There were other changes needed too in order to make the upgrade work smoothly (check out that PR). So IMO it makes sense to leave this as is, & we can make the new Kafka API changes over in #6496.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider this a third design review, at the level of the abstractions. They look good to me, judging by the fact that the Kafka and Kinesis implementations are relatively minimal in what they need to override (except LegacyKafkaIndexTaskRunner, which is huge, but we should be removing that soonish anyway).
Thanks @jsun98 for your work on this patch!!
I don't think i can get to re-review the entire thing before next year, so if there is enough consensus that this is good to go, then good it is. |
Major changes to review
SeekableStreamSupervisor
and Kafka/Kinesis implementation classesSeekableStreamIndexTask
and Kafka/Kinesis implementation classesSeekableStreamIndexTaskRunner
and Kafka/Kinesis implementation classesRecordSupplier
and Kafka/Kinesis implementation classesMotivation
The motivation for this PR is to add
kinesis-indexing-service
to core druid extensions and provide an abstractionseekablestream
for streaming systems with the following characteristics:Major changes
The logic inKinesisIndexTask
andKafkaIndexTask
has not been merged yet, due to Kinesis not yet supporting incremental handoff, but this will be implemented in a follow-up PR. The code in the two index tasks are left as is for now.incremental handoff in now implemented for Kinesis Index Task
The unit tests for Kinesis depends on LocalStack, a local AWS testing stack, which requires a running Docker instancedocker dependency is no longer required as mocks are used instead
Definition of Abstractions
SeekableStreamSupervisor
merged the logic from KinesisSupervisor and KafkaSupervisor, most notably the logic in
runInternal()
and the helper methods.RecordSupplier
abstraction for fetching records from Kafka Server/Kinesis Stream. The implementation for Kafka is trivial as Kafka provides a
KafkaConsumer
which basically encapsulates all of the logic. KinesisRecordSupplier on the other hand is a full-fledged buffer implementation.SeekableStreamIndexTaskRunner
merged logic from KinesisIndexTask (which didn't have the TaskRunner abstraction) and IncrementalPublishingKafkaTaskRunner. There is a slight discrepancy in fetching the next sequence number/offset in which the kafka task will read from starting offset (inclusive) to end offset (exclusive) while the Kinesis task reads from start sequence number (inclusive for 1st task, exclusive for all subsequent tasks) to end sequence number (inclusive). This is because Kinesis sequence numbers cannot be calculated simply like Kafka's, so instead of making an expensive API call to get the next sequence number, we store the current-already-read-to sequence number instead.
OrderedPartitionableRecord
,SeekableStreamPartition
,SeekableStreamPartitions
abstractions over the Kafka/Kinesis partition/shard, offset/sequence number concepts. These classes help to avoid working with Kafka/Kinesis specific types directly
Compatibility with other streaming systems
This abstract design is specifically aimed at partition-able, partition-ordered stream systems such as Kafka and Kinesis. Systems that do not have partitions or do not offer ordering within partitions are not good fits under this abstraction.