Skip to content

Commit

Permalink
fix groupBy caching to work with renamed aggregators
Browse files Browse the repository at this point in the history
Issue - while storing results in cache we store the event map which
contains aggregator names mapped to values. Now when someone fire same
query after renaming aggs, the cache key will be same but the event
will contain metric values mapped to older names which leads to wrong
results.
Fix - modify cache to not store raw event but the actual list of values
only.

review comments + fix dimension renaming

review comment
  • Loading branch information
nishantmonu51 committed Jul 9, 2015
1 parent 31d05e3 commit 184b12b
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,10 @@ public CacheStrategy<Row, Object, GroupByQuery> getCacheStrategy(final GroupByQu
{
return new CacheStrategy<Row, Object, GroupByQuery>()
{
private static final byte CACHE_STRATEGY_VERSION = 0x1;
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
private final List<DimensionSpec> dims = query.getDimensions();


@Override
public byte[] computeCacheKey(GroupByQuery query)
Expand All @@ -435,7 +438,7 @@ public byte[] computeCacheKey(GroupByQuery query)

ByteBuffer buffer = ByteBuffer
.allocate(
1
2
+ granularityBytes.length
+ filterBytes.length
+ aggregatorBytes.length
Expand All @@ -444,6 +447,7 @@ public byte[] computeCacheKey(GroupByQuery query)
+ limitBytes.length
)
.put(GROUPBY_QUERY)
.put(CACHE_STRATEGY_VERSION)
.put(granularityBytes)
.put(filterBytes)
.put(aggregatorBytes);
Expand Down Expand Up @@ -474,10 +478,15 @@ public Object apply(Row input)
{
if (input instanceof MapBasedRow) {
final MapBasedRow row = (MapBasedRow) input;
final List<Object> retVal = Lists.newArrayListWithCapacity(2);
final List<Object> retVal = Lists.newArrayListWithCapacity(1 + dims.size() + aggs.size());
retVal.add(row.getTimestamp().getMillis());
retVal.add(row.getEvent());

Map<String, Object> event = row.getEvent();
for (DimensionSpec dim : dims) {
retVal.add(event.get(dim.getOutputName()));
}
for (AggregatorFactory agg : aggs) {
retVal.add(event.get(agg.getName()));
}
return retVal;
}

Expand All @@ -500,21 +509,26 @@ public Row apply(Object input)

DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue());

Iterator<AggregatorFactory> aggsIter = aggs.iterator();

Map<String, Object> event = jsonMapper.convertValue(
results.next(),
new TypeReference<Map<String, Object>>()
{
}
);
Map<String, Object> event = Maps.newLinkedHashMap();
Iterator<DimensionSpec> dimsIter = dims.iterator();
while (dimsIter.hasNext() && results.hasNext()) {
final DimensionSpec factory = dimsIter.next();
event.put(factory.getOutputName(), results.next());
}

while (aggsIter.hasNext()) {
Iterator<AggregatorFactory> aggsIter = aggs.iterator();
while (aggsIter.hasNext() && results.hasNext()) {
final AggregatorFactory factory = aggsIter.next();
Object agg = event.get(factory.getName());
if (agg != null) {
event.put(factory.getName(), factory.deserialize(agg));
}
event.put(factory.getName(), factory.deserialize(results.next()));
}

if (dimsIter.hasNext() || aggsIter.hasNext() || results.hasNext()) {
throw new ISE(
"Found left over objects while reading from cache!! dimsIter[%s] aggsIter[%s] results[%s]",
dimsIter.hasNext(),
aggsIter.hasNext(),
results.hasNext()
);
}

return new MapBasedRow(
Expand Down
117 changes: 117 additions & 0 deletions server/src/test/java/io/druid/client/CachingClusteredClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2363,4 +2363,121 @@ public void testTimeBoundaryCachingWhenTimeIsInteger() throws Exception
makeTimeBoundaryResult(new DateTime("1970-01-05T01"), new DateTime("1970-01-05T01"), null)
);
}

@Test
public void testGroupByCachingRenamedAggs() throws Exception
{
GroupByQuery.Builder builder = new GroupByQuery.Builder()
.setDataSource(DATA_SOURCE)
.setQuerySegmentSpec(SEG_SPEC)
.setDimFilter(DIM_FILTER)
.setGranularity(GRANULARITY)
.setDimensions(Arrays.<DimensionSpec>asList(new DefaultDimensionSpec("a", "output")))
.setAggregatorSpecs(AGGS)
.setContext(CONTEXT);

testQueryCaching(
client,
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeGroupByResults(new DateTime("2011-01-01"), ImmutableMap.of("output", "a", "rows", 1, "imps", 1, "impers", 1)),

new Interval("2011-01-02/2011-01-03"),
makeGroupByResults(new DateTime("2011-01-02"), ImmutableMap.of("output", "b", "rows", 2, "imps", 2, "impers", 2)),

new Interval("2011-01-05/2011-01-10"),
makeGroupByResults(
new DateTime("2011-01-05"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3),
new DateTime("2011-01-06"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4),
new DateTime("2011-01-07"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5),
new DateTime("2011-01-08"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6),
new DateTime("2011-01-09"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7)
),

new Interval("2011-01-05/2011-01-10"),
makeGroupByResults(
new DateTime("2011-01-05T01"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3),
new DateTime("2011-01-06T01"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4),
new DateTime("2011-01-07T01"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5),
new DateTime("2011-01-08T01"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6),
new DateTime("2011-01-09T01"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7)
)
);

Supplier<GroupByQueryConfig> configSupplier = new Supplier<GroupByQueryConfig>()
{
@Override
public GroupByQueryConfig get()
{
return new GroupByQueryConfig();
}
};
QueryRunner runner = new FinalizeResultsQueryRunner(
client,
new GroupByQueryQueryToolChest(
configSupplier,
jsonMapper,
new GroupByQueryEngine(
configSupplier,
new StupidPool<>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
),
TestQueryRunners.pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedObjects(
makeGroupByResults(
new DateTime("2011-01-05T"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3),
new DateTime("2011-01-05T01"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3),
new DateTime("2011-01-06T"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4),
new DateTime("2011-01-06T01"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4),
new DateTime("2011-01-07T"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5),
new DateTime("2011-01-07T01"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5),
new DateTime("2011-01-08T"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6),
new DateTime("2011-01-08T01"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6),
new DateTime("2011-01-09T"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7),
new DateTime("2011-01-09T01"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7)
),
runner.run(
builder.setInterval("2011-01-05/2011-01-10")
.build(),
context
),
""
);

TestHelper.assertExpectedObjects(
makeGroupByResults(
new DateTime("2011-01-05T"), ImmutableMap.of("output2", "c", "rows2", 3, "imps", 3, "impers2", 3),
new DateTime("2011-01-05T01"), ImmutableMap.of("output2", "c", "rows2", 3, "imps", 3, "impers2", 3),
new DateTime("2011-01-06T"), ImmutableMap.of("output2", "d", "rows2", 4, "imps", 4, "impers2", 4),
new DateTime("2011-01-06T01"), ImmutableMap.of("output2", "d", "rows2", 4, "imps", 4, "impers2", 4),
new DateTime("2011-01-07T"), ImmutableMap.of("output2", "e", "rows2", 5, "imps", 5, "impers2", 5),
new DateTime("2011-01-07T01"), ImmutableMap.of("output2", "e", "rows2", 5, "imps", 5, "impers2", 5),
new DateTime("2011-01-08T"), ImmutableMap.of("output2", "f", "rows2", 6, "imps", 6, "impers2", 6),
new DateTime("2011-01-08T01"), ImmutableMap.of("output2", "f", "rows2", 6, "imps", 6, "impers2", 6),
new DateTime("2011-01-09T"), ImmutableMap.of("output2", "g", "rows2", 7, "imps", 7, "impers2", 7),
new DateTime("2011-01-09T01"), ImmutableMap.of("output2", "g", "rows2", 7, "imps", 7, "impers2", 7)
),
runner.run(
builder.setInterval("2011-01-05/2011-01-10")
.setDimensions(Arrays.<DimensionSpec>asList(new DefaultDimensionSpec("a", "output2")))
.setAggregatorSpecs(RENAMED_AGGS)
.build(),
context
),
"renamed aggregators test"
);
}

}

0 comments on commit 184b12b

Please sign in to comment.