-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-1911: Async delete topic #1664
Conversation
Jenkins build failed due to kafka.server.OffsetCommitTest.testUpdateOffsets test failure. It's an unrelated test to the PR and it runs fine on my dev box. |
removePartitionMetrics() | ||
info("Deleted log for [%s,%d] in %d ms".format(topic, partitionId, (time.milliseconds - start))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We should make it clear in this message that the file isn't actually deleted at this point. It is basically inaccessible/deleted from the user's POV and scheduled for removal.
- Here and elsewhere: rather than expand out the
[%s,%d]
format for the topic-partition we should just rely on thetoString
rendering ofTopicAndPartition
. - Here and elsewhere: can also make this a little more concise with Scala's string interpolation.
@@ -777,7 +778,12 @@ class Log(val dir: File, | |||
/** | |||
* The active segment that is currently taking appends | |||
*/ | |||
def activeSegment = segments.lastEntry.getValue | |||
def activeSegment: LogSegment = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the check in activeSegment
isn't necessary because segments
is never empty. loadSegments
is called right after val segments
in the constructor. Perhaps loadSegments
should return segments to make it clear.
603b014
to
6022ffb
Compare
@jjkoshy Please take a look at the update PR for async-delete. This time the code is a better Scala citizen and improved logging messages. |
removePartitionMetrics() | ||
info(s"Log for $topicPartition renamed to $renamedDir and scheduled for deletion. Spent ${time.milliseconds - start} ms.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log line is slightly weird. Not sure how useful it is to have a "spent" part given that it can be derived from logging timestamps. It is also probably redundant to the log that is already inside asyncDelete
. That said, if you want this I would go with ... renamed to $renamedDir in $time milliseconds and scheduled for deletion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I zapped "time spent" reporting and calculation.
// reset the index size of the currently active log segment to allow more entries | ||
activeSegment.index.resize(config.maxIndexSize) | ||
activeSegment.timeIndex.resize(config.maxIndexSize) | ||
if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could synchronously delete these during load right? Similar comment in LogManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm leaning towards deleting them in a scheduled task to avoid wasting start-up time for data we don't need. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is fine.
throw new IllegalArgumentException( | ||
"Duplicate log directories found: %s, %s!".format( | ||
current.dir.getAbsolutePath, previous.dir.getAbsolutePath)) | ||
if (logDir.getName.endsWith(Log.DeleteDirSuffix)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could actually synchronously delete logs during load right instead of waiting for the scheduled task to kick in. OTOH maybe it is better to have a consistent/single place where we delete the logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a particular broker holds a large amount of data for a particular topic or if we have a high throughput topic that was marked for delete, then deleting them synchronously when the broker starts up might increase the startup time significantly, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm leaning towards deleting them in a scheduled task to avoid wasting start-up time for data we don't need.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes either way is fine.
@@ -203,6 +208,11 @@ class LogManager(val logDirs: Array[File], | |||
delay = InitialTaskDelayMs, | |||
period = flushCheckpointMs, | |||
TimeUnit.MILLISECONDS) | |||
scheduler.schedule("kafka-delete-logs", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is worth observing that an alternate approach to this recurring task that pulls of a queue is to just invoke async delete on demand when we delete a log. i.e., instead of having the asyncDelete
method and log directory renames, instead call scheduler.schedule("delete", log.delete)
which will set period -1 to get a single invocation. The reason this isn't very robust is we could get several partitions to delete in a short period of time which could lead to rejected executions if the threadpool is fully utilized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deleteLogs
function swallows all exceptions and reports them. On IO exceptions on a particular log, it will retry later. The current implementation has a queue of logs already. It's semantically equivalent to scheudler.schedule
afaik. I did not feel compelled to change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is similar, but as noted above I think scheduler.schedule
could lead to a rejected execution if the executor is saturated; and then you would need a rejected execution handler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scheduler uses ScheduledThreadPoolExecutor
, which has a fixed number of threads (say N) and an unbounded queue. Using this scheduler for IO risks blocking all the threads in the thread-pool if/when at least N disk IO operations take too long or never return. As this thread-pool executor is used for other periodic tasks such as, log-retention, log-flushing, and recovery-checkpointing, we definitely don't want to lock-up all the threads doing nothing. Direct use of scheduler.schedule
opens up this unlikely but dangerous possibility. This is perhaps strong enough reason to stay away from direct scheduling and use an explicit queue of logs such that only one thread from the pool is used at a time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That can be avoided by using a separate scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be missing something here, but would like to understand the advantage of scheduling delete on the fly over having it as a periodic task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's simpler - that is pretty much it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A separate scheduler may need it's own defaultNumThreads
config?
removedLog = logs.remove(topicAndPartition) | ||
private def deleteLogs(): Unit = { | ||
while (!logsToBeDeleted.isEmpty) { | ||
val removedLog = logsToBeDeleted.take() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are any storage exceptions in delete, we probably want to retry - i.e., we should peek
and take
only after the delete succeeds.
@jjkoshy Updated PR to be robust against IO exceptions. Not doing startup time deletion yet. Ping @MayureshGharat |
@jjkoshy The |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be worth spending some time looking into just using the scheduler in single-shot manner. i.e., rename the directory and then delete that asynchronously in a single-shot call to the executor. The only thing I'm not sure about is the likelihood of saturating the executor service.
Also, there are unit test failures that are likely related. For e.g.,
kafka.log.LogCleanerIntegrationTest > testCleansCombinedCompactAndDeleteTopic[0] FAILED
java.lang.NullPointerException
at kafka.log.Log.logSegments(Log.scala:917)
at kafka.log.Log.recoverLog(Log.scala:286)
at kafka.log.Log.loadSegments(Log.scala:265)
at kafka.log.Log.<init>(Log.scala:107)
at kafka.log.LogCleanerIntegrationTest$$anonfun$makeCleaner$1.apply(LogCleanerIntegrationTest.scala:329)
at kafka.log.LogCleanerIntegrationTest$$anonfun$makeCleaner$1.apply(LogCleanerIntegrationTest.scala:325)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at kafka.log.LogCleanerIntegrationTest.makeCleaner(LogCleanerIntegrationTest.scala:325)
at kafka.log.LogCleanerIntegrationTest.runCleanerAndCheckCompacted$1(LogCleanerIntegrationTest.scala:104)
at kafka.log.LogCleanerIntegrationTest.testCleansCombinedCompactAndDeleteTopic(LogCleanerIntegrationTest.scala:130)
// reset the index size of the currently active log segment to allow more entries | ||
activeSegment.index.resize(config.maxIndexSize) | ||
activeSegment.timeIndex.resize(config.maxIndexSize) | ||
if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is fine.
throw new IllegalArgumentException( | ||
"Duplicate log directories found: %s, %s!".format( | ||
current.dir.getAbsolutePath, previous.dir.getAbsolutePath)) | ||
if (logDir.getName.endsWith(Log.DeleteDirSuffix)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes either way is fine.
@@ -203,6 +208,11 @@ class LogManager(val logDirs: Array[File], | |||
delay = InitialTaskDelayMs, | |||
period = flushCheckpointMs, | |||
TimeUnit.MILLISECONDS) | |||
scheduler.schedule("kafka-delete-logs", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is similar, but as noted above I think scheduler.schedule
could lead to a rejected execution if the executor is saturated; and then you would need a rejected execution handler.
} | ||
} catch { | ||
case e: Throwable => | ||
error(s"Exception in kafka-delete-logs thread. Ignoring.", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's necessary for a couple of reasons. First, ScheduledExecutorService.scheduleAtFixedRate
documentation says that "...If any execution of the task encounters an exception, subsequent executions are suppressed....". Second, LinkedBlockingQueue.put
may throw an InterruptedException
from the catch block. In both cases, we want to continue scheduling the task in future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was wondering if it makes more sense to do something more radical like shutdown. We don't have to - the existing behavior on storage exceptions on stop-replica is that the controller would retry the stop-replica request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, the blocking queue is effectively unbounded (well, max-value) so that is probably not going to block. In fact, it should never be allowed to block as that would hold up the stop-replica request.
5f04c63
to
141734b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this LGTM, but I'm not convinced that just doing single-shot calls to the executor would not be better. I think that is much simpler and is backed by an unbounded task queue.
f114e09
to
6ea745a
Compare
@jjkoshy Summarizing our discussion and decision to use the existing background thread-pool for deleting logs. The dilemma was between two choices with the existing background scheduler: 1) single-shot scheduling via An alternative thread-pool of background threads could be used but it requires a separate user-facing config and was therefore eliminated. A periodically scheduled deletion task (option 2 above) has two sub-options: a) scan the directory looking for any directories marked for deletion ("*.delete"). b) Use an internal queue of logs that contain directory names. Option 2.a is not attractive because most of the time the directory scan will not result anything to delete. Besides it's IO heavy. Therefore, the only reasonable option is to use 2.b where in an explicit The current patch keeps track of IO failures during one run of deletion and retries the failed directories at the next scheduled run. |
@@ -106,7 +106,7 @@ class Log(val dir: File, | |||
val t = time.milliseconds | |||
/* the actual segments of the log */ | |||
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] | |||
loadSegments() | |||
loadSegments() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whitespace
@@ -297,7 +307,8 @@ class LogManager(val logDirs: Array[File], | |||
|
|||
/** | |||
* Delete all data in a partition and start the log at the new offset | |||
* @param newOffset The new offset to start the log with | |||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whitespace
} | ||
} catch { | ||
case e: Throwable => | ||
error(s"Exception in kafka-delete-logs thread. Ignoring to ensure continued scheduling.", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drop the Ignoring..
bit.
removedLog = logs.remove(topicAndPartition) | ||
private def deleteLogs(): Unit = { | ||
try { | ||
var failed = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we were going with the simpler approach of just peeking and letting it go (and have the next run re-attempt to delete it). Exceptions here are completely abnormal and it is likely that similar exceptions will affect regular fetch/produce requests. This is also fine though, but overkill IMO.
} | ||
} | ||
|
||
def asyncDelete(topicAndPartition: TopicAndPartition) : String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add scaladoc to this? Specifically, document what it returns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method need not return anything. The return string (name of the renamed dir) was used just in a log statement. It was redundant too. So I zapped the return value. Added documentation.
|
||
logsToBeDeleted.add(removedLog) | ||
removedLog.removeLogMetrics() | ||
info(s"Log for partition ${removedLog.topicAndPartition.topic} is renamed to ${removedLog.topicAndPartition.partition} and is scheduled for deletion") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log message doesn't seem right - it seems it would log "topic" is renamed to "partition". Can you verify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops! Fixed now.
6ea745a
to
5c9ba74
Compare
Signed-off-by: Sumant Tambe <sutambe@linkedin.com>
…rectory Signed-off-by: Sumant Tambe <sutambe@linkedin.com>
…his is to speedup startup process. Also added check that Log directories ending with .delete be added to a separate set of logs that the are to be deleted asynchronously. Signed-off-by: Sumant Tambe <sutambe@linkedin.com>
Signed-off-by: Sumant Tambe <sutambe@linkedin.com>
Signed-off-by: Sumant Tambe <sutambe@linkedin.com>
Signed-off-by: Sumant Tambe <sutambe@linkedin.com>
…o loading segments on a crash Signed-off-by: Sumant Tambe <sutambe@linkedin.com>
Cleanup, async log deletion rebase, and tested successfully
5c9ba74
to
3139863
Compare
+1 |
…atmayuresh15@gmail.com> and Sumant Tambe <sutambe@yahoo.com> The last patch submitted by MayureshGharat (back in Dec 15) has been rebased to the latest trunk. I took care of a couple of test failures (MetricsTest) along the way. jjkoshy , granders , avianey , you may be interested in this PR. Author: Sumant Tambe <sutambe@yahoo.com> Author: Mayuresh Gharat <mgharat@mgharat-ld1.linkedin.biz> Author: MayureshGharat <gharatmayuresh15@gmail.com> Reviewers: Joel Koshy <jjkoshy.w@gmail.com> Closes apache#1664 from sutambe/async-delete-topic
The last patch submitted by @MayureshGharat (back in Dec 15) has been rebased to the latest trunk. I took care of a couple of test failures (MetricsTest) along the way. @jjkoshy , @granders , @avianey , you may be interested in this PR.