Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix resultLevelCache for timeseries with grandTotal #7624

Merged
merged 4 commits into from
May 10, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions processing/src/main/java/org/apache/druid/query/Result.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.druid.guice.annotations.PublicApi;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.Objects;
import java.util.function.Function;

/**
Expand All @@ -33,11 +36,12 @@ public class Result<T> implements Comparable<Result<T>>
{
public static String MISSING_SEGMENTS_KEY = "missingSegments";

@Nullable
private final DateTime timestamp;
private final T value;

@JsonCreator
public Result(@JsonProperty("timestamp") DateTime timestamp, @JsonProperty("result") T value)
public Result(@JsonProperty("timestamp") @Nullable DateTime timestamp, @JsonProperty("result") T value)
{
this.timestamp = timestamp;
this.value = value;
Expand All @@ -51,10 +55,12 @@ public <U> Result<U> map(Function<? super T, ? extends U> mapper)
@Override
public int compareTo(Result<T> tResult)
{
return timestamp.compareTo(tResult.timestamp);
// timestamp is null for grandTotal which should come last.
return Comparator.nullsLast(DateTime::compareTo).compare(this.timestamp, tResult.timestamp);
}

@JsonProperty
@Nullable
public DateTime getTimestamp()
{
return timestamp;
Expand All @@ -78,22 +84,22 @@ public boolean equals(Object o)

Result result = (Result) o;

if (timestamp != null ? !(timestamp.isEqual(result.timestamp) && timestamp.getZone().getOffset(timestamp) == result.timestamp.getZone().getOffset(result.timestamp)) : result.timestamp != null) {
return false;
}
if (value != null ? !value.equals(result.value) : result.value != null) {
if (timestamp != null && result.timestamp != null) {
if (!timestamp.isEqual(result.timestamp)
&& timestamp.getZone().getOffset(timestamp) == result.timestamp.getZone().getOffset(result.timestamp)) {
return false;
}
} else if (timestamp == null ^ result.timestamp == null) {
return false;
}

return true;
return Objects.equals(value, result.value);
}

@Override
public int hashCode()
{
int result = timestamp != null ? timestamp.hashCode() : 0;
result = 31 * result + (value != null ? value.hashCode() : 0);
return result;
return Objects.hash(timestamp, value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -52,7 +53,6 @@
import org.apache.druid.query.groupby.RowBasedColumnSelectorFactory;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -303,7 +303,12 @@ public Function<Result<TimeseriesResultValue>, Object> prepareForCache(boolean i
TimeseriesResultValue results = input.getValue();
final List<Object> retVal = Lists.newArrayListWithCapacity(1 + aggs.size());

retVal.add(input.getTimestamp().getMillis());
// Timestamp can be null if grandTotal is true.
if (isResultLevelCache) {
retVal.add(input.getTimestamp() == null ? null : input.getTimestamp().getMillis());
} else {
retVal.add(Preconditions.checkNotNull(input.getTimestamp(), "timestamp of input[%s]", input).getMillis());
}
for (AggregatorFactory agg : aggs) {
retVal.add(results.getMetric(agg.getName()));
}
Expand All @@ -324,15 +329,21 @@ public Function<Object, Result<TimeseriesResultValue>> pullFromCache(boolean isR
private final Granularity granularity = query.getGranularity();

@Override
public Result<TimeseriesResultValue> apply(@Nullable Object input)
public Result<TimeseriesResultValue> apply(Object input)
{
List<Object> results = (List<Object>) input;
Map<String, Object> retVal = Maps.newLinkedHashMap();

Iterator<AggregatorFactory> aggsIter = aggs.iterator();
Iterator<Object> resultIter = results.iterator();

DateTime timestamp = granularity.toDateTime(((Number) resultIter.next()).longValue());
final Number timestampNumber = (Number) resultIter.next();
final DateTime timestamp;
if (isResultLevelCache) {
timestamp = timestampNumber == null ? null : granularity.toDateTime(timestampNumber.longValue());
} else {
timestamp = granularity.toDateTime(Preconditions.checkNotNull(timestampNumber, "timestamp").longValue());
}

while (aggsIter.hasNext() && resultIter.hasNext()) {
final AggregatorFactory factory = aggsIter.next();
Expand Down
38 changes: 38 additions & 0 deletions processing/src/test/java/org/apache/druid/query/ResultTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query;

import org.apache.druid.java.util.common.DateTimes;
import org.junit.Assert;
import org.junit.Test;

public class ResultTest
{
@Test
public void testCompareNullTimestamp()
{
final Result<Object> nullTimestamp = new Result<>(null, null);
final Result<Object> nullTimestamp2 = new Result<>(null, null);
final Result<Object> nonNullTimestamp = new Result<>(DateTimes.nowUtc(), null);

Assert.assertEquals(0, nullTimestamp.compareTo(nullTimestamp2));
Assert.assertEquals(1, nullTimestamp.compareTo(nonNullTimestamp));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,23 @@ public void testCacheStrategy() throws Exception

Result<TimeseriesResultValue> fromResultLevelCacheRes = strategy.pullFromCache(true).apply(fromResultLevelCacheValue);
Assert.assertEquals(result2, fromResultLevelCacheRes);

final Result<TimeseriesResultValue> result3 = new Result<>(
// null timestamp similar to grandTotal
null,
new TimeseriesResultValue(
ImmutableMap.of("metric1", 2, "metric0", 3, "post", 10)
)
);

preparedResultLevelCacheValue = strategy.prepareForCache(true).apply(result3);
fromResultLevelCacheValue = objectMapper.readValue(
objectMapper.writeValueAsBytes(preparedResultLevelCacheValue),
strategy.getCacheObjectClazz()
);

fromResultLevelCacheRes = strategy.pullFromCache(true).apply(fromResultLevelCacheValue);
Assert.assertEquals(result3, fromResultLevelCacheRes);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,43 +108,46 @@ public Sequence<T> run(QueryPlus queryPlus, Map responseContext)
}
final Function<T, Object> cacheFn = strategy.prepareForCache(true);

return Sequences.wrap(Sequences.map(
resultFromClient,
new Function<T, T>()
return Sequences.wrap(
Sequences.map(
resultFromClient,
new Function<T, T>()
{
@Override
public T apply(T input)
{
if (resultLevelCachePopulator.isShouldPopulate()) {
resultLevelCachePopulator.cacheResultEntry(input, cacheFn);
}
return input;
}
}
),
new SequenceWrapper()
{
@Override
public T apply(T input)
public void after(boolean isDone, Throwable thrown)
{
if (resultLevelCachePopulator.isShouldPopulate()) {
resultLevelCachePopulator.cacheResultEntry(resultLevelCachePopulator, input, cacheFn);
Preconditions.checkNotNull(
resultLevelCachePopulator,
"ResultLevelCachePopulator cannot be null during cache population"
);
if (thrown != null) {
log.error(
thrown,
"Error while preparing for result level caching for query %s with error %s ",
query.getId(),
thrown.getMessage()
);
} else if (resultLevelCachePopulator.isShouldPopulate()) {
// The resultset identifier and its length is cached along with the resultset
resultLevelCachePopulator.populateResults();
log.debug("Cache population complete for query %s", query.getId());
}
return input;
resultLevelCachePopulator.stopPopulating();
}
}
), new SequenceWrapper()
{
@Override
public void after(boolean isDone, Throwable thrown)
{
Preconditions.checkNotNull(
resultLevelCachePopulator,
"ResultLevelCachePopulator cannot be null during cache population"
);
if (thrown != null) {
log.error(
thrown,
"Error while preparing for result level caching for query %s with error %s ",
query.getId(),
thrown.getMessage()
);
} else if (resultLevelCachePopulator.isShouldPopulate()) {
// The resultset identifier and its length is cached along with the resultset
resultLevelCachePopulator.populateResults();
log.debug("Cache population complete for query %s", query.getId());
}
resultLevelCachePopulator.cacheObjectStream = null;
}
});
);
}
} else {
return baseRunner.run(
Expand Down Expand Up @@ -234,20 +237,14 @@ private ResultLevelCachePopulator createResultLevelCachePopulator(
}
}

public class ResultLevelCachePopulator
private class ResultLevelCachePopulator
{
private final Cache cache;
private final ObjectMapper mapper;
private final Cache.NamedKey key;
private final CacheConfig cacheConfig;
private ByteArrayOutputStream cacheObjectStream = new ByteArrayOutputStream();

public boolean isShouldPopulate()
{
return shouldPopulate;
}

private boolean shouldPopulate;
@Nullable
private ByteArrayOutputStream cacheObjectStream;

private ResultLevelCachePopulator(
Cache cache,
Expand All @@ -261,29 +258,35 @@ private ResultLevelCachePopulator(
this.mapper = mapper;
this.key = key;
this.cacheConfig = cacheConfig;
this.shouldPopulate = shouldPopulate;
this.cacheObjectStream = shouldPopulate ? new ByteArrayOutputStream() : null;
}

boolean isShouldPopulate()
{
return cacheObjectStream != null;
}

void stopPopulating()
{
cacheObjectStream = null;
}

private void cacheResultEntry(
ResultLevelCachePopulator resultLevelCachePopulator,
T resultEntry,
Function<T, Object> cacheFn
)
{

Preconditions.checkNotNull(cacheObjectStream, "cacheObjectStream");
int cacheLimit = cacheConfig.getResultLevelCacheLimit();
try (JsonGenerator gen = mapper.getFactory().createGenerator(resultLevelCachePopulator.cacheObjectStream)) {
try (JsonGenerator gen = mapper.getFactory().createGenerator(cacheObjectStream)) {
gen.writeObject(cacheFn.apply(resultEntry));
if (cacheLimit > 0 && resultLevelCachePopulator.cacheObjectStream.size() > cacheLimit) {
shouldPopulate = false;
resultLevelCachePopulator.cacheObjectStream = null;
return;
if (cacheLimit > 0 && cacheObjectStream.size() > cacheLimit) {
stopPopulating();
}
}
catch (IOException ex) {
log.error(ex, "Failed to retrieve entry to be cached. Result Level caching will not be performed!");
shouldPopulate = false;
resultLevelCachePopulator.cacheObjectStream = null;
stopPopulating();
}
}

Expand All @@ -292,7 +295,7 @@ public void populateResults()
ResultLevelCacheUtil.populate(
cache,
key,
cacheObjectStream.toByteArray()
Preconditions.checkNotNull(cacheObjectStream, "cacheObjectStream").toByteArray()
);
}
}
Expand Down