Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Index Task that supports Incremental handoffs #4815

Merged
merged 14 commits into from Nov 17, 2017
Merged

Kafka Index Task that supports Incremental handoffs #4815

merged 14 commits into from Nov 17, 2017

Conversation

pjain1
Copy link
Member

@pjain1 pjain1 commented Sep 16, 2017

Hopefully fixes #4016, #4177, #4812, #4781, #4743 and #5046
Partially fixes #4498 and #4693
Alternate to #4178 and #4175

  • Incrementally handoff segments when they hit maxRowsPerSegment limit
  • Decouple segment partitioning from Kafka partitioning, all records from consumed partitions go to a single druid segment
  • Support for restoring task on middle manager restarts by check pointing end offsets for segments
  • Incremental handoff is NOT supported with pauseAfterRead feature

Design -

  • Currently a KafkaIndexTask only creates and publishes segments corresponding to a single sequenceName.
  • With this PR, Introduced ability to incrementally hand-off segments by by starting a new sequence when maxRowsInSegment limit is reached and publishing all the segments for the previous sequence.
  • New Sequences are created by concatenating base sequence name with monotonically increasing id called sequenceId. It starts with 0.
  • A sequence corresponds to a range of Kafka offsets. SequenceMetadata class is added to KafkaIndexTask class to maintain metadata about a sequence. Metadata for all the sequences in the task is persisted to disk whenever a new sequence is created or deleted or end offsets for a sequence are set. This also enables restore on restart.
  • End offsets for the latest sequence is set whenever AppenderatorDriverAddResult.getNumRowsInSegment() returned by add call of AppenderatorDriver is greater than maxRowsInMemory.
    -At this point the task will pause and send CheckPointDataSourceMetadataAction to the Supervisor, which will use the same logic as is used for finishing all the tasks in the TaskGroup and call setEndOffsets on the replica tasks with finish flag set to false to indicate that current sequence should be finished and published and new sequence should be created for messages having offsets greater than the set end offsets. The new checkpoint information will be stored in the TaskGroup.
  • At Supervisor list of SequenceMetadata is not maintained for tasks in TaskGroup rather a sorted map of <SequenceId, Checkpoint> is maintained. Checkpoint always corresponds to the start offsets of a Sequence.
  • Whenever a new task is started by the Supervisor it always sends the map of checkpoints for the TaskGroup in the context field. Using this checkpoints information, the task creates list of SequenceMetadata and sets it start offsets to the start offset of the first sequence.
  • Since task always starts consuming from the start offset of first sequence, it is necessary for the Supervisor to remove checkpoints from the TaskGroup for which segments have been published so that new tasks are not sent stale checkpoints thus preventing duplicate work.
  • Supervisor does this clean up lazily only when it starts new tasks in replacement of a failed task or creates a new TaskGroup or discover new tasks for a TaskGroup (includes the case when it restarts). This is done by getting checkpoints information from the running tasks and verifying them with the offsets information in the metadata store. It also kills inconsistent task, refer to verifyAndMergeCheckpoints method.

Some more information about specific issues -

@pjain1
Copy link
Member Author

pjain1 commented Sep 16, 2017

Will add design details by Monday Done

@jihoonson
Copy link
Contributor

@pjain1 thanks for your work on this issue. I'll review soon.

Copy link
Contributor

@himanshug himanshug left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partially reviewed, will continue.

)));
this.topic = ioConfig.getStartPartitions().getTopic();
this.sequences = new CopyOnWriteArrayList<>();
this.publishExecService = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-driver"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to create executor service somewhere else instead of constructor or this executor would be created/started every time a KafkaIndexTask instance is created e.g. when user supervisor creates them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably startExecutors() is the right place.

@@ -276,13 +318,124 @@ public KafkaIOConfig getIOConfig()
return ioConfig;
}

private void startExecutors()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call this createPublishExecService() ?

@@ -276,13 +318,124 @@ public KafkaIOConfig getIOConfig()
return ioConfig;
}

