-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix idempotence of segment allocation and task report apis in native batch ingestion #11189
Conversation
I think we need a cluster-wide configuration corresponding to the new taskContext. I will add it soon. |
|
I manually tested the behavior with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall approach lgtm
@@ -398,7 +398,7 @@ private StringFullResponseHolder submitRequest( | |||
} else { | |||
try { | |||
final long sleepTime = delay.getMillis(); | |||
log.debug( | |||
log.warn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
throw new ISE( | ||
"Can't compact segments of non-consecutive rootPartition range. Missing partitionIds between [%s] and [%s]", | ||
curSegment.getEndRootPartitionId(), | ||
nextSegment.getStartRootPartitionId() | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice 👍
final int maxTries, | ||
@Nullable final CleanupAfterFailure cleanupAfterFailure, | ||
@Nullable final String messageOnRetry, | ||
boolean skipSleep |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is skip sleep the test parameter i guess? maybe worth javadocs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added javadocs.
docs/ingestion/tasks.md
Outdated
|`taskLockTimeout`|300000|task lock timeout in millisecond. For more details, see [Locking](#locking).| | ||
|`forceTimeChunkLock`|true|_Setting this to false is still experimental_<br/> Force to always use time chunk lock. If not set, each task automatically chooses a lock type to use. If this set, it will overwrite the `druid.indexer.tasklock.forceTimeChunkLock` [configuration for the overlord](../configuration/index.md#overlord-operations). See [Locking](#locking) for more details.| | ||
|`priority`|Different based on task types. See [Priority](#priority).|Task priority| | ||
|`useLineageBasedSegmentAllocation`|false|Enable the new lineage-based segment allocation protocol for the native Parallel task with dynamic partitioning. This option should be off during the replacing rolling upgrade to Druid 0.22 or higher. Once the upgrade is done, it must be set to true.| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe worth elaborating on why, e.g. "...must be set to true to ensure data correctness"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest also adding a note that this applies if upgrading from a pre-0.22.0 version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the doc per suggestion.
@@ -351,6 +355,7 @@ public boolean add(final Task task) throws EntryExistsException | |||
|
|||
// Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec. | |||
task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockConfig.isForceTimeChunkLock()); | |||
defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should we also set the use lineage config if it is absent to true here so that custom taskContext configs that are missing that setting do not run with false. The documentation for the default config would then no longer need to indicate that config is the default config since it would be implicit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I changed the default of the default context to an empty map, and added useLineageBasedSegmentAllocation
here.
/** | ||
* Transient task failure rate emulated by the taskKiller in {@link SimpleThreadingTaskRunner}. | ||
* Per {@link SubTaskSpec}, there could be at most one task failure. | ||
*/ | ||
private final double transientTaskFailureRate; | ||
|
||
/** | ||
* Transient API call failure rate emulated by {@link LocalParallelIndexSupervisorTaskClient}. | ||
* This will be applied to every API calls in the future. | ||
*/ | ||
private final double transientApiCallFailureRate; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool 👍
"Cannot publish segments due to incomplete time chunk. Segments are [%s]", | ||
segmentsPerInterval.stream().map(DataSegment::getId).collect(Collectors.toList()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 on sanity check... is there any chance the list of segments is huge here? (maybe we should use log.errorSegments
to log segments and just include count/interval or something?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reminding me of that. Changed to use log.errorSegments
and to not create a too large string for exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm 🤘
@jon-wei @clintropolis thanks for the review! |
Hi, I am wondering if this fix may be related to the issue observed in #11348 as well? |
… native batch ingestion (apache#11189)" This reverts commit 2df4214.
Description
Most internal APIs used in Druid's ingestion should be idempotent to handle transient errors. This PR fixes the idempotence of two APIs used in native batch ingestion.
The first API is the segment allocation API used in dynamic partitioning. Currently, transient network errors or task failures can lead to non-contiguous segment partitionIds allocated by this API. This can be a problem because
PartitionHolder
of those segments of non-contiguous partitionIds will be never complete in the broker timeline. As a result, everything will look fine, that means, the task will succeed, segments will be published into the metadata store, historicals will load and announce them, but you will never be able to query them.To fix the segment allocation API, I had to add a new API that accepts extra parameters such as
sequenceName
to guarantee the idempotence. This will break the rolling upgrade that replaces idle nodes with a newer version one at a time. To resolve this issue, I added a taskContext,useLineageBasedSegmentAllocation
, to control which protocol to use for segment allocation in dynamic partitioning. This option is true by default and must be set to false during rolling upgrade. The in-place rolling upgrade is not a consideration because batch ingestion doesn't support it.The second API is the task report API used in all native batch ingestion types. This API can handle retries triggered by transient network errors, but cannot handle duplicate reports by task retries. As a result, if there is a task that failed after sending its report, the supervisor task will count both the report of the failed task and that of the retry task. Because of this bug, the parallel task can incorrectly estimate the cardinality of partition column and the distribution of partition column in hash and range partitioning, respectively.
Finally, to test the fix, I added random task failures and API call retries (emulating transient network failures) in
AbstractParallelIndexSupervisorTaskTest
. All unit tests extending this class, such asCompactionTaskParallelRunTest
,HashPartitionMultiPhaseParallelIndexingTest
,SinglePhaseParallelIndexingTest
, andRangePartitionMultiPhaseParallelIndexingTest
, now run with potential transient task failures and API call retries.Upgrade path to 0.22:
druid.indexer.task.default.context = { "useLineageBasedSegmentAllocation": false }
during the upgrade, and restore it to{ "useLineageBasedSegmentAllocation": true }
after the upgrade is finished.Key changed/added classes in this PR
SinglePhaseParallelIndexTaskRunner
TaskMonitor
AbstractParallelIndexSupervisorTaskTest
This PR has: