Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Granularity: Introduce primitive-typed bucketStart, increment methods. #10904

Merged
merged 3 commits into from Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -44,12 +44,24 @@ public DateTimeFormatter getFormatter(Formatter type)
throw new UnsupportedOperationException("This method should not be invoked for this granularity type");
}

@Override
public long increment(long time)
{
return DateTimes.MAX.getMillis();
}

@Override
public DateTime increment(DateTime time)
{
return DateTimes.MAX;
}

@Override
public long bucketStart(long time)
{
return DateTimes.MIN.getMillis();
}

@Override
public DateTime bucketStart(DateTime time)
{
Expand Down
Expand Up @@ -71,22 +71,33 @@ public DateTimeFormatter getFormatter(Formatter type)
throw new UnsupportedOperationException("This method should not be invoked for this granularity type");
}

@Override
public long increment(long time)
{
return time + duration;
}

@Override
public DateTime increment(DateTime time)
{
return time.plus(getDuration());
return time.plus(duration);
}

@Override
public DateTime bucketStart(DateTime time)
public long bucketStart(long t)
{
long t = time.getMillis();
final long duration = getDurationMillis();
long offset = t % duration - origin;
if (offset < 0) {
offset += duration;
}
return new DateTime(t - offset, time.getChronology());
return t - offset;
}

@Override
public DateTime bucketStart(DateTime time)
{
return new DateTime(bucketStart(time.getMillis()), time.getChronology());
}

@Override
Expand Down
Expand Up @@ -132,8 +132,12 @@ public static List<Granularity> granularitiesFinerThan(final Granularity gran0)

public abstract DateTimeFormatter getFormatter(Formatter type);

public abstract long increment(long time);

public abstract DateTime increment(DateTime time);

public abstract long bucketStart(long time);

public abstract DateTime bucketStart(DateTime time);

public abstract DateTime toDate(String filePath, Formatter formatter);
Expand Down
Expand Up @@ -42,12 +42,24 @@ public DateTimeFormatter getFormatter(Formatter type)
throw new UnsupportedOperationException("This method should not be invoked for this granularity type");
}

@Override
public long increment(long time)
{
return time + 1;
}

@Override
public DateTime increment(DateTime time)
{
return time.plus(1);
}

@Override
public long bucketStart(long time)
{
return time;
}

@Override
public DateTime bucketStart(DateTime time)
{
Expand Down
Expand Up @@ -109,6 +109,18 @@ public DateTimeFormatter getFormatter(Formatter type)
}
}

@Override
public long bucketStart(long time)
{
return truncate(time);
}

@Override
public long increment(long t)
{
return chronology.add(period, t, 1);
}

@Override
public DateTime increment(DateTime time)
{
Expand Down Expand Up @@ -219,11 +231,6 @@ private static boolean isCompoundPeriod(Period period)
return false;
}

private long increment(long t)
{
return chronology.add(period, t, 1);
}

private long truncate(long t)
{
if (isCompound) {
Expand Down
Expand Up @@ -754,7 +754,7 @@ private Map<Interval, Optional<HyperLogLogCollector>> collectIntervalsAndShardSp
hllCollectors.computeIfAbsent(interval, intv -> Optional.of(HyperLogLogCollector.makeLatestCollector()));

List<Object> groupKey = Rows.toGroupKey(
queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
queryGranularity.bucketStart(inputRow.getTimestampFromEpoch()),
inputRow
);
hllCollectors.get(interval).get()
Expand Down
Expand Up @@ -198,9 +198,9 @@ private Map<Interval, byte[]> determineCardinalities(
DateTime timestamp = inputRow.getTimestamp();
final Interval interval;
if (granularitySpec.inputIntervals().isEmpty()) {
interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());
interval = granularitySpec.getSegmentGranularity().bucket(timestamp);
} else {
final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
final Optional<Interval> optInterval = granularitySpec.bucketInterval(timestamp);
// this interval must exist since it passed the rowFilter
assert optInterval.isPresent();
interval = optInterval.get();
Expand Down
Expand Up @@ -50,7 +50,6 @@
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -369,8 +368,8 @@ public boolean accept(Interval interval, String partitionDimensionValue, InputRo

private long getBucketTimestamp(InputRow inputRow)
{
DateTime timestamp = inputRow.getTimestamp();
return queryGranularity.bucketStart(timestamp).getMillis();
final long timestamp = inputRow.getTimestampFromEpoch();
return queryGranularity.bucketStart(timestamp);
}

@Override
Expand Down
Expand Up @@ -42,8 +42,8 @@ public ResultGranularTimestampComparator(Granularity granularity)
public int compare(Result<T> r1, Result<T> r2)
{
return Longs.compare(
gran.bucketStart(r1.getTimestamp()).getMillis(),
gran.bucketStart(r2.getTimestamp()).getMillis()
gran.bucketStart(r1.getTimestamp().getMillis()),
gran.bucketStart(r2.getTimestamp().getMillis())
);
}

Expand Down
Expand Up @@ -20,15 +20,13 @@
package org.apache.druid.query.expression;

import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.math.expr.ExprType;
import org.joda.time.DateTime;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -80,12 +78,12 @@ public ExprEval eval(final ObjectBinding bindings)
// Return null if the argument if null.
return ExprEval.of(null);
}
DateTime argTime = DateTimes.utc(eval.asLong());
DateTime bucketStartTime = granularity.bucketStart(argTime);
if (argTime.equals(bucketStartTime)) {
return ExprEval.of(bucketStartTime.getMillis());
long argTime = eval.asLong();
long bucketStartTime = granularity.bucketStart(argTime);
if (argTime == bucketStartTime) {
return ExprEval.of(bucketStartTime);
}
return ExprEval.of(granularity.increment(bucketStartTime).getMillis());
return ExprEval.of(granularity.increment(bucketStartTime));
}

@Override
Expand Down Expand Up @@ -148,12 +146,12 @@ static class TimestampCeilDynamicExpr extends ExprMacroTable.BaseScalarMacroFunc
public ExprEval eval(final ObjectBinding bindings)
{
final PeriodGranularity granularity = getGranularity(args, bindings);
DateTime argTime = DateTimes.utc(args.get(0).eval(bindings).asLong());
DateTime bucketStartTime = granularity.bucketStart(argTime);
if (argTime.equals(bucketStartTime)) {
return ExprEval.of(bucketStartTime.getMillis());
long argTime = args.get(0).eval(bindings).asLong();
long bucketStartTime = granularity.bucketStart(argTime);
if (argTime == bucketStartTime) {
return ExprEval.of(bucketStartTime);
}
return ExprEval.of(granularity.increment(bucketStartTime).getMillis());
return ExprEval.of(granularity.increment(bucketStartTime));
}

@Override
Expand Down
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.query.expression;

import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.math.expr.Expr;
Expand Down Expand Up @@ -105,7 +104,7 @@ public ExprEval eval(final ObjectBinding bindings)
// Return null if the argument if null.
return ExprEval.of(null);
}
return ExprEval.of(granularity.bucketStart(DateTimes.utc(eval.asLong())).getMillis());
return ExprEval.of(granularity.bucketStart(eval.asLong()));
}

@Override
Expand Down Expand Up @@ -141,7 +140,7 @@ public <T> ExprVectorProcessor<T> buildVectorized(VectorInputBindingInspector in
@Override
public long apply(long input)
{
return granularity.bucketStart(DateTimes.utc(input)).getMillis();
return granularity.bucketStart(input);
}
};

Expand Down Expand Up @@ -183,7 +182,7 @@ public static class TimestampFloorDynamicExpr extends ExprMacroTable.BaseScalarM
public ExprEval eval(final ObjectBinding bindings)
{
final PeriodGranularity granularity = computeGranularity(args, bindings);
return ExprEval.of(granularity.bucketStart(DateTimes.utc(args.get(0).eval(bindings).asLong())).getMillis());
return ExprEval.of(granularity.bucketStart(args.get(0).eval(bindings).asLong()));
}

@Override
Expand Down
Expand Up @@ -126,7 +126,7 @@ public byte[] getCacheKey()
@Override
public String apply(long value)
{
final long truncated = granularity.bucketStart(DateTimes.utc(value)).getMillis();
final long truncated = granularity.bucketStart(value);
return formatter == null ? String.valueOf(truncated) : formatter.print(truncated);
}

Expand Down
Expand Up @@ -675,8 +675,8 @@ private Comparator<ResultRow> getTimeComparator(boolean granular)

if (granular) {
return (lhs, rhs) -> Longs.compare(
getGranularity().bucketStart(DateTimes.utc(lhs.getLong(0))).getMillis(),
getGranularity().bucketStart(DateTimes.utc(rhs.getLong(0))).getMillis()
getGranularity().bucketStart(lhs.getLong(0)),
getGranularity().bucketStart(rhs.getLong(0))
);
} else {
return NON_GRANULAR_TIME_COMP;
Expand Down
Expand Up @@ -45,7 +45,6 @@
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand All @@ -67,10 +66,10 @@ public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> creat
{
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
final Granularity gran = query.getGranularity();
final DateTime timeStart = query.getIntervals().get(0).getStart();
final long timeStart = query.getIntervals().get(0).getStartMillis();
final boolean combine = subquery == null;

DateTime granTimeStart = timeStart;
long granTimeStart = timeStart;
if (!(Granularities.ALL.equals(gran))) {
granTimeStart = gran.bucketStart(timeStart);
}
Expand Down Expand Up @@ -117,7 +116,7 @@ public String apply(DimensionSpec input)
.withDimensionsSpec(new DimensionsSpec(dimensionSchemas, null, null))
.withMetrics(aggs.toArray(new AggregatorFactory[0]))
.withQueryGranularity(gran)
.withMinTimestamp(granTimeStart.getMillis())
.withMinTimestamp(granTimeStart)
.build();


Expand Down
Expand Up @@ -88,7 +88,7 @@ private long adjustTimestamp(final ResultRow row)
if (query.getGranularity() instanceof AllGranularity) {
return row.getLong(0);
} else {
return query.getGranularity().bucketStart(DateTimes.utc(row.getLong(0))).getMillis();
return query.getGranularity().bucketStart(row.getLong(0));
}
}
}
Expand Up @@ -448,7 +448,7 @@ private static TimestampExtractFunction makeTimestampExtractFunction(
if (query.getGranularity() instanceof AllGranularity) {
return row -> query.getIntervals().get(0).getStartMillis();
} else {
return row -> query.getGranularity().bucketStart(DateTimes.utc(row.getLong(0))).getMillis();
return row -> query.getGranularity().bucketStart(row.getLong(0));
}
}
} else {
Expand Down
Expand Up @@ -127,7 +127,7 @@ public Cursor apply(final Interval inputInterval)
final long timeStart = Math.max(interval.getStartMillis(), inputInterval.getStartMillis());
final long timeEnd = Math.min(
interval.getEndMillis(),
gran.increment(inputInterval.getStart()).getMillis()
gran.increment(inputInterval.getStartMillis())
);

if (descending) {
Expand Down
Expand Up @@ -590,7 +590,7 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row)

long truncated = 0;
if (row.getTimestamp() != null) {
truncated = gran.bucketStart(row.getTimestamp()).getMillis();
truncated = gran.bucketStart(row.getTimestampFromEpoch());
}
IncrementalIndexRow incrementalIndexRow = IncrementalIndexRow.createTimeAndDimswithDimsKeySize(
Math.max(truncated, minTimestamp),
Expand Down
Expand Up @@ -333,7 +333,7 @@ private class IncrementalIndexCursor implements Cursor
cursorIterable = index.getFacts().timeRangeIterable(
descending,
timeStart,
Math.min(actualInterval.getEndMillis(), gran.increment(interval.getStart()).getMillis())
Math.min(actualInterval.getEndMillis(), gran.increment(interval.getStartMillis()))
);
emptyRange = !cursorIterable.iterator().hasNext();
time = gran.toDateTime(interval.getStartMillis());
Expand Down