private void startExecutors()
{
// start publish executor service
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary comment

{
// start publish executor service
publishExecService.submit(
(Runnable) () -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is type cast to Runnable needed ?

if (getContext() != null && getContext().get("checkpoints") != null) {
log.info("Got checkpoints [%s]", (String) getContext().get("checkpoints"));
final TreeMap<Integer, Map<Integer, Long>> checkpoints = toolbox.getObjectMapper().readValue(
(String) getContext().get("checkpoints"), new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it required for checkpoints map to be represented as string in context ? can't we have non-string structures stored in task context ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context can hold anything but as far as I remember there was serialization issue that's why I chose String. Will look at it again.

@pjain1
Copy link
Member Author

pjain1 commented Oct 3, 2017

Added fix for #4743

@pjain1 pjain1 added this to the 0.11.1 milestone Oct 9, 2017
@dclim
Copy link
Contributor

dclim commented Oct 10, 2017

thanks @pjain1, I've started reviewing this and will post comments in the next few days

Copy link
Contributor

@dclim dclim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partially reviewed, seems pretty sound so far.

Do you have any performance benchmarks before and after the changes, even anecdotally? I'm interested in whether any of the additions to the while (stillReading) loop had any measurable impact on throughput or maybe even if throughput is increased for Kafka topics with many partitions since you're allocating and persisting fewer segments.

ioConfig.getStartPartitions()
.getPartitionOffsetMap()
.get(partitionOffsetEntry.getKey())
) >= 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why >= 0 ? Should this just be == 0?

) >= 0
), "Sequence offsets are not compatible with start offsets of task");
log.info("Setting next offsets to [%s]", sequences.size() == 0 ? endOffsets : sequences.get(0).startOffsets);
nextOffsets.putAll(sequences.size() == 0 ? endOffsets : sequences.get(0).startOffsets);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think sequences.size() can ever be 0 at this point so the conditional can probably be removed. But if it could be 0, you're using endOffsets when I think you should be using the starting offsets.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are when sequence size can be never be 0 if restored metadata is null (happens when task runs for first time). I was trying to handle the case when task is restarted after all sequences are published but before task shuts down, however in this case restored metadata will not be null, fixed the conditions accordingly. Thanks

publishingSequences.add(sequenceMetadata.getSequenceName());
try {
Object result = driver.persist(committerSupplier.get());
log.info("Persist completed with results [%s]", result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well combine these log messages

}
}

if (stopRequested) {
// if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true
if (stopRequested.get() || (sequences.get(sequences.size() - 1).isCheckpointed()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could sequences.size() ever be 0 here?

Copy link
Member Author

@pjain1 pjain1 Oct 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no sequences list should never be empty inside the while (stillReading) loop

checkAndMaybeThrowException();

if (!ioConfig.isPauseAfterRead()) {
persistAndPublishSequences(committerSupplier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably be called maybePersistAndPublishSequences since it doesn't do this on every call.

if (tuningConfig.getHandoffConditionTimeout() == 0) {
handedOff = handoffFuture.get();
handedOffList = Futures.allAsList(handOffWaitList).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, you might have a visibility concurrency issue here using an ArrayList for handOffWaitList, even with the sentinel (http://gee.cs.oswego.edu/dl/cpj/jmm.html).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix, thanks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to CopyOnWriteArrayList

} else {
handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
handedOffList = Futures.allAsList(handOffWaitList)
.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this throws a TimeoutException, it looks like it'll cause the task to report as FAILED. Is this the behavior we want, or should we just log/emit errors and then report as SUCCESS?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current behavior is that it would report FAILED, there is a unit test for that also. But yes reporting task as SUCCESS and emitting an alert before that seems correct behavior, fixed it.

);
for (SegmentsAndMetadata handedOff : handedOffList) {
if (handedOff == null) {
log.warn("Handoff failed for segments [%s]", handedOff.getSegments());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary brackets around %s

}
}
catch (InterruptedException | RejectedExecutionException e) {
publishExecService.shutdownNow();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this better to put in the finally block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved

}
}
catch (InterruptedException | RejectedExecutionException e) {
publishExecService.shutdownNow();
appenderator.closeNow();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems redundant with the closeNow() call in the finally block

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to use closeNow in two cases - first when middle managers asks the task to stop, which is this case where an InterruptedException would be thrown. The second case is when some exception is thrown during processing and we want to fail the task which what closeNow in finally captures. So, I could not find any easy way to capture the first case in finally block, therefore an explicit call here. Does this make sense ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay. One way you could do it is to just set throwableAtomicReference in the InterruptedException catch, but this is fine.

@@ -660,14 +920,20 @@ private Access authorizationCheck(final HttpServletRequest req, Action action)
return access;
}

// used for unit tests
Appenderator getAppenderator()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the @VisibleForTesting annotation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wanted to do that but #4874 (comment) :)
Anyways I can add it

Copy link
Contributor

@dclim dclim Oct 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah. I think that obviously the best option is if you're able to test this without relaxing visibility, but if not, then I think using the annotation is better than using a comment because at least it can be static analyzed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added @VisibleForTesting

}
}

final SequenceMetadata sequenceMetadata = sequences.get(sequences.size() - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May as well define this earlier and use it instead of repeatedly doing sequences.get(sequences.size() - 1)

}
}

private synchronized void persistState(final TaskToolbox toolbox) throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason you're passing toolbox in here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

return sentinel;
}

public void setCheckpointed()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't used outside of this class. If checkpointed just means that endOffsets are not all Long.MAX_VALUE, there's probably some cleanup you can do around this too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed this method. However, not sure what you mean by cleanup, do you mean remove the checkpointed and use endOffsets instead ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's what I was suggesting, but I was more thinking out loud. If you feel it's more clear to have an explicit checkpointed flag that basically means 'setEndOffsets() has been called' that's fine too. Maybe that's better and less error-prone.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, seems better to have the flag

@@ -290,19 +306,19 @@ public DateTime getStartTime(final String id)

public boolean setEndOffsets(final String id, final Map<Integer, Long> endOffsets)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove this, it isn't being called anywhere

@@ -153,6 +156,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

//CHECKSTYLE.OFF: Regexp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

@@ -153,6 +156,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

//CHECKSTYLE.OFF: Regexp
//CHECKSTYLE.ON: Regexp

public class KafkaIndexTaskTest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some tests around early publishing and loading sequence data from disk?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

final List<SegmentIdentifier> theSegments = new ArrayList<>();
synchronized (segments) {
sequenceNames.stream()
.filter(sequenceName -> !publishingSequences.contains(sequenceName))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

publishingSequences never gets written to so this is always true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation behind using this in-memory Set is to prevent duplicate publishes of same sequence during the lifetime of this driver. Just the persisted segment state cannot be used for de-duplication because upon task restart some segments may be in PUBLISHING state because they were being published before restart but could not finish. So, the call to publish them again should actually start publishing them again as it is not a duplicate request in the current lifetime of driver.

The thing I am missing here though is a statement like publishingSequences.add(sequenceName) in forEach. I will add that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjain1 I'm working on fixing a bug in indexTask and this needs to refactor AppenderatorDriver. To avoid making another bug, I want to make sure what this code means.

upon task restart some segments may be in PUBLISHING state because they were being published before restart but could not finish. So, the call to publish them again should actually start publishing them again as it is not a duplicate request in the current lifetime of driver.

  • The segment state is changed to PUBLISHING, but is never checked. Why is it needed?
  • We can assume that all segments in the segments map are not published yet. If this is acceptable, is it possible to achieve the same thing by republishing segments without checking their state when publish() is called? Or, is it possible that publish() is called for the same sequence multiple times?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At any point of time, segment can be in 3 state -

  • ACTIVE - Data is being added actively to it.
  • INACTIVE - No data will be added to this segment, call to moveSegmentOut causes the segment to get into this state. Note that this segment is not yet started publishing by the AppenederatorDriver.
  • PUBLISHING - Segment is being actively published, the segment information will be removed from segments map once publish succeeds.

Currently, the driver does not distinguish between INACTIVE and PUBLISHING state for making decision like preventing duplicate publishes. The reason is that driver's public publishing methods does not enable the caller to publish at segment level, it allows publishing at sequence level. So, publishingSequences Set is used to prevent duplicate sequence publishes. This is my understanding of the code and on the basis of this here are the answers to your questions -

The segment state is changed to PUBLISHING, but is never checked. Why is it needed?

It looks like PUBLISHING state in not required and just changing the state to INACTIVE should work unless some task actually want to distinguish between moved out segments and segments that are being published.

We can assume that all segments in the segments map are not published yet. If this is acceptable, is it possible to achieve the same thing by republishing segments without checking their state when publish() is called? Or, is it possible that publish() is called for the same sequence multiple times?

Currently, segments in the segments map might be in either active, inactive or being published state. The segment information is removed only when it is successfully published using a Future callback in publish method.

Feel free to ask for any more information it its still not clear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjain1 thanks for the detailed explanation. I have one more follow-up question.

publishingSequences Set is used to prevent duplicate sequence publishes.

When is it possible to publish the same sequence multiple times? Here are my understandings.

  • A new sequence is created whenever the supervisor sets a new end offset.
  • The sequence name is always increased by 1 when a new sequence is created, so it's unique for each sequence and cannot be duplicated.
  • Once a sequence is published, it is removed from the segments map.
  • Upon task restart, the restored segments in the segments map are not published yet. In other words, the segments which are not in the segments map are already published.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate publishes can only happen when a caller calls publish for some sequence multiple times or calls publishAll multiple times or some combination of both. I believe Kafka Index task does not do that so it should be ok. However, I see that IndexTask calls publishAll twice at lines 711 and 743, so it needs to be checked if publishingSequences Set is removed then no problem will happen and also in future if these methods are used anywhere else nothing problematic happens.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! IndexTask actually calls publish() synchronously, so it should also be fine.

Copy link
Member Author

@pjain1 pjain1 Jan 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are planning to remove publishingSequences Set then I just noticed that calling publishAll twice one after another immediately can be problematic as the code is relying on Future callback to remove the published sequences from segments map. However, as far as I know callbacks on future are not guaranteed to be executed before get on that future returns. So, it might happen that the sequence keys are not removed from the map before second call to publishAll happens so please check this edge case.

Also I think while removing sequence keys from map in the callback using this statement -
sequenceNames.forEach(segments::remove); should be done is a synchronized (segments) block. I should have done that, its my mistake. Does this make sense ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it makes sense. I'll fix them too. Thanks for sharing!

@Override
public void onSuccess(SegmentsAndMetadata result)
{
if (result != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe log an error if this come back null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If result is null then KafkaIndexTask will throw ISE

public void onFailure(Throwable t)
{
// Do nothing, caller should handle the exception
log.error("Error publishing sequences [%s]", sequenceNames);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log the stack trace?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaIndexTask actually fails when there is any exception during publish and also logs that exception. So, just wanted to prevent duplicate traces, I guess I can add it here as well, doesn't hurt.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For both onSuccess and onFailure I am not logging much and relying on caller to log and take appropriate action. Do you think it is ok, its same as the current behavior ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, that seems fine to me.

@dclim
Copy link
Contributor

dclim commented Oct 20, 2017

Right now it looks like tasks only get checkpointed when the number of rows in a segment exceed maxRowsPerSegment, which means that if maxRowsPerSegment is never hit then the intermediate publishing never happens. It would be nice to have some mechanism, probably controlled by the supervisor, where you can define a period based checkpointing, so even if taskDuration is PT1H, you could have it do an intermediate publish every 15 minutes.

Also, it might make sense to add a 'perform checkpoint now' route to the supervisor API so that users can manually trigger an immediate checkpoint since it doesn't look like it would be a lot of additional work. Asides from being useful for testing, I could also see it being useful for cases where users are working through a big backlog of data in Kafka and their normal steady-state configs would fail with the load; rather than submit an updated supervisor, they could maintain the same configuration but just manually trigger checkpoints to hand off the data in smaller chunks. I don't see this being as useful as the time-based checkpointing, but again maybe worth it since there's very little to do to enable it.

for (String taskId : taskGroup.taskIds()) {
TreeMap<Integer, Map<Integer, Long>> checkpoints = null;
try {
checkpoints = taskClient.getCheckpoints(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider doing this asynchronously, otherwise if they have a lot of tasks and they're all misbehaving and not responding, this'll take a really long time to timeout and fail.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for suggestion, will do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to asynchronous gets, please review

try {

if (endOffsets.equals(taskGroup.partitionOffsets)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this condition is true, it just logs a warning and then continues on with the same logic. Is this missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, however actually this condition should never be true, don't remember why I added this probably can be removed, currently just changed to return endOffsets

{
Map<Integer, Long> startPartitions = taskGroups.get(groupId).partitionOffsets;
Map<Integer, Long> endPartitions = new HashMap<>();
Map<Integer, Long> endPartitions = new HashMap<>(); // TODO if endOffsets were already set
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we resolve this TODO?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the TODO, this case need not be handled because if endOffsets are set that means the TaskGroup has moved from active to publish pending state and supervisor never creates tasks for pending publish TaskGroup.

@pjain1
Copy link
Member Author

pjain1 commented Oct 21, 2017

@dclim Thanks for the review comments. I have resolved some and working on others.

Regarding, indexing performance, I am also curious to compare that but didn't got a chance to do so. However, we have been running the new code with same task count as with previous code so guessing not much changed (it probably might have decreased a bit because of extra overhead). I am planning to deploy the previous indexing service and compare the ingest/events/processed metrics with current one.

Regarding, time based checkpointing - are you suggesting when enabled apart from maxRowsInSegment threshold, time threshold will be considered as well and whichever threshold is hit earlier, checkpointing will be done ? In case of time based checkpointing, there is a potential to create small sized segments, so I am not sure what advantage it gives us over just waiting for either maxRowsInSegment limit to hit or taskDuration to get over. The only advantage I can see is if taskDuration is too high and ingest event rate is too low then there is potential to reindex events in case we loose all task replicas.

Regarding, perform checkpoint now - I understand it might be a good feature to have. However, the example that you specifically mentioned I am not sure why it is useful when there is a big backlog, how is handing off data in smaller chunks help ? Is it just winning the race against Kafka retention rules so that we don't loose much if Kafka drops data before the task could publish segments for a sequence or anything else ? We can discuss this in next dev sync as well if that is more convenient.

@gianm
Copy link
Contributor

gianm commented Oct 21, 2017

@pjain1 I tried this on one of our internal clusters and found one issue: when upgrading from an older version, since the AppenderatorDriverMetadata changed, any restored tasks immediately fail with NPE. They do seem to get retried fine, and the system eventually recovers, but it would be nice to handle it better if possible. Ideal would be to be able to restore older metadata in some sane way. If that's not possible then at least a better error message would be good, so users understand the error is related to upgrading and should be recovered soon.

I'm wondering, are there any other potential issues related to upgrading? Has the protocol changed in such a way that old supervisors cannot talk to new tasks, or the new supervisor cannot talk to old tasks?

I'll keep this patch running on our internal clusters so we can see how it shakes out there.

The error I got was,

2017-10-21T20:16:31,529 INFO [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.AppenderatorDriver - Restored metadata[AppenderatorDriverMetadata{segments=null, lastSegmentIds={index_kafka_metrics-int_0ea3d787eb71af2_0=metrics-int_2017-10-21T20:00:00.000Z_2017-10-21T21:00:00.000Z_2017-10-21T20:00:04.838Z}, callerMetadata={nextPartitions={topic=imply-metrics, partitionOffsetMap={0=5657076237}}}}].
2017-10-21T20:16:31,913 ERROR [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[KafkaIndexTask{id=index_kafka_metrics-int_0ea3d787eb71af2_cbldckbp, type=index_kafka, dataSource=metrics-int}]
java.lang.NullPointerException
	at io.druid.segment.realtime.appenderator.AppenderatorDriver.startJob(AppenderatorDriver.java:189) ~[druid-server-0.11.1-SNAPSHOT.jar:0.11.1-SNAPSHOT]
	at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:488) ~[?:?]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.11.1-SNAPSHOT.jar:0.11.1-SNAPSHOT]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.11.1-SNAPSHOT.jar:0.11.1-SNAPSHOT]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_131]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
2017-10-21T20:16:31,915 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_metrics-int_0ea3d787eb71af2_cbldckbp] status changed to [FAILED].
2017-10-21T20:16:31,960 INFO [task-runner-0-priority-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
  "id" : "index_kafka_metrics-int_0ea3d787eb71af2_cbldckbp",
  "status" : "FAILED",
  "duration" : 3221
}

@pjain1
Copy link
Member Author

pjain1 commented Oct 21, 2017

@gianm Yes you are correct, AppenderatorDriverMetadata has changed and would cause tasks to not recover.

There are protocol changes on both supervisor and as well as task side. Upgrading overlord first is disastrous. On supervisor restart it will ask for checkpoints from the tasks and they wouldn't return anything and overlord will keep on killing tasks until Middle managers are running new code and can return checkpoints. If Middle Managers are upgraded first then there is a slight chance that some checkpoint limit is hit before overlord is upgraded and checkpoint action is submitted to supervisor but supervisor does not know how to handle it.

As of now, only clean way of upgrading is to stop all supervisors and upgrade the code on both overlord and middle manager and resubmit them.

I would also suggest to have at least all the commits till "fix pendingCompletionTaskGroup check - 96fb88b" in the code that you are trying on internal cluster.

@gianm
Copy link
Contributor

gianm commented Oct 21, 2017

@pjain1 thanks for the details.

Is there anything (hopefully somewhat simple) that we can do to make upgrading easier for people? "stop all supervisors and upgrade the code on both overlord and middle manager and resubmit" is pretty labor intensive and will incur some noticeable downtime. It's not absolutely critical (KIS is still experimental after all…) but it would be nice.

Maybe have the supervisor and tasks fall back to some legacy mode if they can't confirm their counterpart is on a new enough version?

I would also suggest to have at least all the commits till "fix pendingCompletionTaskGroup check - 96fb88b" in the code that you are trying on internal cluster.

Thanks for the tip. I was missing that commit before, but will push it out to our internal clusters today.

@pjain1
Copy link
Member Author

pjain1 commented Oct 22, 2017

@gianm On the task restore part probably this can be tried - a version number can be stored with driver metadata. While restoring, the task can check the version and populate the driver data structures accordingly. If this is done middle managers can be upgraded first, however even after this, there is still a chance that the task might hit maxRowsInSegment before overlord is upgraded and try to submit CheckPointDataSourceMetadataAction in which case it will get a non 200 response and as far as I can understand RemoteTaskActionClient will keep on retrying, so hopefully overlord will be upgraded by then, if not then task will fail. Does this sound ok ?

@gianm
Copy link
Contributor

gianm commented Oct 22, 2017

@pjain1

IMO, if we can think of a way to support it, ideally it would be best to support a single middleManager running the new version for a long time, while the overlord and other middleManagers run the old version. That's a common pattern that people use when upgrading clusters (try one MM/historical first and then do the rest). It's also common for rolling updates of large clusters, which can take a while to roll through all middleManagers.

Would it work to do something like: if CheckPointDataSourceMetadataAction is unrecognized by the server, the task could give up and just not do the incremental publish, and keep going?

@pjain1
Copy link
Member Author

pjain1 commented Oct 22, 2017

@gianm probably overlord can send a flag in the context, indicating whether it supports checkpointing or not. If not then task can revert back to previous logic of moving out segments out of active list than checkpointing.

However, running new code on a single MM might not test much as its not doing incremental handoffs then its not actually testing the new feature for which they are supposed to be upgraded. Running new code on one or subset of MM also means that replication cannot be used as replicas should not get mixed up on new and old MM otherwise they will do different things.

@gianm
Copy link
Contributor

gianm commented Oct 23, 2017

@pjain1

probably overlord can send a flag in the context, indicating whether it supports checkpointing or not. If not then task can revert back to previous logic of moving out segments out of active list than checkpointing.

That sounds like a nice way of handling the compatibility.

However, running new code on a single MM might not test much as its not doing incremental handoffs then its not actually testing the new feature for which they are supposed to be upgraded.

That's fine, there's a lot of other stuff changing in 0.11.0 and it's still valuable to stage the rollout one machine at a time, if uptime of the system is critical.

Running new code on one or subset of MM also means that replication cannot be used as replicas should not get mixed up on new and old MM otherwise they will do different things.

That's less fine, since I guess I was hoping for a more seamless upgrade.

If it's not feasible to do it seamlessly then I can live with it, but I think we should at least provide people a tool to help do the upgrade. Maybe a tool that shuts down all supervisors, monitors the tasks for when they end, and then can later restore them to their last running state.

@dclim
Copy link
Contributor

dclim commented Oct 25, 2017

@pjain1

Regarding, indexing performance, I am also curious to compare that but didn't got a chance to do so. However, we have been running the new code with same task count as with previous code so guessing not much changed (it probably might have decreased a bit because of extra overhead). I am planning to deploy the previous indexing service and compare the ingest/events/processed metrics with current one.

Awesome, I'm looking forward to seeing the results of the side-by-side comparison with the previous logic.

Regarding, time based checkpointing - are you suggesting when enabled apart from maxRowsInSegment threshold, time threshold will be considered as well and whichever threshold is hit earlier, checkpointing will be done ? In case of time based checkpointing, there is a potential to create small sized segments, so I am not sure what advantage it gives us over just waiting for either maxRowsInSegment limit to hit or taskDuration to get over. The only advantage I can see is if taskDuration is too high and ingest event rate is too low then there is potential to reindex events in case we loose all task replicas.

Yes that's what I'm suggesting. I'm looking at this from the angle of being able to support long-running tasks, which is interesting because currently having short task durations can create unnecessary shards at the task transition boundaries, whereas currently long task durations means the peon is responsible for serving more queryable data, merging time is longer, and as you mentioned there's a lot of re-work in the case of a failure. Being able to run long task durations with periodic handoffs will eliminate all of these issues. It would also be interesting if we can get to the point where we don't need 2x worker capacity for a given indexing load (currently we generally need capacity so we can have one task reading while another task is publishing). Having long-running tasks should help here.

But for long-running tasks in general, depending solely on the maxRowsInSegment threshold is problematic in the cases where you are receiving less data in a time interval than maxRowsInSegment. For example, say you have taskDuration PT24H, segmentGranularity HOUR, maxRowsInSegment 5M, and you are receiving 4M post-rollup rows per hour. Since maxRowsInSegment never gets hit, the indexing task never does an intermediate handoff and holds onto all that data until taskDuration elapses which is the same behavior as the previous implementation and has all the same issues. Being able to set some kind of intermediateHandoffPeriod would be helpful here. I do agree it can lead to small segments potentially, but I think having this lever available for tuning (and having the option of not using it as well) is helpful.

Regarding, perform checkpoint now - I understand it might be a good feature to have. However, the example that you specifically mentioned I am not sure why it is useful when there is a big backlog, how is handing off data in smaller chunks help ? Is it just winning the race against Kafka retention rules so that we don't loose much if Kafka drops data before the task could publish segments for a sequence or anything else ? We can discuss this in next dev sync as well if that is more convenient.

Hm, typically the problems I see with handling backlogs of data is that since there's no way to limit the number of messages read or the rate of message reading from Kafka, tasks wind up ingesting far more data than they would under steady-state conditions which leads to various forms of out-of-memory conditions. One of the more annoying ones is where they run out of heap space for indexing but can't increase Xmx so instead decrease maxRowsInMemory. But then this results in more intermediate segments being spilled to disk, and when the task finishes, since each of these segments requires off-heap dictionary merge buffers, they then run out of direct memory as well. Typically they'll then change taskDuration to something really small as a way of limiting the amount of data being ingested by each task.

I think it might be interesting to consider supporting rate limiting as a solution for this scenario. But in the meantime, being able to manually trigger handoffs would provide a workaround that doesn't involve having to change the ingestion spec or make configuration changes.

To me, having time-based intermediate handoffs is more important than manually-triggered checkpointing.

@pjain1
Copy link
Member Author

pjain1 commented Oct 25, 2017

@dclim Time base handoff sounds reasonable to me now. Probably it can be part of next PR, what do you think ?

tasks wind up ingesting far more data than they would under steady-state conditions which leads to various forms of out-of-memory conditions. One of the more annoying ones is where they run out of heap space for indexing but can't increase Xmx so instead decrease maxRowsInMemory

Even if task ingests more data, the memory consumption is capped by maxRowsInMemory in each case (steady state and back log situation). When this limit is hit in-memory rows are persisted and during this time ingestion is blocked so memory consumption should ideally not go up. Not sure why it is happening.

Anyways not opposed to the idea of manual trigger but just trying to understand it more.

@dclim
Copy link
Contributor

dclim commented Oct 25, 2017

@pjain1 cool, doing a follow-on PR is fine.

It's not the heap memory that's a problem; as you mentioned that's bounded by maxRowsInMemory. The problem is that decreasing maxRowsInMemory tends to generate more intermediate segments and the number of off-heap merge buffers used for merging intermediate segments is not bounded and increases with more intermediate segments until you hit the direct memory limit or run out of physical memory.

Probably ideally those would also be bounded and merging would be done incrementally if it requires more buffers than are available, but anyway, that's an issue that we see once in a while.

@gianm
Copy link
Contributor

gianm commented Oct 26, 2017

The problem is that decreasing maxRowsInMemory tends to generate more intermediate segments and the number of off-heap merge buffers used for merging intermediate segments is not bounded and increases with more intermediate segments until you hit the direct memory limit or run out of physical memory.

I think ultimately, it would be better to fix this by doing a multi-phase merge (not merging more than N segments per pass) so we don't need to worry about artificial limits.

@dclim
Copy link
Contributor

dclim commented Oct 31, 2017

hey @pjain1, do you have more anticipated changes coming or is this ready for re-review? If the latter, could you fix the conflicts?

@pjain1
Copy link
Member Author

pjain1 commented Oct 31, 2017

@dclim I am working on making the PR backwards compatible as discussed in last dev sync, I am almost done. Also I need to add few unit tests around early publishing, apart from these I don't anticipate any other change.

@dclim
Copy link
Contributor

dclim commented Oct 31, 2017

@pjain1 sounds great, let me know when things are ready for review. Thank you!

@pjain1
Copy link
Member Author

pjain1 commented Nov 3, 2017

@gianm @dclim I have made changes to the code so that rolling upgrade can be supported (given overlord is updated last). Now, the overlord sends a flag in task context which indicates whether it supports incremental handoffs or not. If not then code will continue using old logic and will have similar behavior as current. AppenderatorDriverMetadata has been modified so that it can create new metadata based on old persisted metadata so that task restore works.

I am running this internally on our cluster where only few middle managers are running the code in this PR and others (including overlord) is running latest master/current code. However, I plan to switch them all back to running the latest code in this PR. Can you guys please also test the backwards compatibility support on your cluster ?

I still have to add the unit tests but for now if possible please run the code on your cluster.

@pjain1
Copy link
Member Author

pjain1 commented Nov 18, 2017

@dclim probably not immediately until parse batch code is in. Are you thinking of having this feature in 0.11.1 ? I believe 0.11.1 will be released by end of December, will that be a RC or stable version ?

@gianm
Copy link
Contributor

gianm commented Nov 20, 2017

@pjain1 0.11.1 (or 0.12.0 if that's what it is) should be release-branched off in mid-december, and then RC/stabled as soon as feasible.

@gianm
Copy link
Contributor

gianm commented Nov 20, 2017

@pjain1 thanks for your work on this patch! Could you please write up something for the release notes with anything users should be aware of when upgrading? Like how to do a rolling upgrade, how to do a roll back to an older version, if any configs should be adjusted, etc. Marking this 'release notes' so we can include it.

@pjain1
Copy link
Member Author

pjain1 commented Nov 21, 2017

@gianm I was asking about the time-based intermediate publish, do you intend to have it in 0.11.1 ? If yes then I would encourage someone else to implement it otherwise I can have a look at this feature later.

Let me see what I can add in the release notes, just to clarify rolling back to older version would cause currently running tasks to not restore as Appenderator metadata would not be compatible.

@gianm
Copy link
Contributor

gianm commented Nov 21, 2017

@pjain1 oh, sorry I misunderstood you. I think it's not critical to have the time-based intermediate publish feature in 0.11.1. More like when you are able to get to it.

Let me see what I can add in the release notes, just to clarify rolling back to older version would cause currently running tasks to not restore as Appenderator metadata would not be compatible.

Thank you, the docs would be really helpful for users.

Hmm, if a user does roll back, what will happen? Will it be broken forever or will it just fail to restore and then continue? If it's just going to ignore the on-disk data and continue, that's fine. If it's broken forever I think we should do a follow up patch to fix that so rollback is possible (even if restoring is not possible).

@pjain1
Copy link
Member Author

pjain1 commented Nov 21, 2017

@gianm In case of roll-back in worst case, current set of tasks will fail and then new set of tasks should be started by Supervisor which would continue to work.

@gianm
Copy link
Contributor

gianm commented Nov 21, 2017

@pjain1 that sounds OK to me, probably just mention that in the release notes blurb so people know what to expect.

@pjain1
Copy link
Member Author

pjain1 commented Nov 21, 2017

Changes to Kafka Indexing Service

  • Decoupling of Druid segments and Kafka partitions - Starting with this release, number of segments created by a Kafka Indexing Task is decoupled from number of Kafka partitions that the task is consuming from. Lets say if there are 10 partitions in Kafka and 2 Kafka Indexing tasks are running then each task will consume messages from 5 partitions, earlier the tasks would create 5 segments (one segment for each partition), now the task will create a single segment for each segment granular interval; all the records from consumed partitions go to a single druid segment.
  • Incremental hand-off - Kafka Indexing Task now also has the ability to do incremental hand-offs which means that all the segments created by a task will not be held up till the task duration is over. As soon as maxRowsPerSegment limit is hit, all the segments held up by the task at that point in time will be handed-off and new set of segments will be created for further messages. This also means that now the task can run for longer durations of time without accumulating old segments locally on Middle Manager nodes. Incremental handoff is not supported with pauseAfterRead feature.
  • Miscellaneous bug fixes

Rolling upgrade

Since Kafka Indexing Service was marked experimental, with this release, there are protocol changes between Kafka Supervisor and Kafka Indexing task and also some changes to metadata persisted on disk. Therefore, to support rolling upgrade, all the Middle Managers will need to be upgraded first before the Overlord. Note that this ordering is different from the standard order of upgrade, also note that this ordering is only specific to Kafka Indexing Service. If one is not using Kafka Indexing Service or can handle down time for Kafka Supervisor then one can upgrade in any order.

Until the point in time Overlord is upgraded, all the Kafka Indexing Task will behave in same manner (even if they are upgraded) as earlier which means no decoupling and incremental hand-offs. Once, Overlord is upgraded, the new tasks started by the upgraded Overlord will support the new features.

Roll back

Depending on how roll-back is done, in worst case, current set of tasks will fail. Once, both Overlord and Middle Managers are upgraded, new set of tasks should be started by Supervisor which would continue to work. Better to roll back Overlord first and then Middle Managers. Note of caution - Rolling back is not tested.

@pjain1
Copy link
Member Author

pjain1 commented Nov 21, 2017

@gianm I have written something, please have a look and let me know if I am missing anything.

activeSegments,
publishPendingSegments
);
if (segments == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjain1 would you let me know when segments can be null?
Also, activeSegments and publishPendingSegments look always null. Maybe these should be removed but accidentally remained?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, never mind. I just found your comment on these variables.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done for backwards compatibility so that old AppenderatorDriverMetadata can be converted to new format. segments will be null while reading older metadata before upgrading the middle manager code, after upgrade the old metadata needs to be converted to new one.

Yes, for new code activeSegments and publishPendingSegments will always be null therefore these properties can be removed on versions greater than 0.12.x

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Thanks.

while (taskIndex < taskSequences.size()) {
if (earliestConsistentSequenceId.get() == -1) {
// find the first replica task with earliest sequenceId consistent with datasource metadata in the metadata store
if (taskSequences.get(taskIndex).rhs.entrySet().stream().anyMatch(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjain1 could you please refactor this if statement with side effects into a method that returns a boolean? It's very hard to understand this code.

seoeun25 added a commit to seoeun25/incubator-druid that referenced this pull request Jan 10, 2020
* Kafka Index Task that supports Incremental handoffs apache#4815

* prevent NPE from supressing actual exception (apache#5146)

* prevent npe on mismatch between number of kafka partitions and task count (apache#5139)

* Throw away rows with timestamps beyond long bounds in kafka indexing (apache#5215) (apache#5232)

* Fix state check bug in Kafka Index Task (apache#5204) (apache#5248)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants