Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supervisor for KafkaIndexTask #2656

Merged
merged 2 commits into from
May 5, 2016
Merged

Supervisor for KafkaIndexTask #2656

merged 2 commits into from
May 5, 2016

Conversation

dclim
Copy link
Contributor

@dclim dclim commented Mar 15, 2016

Manages the creation and monitoring of KafkaIndexTasks based on a SupervisorSpec submitted to /druid/indexer/v1/supervisor. See #1642 for realtime ingestion improvements overview. Closes #2635.

@dclim dclim added this to the 0.9.1 milestone Mar 15, 2016
@fjy
Copy link
Contributor

fjy commented Mar 15, 2016

@dclim this fails UT

@dclim
Copy link
Contributor Author

dclim commented Mar 15, 2016

Should be fixed now, thanks!

@dclim
Copy link
Contributor Author

dclim commented Mar 15, 2016

Here are more details on what is included and not included in this PR:

Kafka supervisor manages the creating and monitoring of Kafka indexing tasks, which are tasks that ingest events from Kafka using offset ranges instead of time intervals (see #2220). A supervisor is created by providing a supervisor spec which includes fields for (among other things):

  • the Kafka topic to ingest
  • the number of indexing tasks to spawn
  • the number of events each task should handle before persisting a segment and completing
  • the number of indexing task replicas
  • an ingestion spec and configuration that will be passed down to the created Kafka indexing tasks

The supervisor will then periodically run the following sequence of steps:

  • get the current number of partitions for the topic from Kafka
  • get the current running tasks for the data source we are supervising
  • using this information, make sure that we have the expected number of tasks processing the Kafka partitions

As an example, say we have a Kafka topic with 6 partitions, we want to split these partitions across 2 tasks, with each task handling 3000 events, and we want a replication factor of 2. This will create 4 tasks:

  • Task 1: (partitions 0, 2, 4), offsets (P0: 0-1000, P2: 0-1000, P4: 0-1000) - Replica 1
  • Task 2: (partitions 1, 3, 5), offsets (P1: 0-1000, P3: 0-1000, P5: 0-1000) - Replica 1
  • Task 3: (partitions 0, 2, 4), offsets (P0: 0-1000, P2: 0-1000, P4: 0-1000) - Replica 2
  • Task 4: (partitions 1, 3, 5), offsets (P1: 0-1000, P3: 0-1000, P5: 0-1000) - Replica 2

(Replica tasks are created with the same availability group so that they will be executed on different nodes.)

If:

  • Task 1 dies: the supervisor will create another task with the same parameters to maintain the desired replication factor:
    • [Task 5: (partitions 0, 2, 4), offsets (P0: 0-1000, P2: 0-1000, P4: 0-1000)]
  • Task 1 succeeds: the supervisor will terminate redundant replicas after the segment has been published and will spawn tasks to handle the next range of offsets:
    • kill Task 3
    • create [Task 5: (partitions 0, 2, 4), offsets (P0: 1000-2000, P2: 1000-2000, P4: 1000-2000) - Replica 1]
    • create [Task 6: (partitions 0, 2, 4), offsets (P0: 1000-2000, P2: 1000-2000, P4: 1000-2000) - Replica 1]

As of now, this implementation is fully functional but has a number of areas that can be refined to be more efficient / a better user experience (numbered to facilitate discussion):

  1. Currently, the supervisor waits for the previous set of tasks to complete before submitting the next set of tasks to handle the next offset range. This results in a pause in processing due to the time it takes to build/push/load the segment. No data is lost (assuming Kafka buffers all the messages during that time) but ingestion is bursty. We should be preemptively adding tasks to handle the next set of offsets as the previous set near the end of their range so that the new tasks begin handling data while the previous set are persisting. This requires being able to talk to the peons the tasks are running on and is enabled by Plumb task peon host/ports back out to the overlord. #2419 .

  2. Currently, when a task completes (the segment is published and has been loaded by a historical), the supervisor will kill any replica tasks that are doing the same thing since it is now redundant. This generates a failed status, where it would be better to have it either show a successful or stopped status.

  3. Currently, the supervisor itself does not persist in any way and will die if the overlord crashes, restarts, or loses leadership in any other way. The supervisor spec should be written to metadata storage and read on overlord startup so that it can survive failures and cluster upgrades without the user needing to resubmit it every time. There already is an endpoint that can be used to stop running supervisors.

  4. Currently, all replicas are given an identical set of partitions/offset ranges to process and then independently do their thing. This produces correct final output, but could potentially lead to realtime queries returning different results, depending on which worker is queried by the broker and if that worker is unable to keep up with the data being pushed into Kafka and is lagging behind. A solution would be to implement a leader/follower mechanism where the followers would check with the leader to determine what offsets they should be reading which would improve synchronization between tasks.

  5. Currently there is no graceful shutdown mechanism for Kafka indexing tasks, which means that the supervisor is unable to provide any way of early stopping of spawned tasks. When the supervisor is stopped, it leaves the tasks running and they will continue to run until they have ingested up to the ending offset for all the partitions they are reading from. Early stopping (where the supervisor can tell the running tasks to stop ingesting, persist what they currently have, and then end) would make things like rolling updates somewhat nicer.

  6. Currently, when a new supervisor starts up, it gets a list of running tasks and kills any Kafka indexing task that is building segments for its data source that it did not create (it actually does this every time the main loop runs). This is to prevent situations where tasks created by a previous supervisor are running but have a different ingestion spec that will lead to offsets from Kafka being ingested twice (as the segment's sequenceName is based on the partitions/offsets being read, so a task reading different partitions will generate a different sequenceName which will generate a new segment containing duplicated data). It would be better if the supervisor inspected the Kafka indexing task to see if it was processing the same partitions/offsets and allow it to finish if it is safe, even if it wasn't the one who created the task.

@dclim
Copy link
Contributor Author

dclim commented Mar 15, 2016

@himanshug @gianm as discussed during the dev sync, I've added some details into what is included in this PR and what enhancements could be added. Let me know your thoughts.

@nishantmonu51 nishantmonu51 self-assigned this Mar 16, 2016

private final Map<String, Pair<Supervisor, SupervisorSpec>> supervisors = new HashMap<>();

public Set<String> getSupervisors()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to getSupervisorIDs ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, thanks!

@dclim
Copy link
Contributor Author

dclim commented Mar 17, 2016

Added improvement 6: supervisor no longer kills all tasks on startup, but checks to see if the task matches the supervisor's spec and expected offsets and starts tracking it if it conforms. If it doesn't match, it will kill the task to prevent ingesting duplicate events.

@himanshug
Copy link
Contributor

@dclim nice write up. here are the things, i think, we need to have in the order of priority.

I believe number (3) is a MUST have . Supervisor must persist its state in metadata store because overlord going down or changing leadership is a very common case, even in case of rolling deployment.

(5) about early stopping of tasks and related "user supplies number of events a task will handle", it is very difficult for users to figure out that number and many times it could change depending on hour of the day, day of the week. This might impact rolling deployment also where some tasks are never finishing.

(2) about remaining replica's showing FAILED status is confusing to users. It will require users to very carefully look at the overlord console task statuses to say whether ingestion is working fine or if there are genuine failures.

log.debug("Task group [%d] pre-pruning: %s", groupId, taskGroup.taskStatuses);
String successfulTaskId = null;
Iterator<Map.Entry<String, Optional<TaskStatus>>> it = taskGroup.taskStatuses.entrySet().iterator();
while (it.hasNext()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not remove the TaskGroup for which all tasks were killed here from the taskGroups map otherwise new tasks will be again spawned for this TaskGroup at Line 397 ? Is that correct ? Am I missing something ?
Consider this scenario, if the taskCount is set to 3 and number of kafka partitions is 3, now i decreased the number of partitions form 3 to 1, this would cause all tasks to be killed in two TaskGroups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjain1 removing the TaskGroup when all tasks are killed would be okay; the only (minor) downside is that with the current logic the new retry tasks wouldn't be enqueued until the next cycle. However, I don't believe this is necessary since Kafka does not support decreasing the number of partitions, and I can't think of any other reason that the number of task groups would decrease.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, does this PR handles the scenarios where one increases or decreases the taskCount and restarts the kafka supervisor ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only one situation where partitions can decrease- a kafka user deletes a topic and re-creates it with fewer partitions. But in that case it's not really the "same" topic (offsets are gonna get re-used too) so the supervisor is gonna get really confused and need to be re-set.

we could help people out in this situation with a "hard reset" option that stops the supervisor, and wipes the datasource metadata. that would let them resume from the start or end of the new topic, as they wish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjain1 yes, right now what would happen is the supervisor would come back up, discover the existing running tasks, and notice that they are processing a different allocation of partitions than what is specified in its spec (since the number of tasks have changed so the number of partitions processed per task also changed) - it would then forcibly kill these tasks and create new ones which would start reading from the last persisted offsets which are stored in the datasource metadata table. Assuming that Kafka's buffer is large enough that these events haven't been dropped, no data would be lost.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dclim great...I didn't knew that kafka does not allow decreasing the number of partitions in which case it should be fine

@fjy
Copy link
Contributor

fjy commented Mar 21, 2016

@pjain1 any more comments?

@gianm gianm added the Discuss label Mar 21, 2016
@gianm
Copy link
Contributor

gianm commented Mar 21, 2016

IMO the importance of the issues @dclim raised are, from most important to least,

  • 3 (supervisor persisting) is critical
  • 6 (avoiding killing "safe" tasks on startup) is important to make people feel like it's safe to restart overlords whenever they want (which is currently the case, and we want to keep it that way)
  • 2 (unnecessary replicas being killed and FAILED) is important, as the current behavior will be confusing for users like @himanshug pointed out
  • 1 (eliminating ingestion pauses) is going to be critical for users that have a real real-time requirement, and nice to have for other users
  • 5 (early stopping) would be nice for getting schema updates out faster. Kafka tasks support restoreTasksOnRestart so I think this is not really needed for rolling updates (the tasks can restore after a MM restart)
  • 4 (keeping replicas in sync) is nice but least important

Of those I think 3, 6, 2 are important to do before releasing the feature at all.

I think we can live without 1 for the first release but should do it in a follow-up.

I think 5, 4 are nice improvements but less critical than the others.

@himanshug - does this sound reasonable to you?

@himanshug
Copy link
Contributor

@gianm from #2656 (comment) , i believe that (6) is already done.
I agree with 3,6,2 being important.

(4) would be nice to have but very hard to do without adding complexity , not having it is fine. also, (1) is not too critical at least for now in the first cut.

currently user is required to set "the number of events each task should handle before persisting a segment and completing" . it will be very convenient if this is dynamically figured out by the system automatically. I thought (5) could probably help support this too in addition to speeding up schema updates as you pointed.

@pjain1
Copy link
Member

pjain1 commented Mar 22, 2016

@fjy I 👍 on @gianm suggestion apart from that I do not have any more comments

@dclim
Copy link
Contributor Author

dclim commented Mar 22, 2016

@himanshug thanks for your feedback - what were you thinking of as an alternative to having the user specify the number of events a task will handle? It might be interesting if we allowed the user to specify a target for how long they want the task to run and then run an initial task to ingest a small number of events to get a feel of the events/sec and then use this to tune our offset ranges. Also like you said, if we had graceful shutdown we could guarantee tasks live as long as we want them to. Is that along the lines of what you were thinking? (have the user set the lifetime of the task in time vs number of events)

@himanshug
Copy link
Contributor

@dclim as discussed on dev sync it will be nice if supervisor starts the tasks with some very large arbitrary end-offset and then forces the push if/when segments reach some size threshold or some time (e.g. 30 mins) has elasped.

@dclim
Copy link
Contributor Author

dclim commented Apr 22, 2016

Update:
The Kafka supervisor has been reworked and includes the following refinements:

  • Task lifetime is now based on time rather than a number of ingested events - as @himanshug pointed out, this is easier for users to reason about and better handles fluctuating ingestion rates (or even situations where no events are ingested at all in a time period)
  • 1 from above (subsequent tasks are started immediately instead of waiting until the previous set has finished publishing)
  • 2 from above (tasks stopped by the supervisor report a SUCCESS status with log messages in the task showing that it was asked to stop early)
  • 3 from above (supervisor specs are persisted in metadata storage and will automatically load and begin running when the overlord starts (or takes leadership)
  • 5 from above (KafkaIndexTasks can be stopped early and instructed to publish - telling the supervisor to shutdown via the HTTP API will cause a controlled (synchronized across replicas) early publish and shutdown of the managed tasks)
  • 6 from above (on startup, supervisor now inspects and adopts already-running tasks that match the supervisor's spec and expected start offset)

The supervisor logic has changed somewhat from the above description to accommodate time-based task lifetimes and no pauses between tasks. I'll post an updated overview of the mechanism, but as is this PR should be ready for review.

@dclim
Copy link
Contributor Author

dclim commented Apr 25, 2016

Some additional notes regarding the time-based task lifetime design:

Previously, the supervisor created KafkaIndexTasks, provided a starting and ending offset, and when the task completed, the supervisor created another task (to handle the next set of offsets if the task succeeded or to reprocess the same range of offsets if the task failed). With the updated design, the user provides a taskDuration value in the supervisor spec which is used to manage the lifetime of tasks based on time rather than starting and ending offsets. The high-level mechanism works as follows:

  1. When the supervisor starts up, assuming no tasks are already running, it will create up to taskCount * replicas number of tasks. Each of these tasks will be provided with a start offset per partition corresponding to the last offset that was read and published in a segment (from the dataSource metadata table) or if this doesn't exist, either the earliest or latest offset from Kafka depending on configuration. The end offset is now always set to Long.MAX_VALUE. This is because we can't know how many events we will be reading during taskDuration so we initially set the tasks to read indefinitely.

  2. Once the task is accepted by a worker and is started, the supervisor queries the task for its starting time and stores this information. It schedules an event to fire at startTime + taskDuration.

  3. When the event is fired, the supervisor then executes the logic to instruct the tasks to end reading and begin publishing their segments in a coordinated way. Coordination is important, because if we just told the tasks to stop and publish, replica tasks which are expected to have read the same data and publish exactly the same segments are not guaranteed to produce identical segments since the Kafka consumers might be at different offsets for different partitions when they receive the stop command. To coordinate the tasks, we do the following:

    a. Tell all tasks handling a particular set of partitions (in the code referred to as a task group) to pause and return the offsets it had read up to before it was paused.

    b. Build a list of the highest offset for each partition read by any of the tasks.

    c. Tell all of the tasks to change their ending offset from Long.MAX_VALUE to the list generated in step b. Using the highest offset (and not for example the highest offset + 1) guarantees that all the data is available in Kafka (since at least one of the tasks has already read it) and that the tasks will always be able to complete.

    d. Tell all of the tasks to resume reading.

    The tasks will then all read to the same ending offset for every partition and begin publishing their segment.

  4. The supervisor will then immediately create tasks to continue reading from Kafka starting from the ending offsets determined in step 3. This will allow for continuous ingestion from Kafka (with a few seconds of delay as the new task is accepted by a worker and starts up). At the same time, the previous tasks which are now publishing are moved to a list of 'pending completion' tasks and assigned a timeout specified by the completionTimeout config.

  5. If a task which is supposed to be publishing does not finish before the timeout, the task is deemed to have failed and is terminated. Furthermore, segments generated by KafkaIndexTasks must contain contiguous offsets, so a failure to publish by a previous task means that the next tasks that were reading from the same partitions are now invalid. The supervisor handles this by killing the tasks reading those partitions and then re-creating new tasks that start from the last successfully published offset (tracked in the dataSource table).

@dclim dclim removed the Discuss label Apr 25, 2016
@pjain1
Copy link
Member

pjain1 commented Apr 25, 2016

Does the completionTimeout should always be less than taskDuration ? otherwise in a situation where are two sets of pending tasks the older set may fail but the newer set may publish. I guess it may not happen because of metadata equal checks in updateDataSourceMetadataWithHandle method ?

@dclim
Copy link
Contributor Author

dclim commented Apr 25, 2016

@pjain1 good question - there shouldn't need to be any condition on completionTimeout < taskDuration. If there are two sets of pending tasks (plus the currently reading task set) and the older set fails, it will kill itself, the newer pending tasks, as well as the currently reading set since all of them will now be producing invalid (non-contiguous) segments. The supervisor will then create new tasks that'll start from the offsets in the last successful segment (which would be the starting offsets of that older task set that failed).

In general, you wouldn't want time-to-publish >> taskDuration, since that would mean that you'd be spawning new tasks faster than they're completing and your workers will eventually run out of capacity, but in terms of correctness, there shouldn't be any issues.

@himanshug
Copy link
Contributor

@dclim can you add some user documentation ?

@dclim
Copy link
Contributor Author

dclim commented Apr 26, 2016

Yes, will write some up.

@dclim
Copy link
Contributor Author

dclim commented Apr 28, 2016

@himanshug docs added: https://github.com/dclim/druid/blob/kafka-supervisor/docs/content/development/extensions-core/kafka-ingestion.md

@schmee
Copy link

schmee commented Apr 28, 2016

@dclim I read through the docs and noticed that the ioConfig takes a topic, unlike Tranquility that takes a topicPattern. Is this a fundamental limitation of the new KafkaIndexTask? If so, what is the best way to emulate how Tranquility handles topics?

@dclim
Copy link
Contributor Author

dclim commented Apr 28, 2016

@schmee yeah, the KafkaIndexTask was written to read a single topic only; I don't believe that it's a fundamental limitation, but some work would be required to support topic patterns. We use topic patterns in our Druid cluster as well so I definitely think there's value in supporting them.

What is your use case for topicPatterns? Are you reading from multiple topics into one dataSource or multiple topics into multiple dataSources (using the same ingestion spec)?

@@ -83,7 +86,8 @@ public MetadataStorageTablesConfig(
@JsonProperty("tasks") String tasksTable,
@JsonProperty("taskLog") String taskLogTable,
@JsonProperty("taskLock") String taskLockTable,
@JsonProperty("audit") String auditTable
@JsonProperty("audit") String auditTable,
@JsonProperty("supervisors") String supervisorTable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs doc update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

@himanshug
Copy link
Contributor

@gianm @dclim @schmee i agree about keeping scope of this PR to single topic support, it would be nice to have that in future.. i would let this impl get settled a little bit by releasing and running on some production clusters and iron out [if any] stability issues first.

+ "FROM %1$s r "
+ "INNER JOIN(SELECT spec_id, max(version) as version FROM %1$s GROUP BY spec_id) latest "
+ "ON r.spec_id = latest.spec_id and r.version = latest.version",
getSupervisorsTable()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limiting items in history may potentially help the performance of this query too.

@himanshug
Copy link
Contributor

@dclim
👍 besides #2656 (comment) , i know it does not matter if history stays small in size but will leave it up to you.


# Kafka Ingestion

The recommended way of ingesting data from Kafka is to use the `kafka-indexing-service` core extension (see
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the optimism but this language is a bit strong for first release :)

The language here should be telling people that this is an experimental feature, API subject to change, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha, sounds good

@dclim
Copy link
Contributor Author

dclim commented May 5, 2016

Fixed an issue where if the supervisor crashed after signalling a task to begin publishing but before creating the next task, the succeeding supervisor would create the new task with the same starting offsets as the publishing task. It will now create the new task starting from where the publishing task ended. Also added a test for this.

@gianm
Copy link
Contributor

gianm commented May 5, 2016

Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 4.086 sec <<< FAILURE! - in io.druid.server.coordinator.DruidCoordinatorTest
testCoordinatorRun(io.druid.server.coordinator.DruidCoordinatorTest)  Time elapsed: 1.918 sec  <<< FAILURE!
java.lang.AssertionError: expected:<1> but was:<0>
    at org.junit.Assert.fail(Assert.java:88)
    at org.junit.Assert.failNotEquals(Assert.java:743)
    at org.junit.Assert.assertEquals(Assert.java:118)
    at org.junit.Assert.assertEquals(Assert.java:555)
    at org.junit.Assert.assertEquals(Assert.java:542)
    at io.druid.server.coordinator.DruidCoordinatorTest.testCoordinatorRun(DruidCoordinatorTest.java:374)

@gianm gianm closed this May 5, 2016
@gianm gianm reopened this May 5, 2016
@gianm
Copy link
Contributor

gianm commented May 5, 2016

@dclim looking good!

👍

@gianm gianm merged commit b489f63 into apache:master May 5, 2016
seoeun25 pushed a commit to seoeun25/incubator-druid that referenced this pull request Jan 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants