New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaIndexTask can delete published segments on restart #6124

Closed
jihoonson opened this Issue Aug 8, 2018 · 6 comments

Comments

Projects
None yet
2 participants
@jihoonson
Contributor

jihoonson commented Aug 8, 2018

This can happen in the following scenario.

  1. A kafka index task starts publishing segments.
  2. The task succeeds to publish segments and is stopped immediately (by restarting the machine).
  3. When the task is restored, it restores all sequences it kept in memory before restarting.
  4. After reading some more events from Kafka, the task tries to publish segments. These segments include the ones which were published before restarting because the restored sequences contain them.
  5. Since the segments which are published twice are already stored in metastore, the publish fails.
  6. The set of published segments in metastore is different from the set of segments the task is trying because the task read more data.
  7. The task thinks that the publish actually failed and removes the published segments from deep storage.
@gianm

This comment has been minimized.

Show comment
Hide comment
@gianm

gianm Aug 8, 2018

Contributor
  1. Since the segments which are published twice are already stored in metastore, the publish fails.

This doesn't seem right: there is code specifically to handle the case where a task tries to publish a segment that some task already published. It happens all the time with replicas, and they just ignore that segment and move on to the next one.

I wonder if the real reason for publish failure is that the startMetadata doesn't match up. I bet it wouldn't match up: it sounds like the task is trying to publish from the point it originally started from rather than from the point it last published.

It sounds like there are two separate problems here:

  • In (7) the task should not have removed the published segments (this is the biggest bug).
  • In (3) the task should have done something smarter instead of restoring a setup that couldn't possibly work out.
Contributor

gianm commented Aug 8, 2018

  1. Since the segments which are published twice are already stored in metastore, the publish fails.

This doesn't seem right: there is code specifically to handle the case where a task tries to publish a segment that some task already published. It happens all the time with replicas, and they just ignore that segment and move on to the next one.

I wonder if the real reason for publish failure is that the startMetadata doesn't match up. I bet it wouldn't match up: it sounds like the task is trying to publish from the point it originally started from rather than from the point it last published.

It sounds like there are two separate problems here:

  • In (7) the task should not have removed the published segments (this is the biggest bug).
  • In (3) the task should have done something smarter instead of restoring a setup that couldn't possibly work out.
@jihoonson

This comment has been minimized.

Show comment
Hide comment
@jihoonson

jihoonson Aug 8, 2018

Contributor

This doesn't seem right: there is code specifically to handle the case where a task tries to publish a segment that some task already published. It happens all the time with replicas, and they just ignore that segment and move on to the next one.

Yes, correct. The task doesn't fail at this point, but it just fails to update the metastore.

I wonder if the real reason for publish failure is that the startMetadata doesn't match up. I bet it wouldn't match up: it sounds like the task is trying to publish from the point it originally started from rather than from the point it last published.

Here is the code what this error happens.

              final boolean published = publisher.publishSegments(
                  ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
                  metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata()
              );

              if (published) {
                log.info("Published segments.");
              } else {
                log.info("Transaction failure while publishing segments, checking if someone else beat us to it.");
                final Set<SegmentIdentifier> segmentsIdentifiers = segmentsAndMetadata
                    .getSegments()
                    .stream()
                    .map(SegmentIdentifier::fromDataSegment)
                    .collect(Collectors.toSet());
                if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers)
                                      .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
                  log.info(
                      "Removing our segments from deep storage because someone else already published them: %s",
                      segmentsAndMetadata.getSegments()
                  );
                  segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);

                  log.info("Our segments really do exist, awaiting handoff.");
                } else {
                  throw new ISE("Failed to publish segments[%s]", segmentsAndMetadata.getSegments());
                }
              }

published is false, so the task checks the segments it failed to publish are already being used using usedSegmentChecker which is the ActionBasedUsedSegmentChecker in this case. And usedSegmentChecker.findUsedSegments(segmentsIdentifiers).equals(Sets.newHashSet(segmentsAndMetadata.getSegments())) returns false and the task throws an exception. This is because the task generated more segments after restarting. I've verified this by comparing the publishing segments before and after restart.

I didn't see any logs related to metadata mismatch.

In (7) the task should not have removed the published segments (this is the biggest bug).

Not sure about this. This can make publishing segments non-atomic as well as potential garbage segments. Probably solving (3) would be enough because this should never happen?

In (3) the task should have done something smarter instead of restoring a setup that couldn't possibly work out.

Yes, I think we should keep the task states in local disk which represents what the task was doing. Also, for usedSegmentChecker.findUsedSegments(segmentsIdentifiers).equals(Sets.newHashSet(segmentsAndMetadata.getSegments())), we can change this to check usedSegments include segmentsAndMetadata.getSegments() and continue publishing the segments not in usedSegments.

Contributor

jihoonson commented Aug 8, 2018

This doesn't seem right: there is code specifically to handle the case where a task tries to publish a segment that some task already published. It happens all the time with replicas, and they just ignore that segment and move on to the next one.

Yes, correct. The task doesn't fail at this point, but it just fails to update the metastore.

I wonder if the real reason for publish failure is that the startMetadata doesn't match up. I bet it wouldn't match up: it sounds like the task is trying to publish from the point it originally started from rather than from the point it last published.

Here is the code what this error happens.

              final boolean published = publisher.publishSegments(
                  ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
                  metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata()
              );

              if (published) {
                log.info("Published segments.");
              } else {
                log.info("Transaction failure while publishing segments, checking if someone else beat us to it.");
                final Set<SegmentIdentifier> segmentsIdentifiers = segmentsAndMetadata
                    .getSegments()
                    .stream()
                    .map(SegmentIdentifier::fromDataSegment)
                    .collect(Collectors.toSet());
                if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers)
                                      .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
                  log.info(
                      "Removing our segments from deep storage because someone else already published them: %s",
                      segmentsAndMetadata.getSegments()
                  );
                  segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);

                  log.info("Our segments really do exist, awaiting handoff.");
                } else {
                  throw new ISE("Failed to publish segments[%s]", segmentsAndMetadata.getSegments());
                }
              }

published is false, so the task checks the segments it failed to publish are already being used using usedSegmentChecker which is the ActionBasedUsedSegmentChecker in this case. And usedSegmentChecker.findUsedSegments(segmentsIdentifiers).equals(Sets.newHashSet(segmentsAndMetadata.getSegments())) returns false and the task throws an exception. This is because the task generated more segments after restarting. I've verified this by comparing the publishing segments before and after restart.

I didn't see any logs related to metadata mismatch.

In (7) the task should not have removed the published segments (this is the biggest bug).

Not sure about this. This can make publishing segments non-atomic as well as potential garbage segments. Probably solving (3) would be enough because this should never happen?

In (3) the task should have done something smarter instead of restoring a setup that couldn't possibly work out.

Yes, I think we should keep the task states in local disk which represents what the task was doing. Also, for usedSegmentChecker.findUsedSegments(segmentsIdentifiers).equals(Sets.newHashSet(segmentsAndMetadata.getSegments())), we can change this to check usedSegments include segmentsAndMetadata.getSegments() and continue publishing the segments not in usedSegments.

@gianm gianm added this to the 0.12.3 milestone Aug 9, 2018

gianm added a commit to gianm/druid that referenced this issue Aug 11, 2018

Fix three bugs with segment publishing.
1. In AppenderatorImpl: always use a unique path if requested, even if the segment
   was already pushed. This is important because if we don't do this, it causes
   the issue mentioned in apache#6124.
2. In IndexerSQLMetadataStorageCoordinator: Fix a bug that could cause it to return
   a "not published" result instead of throwing an exception, when there was one
   metadata update failure, followed by some random exception. This is done by
   resetting the AtomicBoolean that tracks what case we're in, each time the
   callback runs.
3. In BaseAppenderatorDriver: Only kill segments if we get an affirmative false
   publish result. Skip killing if we just got some exception. The reason for this
   is that we want to avoid killing segments if they are in an unknown state.

Two other changes to clarify the contracts a bit and hopefully prevent future bugs:

1. Return SegmentPublishResult from TransactionalSegmentPublisher, to make it
more similar to announceHistoricalSegments.
2. Make it explicit, at multiple levels of javadocs, that a "false" publish result
must indicate that the publish _definitely_ did not happen. Unknown states must be
exceptions. This helps BaseAppenderatorDriver do the right thing.

fjy added a commit that referenced this issue Aug 15, 2018

Fix three bugs with segment publishing. (#6155)
* Fix three bugs with segment publishing.

1. In AppenderatorImpl: always use a unique path if requested, even if the segment
   was already pushed. This is important because if we don't do this, it causes
   the issue mentioned in #6124.
2. In IndexerSQLMetadataStorageCoordinator: Fix a bug that could cause it to return
   a "not published" result instead of throwing an exception, when there was one
   metadata update failure, followed by some random exception. This is done by
   resetting the AtomicBoolean that tracks what case we're in, each time the
   callback runs.
3. In BaseAppenderatorDriver: Only kill segments if we get an affirmative false
   publish result. Skip killing if we just got some exception. The reason for this
   is that we want to avoid killing segments if they are in an unknown state.

Two other changes to clarify the contracts a bit and hopefully prevent future bugs:

1. Return SegmentPublishResult from TransactionalSegmentPublisher, to make it
more similar to announceHistoricalSegments.
2. Make it explicit, at multiple levels of javadocs, that a "false" publish result
must indicate that the publish _definitely_ did not happen. Unknown states must be
exceptions. This helps BaseAppenderatorDriver do the right thing.

* Remove javadoc-only import.

* Updates.

* Fix test.

* Fix tests.

gianm added a commit to implydata/druid that referenced this issue Aug 16, 2018

Fix three bugs with segment publishing. (apache#6155)
* Fix three bugs with segment publishing.

1. In AppenderatorImpl: always use a unique path if requested, even if the segment
   was already pushed. This is important because if we don't do this, it causes
   the issue mentioned in apache#6124.
2. In IndexerSQLMetadataStorageCoordinator: Fix a bug that could cause it to return
   a "not published" result instead of throwing an exception, when there was one
   metadata update failure, followed by some random exception. This is done by
   resetting the AtomicBoolean that tracks what case we're in, each time the
   callback runs.
3. In BaseAppenderatorDriver: Only kill segments if we get an affirmative false
   publish result. Skip killing if we just got some exception. The reason for this
   is that we want to avoid killing segments if they are in an unknown state.

Two other changes to clarify the contracts a bit and hopefully prevent future bugs:

1. Return SegmentPublishResult from TransactionalSegmentPublisher, to make it
more similar to announceHistoricalSegments.
2. Make it explicit, at multiple levels of javadocs, that a "false" publish result
must indicate that the publish _definitely_ did not happen. Unknown states must be
exceptions. This helps BaseAppenderatorDriver do the right thing.

* Remove javadoc-only import.

* Updates.

* Fix test.

* Fix tests.
@gianm

This comment has been minimized.

Show comment
Hide comment
@gianm

gianm Aug 17, 2018

Contributor

I think this may be fixed by #6155, but would like to double-check that there are no lingering issues due to the fact that the sequences persist file and the appenderator persist file are separate files.

Contributor

gianm commented Aug 17, 2018

I think this may be fixed by #6155, but would like to double-check that there are no lingering issues due to the fact that the sequences persist file and the appenderator persist file are separate files.

@jihoonson

This comment has been minimized.

Show comment
Hide comment
@jihoonson

jihoonson Aug 27, 2018

Contributor

To make it clear, #6155 fixes Kafka tasks to not delete published segments any more. In the same scenario, the restored Kafka tasks would fail and the supervisor would respawn the new tasks which will finish the job.

Contributor

jihoonson commented Aug 27, 2018

To make it clear, #6155 fixes Kafka tasks to not delete published segments any more. In the same scenario, the restored Kafka tasks would fail and the supervisor would respawn the new tasks which will finish the job.

@gianm

This comment has been minimized.

Show comment
Hide comment
@gianm

gianm Aug 28, 2018

Contributor

Got it, I think we can close this then.

Contributor

gianm commented Aug 28, 2018

Got it, I think we can close this then.

@gianm gianm closed this Aug 28, 2018

@jihoonson

This comment has been minimized.

Show comment
Hide comment
@jihoonson

jihoonson Aug 28, 2018

Contributor

@gianm I think it makes sense to close this issue since the title is about kafka tasks deleting pushed segments. However, the tasks would still fail in the same scenario which can make users confused. Do you think we need to file this in another issue?

Contributor

jihoonson commented Aug 28, 2018

@gianm I think it makes sense to close this issue since the title is about kafka tasks deleting pushed segments. However, the tasks would still fail in the same scenario which can make users confused. Do you think we need to file this in another issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment