-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
average aggregator in both ingestion phase and query phase #3859
Conversation
609c036
to
f12a728
Compare
@kaijianding, a couple questions:
|
benefits are:
In our scenario, we want to do average when rollup(average only when timeAndDims are same), but sum/max/min the averaged column as if the averaged column is the float type GenericColumn. In this case, sum1/count1+sum2/count2+sum3/count3 != (sum1+sum2+sum3)/(count1+count2+count3). We are using this in pre-production for now and will go to production when the whole product is finished |
thanks for the explanation @kaijianding. |
@kaijianding i think this might confuse druid user for the reason you highlighted.
I think it will be better to have it as a module maybe ? plus more documentation about how it is different from the conventional |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed this partially, had a few comments on specific parts of the code, will continue review later
I also agree with @b-slim, I think it would be good to see a bit of documentation on what use cases this enables beyond the existing (sum / count) method for calculating averages
return Aggregators.noopBufferAggregator(); | ||
} | ||
|
||
if ("float".equalsIgnoreCase(inputType)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest making inputType
an enum with a fromString() method for deserialization (see ValueType
for an example), this block can be a switch
return GroupByQueryRunnerTest.constructorFeeder(); | ||
} | ||
|
||
public AvgGroupByQueryTest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest moving these and other "avg agg with query" tests to the main query test suites instead of in separate files, I think it'd be more convenient/easier to validate code changes if the tests for a query type are all together (otherwise a dev would have to remember to find/run all the separate 'does feature X work with query type Y' test suites)
return holder1; | ||
} | ||
if (holder1.count == 0) { | ||
holder1.count = holder2.count; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this just return holder2
instead of copying its values into holder1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had some more comments, will do another pass of review
@Override | ||
public byte[] getCacheKey() | ||
{ | ||
byte[] fieldNameBytes = com.metamx.common.StringUtils.toUtf8(fieldName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should remove the com.metamx.common.
and use the StringUtils within druid
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.fasterxml.jackson.annotation.JsonTypeName; | ||
import com.google.common.base.Preconditions; | ||
import com.metamx.common.IAE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should use io.druid.java.util.common.IAE
instead
public double compute() | ||
{ | ||
if (count == 0) { | ||
throw new IllegalStateException("should not be empty holder"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should return NaN or 0 instead of throwing the exception, it's possible for an aggregator to not receive any values (suppose the AvgAggregator is wrapped in a FilteredAggregator that doesn't match any rows in a particular segment)
Can you also add a test for this case?
return Longs.BYTES + Doubles.BYTES; | ||
} | ||
|
||
long count; // number of elements |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's have these use getters/setters
{ | ||
long count = buf.getLong(position + COUNT_OFFSET); | ||
double sum = buf.getDouble(position + SUM_OFFSET); | ||
return (float) sum / count; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should check for count == 0
{ | ||
long count = buf.getLong(position + COUNT_OFFSET); | ||
double sum = buf.getDouble(position + SUM_OFFSET); | ||
return (long) sum / count; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should check for count == 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kaijianding Thanks for the PR, these are all the comments I have for now
@Override | ||
public int compare(AvgAggregatorCollector o1, AvgAggregatorCollector o2) | ||
{ | ||
int compare = Longs.compare(o1.count, o2.count); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comparator should compare the computed averages from o1 and o2 instead of comparing the count/sum separately, otherwise anything sorting by an AvgAggregator could get results in an incorrect order, e.g.:
AvgAgg(count=5, sum=100)
should be less than AvgAgg(count=1, sum=500000)
Can you add a test for this behavior where an AvgAggregator determines the sorting order?
@kaijianding any particular reason, you didnt follow up with this. |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@druid.apache.org list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This PR is Inspired by #2525.
The avg aggregator can both be used in ingestion phase and query phase.
The pre-averaged column can be used in doubleSum/... etc again. This is useful if user only want to do average when rollup, but treat it as normal double metric when query.