-
Notifications
You must be signed in to change notification settings - Fork 396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
flush the last partial file when incoming stream is paused, and not g… #100
Conversation
…enerating first small files. confluentinc#79
Can one of the admins verify this patch? |
It looks like @skyahead hasn't signed our Contributor License Agreement, yet. Appreciation of efforts, clabot |
[clabot:check] |
It looks like @skyahead hasn't signed our Contributor License Agreement, yet. Appreciation of efforts, clabot |
[clabot:check] |
@confluentinc It looks like @skyahead just signed our Contributor License Agreement. 👍 Always at your service, clabot |
this.isFirst = isFirst; | ||
} | ||
|
||
public void setFlushPartial(boolean flushPartial) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that flushPartial
is only set to true
in practice and that the setFlushPartial
call only exists to support the tests. Why have the flushPartial
flag at all if it only takes on one value? It looks like the same is also true of isFirst
.
Normally we'd just test the code as a blackbox, feeding it some input and validating that the expected output was generated. Is there a reason that wouldn't work here? It seems like you just want partial data to be written upon flush, which seems like it should be testable without manually exposing some internal flags.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ewencp Sorry for late replying and thanks for the review.
I removed the flushPartial flag and the non-blackbox test case.
Regarding isFirst, it is kept with the following reason.
In the shouldRotate method, there is a periodicRotation check on the time difference between 'now' and 'lastRotate' like so: now - lastRotate >= rotateIntervalMs. The 'lastRotate' is always zero because it is first used in shouldRorate() and will only be updated later on in updateRotationTimers().
Without the isFirst flag, the very first batch always contains only one record.
ok to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of nits, but the mechanics all look good to me. Also, will we want to adjust the target on this to the 3.1.x
branch so it'll make it into that release?
@@ -398,10 +417,20 @@ private void setState(State state) { | |||
this.state = state; | |||
} | |||
|
|||
private void setIsFirst(boolean isFirst) { | |||
this.isFirst = isFirst; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really critical, but this method doesn't seem necessary since it's just used internally to set a single variable.
boolean scheduledRotation = rotateScheduleIntervalMs > 0 && now >= nextScheduledRotate; | ||
boolean messageSizeRotation = recordCounter >= flushSize; | ||
|
||
if (isFirst) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we could also avoid the whole isFirst
business if we just initialize lastRotate
to be the current time in the constructor.
@ewencp Thanks very much for your help and terrible sorry for my slowness. Got quite snowed under by work (on other hdfs connector issues :-)). A reason that I was using the 'isFirst' flag is that sometime shouldRotate() can be true even if no records appeared. For example, if the recover() method (that talks to HDFS) takes longer than rotateIntervalMs time, then shouldRotate() will be true and am empty file will be generated. The unit test can not verify this situation but I saw this happening in our test cluster. Will try to confirm as soon as I can. |
@skyahead Haha, no problem, I understand the feeling :) Re: empty files, I think we should probably just handle empty files an additional case in |
@ewencp Tracking the amount of actual data written is a great idea! |
…enerating first small files.
#79