-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4586; Add purgeDataBefore() API (KIP-107) #2476
Conversation
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@ijuma @ewencp @jjkoshy @radai-rosenblatt I have manually tested it successfully using the |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@lindong28, thanks for the PR. I probably won't have time to review before next week. cc @junrao as well since he reviewed the KIP. |
1146e74
to
ed11672
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
ed11672
to
478300f
Compare
Refer to this link for build results (access rights to CI server needed): |
@jjkoshy @junrao @becketqin @ijuma @radai-rosenblatt I have added tests and the patch is fully ready for review. Would you have time to review this patch? |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@lindong28 Thanks for the patch. It seems the patch has conflicts. Could you rebase? |
@becketqin I thought it will take 1+ week for the patch to be reviewed and there will be conflict again anyway. Thus I was going to rebase it after first round of review. What is our general guideline for rebasing big patches? I can certainly rebase it now if you think it is useful. |
478300f
to
74b1229
Compare
@becketqin All conflicts have been resolved and all tests are passed. Thanks! |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@lindong28 Thanks for updating the patch. I'll take a look. Usually if there are multiple big patches in parallel, the committers who are reviewing the code would hold back some of the patches to avoid unnecessary rebase. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lindong28 Thanks for the patch. Left some comments.
It looks the patch again has conflicts with trunk. To avoid frequent rebase, let's finish a few iteration of reviews before we rebase.
if (!errors.isEmpty) | ||
error(s"Metadata request contained errors: $errors") | ||
|
||
val (authorizedPartitions, unauthorizedPartitions) = purgeOffsets.partition{partitionAndOffset => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable names seem a little misleading. Are all partitions without leader information unauthorized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. I have updated the names.
response.cluster().leaderFor(partitionAndOffset._1) != null} | ||
|
||
val unauthorizedPartitionResults = unauthorizedPartitions.mapValues( _ => | ||
PurgeDataResult(PurgeResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION.exception())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use the exception returned by the broker in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I have updated code to use error from MetadataRequest.
val observedResults = futures.flatMap{ future => | ||
val elapsed = time.milliseconds() - start | ||
remaining = timeoutMs - elapsed | ||
if (remaining > 0 && client.poll(future, remaining)) future.value() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When client.poll(future, remaining)
returns true, the future may either contains a value (succeeded) or an error (failed). If the future has an error, calling future.value()
will throw exception. It seems better if we can return the full results to the users even if some of the requests failed so the users will be able to know which partitions has failed to purge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be a problem for purgeDataBefore()
, because those futures
provided to the CompositeFuture
has been constructed in such a way that they never raises exception. Those future will call future.complete(result)
in case of onFailure
, where result indicates has the error information.
I agree it would be make CompositeFuture
more useful if this class handles the logic of converting error to result and return the full results to user as you suggested. But I don't have a good way to do it now because CompositeFuture
doesn't know the type of the return value -- it currently use template T
.
} | ||
|
||
// send requests over network to brokers | ||
client.poll(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems a single consumerNetworkClient.poll(0)
cannot guarantee all the requests are sent out. Also, the interface might be a little weird that after purgeDataBefore()
is returned the users have to keep calling future.client.poll() otherwise the futures will not be completed. I am wondering how would user use the asynchronous purge in this case? At very least we should document this clearly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point. I have updated the AdminClient to create its own thread to do client.poll(retryBackoffMs)
. I find it necessary for AdminClient to have its own thread in order to support both syn and async operation.
I have also added testLogStartOffsetAfterAsyncPurge()
to validate the asyn purge operation.
|
||
@Override | ||
public String toString() { | ||
StringBuilder builder = new StringBuilder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could just be a string concatenation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any negative impact to use StringBuilder as compared to string concatenation? Using StringBuilder here allows us to have the same code style as toString()
of other requests such as ProduceRequest
and LeaderAndIsrRequest
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not much difference, just for readability (See below) we can keep them the same as other requests.
http://stackoverflow.com/questions/1532461/stringbuilder-vs-string-concatenation-in-tostring-in-java
* Return low watermark of the partition. | ||
*/ | ||
def purgeRecordsOnLeader(offset: Long): Long = { | ||
inReadLock(leaderIsrUpdateLock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be in write lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use readlock since this method doesn't update leader or isr of the partition, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. Read lock here is fine.
*/ | ||
def maybeIncrementLogStartOffsetAndPurge(offset: Long) { | ||
// We don't have to write the log start offset to log-start-offset-checkpoint immediately. | ||
// The purgeOffset may be lost only if all replicas of this broker are shutdown |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment accurate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was not accurate. I have updated the comment to replace all replicas of this broker
to in-sync replicas of this broker
val localPurgeResults = purgeOnLocalLog(offsetPerPartition, metadataCache) | ||
debug("Purge on local log in %d ms".format(time.milliseconds - sTime)) | ||
|
||
val purgeStatus = localPurgeResults.map { case (topicPartition, result) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: could be mapValues
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I validated that we can not use mapValues()
here. This is because mapValues()
returns a map view which maps every key of this map to f(this(key))
. The resulting map wraps the original map without copying any elements. As a result status.acksPending = true
in the constructor of DelayedPurge
becomes no-op.
tryCompleteDelayedRequests() | ||
} | ||
|
||
def checkLowWatermarkReachOffset(requiredOffset: Long): (Boolean, Errors, Long) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Methods with three tuples as return value may be a little hard to follow, may be we can create a case class. At very least we should document each field of the return value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I have updated the code to make it more readable.
|
||
// Default throttle time | ||
private static final int DEFAULT_THROTTLE_TIME = 0; | ||
// Default low watermark | ||
private static final long DEFAULT_LOG_START_OFFSET = 0L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to distinguish between NO_LOG_START_OFFSET v.s. LOG_START_OFFSET = 0? Is it clearer to define the NO_LOG_START_OFFSET as -1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is necessary to distinguish between NO_LOG_START_OFFSET v.s. LOG_START_OFFSET = 0. Is there any use-case for NO_LOG_START_OFFSET?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, we want to identify the state of the system as clear as possible. The follower should not take any action if the LOG_START_OFFSET on the broker is NO_LOG_START_OFFSET. But if the follower sees the leader returning the starting offset = 0 while the actual starting offset on the leader is not, this introduces confusion.
@becketqin Thanks so much for taking time to review the patch! Can you check if the updated patch has addressed your comments? |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
It seems that right now, for a compacted topic, the base offset of the first segment is always 0. So, the patch is fine. |
@lindong28 : Thanks for the patch. LGTM. @becketqin : Do you want to make another pass and then merge? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lindong28 Thanks for the patch. LGTM except a very rare corner case.
new Field("timeout", INT32, "The maximum time to await a response in ms.")); | ||
|
||
public static final Schema DELETE_RECORDS_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."), | ||
new Field("low_watermark", INT64, "Smallest available offset"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we be more clear on this field. In the FetchResponse
we have log_start_offset which have almost the same comment. Maybe here we can say "The smallest available offset across all live replicas."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Updated now.
|
||
public static final Schema DELETE_RECORDS_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."), | ||
new Field("low_watermark", INT64, "Smallest available offset"), | ||
new Field("error_code", INT16, "The error code for the given topic.")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the given topic => for the given partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Fixed now.
val recoveryPoints = this.logsByDir.get(dir.toString) | ||
if (recoveryPoints.isDefined) { | ||
this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint)) | ||
} | ||
} | ||
|
||
/** | ||
* Checkpoint log start offset for all logs in provided directory. | ||
*/ | ||
private def checkpointLogStartOffsetsInDir(dir: File): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There seems a very rare case that may result in message loss. Assuming there is only one replica, consider the following sequence:
- User deletes a topic, we are not deleting the log starting offset from the checkpoint file.
- If the topic is created again with the same name and the partitions happen to be on the same broker.
- user produced some messages and before the log starting offset is checkpointed, the broker went down.
- Now when the broker restarts, the old checkpointed log starting offset may be applied to the newly created topic, which may cause the messages that have been produced into the log to be unavailable to the users.
This is a very rare corner case, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I fixed the problem by always do checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
when a partition is deleted. The overhead will probably be smaller than checkpointing the cleaner offset which we already do everytime we delete a partition.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@junrao @becketqin All integration tests have passed except |
@lindong28 Thanks for updating the patch. Merged to trunk. |
No description provided.