Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-6516: EMIT support in streaming agg #1358

Closed
wants to merge 3 commits into from

Conversation

Projects
None yet
4 participants
@parthchandra
Copy link
Contributor

commented Jul 2, 2018

Support for EMIT in the streaming aggregator.
Also includes a fix from @sohami in the external sort memory management (since streaming agg requires sort to hold on to memory until atreaming agg is done).

@Ben-Zvi, @sohami please review

@Ben-Zvi
Copy link
Contributor

left a comment

Looks good; comments are mainly suggestions for code cleanup.

@@ -308,8 +433,7 @@ public void addComplexWriter(final BaseWriter.ComplexWriter writer) {
private StreamingAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{
ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions());
cg.getCodeGenerator().plainJavaCapable(true);
// Uncomment out this line to debug the generated code.
//cg.getCodeGenerator().saveCodeForDebugging(true);
// cg.getCodeGenerator().saveCodeForDebugging(true);

This comment has been minimized.

Copy link
@Ben-Zvi

Ben-Zvi Jul 7, 2018

Contributor

Please restore the // Uncomment out this line .......
comment line

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Done

if (outcomeToReturn == EMIT) {
firstBatchForDataSet = true;
}
} else if (lastKnownOutcome == EMIT) {

This comment has been minimized.

Copy link
@Ben-Zvi

Ben-Zvi Jul 7, 2018

Contributor

(A minor comment) This "else if" can be moved up (to become the first "else if"), then all is needed is one "else" with outcomeToReturn = recordCount == 0 ? NONE : OK;

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Done

constructSpecialBatch();
// set state to indicate the fact that we have sent a special batch and input is empty
specialBatchSent = true;
firstBatchForDataSet = true; // reset on the next iteration

This comment has been minimized.

Copy link
@Ben-Zvi

Ben-Zvi Jul 7, 2018

Contributor

firstBatchForDataSet is already true (a condition to entering this if() )

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Done

return IterOutcome.STOP;
}
break;
case EMIT:

This comment has been minimized.

Copy link
@Ben-Zvi

Ben-Zvi Jul 7, 2018

Contributor

