diff --git a/processing/src/main/java/org/apache/druid/query/Result.java b/processing/src/main/java/org/apache/druid/query/Result.java index c1ec3a3a5ba9..9ec75ad40a02 100644 --- a/processing/src/main/java/org/apache/druid/query/Result.java +++ b/processing/src/main/java/org/apache/druid/query/Result.java @@ -24,6 +24,9 @@ import org.apache.druid.guice.annotations.PublicApi; import org.joda.time.DateTime; +import javax.annotation.Nullable; +import java.util.Comparator; +import java.util.Objects; import java.util.function.Function; /** @@ -33,11 +36,12 @@ public class Result implements Comparable> { public static String MISSING_SEGMENTS_KEY = "missingSegments"; + @Nullable private final DateTime timestamp; private final T value; @JsonCreator - public Result(@JsonProperty("timestamp") DateTime timestamp, @JsonProperty("result") T value) + public Result(@JsonProperty("timestamp") @Nullable DateTime timestamp, @JsonProperty("result") T value) { this.timestamp = timestamp; this.value = value; @@ -51,10 +55,12 @@ public Result map(Function mapper) @Override public int compareTo(Result tResult) { - return timestamp.compareTo(tResult.timestamp); + // timestamp is null for grandTotal which should come last. + return Comparator.nullsLast(DateTime::compareTo).compare(this.timestamp, tResult.timestamp); } @JsonProperty + @Nullable public DateTime getTimestamp() { return timestamp; @@ -78,22 +84,22 @@ public boolean equals(Object o) Result result = (Result) o; - if (timestamp != null ? !(timestamp.isEqual(result.timestamp) && timestamp.getZone().getOffset(timestamp) == result.timestamp.getZone().getOffset(result.timestamp)) : result.timestamp != null) { - return false; - } - if (value != null ? !value.equals(result.value) : result.value != null) { + if (timestamp != null && result.timestamp != null) { + if (!timestamp.isEqual(result.timestamp) + && timestamp.getZone().getOffset(timestamp) == result.timestamp.getZone().getOffset(result.timestamp)) { + return false; + } + } else if (timestamp == null ^ result.timestamp == null) { return false; } - return true; + return Objects.equals(value, result.value); } @Override public int hashCode() { - int result = timestamp != null ? timestamp.hashCode() : 0; - result = 31 * result + (value != null ? value.hashCode() : 0); - return result; + return Objects.hash(timestamp, value); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index d625c318ce2a..0ae9a707c19b 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -52,7 +53,6 @@ import org.apache.druid.query.groupby.RowBasedColumnSelectorFactory; import org.joda.time.DateTime; -import javax.annotation.Nullable; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -303,7 +303,12 @@ public Function, Object> prepareForCache(boolean i TimeseriesResultValue results = input.getValue(); final List retVal = Lists.newArrayListWithCapacity(1 + aggs.size()); - retVal.add(input.getTimestamp().getMillis()); + // Timestamp can be null if grandTotal is true. + if (isResultLevelCache) { + retVal.add(input.getTimestamp() == null ? null : input.getTimestamp().getMillis()); + } else { + retVal.add(Preconditions.checkNotNull(input.getTimestamp(), "timestamp of input[%s]", input).getMillis()); + } for (AggregatorFactory agg : aggs) { retVal.add(results.getMetric(agg.getName())); } @@ -324,7 +329,7 @@ public Function> pullFromCache(boolean isR private final Granularity granularity = query.getGranularity(); @Override - public Result apply(@Nullable Object input) + public Result apply(Object input) { List results = (List) input; final Map retVal = Maps.newLinkedHashMap(); @@ -332,7 +337,13 @@ public Result apply(@Nullable Object input) Iterator aggsIter = aggs.iterator(); Iterator resultIter = results.iterator(); - DateTime timestamp = granularity.toDateTime(((Number) resultIter.next()).longValue()); + final Number timestampNumber = (Number) resultIter.next(); + final DateTime timestamp; + if (isResultLevelCache) { + timestamp = timestampNumber == null ? null : granularity.toDateTime(timestampNumber.longValue()); + } else { + timestamp = granularity.toDateTime(Preconditions.checkNotNull(timestampNumber, "timestamp").longValue()); + } CacheStrategy.fetchAggregatorsFromCache( aggsIter, diff --git a/processing/src/test/java/org/apache/druid/query/ResultTest.java b/processing/src/test/java/org/apache/druid/query/ResultTest.java new file mode 100644 index 000000000000..7fe6815549c2 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/ResultTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import org.apache.druid.java.util.common.DateTimes; +import org.junit.Assert; +import org.junit.Test; + +public class ResultTest +{ + @Test + public void testCompareNullTimestamp() + { + final Result nullTimestamp = new Result<>(null, null); + final Result nullTimestamp2 = new Result<>(null, null); + final Result nonNullTimestamp = new Result<>(DateTimes.nowUtc(), null); + + Assert.assertEquals(0, nullTimestamp.compareTo(nullTimestamp2)); + Assert.assertEquals(1, nullTimestamp.compareTo(nonNullTimestamp)); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java index 304ac9b5ff1f..89fdbb7a8674 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java @@ -133,6 +133,23 @@ public void testCacheStrategy() throws Exception Result fromResultLevelCacheRes = strategy.pullFromCache(true).apply(fromResultLevelCacheValue); Assert.assertEquals(result2, fromResultLevelCacheRes); + + final Result result3 = new Result<>( + // null timestamp similar to grandTotal + null, + new TimeseriesResultValue( + ImmutableMap.of("metric1", 2, "metric0", 3, "complexMetric", "val1", "post", 10) + ) + ); + + preparedResultLevelCacheValue = strategy.prepareForCache(true).apply(result3); + fromResultLevelCacheValue = objectMapper.readValue( + objectMapper.writeValueAsBytes(preparedResultLevelCacheValue), + strategy.getCacheObjectClazz() + ); + + fromResultLevelCacheRes = strategy.pullFromCache(true).apply(fromResultLevelCacheValue); + Assert.assertEquals(result3, fromResultLevelCacheRes); } @Test diff --git a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java index 6a303b8a6df7..6a9a64091bb7 100644 --- a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java @@ -108,43 +108,46 @@ public Sequence run(QueryPlus queryPlus, Map responseContext) } final Function cacheFn = strategy.prepareForCache(true); - return Sequences.wrap(Sequences.map( - resultFromClient, - new Function() + return Sequences.wrap( + Sequences.map( + resultFromClient, + new Function() + { + @Override + public T apply(T input) + { + if (resultLevelCachePopulator.isShouldPopulate()) { + resultLevelCachePopulator.cacheResultEntry(input, cacheFn); + } + return input; + } + } + ), + new SequenceWrapper() { @Override - public T apply(T input) + public void after(boolean isDone, Throwable thrown) { - if (resultLevelCachePopulator.isShouldPopulate()) { - resultLevelCachePopulator.cacheResultEntry(resultLevelCachePopulator, input, cacheFn); + Preconditions.checkNotNull( + resultLevelCachePopulator, + "ResultLevelCachePopulator cannot be null during cache population" + ); + if (thrown != null) { + log.error( + thrown, + "Error while preparing for result level caching for query %s with error %s ", + query.getId(), + thrown.getMessage() + ); + } else if (resultLevelCachePopulator.isShouldPopulate()) { + // The resultset identifier and its length is cached along with the resultset + resultLevelCachePopulator.populateResults(); + log.debug("Cache population complete for query %s", query.getId()); } - return input; + resultLevelCachePopulator.stopPopulating(); } } - ), new SequenceWrapper() - { - @Override - public void after(boolean isDone, Throwable thrown) - { - Preconditions.checkNotNull( - resultLevelCachePopulator, - "ResultLevelCachePopulator cannot be null during cache population" - ); - if (thrown != null) { - log.error( - thrown, - "Error while preparing for result level caching for query %s with error %s ", - query.getId(), - thrown.getMessage() - ); - } else if (resultLevelCachePopulator.isShouldPopulate()) { - // The resultset identifier and its length is cached along with the resultset - resultLevelCachePopulator.populateResults(); - log.debug("Cache population complete for query %s", query.getId()); - } - resultLevelCachePopulator.cacheObjectStream = null; - } - }); + ); } } else { return baseRunner.run( @@ -234,20 +237,14 @@ private ResultLevelCachePopulator createResultLevelCachePopulator( } } - public class ResultLevelCachePopulator + private class ResultLevelCachePopulator { private final Cache cache; private final ObjectMapper mapper; private final Cache.NamedKey key; private final CacheConfig cacheConfig; - private ByteArrayOutputStream cacheObjectStream = new ByteArrayOutputStream(); - - public boolean isShouldPopulate() - { - return shouldPopulate; - } - - private boolean shouldPopulate; + @Nullable + private ByteArrayOutputStream cacheObjectStream; private ResultLevelCachePopulator( Cache cache, @@ -261,29 +258,35 @@ private ResultLevelCachePopulator( this.mapper = mapper; this.key = key; this.cacheConfig = cacheConfig; - this.shouldPopulate = shouldPopulate; + this.cacheObjectStream = shouldPopulate ? new ByteArrayOutputStream() : null; + } + + boolean isShouldPopulate() + { + return cacheObjectStream != null; + } + + void stopPopulating() + { + cacheObjectStream = null; } private void cacheResultEntry( - ResultLevelCachePopulator resultLevelCachePopulator, T resultEntry, Function cacheFn ) { - + Preconditions.checkNotNull(cacheObjectStream, "cacheObjectStream"); int cacheLimit = cacheConfig.getResultLevelCacheLimit(); - try (JsonGenerator gen = mapper.getFactory().createGenerator(resultLevelCachePopulator.cacheObjectStream)) { + try (JsonGenerator gen = mapper.getFactory().createGenerator(cacheObjectStream)) { gen.writeObject(cacheFn.apply(resultEntry)); - if (cacheLimit > 0 && resultLevelCachePopulator.cacheObjectStream.size() > cacheLimit) { - shouldPopulate = false; - resultLevelCachePopulator.cacheObjectStream = null; - return; + if (cacheLimit > 0 && cacheObjectStream.size() > cacheLimit) { + stopPopulating(); } } catch (IOException ex) { log.error(ex, "Failed to retrieve entry to be cached. Result Level caching will not be performed!"); - shouldPopulate = false; - resultLevelCachePopulator.cacheObjectStream = null; + stopPopulating(); } } @@ -292,7 +295,7 @@ public void populateResults() ResultLevelCacheUtil.populate( cache, key, - cacheObjectStream.toByteArray() + Preconditions.checkNotNull(cacheObjectStream, "cacheObjectStream").toByteArray() ); } }