This EMIT handling code is about identical to the NONE handling. How about merging the two cases (only in the case of NONE, return NONE at the end, and for EMIT just fall thru).

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Done and then undone. The if condition for EMIT is different from the case for NONE and so this was a bug. The unit test you suggested caught it! Wonderful!

} else {
if ( lastKnownOutcome != NONE && firstBatchForDataSet && !aggregator.isDone()) {
lastKnownOutcome = incoming.next();
if (!first && firstBatchForDataSet) {

This comment has been minimized.

Copy link
@Ben-Zvi

Ben-Zvi Jul 7, 2018

Contributor

Isn't firstBatchForDataSet guaranteed true here (due to the enclosing if() ) ?

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Done

if (outputToBatchPrev(previous, previousIndex, outputCount)) {
if (EXTRA_DEBUG) {
logger.debug("Output container is full. flushing it.");
}

This comment has been minimized.

Copy link
@Ben-Zvi

Ben-Zvi Jul 7, 2018

Contributor

Should previousIndex be set to -1 here ? (instead of inside setOkAndReturn(), where it seems "out of place" )

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Yes it does appear out of place but it isn't really. It actually is tied to the fact that we must reset several state variables (this being one of them) every time EMIT is processed.
Also, while debugging I found places where I had missed resetting the previousIndex, so this is a safe place to do so.


testBuilder()
.optionSettingQueriesForTestQuery("alter session set `%s` = true",
PlannerSettings.STREAMAGG.getOptionName())

This comment has been minimized.

Copy link
@Ben-Zvi

Ben-Zvi Jul 7, 2018

Contributor

Should "planner.enable_hashagg" be set to false ? "planner.enable_streamagg" defaults to true anyway.

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Had to do this because the HashAgg tests were disabling the streaming agg. Removed this and reset the option in the HashAgg tests.


testBuilder()
.optionSettingQueriesForTestQuery("alter session set `%s` = true",
PlannerSettings.STREAMAGG.getOptionName())

This comment has been minimized.

Copy link
@Ben-Zvi

Ben-Zvi Jul 7, 2018

Contributor

Same comment - disable hash-agg.

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Ditto

.baselineValues("dd",111L)
.baselineValues("dd",222L)
.build().run();
}
}

This comment has been minimized.

Copy link
@Ben-Zvi

Ben-Zvi Jul 7, 2018

Contributor

Suggestion: add a test with COUNT(...) ; this is a classic streaming-agg usage (without GROUP BY ....)

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Excellent suggestion. Done

This comment has been minimized.

Copy link
@Ben-Zvi

Ben-Zvi Jul 10, 2018

Contributor

That's MAX, not COUNT; though the difference should only be in the generated code for the aggregation function.

* @param outcome
* @return outcome
*/
private final AggOutcome setOkAndReturn( IterOutcome outcome) {

This comment has been minimized.

Copy link
@Ben-Zvi

Ben-Zvi Jul 7, 2018

Contributor

A suggestion: Split off another method - setOkAndReturnForEmit() - to be used when (outcome == EMIT). This would yield two simpler methods (fewer condition checks). As in all places the current method is used, the value of "outcome" is known (EMIT or anything-else), then just call the appropriate one.

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Done.

@parthchandra
Copy link
Contributor Author

left a comment

Thanks Boaz. I've added a new commit with the changes. Will squash and merge myself once I get the approval.

return IterOutcome.STOP;
}
break;
case EMIT:

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Done and then undone. The if condition for EMIT is different from the case for NONE and so this was a bug. The unit test you suggested caught it! Wonderful!

constructSpecialBatch();
// set state to indicate the fact that we have sent a special batch and input is empty
specialBatchSent = true;
firstBatchForDataSet = true; // reset on the next iteration

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Done

} else {
if ( lastKnownOutcome != NONE && firstBatchForDataSet && !aggregator.isDone()) {
lastKnownOutcome = incoming.next();
if (!first && firstBatchForDataSet) {

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Done

// set state to indicate the fact that we have sent a special batch and input is empty
specialBatchSent = true;
return IterOutcome.OK;
firstBatchForDataSet = true; // reset on the next iteration

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Done

if (!createAggregator()) {
done = true;
firstBatchForDataSet = true;
if(first) {

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Leaving this as this was the style used in the original implementation

case OK:
resetIndex();
if (incoming.getRecordCount() == 0) {
continue;

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Nope. This is the same as the original code (line 234 in the original)

* @param outcome
* @return outcome
*/
private final AggOutcome setOkAndReturn( IterOutcome outcome) {

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Done.


testBuilder()
.optionSettingQueriesForTestQuery("alter session set `%s` = true",
PlannerSettings.STREAMAGG.getOptionName())

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Had to do this because the HashAgg tests were disabling the streaming agg. Removed this and reset the option in the HashAgg tests.


testBuilder()
.optionSettingQueriesForTestQuery("alter session set `%s` = true",
PlannerSettings.STREAMAGG.getOptionName())

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Ditto

.baselineValues("dd",111L)
.baselineValues("dd",222L)
.build().run();
}
}

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 10, 2018

Author Contributor

Excellent suggestion. Done

@priteshm

This comment has been minimized.

Copy link

commented Jul 10, 2018

@Ben-Zvi any more comments on this one?

@Ben-Zvi
Copy link
Contributor

left a comment

+1, nice work !!

.baselineValues("dd",111L)
.baselineValues("dd",222L)
.build().run();
}
}

This comment has been minimized.

Copy link
@Ben-Zvi

Ben-Zvi Jul 10, 2018

Contributor

That's MAX, not COUNT; though the difference should only be in the generated code for the aggregation function.

parthchandra added a commit to parthchandra/drill that referenced this pull request Jul 11, 2018

@parthchandra parthchandra deleted the parthchandra:DRILL-6516 branch Aug 3, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.