emptyEventsCopy = new HashMap<>(emptyEvents);
+
+ // Convert key to a full dummy event (key + dummy metrics).
+ dims.forEach((dim, value) -> emptyEventsCopy.put(dim, value));
+
+ r = computeMovingAverage(new MapBasedRow(cache.getDateTime(), emptyEventsCopy), true);
+ if (r != null) {
+ return r;
+ }
+ }
+
+ seenKeys.clear();
+ averagersKeysIter = null;
+ cache = null;
+ }
+
+ if (cacheIter == null && yielder.isDone()) {
+ // we should never get here. For some reason, there is
+ // no more work to do, so continuing to iterate will infinite loop
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Compute and add any moving average columns.
+ *
+ * Normally, the row passed in will be added to all the {@link Averager}'s and then results pulled
+ * from each averager. If skip is true, then the incoming row is actually a dummy value due to
+ * no data being present for this dimension combination in the current bucket. When this happens,
+ * {@link Averager#skip()} should be called instead of {@link Averager#addElement(Map, Map)}()} to force proper
+ * decaying of the average values.
+ *
+ *
Usually, the contents of key will be contained by the row R being passed in, but in the case of a
+ * dummy row, it's possible that the dimensions will be known but the row empty. Hence, the values are
+ * passed as two separate arguments.
+ *
+ * @param r The Row to operate on
+ * @param skip Indicates whether skip or add should be called
+ *
+ * @return The updated row containing averager results, or null if no averagers computed a result
+ */
+ @Nullable
+ private Row computeMovingAverage(MapBasedRow r, boolean skip)
+ {
+ Map event = r.getEvent();
+ Map result = new HashMap<>(event);
+ Map key = MovingAverageHelper.getDimKeyFromRow(dims, r);
+
+ List> avg = averagers.get(key);
+
+ // Initialize key's averagers.
+ if (avg == null) {
+ avg = averagerFactories.stream().map(af -> af.createAverager()).collect(Collectors.toList());
+ averagers.put(key, avg);
+ }
+
+ if (!skip) {
+ avg.forEach(af -> af.addElement(event, aggMap));
+ } else {
+ avg.forEach(af -> af.skip());
+ }
+
+ avg.forEach(af -> result.put(af.getName(), af.getResult()));
+
+ // At least one non-dimension value must be in the record for it to be valid.
+ if (result.entrySet().stream().anyMatch(e -> !key.containsKey(e.getKey()) && e.getValue() != null)) {
+ result.putAll(event);
+ return new MapBasedRow(r.getTimestamp(), result);
+ } else {
+ // No averagers returned anything. All buckets must be empty.
+ // skip this row.
+ return null;
+ }
+ }
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQuery.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQuery.java
new file mode 100644
index 000000000000..38fc1ebcc124
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQuery.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.groupby.having.HavingSpec;
+import org.apache.druid.query.groupby.orderby.LimitSpec;
+import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.query.spec.QuerySegmentSpec;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Class that defines druid MovingAverage query fields
+ */
+@JsonTypeName("movingAverage")
+public class MovingAverageQuery extends BaseQuery
+{
+
+ public static final String MOVING_AVG_QUERY_TYPE = "movingAverage";
+ public static final String CTX_KEY_SORT_BY_DIMS_FIRST = "sortByDimsFirst";
+
+ private final LimitSpec limitSpec;
+ private final HavingSpec havingSpec;
+ private final DimFilter dimFilter;
+ private final Function, Sequence> limitFn;
+ private final Granularity granularity;
+ private final List dimensions;
+ private final List aggregatorSpecs;
+ private final List postAggregatorSpecs;
+ private final List> averagerSpecs;
+ private final List postAveragerSpecs;
+
+ @JsonCreator
+ public MovingAverageQuery(
+ @JsonProperty("dataSource") DataSource dataSource,
+ @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
+ @JsonProperty("filter") DimFilter dimFilter,
+ @JsonProperty("granularity") Granularity granularity,
+ @JsonProperty("dimensions") List dimensions,
+ @JsonProperty("aggregations") List aggregatorSpecs,
+ @JsonProperty("postAggregations") List postAggregatorSpecs,
+ @JsonProperty("having") HavingSpec havingSpec,
+ @JsonProperty("averagers") List> averagerSpecs,
+ @JsonProperty("postAveragers") List postAveragerSpecs,
+ @JsonProperty("limitSpec") LimitSpec limitSpec,
+ @JsonProperty("context") Map context
+ )
+ {
+ super(dataSource, querySegmentSpec, false, context);
+
+ //TBD: Implement null awareness to respect the contract of this flag.
+ Preconditions.checkArgument(NullHandling.replaceWithDefault(), "movingAverage does not support druid.generic.useDefaultValueForNull=false");
+
+ this.dimFilter = dimFilter;
+ this.granularity = granularity;
+ this.dimensions = dimensions == null ? ImmutableList.of() : dimensions;
+ for (DimensionSpec spec : this.dimensions) {
+ Preconditions.checkArgument(spec != null, "dimensions has null DimensionSpec");
+ }
+ this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.of() : aggregatorSpecs;
+ this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.of() : postAggregatorSpecs;
+ this.averagerSpecs = averagerSpecs == null ? ImmutableList.of() : averagerSpecs;
+ this.postAveragerSpecs = postAveragerSpecs == null ? ImmutableList.of() : postAveragerSpecs;
+ this.havingSpec = havingSpec;
+ this.limitSpec = (limitSpec == null) ? NoopLimitSpec.INSTANCE : limitSpec;
+
+ Preconditions.checkNotNull(this.granularity, "Must specify a granularity");
+
+ verifyOutputNames(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);
+
+ // build combined list of aggregators and averagers so that limit spec building is happy
+ List combinedAggregatorSpecs = new ArrayList<>();
+ combinedAggregatorSpecs.addAll(this.aggregatorSpecs);
+ for (AveragerFactory, ?> avg : this.averagerSpecs) {
+ combinedAggregatorSpecs.add(new AveragerFactoryWrapper(avg, ""));
+ }
+
+ Function, Sequence> postProcFn =
+ this.limitSpec.build(
+ this.dimensions,
+ combinedAggregatorSpecs,
+ this.postAggregatorSpecs,
+ this.granularity,
+ getContextSortByDimsFirst()
+ );
+
+ if (havingSpec != null) {
+ postProcFn = Functions.compose(
+ postProcFn,
+ new Function, Sequence>()
+ {
+ @Override
+ public Sequence apply(Sequence input)
+ {
+ return Sequences.filter(
+ input,
+ new Predicate()
+ {
+ @Override
+ public boolean apply(Row input)
+ {
+ return MovingAverageQuery.this.havingSpec.eval(input);
+ }
+ }
+ );
+ }
+ }
+ );
+ }
+
+ this.limitFn = postProcFn;
+
+ }
+
+ private static void verifyOutputNames(
+ List dimensions,
+ List aggregators,
+ List postAggregators
+ )
+ {
+
+ final Set outputNames = new HashSet<>();
+ for (DimensionSpec dimension : dimensions) {
+ if (!outputNames.add(dimension.getOutputName())) {
+ throw new IAE("Duplicate output name[%s]", dimension.getOutputName());
+ }
+ }
+
+ for (AggregatorFactory aggregator : aggregators) {
+ if (!outputNames.add(aggregator.getName())) {
+ throw new IAE("Duplicate output name[%s]", aggregator.getName());
+ }
+ }
+
+ for (PostAggregator postAggregator : postAggregators) {
+ if (!outputNames.add(postAggregator.getName())) {
+ throw new IAE("Duplicate output name[%s]", postAggregator.getName());
+ }
+ }
+ }
+
+ /**
+ * A private constructor that avoids all of the various state checks. Used by the with*() methods where the checks
+ * have already passed in order for the object to exist.
+ */
+ private MovingAverageQuery(
+ DataSource dataSource,
+ QuerySegmentSpec querySegmentSpec,
+ DimFilter dimFilter,
+ Granularity granularity,
+ List dimensions,
+ List aggregatorSpecs,
+ List> averagerSpecs,
+ List postAggregatorSpecs,
+ List postAveragerSpecs,
+ HavingSpec havingSpec,
+ LimitSpec orderBySpec,
+ Function, Sequence> limitFn,
+ Map context
+ )
+ {
+ super(dataSource, querySegmentSpec, false, context);
+
+ this.dimFilter = dimFilter;
+ this.granularity = granularity;
+ this.dimensions = dimensions;
+ this.aggregatorSpecs = aggregatorSpecs;
+ this.averagerSpecs = averagerSpecs;
+ this.postAggregatorSpecs = postAggregatorSpecs;
+ this.postAveragerSpecs = postAveragerSpecs;
+ this.havingSpec = havingSpec;
+ this.limitSpec = orderBySpec;
+ this.limitFn = limitFn;
+ }
+
+ @Override
+ public boolean hasFilters()
+ {
+ return dimFilter != null;
+ }
+
+ @Override
+ public String getType()
+ {
+ return MOVING_AVG_QUERY_TYPE;
+ }
+
+ @JsonIgnore
+ public boolean getContextSortByDimsFirst()
+ {
+ return getContextBoolean(CTX_KEY_SORT_BY_DIMS_FIRST, false);
+ }
+
+ @Override
+ @JsonProperty
+ public DimFilter getFilter()
+ {
+ return dimFilter;
+ }
+
+ @Override
+ @JsonProperty
+ public Granularity getGranularity()
+ {
+ return granularity;
+ }
+
+ @JsonProperty
+ public List getDimensions()
+ {
+ return dimensions;
+ }
+
+ @JsonProperty("aggregations")
+ public List getAggregatorSpecs()
+ {
+ return aggregatorSpecs;
+ }
+
+ @JsonProperty("averagers")
+ public List> getAveragerSpecs()
+ {
+ return averagerSpecs;
+ }
+
+ @JsonProperty("postAggregations")
+ public List getPostAggregatorSpecs()
+ {
+ return postAggregatorSpecs;
+ }
+
+ @JsonProperty("postAveragers")
+ public List getPostAveragerSpecs()
+ {
+ return postAveragerSpecs;
+ }
+
+ @JsonProperty("having")
+ public HavingSpec getHavingSpec()
+ {
+ return havingSpec;
+ }
+
+ @JsonProperty
+ public LimitSpec getLimitSpec()
+ {
+ return limitSpec;
+ }
+
+ @Override
+ public MovingAverageQuery withOverriddenContext(Map contextOverride)
+ {
+ return new MovingAverageQuery(
+ getDataSource(),
+ getQuerySegmentSpec(),
+ dimFilter,
+ granularity,
+ dimensions,
+ aggregatorSpecs,
+ averagerSpecs,
+ postAggregatorSpecs,
+ postAveragerSpecs,
+ havingSpec,
+ limitSpec,
+ limitFn,
+ computeOverridenContext(contextOverride)
+ );
+ }
+
+ @Override
+ public MovingAverageQuery withQuerySegmentSpec(QuerySegmentSpec spec)
+ {
+ return new MovingAverageQuery(
+ getDataSource(),
+ spec,
+ dimFilter,
+ granularity,
+ dimensions,
+ aggregatorSpecs,
+ averagerSpecs,
+ postAggregatorSpecs,
+ postAveragerSpecs,
+ havingSpec,
+ limitSpec,
+ limitFn,
+ getContext()
+ );
+ }
+
+ @Override
+ public Query withDataSource(DataSource dataSource)
+ {
+ return new MovingAverageQuery(
+ dataSource,
+ getQuerySegmentSpec(),
+ dimFilter,
+ granularity,
+ dimensions,
+ aggregatorSpecs,
+ averagerSpecs,
+ postAggregatorSpecs,
+ postAveragerSpecs,
+ havingSpec,
+ limitSpec,
+ limitFn,
+ getContext()
+ );
+ }
+
+ public Query withPostAveragers(List postAveragerSpecs)
+ {
+ return new MovingAverageQuery(
+ getDataSource(),
+ getQuerySegmentSpec(),
+ dimFilter,
+ granularity,
+ dimensions,
+ aggregatorSpecs,
+ averagerSpecs,
+ postAggregatorSpecs,
+ postAveragerSpecs,
+ havingSpec,
+ limitSpec,
+ limitFn,
+ getContext()
+ );
+ }
+
+ public Sequence applyLimit(Sequence results)
+ {
+ return limitFn.apply(results);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryMetrics.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryMetrics.java
new file mode 100644
index 000000000000..6b9f39ad0f2e
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryMetrics.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.query.QueryMetrics;
+
+public interface MovingAverageQueryMetrics extends QueryMetrics
+{
+ /**
+ * Sets the size of {@link MovingAverageQuery#getDimensions()} of the given query as dimension.
+ */
+ void numDimensions(MovingAverageQuery query);
+
+ /**
+ * Sets the number of metrics of the given groupBy query as dimension.
+ */
+ void numMetrics(MovingAverageQuery query);
+
+ /**
+ * Sets the number of "complex" metrics of the given groupBy query as dimension. By default it is assumed that
+ * "complex" metric is a metric of not long or double type, but it could be redefined in the implementation of this
+ * method.
+ */
+ void numComplexMetrics(MovingAverageQuery query);
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryMetricsFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryMetricsFactory.java
new file mode 100644
index 000000000000..db344a0f0ec5
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryMetricsFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+/**
+ * Implementations could be injected using
+ *
+ * PolyBind
+ * .optionBinder(binder, Key.get(MovingAverageQueryMetricsFactory.class))
+ * .addBinding("myCustomMovingAverageQueryMetricsFactory")
+ * .to(MyCustomMovingAverageQueryMetricsFactory.class);
+ *
+ * And then setting property:
+ * druid.query.movingavgquery.queryMetricsFactory=myCustomMovingAverageQueryMetricsFactory
+ */
+public interface MovingAverageQueryMetricsFactory
+{
+ MovingAverageQueryMetrics makeMetrics();
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryModule.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryModule.java
new file mode 100644
index 000000000000..9655678680da
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryModule.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import com.google.inject.multibindings.MapBinder;
+import org.apache.druid.guice.DruidBinders;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryToolChest;
+
+import java.util.Collections;
+import java.util.List;
+
+public class MovingAverageQueryModule implements DruidModule
+{
+
+ @Override
+ public void configure(Binder binder)
+ {
+ MapBinder, QueryToolChest> toolChests = DruidBinders.queryToolChestBinder(binder);
+
+ //Bind the query toolchest to the query class and add the binding to toolchest
+ toolChests.addBinding(MovingAverageQuery.class).to(MovingAverageQueryToolChest.class);
+
+ //Bind the query toolchest to binder
+ binder.bind(MovingAverageQueryToolChest.class).in(LazySingleton.class);
+ }
+
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return Collections.singletonList(new SimpleModule("MovingAverageQueryModule")
+ .registerSubtypes(new NamedType(
+ MovingAverageQuery.class,
+ "movingAverage"
+ )));
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java
new file mode 100644
index 000000000000..8834d0dd42f4
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.Result;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.UnionDataSource;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.server.QueryStats;
+import org.apache.druid.server.RequestLogLine;
+import org.apache.druid.server.log.RequestLogger;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * The QueryRunner for MovingAverage query.
+ * High level flow:
+ * 1. Invokes an inner groupBy query (Or timeseries for no dimensions scenario) to get Aggregations/PostAggregtions.
+ * 2. Result is passed to {@link RowBucketIterable}, which groups rows of all dimension combinations into period-based (e.g. daily) buckets of rows ({@link RowBucket}).
+ * 3. The sequence is passed to {@link MovingAverageIterable}, which performs the main part of the query of adding Averagers computation into the records.
+ * 4. Finishes up by applying post averagers, removing redundant dates, and applying post phases (having, sorting, limits).
+ */
+public class MovingAverageQueryRunner implements QueryRunner
+{
+
+ public static final String QUERY_FAIL_TIME = "queryFailTime";
+ public static final String QUERY_TOTAL_BYTES_GATHERED = "queryTotalBytesGathered";
+
+ private final QuerySegmentWalker walker;
+ private final RequestLogger requestLogger;
+
+ public MovingAverageQueryRunner(
+ @Nullable QuerySegmentWalker walker,
+ RequestLogger requestLogger
+ )
+ {
+ this.walker = walker;
+ this.requestLogger = requestLogger;
+ }
+
+ @Override
+ public Sequence run(QueryPlus query, Map responseContext)
+ {
+
+ MovingAverageQuery maq = (MovingAverageQuery) query.getQuery();
+ List intervals;
+ final Period period;
+
+ // Get the largest bucket from the list of averagers
+ Optional opt =
+ maq.getAveragerSpecs().stream().map(AveragerFactory::getNumBuckets).max(Integer::compare);
+ int buckets = opt.orElse(0);
+
+ //Extend the interval beginning by specified bucket - 1
+ if (maq.getGranularity() instanceof PeriodGranularity) {
+ period = ((PeriodGranularity) maq.getGranularity()).getPeriod();
+ int offset = buckets <= 0 ? 0 : (1 - buckets);
+ intervals = maq.getIntervals()
+ .stream()
+ .map(i -> new Interval(i.getStart().withPeriodAdded(period, offset), i.getEnd()))
+ .collect(Collectors.toList());
+ } else {
+ throw new ISE("Only PeriodGranulaity is supported for movingAverage queries");
+ }
+
+ Sequence resultsSeq;
+ DataSource dataSource = maq.getDataSource();
+ if (maq.getDimensions() != null && !maq.getDimensions().isEmpty() &&
+ (dataSource instanceof TableDataSource || dataSource instanceof UnionDataSource ||
+ dataSource instanceof QueryDataSource)) {
+ // build groupBy query from movingAverage query
+ GroupByQuery.Builder builder = GroupByQuery.builder()
+ .setDataSource(dataSource)
+ .setInterval(intervals)
+ .setDimFilter(maq.getFilter())
+ .setGranularity(maq.getGranularity())
+ .setDimensions(maq.getDimensions())
+ .setAggregatorSpecs(maq.getAggregatorSpecs())
+ .setPostAggregatorSpecs(maq.getPostAggregatorSpecs())
+ .setContext(maq.getContext());
+ GroupByQuery gbq = builder.build();
+
+ HashMap gbqResponse = new HashMap<>();
+ gbqResponse.put(QUERY_FAIL_TIME, System.currentTimeMillis() + QueryContexts.getTimeout(gbq));
+ gbqResponse.put(QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());
+
+ Sequence results = gbq.getRunner(walker).run(QueryPlus.wrap(gbq), gbqResponse);
+ try {
+ // use localhost for remote address
+ requestLogger.logNativeQuery(RequestLogLine.forNative(
+ gbq,
+ DateTimes.nowUtc(),
+ "127.0.0.1",
+ new QueryStats(
+ ImmutableMap.of(
+ "query/time", 0,
+ "query/bytes", 0,
+ "success", true
+ ))
+ ));
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+
+ resultsSeq = results;
+ } else {
+ // no dimensions, so optimize this as a TimeSeries
+ TimeseriesQuery tsq = new TimeseriesQuery(
+ dataSource,
+ new MultipleIntervalSegmentSpec(intervals),
+ false,
+ null,
+ maq.getFilter(),
+ maq.getGranularity(),
+ maq.getAggregatorSpecs(),
+ maq.getPostAggregatorSpecs(),
+ 0,
+ maq.getContext()
+ );
+ HashMap tsqResponse = new HashMap<>();
+ tsqResponse.put(QUERY_FAIL_TIME, System.currentTimeMillis() + QueryContexts.getTimeout(tsq));
+ tsqResponse.put(QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());
+
+ Sequence> results = tsq.getRunner(walker).run(QueryPlus.wrap(tsq), tsqResponse);
+ try {
+ // use localhost for remote address
+ requestLogger.logNativeQuery(RequestLogLine.forNative(
+ tsq,
+ DateTimes.nowUtc(),
+ "127.0.0.1",
+ new QueryStats(
+ ImmutableMap.of(
+ "query/time", 0,
+ "query/bytes", 0,
+ "success", true
+ ))
+ ));
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+
+ resultsSeq = Sequences.map(results, new TimeseriesResultToRow());
+ }
+
+ // Process into period buckets
+ Sequence bucketedMovingAvgResults =
+ Sequences.simple(new RowBucketIterable(resultsSeq, intervals, period));
+
+ // Apply the windows analysis functions
+ Sequence movingAvgResults = Sequences.simple(
+ new MovingAverageIterable(
+ bucketedMovingAvgResults,
+ maq.getDimensions(),
+ maq.getAveragerSpecs(),
+ maq.getPostAggregatorSpecs(),
+ maq.getAggregatorSpecs()
+ ));
+
+ // Apply any postAveragers
+ Sequence movingAvgResultsWithPostAveragers =
+ Sequences.map(movingAvgResults, new PostAveragerAggregatorCalculator(maq));
+
+ // remove rows outside the reporting window
+ List reportingIntervals = maq.getIntervals();
+ movingAvgResults =
+ Sequences.filter(
+ movingAvgResultsWithPostAveragers,
+ row -> reportingIntervals.stream().anyMatch(i -> i.contains(row.getTimestamp()))
+ );
+
+ // Apply any having, sorting, and limits
+ movingAvgResults = ((MovingAverageQuery) maq).applyLimit(movingAvgResults);
+
+ return movingAvgResults;
+
+ }
+
+ static class TimeseriesResultToRow implements Function, Row>
+ {
+ @Override
+ public Row apply(Result lookbackResult)
+ {
+ Map event = lookbackResult.getValue().getBaseObject();
+ MapBasedRow row = new MapBasedRow(lookbackResult.getTimestamp(), event);
+ return row;
+ }
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryToolChest.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryToolChest.java
new file mode 100644
index 000000000000..b0e14affaf5b
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryToolChest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.MetricManipulationFn;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.server.log.RequestLogger;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The QueryToolChest for MovingAverage Query
+ */
+public class MovingAverageQueryToolChest extends QueryToolChest
+{
+
+ private final QuerySegmentWalker walker;
+ private final RequestLogger requestLogger;
+
+ private final MovingAverageQueryMetricsFactory movingAverageQueryMetricsFactory;
+
+ /**
+ * Construct a MovingAverageQueryToolChest for processing moving-average queries.
+ * MovingAverage queries are expected to be processed on broker nodes and never hit historical nodes.
+ *
+ * @param walker
+ * @param requestLogger
+ */
+ @Inject
+ public MovingAverageQueryToolChest(Provider walker, RequestLogger requestLogger)
+ {
+
+ this.walker = walker.get();
+ this.requestLogger = requestLogger;
+ this.movingAverageQueryMetricsFactory = DefaultMovingAverageQueryMetricsFactory.instance();
+ }
+
+ @Override
+ public QueryRunner mergeResults(QueryRunner runner)
+ {
+ return new MovingAverageQueryRunner(walker, requestLogger);
+ }
+
+ @Override
+ public QueryMetrics super MovingAverageQuery> makeMetrics(MovingAverageQuery query)
+ {
+ MovingAverageQueryMetrics movingAverageQueryMetrics = movingAverageQueryMetricsFactory.makeMetrics();
+ movingAverageQueryMetrics.query(query);
+ return movingAverageQueryMetrics;
+ }
+
+ @Override
+ public Function makePostComputeManipulatorFn(MovingAverageQuery query, MetricManipulationFn fn)
+ {
+
+ return new Function()
+ {
+
+ @Override
+ public Row apply(Row result)
+ {
+ MapBasedRow mRow = (MapBasedRow) result;
+ final Map values = new HashMap<>(mRow.getEvent());
+
+ for (AggregatorFactory agg : query.getAggregatorSpecs()) {
+ Object aggVal = values.get(agg.getName());
+ if (aggVal != null) {
+ values.put(agg.getName(), fn.manipulate(agg, aggVal));
+ } else {
+ values.put(agg.getName(), null);
+ }
+ }
+
+ for (AveragerFactory, ?> avg : query.getAveragerSpecs()) {
+ Object aggVal = values.get(avg.getName());
+ if (aggVal != null) {
+ values.put(avg.getName(), fn.manipulate(new AveragerFactoryWrapper<>(avg, avg.getName() + "_"), aggVal));
+ } else {
+ values.put(avg.getName(), null);
+ }
+ }
+
+ return new MapBasedRow(result.getTimestamp(), values);
+
+ }
+ };
+
+ }
+
+
+ @Override
+ public TypeReference getResultTypeReference()
+ {
+ return new TypeReference()
+ {
+ };
+ }
+
+ @Override
+ public Function makePreComputeManipulatorFn(MovingAverageQuery query, MetricManipulationFn fn)
+ {
+ return Functions.identity();
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/PostAveragerAggregatorCalculator.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/PostAveragerAggregatorCalculator.java
new file mode 100644
index 000000000000..5af34871dc4c
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/PostAveragerAggregatorCalculator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.query.aggregation.PostAggregator;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Function that can be applied to a Sequence to calculate PostAverager results
+ */
+public class PostAveragerAggregatorCalculator implements Function
+{
+
+ private final List postAveragers;
+
+ public PostAveragerAggregatorCalculator(MovingAverageQuery maq)
+ {
+ this.postAveragers = maq.getPostAveragerSpecs();
+ }
+
+ @Override
+ public Row apply(final Row row)
+ {
+ if (postAveragers.isEmpty()) {
+ return row;
+ }
+
+ final Map newMap;
+
+ newMap = Maps.newLinkedHashMap(((MapBasedRow) row).getEvent());
+
+ for (PostAggregator postAverager : postAveragers) {
+ boolean allColsPresent = postAverager.getDependentFields().stream().allMatch(c -> newMap.get(c) != null);
+ newMap.put(postAverager.getName(), allColsPresent ? postAverager.compute(newMap) : null);
+ }
+
+ return new MapBasedRow(row.getTimestamp(), newMap);
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/RowBucket.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/RowBucket.java
new file mode 100644
index 000000000000..fa614fa4218e
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/RowBucket.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.Row;
+import org.joda.time.DateTime;
+
+import java.util.List;
+
+/**
+ * Represents a set of rows for a specific date
+ * Each RowBucket is an element in a list (holds a pointer to the next RowBucket)
+ */
+public class RowBucket
+{
+ private final DateTime dateTime;
+ private final List rows;
+ private RowBucket nextBucket = null;
+
+ public RowBucket(DateTime dateTime, List rows)
+ {
+ this.dateTime = dateTime;
+ this.rows = rows;
+ }
+
+ public DateTime getDateTime()
+ {
+ return dateTime;
+ }
+
+ public List getRows()
+ {
+ return rows;
+ }
+
+ public RowBucket getNextBucket()
+ {
+ return nextBucket;
+ }
+
+ public void setNextBucket(RowBucket nextRow)
+ {
+ this.nextBucket = nextRow;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/RowBucketIterable.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/RowBucketIterable.java
new file mode 100644
index 000000000000..308d5551c866
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/RowBucketIterable.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * An iterator which takes list of rows ({@link Sequence}) and generates a new list of {@link RowBucket}s from it.
+ *
+ * It calls {@link BucketingAccumulator} for naive bucketing to buckets of periods,
+ * But does more subtle logic to cover edge cases, such as:
+ * - Handling periods with no rows.
+ * - Handling last record.
+ *
+ * Please notice this is being called by {@link MovingAverageIterable.MovingAverageIterator#internalNext()}
+ * and the logic for skipping records is comprised by the interaction between the two classes.
+ */
+public class RowBucketIterable implements Iterable
+{
+
+ public final Sequence seq;
+ private List intervals;
+ private Period period;
+
+ public RowBucketIterable(Sequence seq, List intervals, Period period)
+ {
+ this.seq = seq;
+ this.period = period;
+ this.intervals = intervals;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Iterable#iterator()
+ */
+ @Override
+ public Iterator iterator()
+ {
+ return new RowBucketIterator(seq, intervals, period);
+ }
+
+ static class RowBucketIterator implements Iterator
+ {
+ private Yielder yielder;
+ private DateTime endTime;
+ private DateTime expectedBucket;
+ private Period period;
+ private int intervalIndex = 0;
+ private List intervals;
+ private boolean processedLastRow = false;
+ private boolean processedExtraRow = false;
+
+ public RowBucketIterator(Sequence rows, List intervals, Period period)
+ {
+ this.period = period;
+ this.intervals = intervals;
+ expectedBucket = intervals.get(intervalIndex).getStart();
+ endTime = intervals.get(intervals.size() - 1).getEnd();
+ yielder = rows.toYielder(null, new BucketingAccumulator());
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#hasNext()
+ */
+ @Override
+ public boolean hasNext()
+ {
+ return expectedBucket.compareTo(endTime) < 0 || !this.yielder.isDone();
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#next()
+ */
+ @Override
+ public RowBucket next()
+ {
+ RowBucket currentBucket = yielder.get();
+
+ // Iterate to next interval
+ if (expectedBucket.compareTo(intervals.get(intervalIndex).getEnd()) >= 0) {
+ intervalIndex++;
+ if (intervalIndex < intervals.size()) {
+ expectedBucket = intervals.get(intervalIndex).getStart();
+ }
+ }
+ // currentBucket > expectedBucket (No rows found for period). Iterate to next period.
+ if (currentBucket != null && currentBucket.getDateTime().compareTo(expectedBucket) > 0) {
+ currentBucket = new RowBucket(expectedBucket, Collections.emptyList());
+ expectedBucket = expectedBucket.plus(period);
+ return currentBucket;
+ }
+
+ if (!yielder.isDone()) {
+ // standard case. return regular row
+ yielder = yielder.next(currentBucket);
+ expectedBucket = expectedBucket.plus(period);
+ return currentBucket;
+ } else if (!processedLastRow && yielder.get() != null && yielder.get().getNextBucket() == null) {
+ // yielder.isDone, processing last row
+ processedLastRow = true;
+ expectedBucket = expectedBucket.plus(period);
+ return currentBucket;
+ } else if (!processedExtraRow && yielder.get() != null && yielder.get().getNextBucket() != null) {
+ RowBucket lastRow = yielder.get().getNextBucket();
+
+ if (lastRow.getDateTime().compareTo(expectedBucket) > 0) {
+ lastRow = new RowBucket(expectedBucket, Collections.emptyList());
+ expectedBucket = expectedBucket.plus(period);
+ return lastRow;
+ }
+
+ // yielder is done, processing newBucket
+ processedExtraRow = true;
+ expectedBucket = expectedBucket.plus(period);
+ return lastRow;
+ } else if (expectedBucket.compareTo(endTime) < 0) {
+ // add any trailing blank rows
+ currentBucket = new RowBucket(expectedBucket, Collections.emptyList());
+ expectedBucket = expectedBucket.plus(period);
+ return currentBucket;
+ } else {
+ // we should never get here
+ throw new NoSuchElementException();
+ }
+
+ }
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/Averager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/Averager.java
new file mode 100644
index 000000000000..506380cac1bb
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/Averager.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.apache.druid.query.aggregation.AggregatorFactory;
+
+import java.util.Map;
+
+/**
+ * Interface for an averager
+ *
+ * @param The return type of the averager
+ */
+public interface Averager
+{
+ /**
+ * Add a row to the window being operated on
+ *
+ * @param e The row to add
+ * @param aggMap The Map of AggregatorFactory used to determine if the metric should to be finalized
+ */
+ void addElement(Map e, Map aggMap);
+
+ /**
+ * There is a missing row, so record a missing entry in the window
+ */
+ void skip();
+
+ /**
+ * Compute the resulting "average" over the collected window
+ *
+ * @return the "average" over the window of buckets
+ */
+ R getResult();
+
+ /**
+ * @return the name
+ */
+ String getName();
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/AveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/AveragerFactory.java
new file mode 100644
index 000000000000..a3c82781b233
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/AveragerFactory.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Interface representing Averager in the movingAverage query.
+ *
+ * @param Type returned by the underlying averager.
+ * @param Type of finalized value.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "constant", value = ConstantAveragerFactory.class),
+ @JsonSubTypes.Type(name = "doubleMean", value = DoubleMeanAveragerFactory.class),
+ @JsonSubTypes.Type(name = "doubleMeanNoNulls", value = DoubleMeanNoNullAveragerFactory.class),
+ @JsonSubTypes.Type(name = "doubleMax", value = DoubleMaxAveragerFactory.class),
+ @JsonSubTypes.Type(name = "doubleMin", value = DoubleMinAveragerFactory.class),
+ @JsonSubTypes.Type(name = "longMean", value = LongMeanAveragerFactory.class),
+ @JsonSubTypes.Type(name = "longMeanNoNulls", value = LongMeanNoNullAveragerFactory.class),
+ @JsonSubTypes.Type(name = "longMax", value = LongMaxAveragerFactory.class),
+ @JsonSubTypes.Type(name = "longMin", value = LongMinAveragerFactory.class)
+})
+public interface AveragerFactory
+{
+ int DEFAULT_PERIOD = 1;
+
+ /**
+ * Gets the column name that will be populated by the Averager
+ *
+ * @return The column name
+ */
+ String getName();
+
+ /**
+ * Returns the window size over which the averaging calculations will be
+ * performed. Size is computed in terms of buckets rather than absolute time.
+ *
+ * @return The window size
+ */
+ int getNumBuckets();
+
+ /**
+ * Returns the cycle size (number of periods to skip during averaging calculations).
+ *
+ * @return The cycle size
+ */
+ int getCycleSize();
+
+ /**
+ * Create an Averager for a specific dimension combination.
+ *
+ * @return The {@link Averager}
+ */
+ Averager createAverager();
+
+ /**
+ * Gets the list of dependent fields that will be used by this Averager. Most
+ * {@link Averager}s depend on only a single field from the underlying query, but
+ * that is not required. This method allow the required fields to be communicated
+ * back to the main query so that validation to enforce the fields presence can
+ * be accomplished.
+ *
+ * @return A list of field names
+ */
+ List getDependentFields();
+
+ /**
+ * Returns a {@link Comparator} that can be used to compare result values for
+ * purposes of sorting the end result of the query.
+ *
+ * @return A {@link Comparator}
+ */
+ Comparator getComparator();
+
+ /**
+ * Finalize result value.
+ *
+ * @param val the value to finalize.
+ *
+ * @return The finalized value.
+ */
+ F finalizeComputation(R val);
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/BaseAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/BaseAverager.java
new file mode 100644
index 000000000000..0c236b899ea2
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/BaseAverager.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.apache.druid.query.aggregation.AggregatorFactory;
+
+import java.lang.reflect.Array;
+import java.util.Map;
+
+/**
+ * Common base class available for use by averagers. The base class implements methods that
+ * capture incoming and skipped rows and store them in an array, to be used later for
+ * calculating the actual value.
+ *
+ * @param The type of intermediate value to be retrieved from the row and stored
+ * @param The type of result the averager is expected to produce
+ */
+public abstract class BaseAverager implements Averager
+{
+
+ final int numBuckets;
+ final int cycleSize;
+ private final String name;
+ private final String fieldName;
+ final I[] buckets;
+ private int index;
+
+ /**
+ * {@link BaseAverager#startFrom} is needed because `buckets` field is a fixed array, not a list.
+ * It makes computeResults() start from the correct bucket in the array.
+ */
+ int startFrom = 0;
+
+ /**
+ * @param storageType The class to use for storing intermediate values
+ * @param numBuckets The number of buckets to include in the window being aggregated
+ * @param name The name of the resulting metric
+ * @param fieldName The field to extra from incoming rows and stored in the window cache
+ * @param cycleSize Cycle group size. Used to calculate day-of-week option. Default=1 (single element in group).
+ */
+ public BaseAverager(Class storageType, int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ this.numBuckets = numBuckets;
+ this.name = name;
+ this.fieldName = fieldName;
+ this.index = 0;
+ @SuppressWarnings("unchecked")
+ final I[] array = (I[]) Array.newInstance(storageType, numBuckets);
+ this.buckets = array;
+ this.cycleSize = cycleSize;
+ }
+
+
+ /* (non-Javadoc)
+ * @see Averager#addElement(java.util.Map, java.util.Map)
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void addElement(Map e, Map a)
+ {
+ Object metric = e.get(fieldName);
+ I finalMetric;
+ if (a.containsKey(fieldName)) {
+ AggregatorFactory af = a.get(fieldName);
+ finalMetric = metric != null ? (I) af.finalizeComputation(metric) : null;
+ } else {
+ finalMetric = (I) metric;
+ }
+ buckets[index++] = finalMetric;
+ index %= numBuckets;
+ }
+
+ /* (non-Javadoc)
+ * @see Averager#skip()
+ */
+ @Override
+ public void skip()
+ {
+ buckets[index++] = null;
+ index %= numBuckets;
+ }
+
+ /* (non-Javadoc)
+ * @see Averager#getResult()
+ */
+ @Override
+ public R getResult()
+ {
+ if (!hasData()) {
+ return null;
+ }
+ return computeResult();
+ }
+
+ /**
+ * Compute the result value to be returned by getResult.
+ *
+ * This routine will only be called when there is valid data within the window
+ * and doesn't need to worry about detecting the case where no data should be returned.
+ *
+ *
+ * The method typically should use {@link #getBuckets()} to retrieve the set of buckets
+ * within the window and then compute a value based on those. It should expect nulls within
+ * the array, indicating buckets where no row was found for the dimension combination. It is
+ * up to the actual implementation to determin how to evaluate those nulls.
+ *
+ *
+ * The type returned is NOT required to be the same type as the intermediary value. For example,
+ * the intermediate value could be a Sketch, but the result a Long.
+ *
+ * @return the computed result
+ */
+ protected abstract R computeResult();
+
+ /* (non-Javadoc)
+ * @see Averager#getName()
+ */
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ /**
+ * Returns the fieldname to be extracted from any event rows passed in and stored
+ * for use computing the windowed function.
+ *
+ * @return the fieldName
+ */
+ public String getFieldName()
+ {
+ return fieldName;
+ }
+
+ /**
+ * @return the numBuckets
+ */
+ public int getNumBuckets()
+ {
+ return numBuckets;
+ }
+
+ /**
+ * @return the cycleSize
+ */
+ public int getCycleSize()
+ {
+ return cycleSize;
+ }
+
+ /**
+ * @return the array of buckets
+ */
+ protected I[] getBuckets()
+ {
+ return buckets;
+ }
+
+ /**
+ * Determines wheter any data is present. If all the buckets are empty (not "0"), then
+ * no value should be returned from the Averager, as there were not valid rows within the window.
+ *
+ * @return true if any non-null values available
+ */
+ protected boolean hasData()
+ {
+ for (Object b : buckets) {
+ if (b != null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/BaseAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/BaseAveragerFactory.java
new file mode 100644
index 000000000000..831000fed5c1
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/BaseAveragerFactory.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Common base class for AveragerFactories
+ *
+ * @param Base type that the averager should return as a result
+ * @param Type that that is returned from finalization
+ */
+public abstract class BaseAveragerFactory implements AveragerFactory
+{
+
+ protected String name;
+ protected String fieldName;
+ protected int numBuckets;
+ protected int cycleSize;
+
+ /**
+ * Constructor.
+ *
+ * @param name Name of the Averager
+ * @param numBuckets Number of buckets in the analysis window
+ * @param fieldName Field from incoming events to include in the analysis
+ * @param cycleSize Cycle group size. Used to calculate day-of-week option. Default=1 (single element in group).
+ */
+ public BaseAveragerFactory(String name, int numBuckets, String fieldName, Integer cycleSize)
+ {
+ this.name = name;
+ this.numBuckets = numBuckets;
+ this.fieldName = fieldName;
+ this.cycleSize = (cycleSize != null) ? cycleSize : DEFAULT_PERIOD;
+ Preconditions.checkNotNull(name, "Must have a valid, non-null averager name");
+ Preconditions.checkNotNull(fieldName, "Must have a valid, non-null field name");
+ Preconditions.checkArgument(this.cycleSize > 0, "Cycle size must be greater than zero");
+ Preconditions.checkArgument(numBuckets > 0, "Bucket size must be greater than zero");
+ Preconditions.checkArgument(!(this.cycleSize > numBuckets), "Cycle size must be less than the bucket size");
+ Preconditions.checkArgument(numBuckets % this.cycleSize == 0, "cycleSize must devide numBuckets without a remainder");
+ }
+
+ @Override
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @JsonProperty
+ public String getFieldName()
+ {
+ return fieldName;
+ }
+
+ @Override
+ @JsonProperty("buckets")
+ public int getNumBuckets()
+ {
+ return numBuckets;
+ }
+
+ @Override
+ @JsonProperty("cycleSize")
+ public int getCycleSize()
+ {
+ return cycleSize;
+ }
+
+ @Override
+ public List getDependentFields()
+ {
+ return Collections.singletonList(fieldName);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public F finalizeComputation(R val)
+ {
+ return (F) val;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ComparableAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ComparableAveragerFactory.java
new file mode 100644
index 000000000000..0463d55b97da
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ComparableAveragerFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import java.util.Comparator;
+
+/**
+ * Base averager factory that adds a default comparable method.
+ *
+ * @param return type
+ * @param finalized type
+ */
+public abstract class ComparableAveragerFactory, F> extends BaseAveragerFactory
+{
+ /**
+ * Constructor.
+ *
+ * @param name Name of the Averager
+ * @param numBuckets Number of buckets in the analysis window
+ * @param fieldName Field from incoming events to include in the analysis
+ * @param cycleSize Cycle group size. Used to calculate day-of-week option. Default=1 (single element in group).
+ */
+ public ComparableAveragerFactory(String name, int numBuckets, String fieldName, Integer cycleSize)
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ return Comparator.naturalOrder();
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ConstantAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ConstantAverager.java
new file mode 100644
index 000000000000..bc76c99610f9
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ConstantAverager.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.apache.druid.query.aggregation.AggregatorFactory;
+
+import java.util.Map;
+
+/**
+ * The constant averager.Created soley for incremental development and wiring things up.
+ */
+public class ConstantAverager implements Averager
+{
+
+ private String name;
+ private float retval;
+
+ /**
+ * @param n
+ * @param name
+ * @param retval
+ */
+ public ConstantAverager(int n, String name, float retval)
+ {
+ this.name = name;
+ this.retval = retval;
+ }
+
+ /* (non-Javadoc)
+ * @see Averager#getResult()
+ */
+ @Override
+ public Float getResult()
+ {
+ return retval;
+ }
+
+ /* (non-Javadoc)
+ * @see Averager#getName()
+ */
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ /* (non-Javadoc)
+ * @see Averager#addElement(java.util.Map, java.util.Map)
+ */
+ @Override
+ public void addElement(Map e, Map a)
+ {
+ // since we return a constant, no need to read from the event
+ }
+
+ /* (non-Javadoc)
+ * @see Averager#skip()
+ */
+ @Override
+ public void skip()
+ {
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ConstantAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ConstantAveragerFactory.java
new file mode 100644
index 000000000000..45339c37058b
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ConstantAveragerFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Implementation of AveragerFacvtory created solely for incremental development
+ */
+
+public class ConstantAveragerFactory implements AveragerFactory
+{
+
+ private String name;
+ private int numBuckets;
+ private float retval;
+
+ @JsonCreator
+ public ConstantAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("retval") float retval
+ )
+ {
+ this.name = name;
+ this.numBuckets = numBuckets;
+ this.retval = retval;
+ }
+
+ @Override
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ @JsonProperty("buckets")
+ public int getNumBuckets()
+ {
+ return numBuckets;
+ }
+
+ @JsonProperty
+ public float getRetval()
+ {
+ return retval;
+ }
+
+ @Override
+ public Averager createAverager()
+ {
+ return new ConstantAverager(numBuckets, name, retval);
+ }
+
+ @Override
+ public List getDependentFields()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ return Comparator.naturalOrder();
+ }
+
+ @Override
+ public int getCycleSize()
+ {
+ return 1;
+ }
+
+ @Override
+ public Float finalizeComputation(Float val)
+ {
+ return val;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAverager.java
new file mode 100644
index 000000000000..5e25617025b6
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAverager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class DoubleMaxAverager extends BaseAverager
+{
+
+ public DoubleMaxAverager(int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ super(Number.class, numBuckets, name, fieldName, cycleSize);
+ }
+
+ @Override
+ protected Double computeResult()
+ {
+ double result = Double.NEGATIVE_INFINITY;
+
+ for (int i = 0; i < numBuckets; i += cycleSize) {
+ if (buckets[(i + startFrom) % numBuckets] != null) {
+ result = Double.max(result, (buckets[(i + startFrom) % numBuckets]).doubleValue());
+ }
+ }
+
+ startFrom++;
+ return result;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAveragerFactory.java
new file mode 100644
index 000000000000..1e82f09e9e95
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAveragerFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class DoubleMaxAveragerFactory extends ComparableAveragerFactory
+{
+
+ @JsonCreator
+ public DoubleMaxAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("cycleSize") Integer cycleSize,
+ @JsonProperty("fieldName") String fieldName
+ )
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Averager createAverager()
+ {
+ return new DoubleMaxAverager(numBuckets, name, fieldName, cycleSize);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAverager.java
new file mode 100644
index 000000000000..be9292c94c7b
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAverager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class DoubleMeanAverager extends BaseAverager
+{
+
+ public DoubleMeanAverager(int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ super(Number.class, numBuckets, name, fieldName, cycleSize);
+ }
+
+ @Override
+ protected Double computeResult()
+ {
+ double result = 0.0;
+ int validBuckets = 0;
+
+ for (int i = 0; i < numBuckets; i += cycleSize) {
+ if (buckets[(i + startFrom) % numBuckets] != null) {
+ result += (buckets[(i + startFrom) % numBuckets]).doubleValue();
+ } else {
+ result += 0.0;
+ }
+ validBuckets++;
+ }
+
+ startFrom++;
+ return result / validBuckets;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAveragerFactory.java
new file mode 100644
index 000000000000..58f544671a96
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAveragerFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class DoubleMeanAveragerFactory extends ComparableAveragerFactory
+{
+
+ @JsonCreator
+ public DoubleMeanAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("cycleSize") Integer cycleSize,
+ @JsonProperty("fieldName") String fieldName
+ )
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Averager createAverager()
+ {
+ return new DoubleMeanAverager(numBuckets, name, fieldName, cycleSize);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAverager.java
new file mode 100644
index 000000000000..573f12a9e8b8
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAverager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class DoubleMeanNoNullAverager extends BaseAverager
+{
+
+ public DoubleMeanNoNullAverager(int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ super(Number.class, numBuckets, name, fieldName, cycleSize);
+ }
+
+ @Override
+ protected Double computeResult()
+ {
+ double result = 0.0;
+ int validBuckets = 0;
+
+ for (int i = 0; i < numBuckets; i += cycleSize) {
+ if (buckets[(i + startFrom) % numBuckets] != null) {
+ result += (buckets[(i + startFrom) % numBuckets]).doubleValue();
+ validBuckets++;
+ }
+ }
+
+ startFrom++;
+ return result / validBuckets;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAveragerFactory.java
new file mode 100644
index 000000000000..d6e11893a5e7
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAveragerFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class DoubleMeanNoNullAveragerFactory extends ComparableAveragerFactory
+{
+ @JsonCreator
+ public DoubleMeanNoNullAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("cycleSize") Integer cycleSize,
+ @JsonProperty("fieldName") String fieldName
+ )
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Averager createAverager()
+ {
+ return new DoubleMeanNoNullAverager(numBuckets, name, fieldName, cycleSize);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAverager.java
new file mode 100644
index 000000000000..d108feed0224
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAverager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class DoubleMinAverager extends BaseAverager
+{
+
+ public DoubleMinAverager(int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ super(Number.class, numBuckets, name, fieldName, cycleSize);
+ }
+
+ @Override
+ protected Double computeResult()
+ {
+ double result = Double.POSITIVE_INFINITY;
+
+ for (int i = 0; i < numBuckets; i += cycleSize) {
+ if (buckets[(i + startFrom) % numBuckets] != null) {
+ result = Double.min(result, (buckets[(i + startFrom) % numBuckets]).doubleValue());
+ }
+ }
+
+ startFrom++;
+ return result;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAveragerFactory.java
new file mode 100644
index 000000000000..35a783b2235d
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAveragerFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class DoubleMinAveragerFactory extends ComparableAveragerFactory
+{
+ @JsonCreator
+ public DoubleMinAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("cycleSize") Integer cycleSize,
+ @JsonProperty("fieldName") String fieldName
+ )
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Averager createAverager()
+ {
+ return new DoubleMinAverager(numBuckets, name, fieldName, cycleSize);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMaxAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMaxAverager.java
new file mode 100644
index 000000000000..a45503ca58c1
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMaxAverager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class LongMaxAverager extends BaseAverager
+{
+
+ public LongMaxAverager(int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ super(Number.class, numBuckets, name, fieldName, cycleSize);
+ }
+
+ @Override
+ protected Long computeResult()
+ {
+ long result = Long.MIN_VALUE;
+
+ for (int i = 0; i < numBuckets; i += cycleSize) {
+ if (buckets[(i + startFrom) % numBuckets] != null) {
+ result = Long.max(result, (buckets[(i + startFrom) % numBuckets]).longValue());
+ }
+ }
+
+ startFrom++;
+ return result;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMaxAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMaxAveragerFactory.java
new file mode 100644
index 000000000000..847bbcb9e341
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMaxAveragerFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class LongMaxAveragerFactory extends ComparableAveragerFactory
+{
+ @JsonCreator
+ public LongMaxAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("cycleSize") Integer cycleSize,
+ @JsonProperty("fieldName") String fieldName
+ )
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Averager createAverager()
+ {
+ return new LongMaxAverager(numBuckets, name, fieldName, cycleSize);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanAverager.java
new file mode 100644
index 000000000000..a5919d727f78
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanAverager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class LongMeanAverager extends BaseAverager
+{
+
+ public LongMeanAverager(int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ super(Number.class, numBuckets, name, fieldName, cycleSize);
+ }
+
+ @Override
+ protected Double computeResult()
+ {
+ long result = 0;
+ int validBuckets = 0;
+
+ for (int i = 0; i < numBuckets; i += cycleSize) {
+ if (buckets[(i + startFrom) % numBuckets] != null) {
+ result += (buckets[(i + startFrom) % numBuckets]).longValue();
+ } else {
+ result += 0;
+ }
+ validBuckets++;
+ }
+
+ startFrom++;
+ return ((double) result) / validBuckets;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanAveragerFactory.java
new file mode 100644
index 000000000000..d02e06d96173
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanAveragerFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class LongMeanAveragerFactory extends ComparableAveragerFactory
+{
+
+ @JsonCreator
+ public LongMeanAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("cycleSize") Integer cycleSize,
+ @JsonProperty("fieldName") String fieldName
+ )
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Averager createAverager()
+ {
+ return new LongMeanAverager(numBuckets, name, fieldName, cycleSize);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAverager.java
new file mode 100644
index 000000000000..ecdd17a6f265
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAverager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class LongMeanNoNullAverager extends BaseAverager
+{
+
+ public LongMeanNoNullAverager(int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ super(Number.class, numBuckets, name, fieldName, cycleSize);
+ }
+
+ @Override
+ protected Double computeResult()
+ {
+ long result = 0;
+ int validBuckets = 0;
+
+ for (int i = 0; i < numBuckets; i += cycleSize) {
+ if (buckets[(i + startFrom) % numBuckets] != null) {
+ result += (buckets[(i + startFrom) % numBuckets]).longValue();
+ validBuckets++;
+ }
+ }
+
+ startFrom++;
+ return ((double) result) / validBuckets;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAveragerFactory.java
new file mode 100644
index 000000000000..03ad7d1e654c
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAveragerFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class LongMeanNoNullAveragerFactory extends ComparableAveragerFactory
+{
+
+ @JsonCreator
+ public LongMeanNoNullAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("cycleSize") Integer cycleSize,
+ @JsonProperty("fieldName") String fieldName
+ )
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Averager createAverager()
+ {
+ return new LongMeanNoNullAverager(numBuckets, name, fieldName, cycleSize);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMinAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMinAverager.java
new file mode 100644
index 000000000000..cc999e6b9abf
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMinAverager.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class LongMinAverager extends BaseAverager
+{
+
+ public LongMinAverager(int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ super(Number.class, numBuckets, name, fieldName, cycleSize);
+ }
+
+ @Override
+ protected Long computeResult()
+ {
+ long result = Long.MAX_VALUE;
+
+ for (int i = 0; i < numBuckets; i += cycleSize) {
+ if (buckets[(i + startFrom) % numBuckets] != null) {
+ result = Long.min(result, (buckets[(i + startFrom) % numBuckets]).longValue());
+ }
+ }
+
+ startFrom++;
+ return result;
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMinAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMinAveragerFactory.java
new file mode 100644
index 000000000000..ff2562541172
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMinAveragerFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class LongMinAveragerFactory extends ComparableAveragerFactory
+{
+
+ @JsonCreator
+ public LongMinAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("cycleSize") int cycleSize,
+ @JsonProperty("fieldName") String fieldName
+ )
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Averager createAverager()
+ {
+ return new LongMinAverager(numBuckets, name, fieldName, cycleSize);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/moving-average-query/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
new file mode 100644
index 000000000000..ec70e7d9c464
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.druid.query.movingaverage.MovingAverageQueryModule
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageIterableTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageIterableTest.java
new file mode 100644
index 000000000000..3acc1f71a46f
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageIterableTest.java
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.query.movingaverage.averagers.ConstantAveragerFactory;
+import org.apache.druid.query.movingaverage.averagers.LongMeanAveragerFactory;
+import org.joda.time.DateTime;
+import org.joda.time.chrono.ISOChronology;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *
+ */
+public class MovingAverageIterableTest
+{
+ private static final DateTime JAN_1 = new DateTime(2017, 1, 1, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+ private static final DateTime JAN_2 = new DateTime(2017, 1, 2, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+ private static final DateTime JAN_3 = new DateTime(2017, 1, 3, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+ private static final DateTime JAN_4 = new DateTime(2017, 1, 4, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+ private static final DateTime JAN_5 = new DateTime(2017, 1, 5, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+ private static final DateTime JAN_6 = new DateTime(2017, 1, 6, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+
+ private static final String GENDER = "gender";
+ private static final String AGE = "age";
+ private static final String COUNTRY = "country";
+
+ private static final Map dims1 = new HashMap<>();
+ private static final Map dims2 = new HashMap<>();
+ private static final Map dims3 = new HashMap<>();
+
+ static {
+ dims1.put(GENDER, "m");
+ dims1.put(AGE, "10");
+ dims1.put(COUNTRY, "US");
+
+ dims2.put(GENDER, "f");
+ dims2.put(AGE, "8");
+ dims2.put(COUNTRY, "US");
+
+ dims3.put(GENDER, "u");
+ dims3.put(AGE, "5");
+ dims3.put(COUNTRY, "UK");
+ }
+
+ @Test
+ public void testNext()
+ {
+
+ List dims = Arrays.asList(
+ new DefaultDimensionSpec(GENDER, GENDER),
+ new DefaultDimensionSpec(AGE, AGE),
+ new DefaultDimensionSpec(COUNTRY, COUNTRY)
+ );
+
+ Sequence dayBuckets = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Arrays.asList(
+ new MapBasedRow(JAN_1, dims1),
+ new MapBasedRow(JAN_1, dims2)
+ )),
+ new RowBucket(JAN_2, Collections.singletonList(
+ new MapBasedRow(JAN_2, dims1)
+ )),
+ new RowBucket(JAN_3, Collections.emptyList()),
+ new RowBucket(JAN_4, Arrays.asList(
+ new MapBasedRow(JAN_4, dims2),
+ new MapBasedRow(JAN_4, dims3)
+ ))
+ ));
+
+ Iterable iterable = new MovingAverageIterable(
+ dayBuckets,
+ dims,
+ Collections.singletonList(new ConstantAveragerFactory("noop", 1, 1.1f)),
+ Collections.emptyList(),
+ Collections.emptyList()
+ );
+
+ Iterator iter = iterable.iterator();
+
+ assertTrue(iter.hasNext());
+ Row r = iter.next();
+ assertEquals(JAN_1, r.getTimestamp());
+ assertEquals("m", r.getRaw(GENDER));
+
+ assertTrue(iter.hasNext());
+ r = iter.next();
+ assertEquals(JAN_1, r.getTimestamp());
+ assertEquals("f", r.getRaw(GENDER));
+
+ assertTrue(iter.hasNext());
+ r = iter.next();
+ assertEquals(JAN_2, r.getTimestamp());
+ assertEquals("m", r.getRaw(GENDER));
+
+ assertTrue(iter.hasNext());
+ r = iter.next();
+ assertEquals(JAN_2, r.getTimestamp());
+ assertEquals("f", r.getRaw(GENDER));
+
+ assertTrue(iter.hasNext());
+ r = iter.next();
+ Row r2 = r;
+ assertEquals(JAN_3, r.getTimestamp());
+ assertEquals("US", r.getRaw(COUNTRY));
+
+ assertTrue(iter.hasNext());
+ r = iter.next();
+ assertEquals(JAN_3, r.getTimestamp());
+ assertEquals("US", r.getRaw(COUNTRY));
+ assertThat(r.getRaw(AGE), not(equalTo(r2.getRaw(AGE))));
+
+ assertTrue(iter.hasNext());
+ r = iter.next();
+ assertEquals(JAN_4, r.getTimestamp());
+ assertEquals("f", r.getRaw(GENDER));
+
+ assertTrue(iter.hasNext());
+ r = iter.next();
+ assertEquals(JAN_4, r.getTimestamp());
+ assertEquals("u", r.getRaw(GENDER));
+
+ assertTrue(iter.hasNext());
+ r = iter.next();
+ assertEquals(JAN_4, r.getTimestamp());
+ assertEquals("m", r.getRaw(GENDER));
+
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testAveraging()
+ {
+
+ Map event1 = new HashMap<>();
+ Map event2 = new HashMap<>();
+ Map event3 = new HashMap<>();
+ Map event4 = new HashMap<>();
+
+ List ds = new ArrayList<>();
+ ds.add(new DefaultDimensionSpec("gender", "gender"));
+
+ event1.put("gender", "m");
+ event1.put("pageViews", 10L);
+ Row row1 = new MapBasedRow(JAN_1, event1);
+
+ event2.put("gender", "m");
+ event2.put("pageViews", 20L);
+ Row row2 = new MapBasedRow(JAN_2, event2);
+
+ event3.put("gender", "m");
+ event3.put("pageViews", 30L);
+ Row row3 = new MapBasedRow(JAN_3, event3);
+
+ event4.put("gender", "f");
+ event4.put("pageViews", 40L);
+ Row row4 = new MapBasedRow(JAN_3, event4);
+
+ float retval = 14.5f;
+
+ Sequence seq = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Collections.singletonList(row1)),
+ new RowBucket(JAN_2, Collections.singletonList(row2)),
+ new RowBucket(JAN_3, Arrays.asList(row3, row4))
+ ));
+
+ Iterator iter = new MovingAverageIterable(seq, ds, Arrays.asList(
+ new ConstantAveragerFactory("costPageViews", 7, retval),
+ new LongMeanAveragerFactory("movingAvgPageViews", 7, 1, "pageViews")
+ ),
+ Collections.emptyList(),
+ Collections.singletonList(new LongSumAggregatorFactory("pageViews",
+ "pageViews"
+ ))
+ ).iterator();
+
+ assertTrue(iter.hasNext());
+ Row caResult = iter.next();
+
+ assertEquals(JAN_1, caResult.getTimestamp());
+ assertEquals("m", (caResult.getDimension("gender")).get(0));
+ assertEquals(retval, caResult.getMetric("costPageViews").floatValue(), 0.0f);
+ assertEquals(1.4285715f, caResult.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ caResult = iter.next();
+ assertEquals("m", (caResult.getDimension("gender")).get(0));
+ assertEquals(4.285714f, caResult.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ caResult = iter.next();
+ assertEquals("m", (caResult.getDimension("gender")).get(0));
+ assertEquals(8.571428f, caResult.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ caResult = iter.next();
+ assertEquals("f", (caResult.getDimension("gender")).get(0));
+ assertEquals(5.714285850f, caResult.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertFalse(iter.hasNext());
+
+ }
+
+
+ @Test
+ public void testCompleteData()
+ {
+
+ Map event1 = new HashMap<>();
+ Map event2 = new HashMap<>();
+ Map event3 = new HashMap<>();
+
+ event1.put("gender", "m");
+ event1.put("pageViews", 10L);
+ event2.put("gender", "f");
+ event2.put("pageViews", 20L);
+ event3.put("gender", "u");
+ event3.put("pageViews", 30L);
+
+ List ds = new ArrayList<>();
+ ds.add(new DefaultDimensionSpec("gender", "gender"));
+
+ Row jan1Row1 = new MapBasedRow(JAN_1, event1);
+ Row jan1Row2 = new MapBasedRow(JAN_1, event2);
+ Row jan1Row3 = new MapBasedRow(JAN_1, event3);
+
+ Row jan2Row1 = new MapBasedRow(JAN_2, event1);
+ Row jan2Row2 = new MapBasedRow(JAN_2, event2);
+ Row jan2Row3 = new MapBasedRow(JAN_2, event3);
+
+ Sequence seq = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Arrays.asList(jan1Row1, jan1Row2, jan1Row3)),
+ new RowBucket(JAN_2, Arrays.asList(jan2Row1, jan2Row2, jan2Row3))
+ ));
+
+ Iterator iter = new MovingAverageIterable(seq, ds, Collections.singletonList(
+ new LongMeanAveragerFactory("movingAvgPageViews", 2, 1, "pageViews")),
+ Collections.emptyList(),
+ Collections.singletonList(new LongSumAggregatorFactory("pageViews",
+ "pageViews"
+ ))
+ ).iterator();
+
+ assertTrue(iter.hasNext());
+ Row result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertFalse(iter.hasNext());
+
+ }
+
+ // no injection if the data missing at the begining
+ @Test
+ public void testMissingDataAtBeginning()
+ {
+
+ Map event1 = new HashMap<>();
+ Map event2 = new HashMap<>();
+ Map event3 = new HashMap<>();
+
+ event1.put("gender", "m");
+ event1.put("pageViews", 10L);
+ event2.put("gender", "f");
+ event2.put("pageViews", 20L);
+ event3.put("gender", "u");
+ event3.put("pageViews", 30L);
+
+ List ds = new ArrayList<>();
+ ds.add(new DefaultDimensionSpec("gender", "gender"));
+
+ Row jan1Row1 = new MapBasedRow(JAN_1, event1);
+
+ Row jan2Row1 = new MapBasedRow(JAN_2, event1);
+ Row jan2Row2 = new MapBasedRow(JAN_2, event2);
+ Row jan2Row3 = new MapBasedRow(JAN_2, event3);
+
+ Sequence seq = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Collections.singletonList(jan1Row1)),
+ new RowBucket(JAN_2, Arrays.asList(jan2Row1, jan2Row2, jan2Row3))
+ ));
+
+ Iterator iter = new MovingAverageIterable(seq, ds, Collections.singletonList(
+ new LongMeanAveragerFactory("movingAvgPageViews", 2, 1, "pageViews")),
+ Collections.emptyList(),
+ Collections.singletonList(new LongSumAggregatorFactory("pageViews",
+ "pageViews"
+ ))
+ ).iterator();
+
+ assertTrue(iter.hasNext());
+ Row result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertFalse(iter.hasNext());
+ }
+
+ // test injection when the data is missing at the end
+ @Test
+ public void testMissingDataAtTheEnd()
+ {
+
+ Map event1 = new HashMap<>();
+ Map event2 = new HashMap<>();
+ Map event3 = new HashMap<>();
+
+ event1.put("gender", "m");
+ event1.put("pageViews", 10L);
+ event2.put("gender", "f");
+ event2.put("pageViews", 20L);
+ event3.put("gender", "u");
+ event3.put("pageViews", 30L);
+
+ List ds = new ArrayList<>();
+ ds.add(new DefaultDimensionSpec("gender", "gender"));
+
+ Row jan1Row1 = new MapBasedRow(JAN_1, event1);
+ Row jan1Row2 = new MapBasedRow(JAN_1, event2);
+ Row jan1Row3 = new MapBasedRow(JAN_1, event3);
+ Row jan2Row1 = new MapBasedRow(JAN_2, event1);
+
+ Sequence seq = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Arrays.asList(jan1Row1, jan1Row2, jan1Row3)),
+ new RowBucket(JAN_2, Collections.singletonList(jan2Row1))
+ ));
+
+ Iterator iter = new MovingAverageIterable(seq, ds, Collections.singletonList(
+ new LongMeanAveragerFactory("movingAvgPageViews", 2, 1, "pageViews")),
+ Collections.emptyList(),
+ Collections.singletonList(new LongSumAggregatorFactory("pageViews",
+ "pageViews"
+ ))
+ ).iterator();
+
+ assertTrue(iter.hasNext());
+ Row result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertFalse(iter.hasNext());
+ }
+
+ // test injection when the data is missing in the middle
+ @Test
+ public void testMissingDataAtMiddle()
+ {
+
+ Map eventM = new HashMap<>();
+ Map eventF = new HashMap<>();
+ Map eventU = new HashMap<>();
+ Map event4 = new HashMap<>();
+
+ eventM.put("gender", "m");
+ eventM.put("pageViews", 10L);
+ eventF.put("gender", "f");
+ eventF.put("pageViews", 20L);
+ eventU.put("gender", "u");
+ eventU.put("pageViews", 30L);
+
+ List ds = new ArrayList<>();
+ ds.add(new DefaultDimensionSpec("gender", "gender"));
+
+ Row jan1Row1M = new MapBasedRow(JAN_1, eventM);
+ Row jan1Row2F = new MapBasedRow(JAN_1, eventF);
+ Row jan1Row3U = new MapBasedRow(JAN_1, eventU);
+ Row jan2Row1M = new MapBasedRow(JAN_2, eventM);
+ Row jan3Row1M = new MapBasedRow(JAN_3, eventM);
+ Row jan3Row2F = new MapBasedRow(JAN_3, eventF);
+ Row jan3Row3U = new MapBasedRow(JAN_3, eventU);
+ Row jan4Row1M = new MapBasedRow(JAN_4, eventM);
+
+ Sequence seq = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Arrays.asList(jan1Row1M, jan1Row2F, jan1Row3U)),
+ new RowBucket(JAN_2, Collections.singletonList(jan2Row1M)),
+ new RowBucket(JAN_3, Arrays.asList(jan3Row1M, jan3Row2F, jan3Row3U)),
+ new RowBucket(JAN_4, Collections.singletonList(jan4Row1M))
+ ));
+
+ Iterator iter = new MovingAverageIterable(seq, ds, Collections.singletonList(
+ new LongMeanAveragerFactory("movingAvgPageViews", 3, 1, "pageViews")),
+ Collections.emptyList(),
+ Collections.singletonList(new LongSumAggregatorFactory("pageViews",
+ "pageViews"
+ ))
+ ).iterator();
+
+ // Jan 1
+ assertTrue(iter.hasNext());
+ Row result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ // Jan 2
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ // Jan 3
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_3, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_3, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_3, (result.getTimestamp()));
+
+ // Jan 4
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_4, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_4, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_4, (result.getTimestamp()));
+
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testMissingDaysAtBegining()
+ {
+
+ Map event1 = new HashMap<>();
+ Map event2 = new HashMap<>();
+
+ List ds = new ArrayList<>();
+ ds.add(new DefaultDimensionSpec("gender", "gender"));
+
+ event1.put("gender", "m");
+ event1.put("pageViews", 10L);
+ Row row1 = new MapBasedRow(JAN_3, event1);
+
+ event2.put("gender", "m");
+ event2.put("pageViews", 20L);
+ Row row2 = new MapBasedRow(JAN_4, event2);
+
+ Sequence seq = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Collections.emptyList()),
+ new RowBucket(JAN_2, Collections.emptyList()),
+ new RowBucket(JAN_3, Collections.singletonList(row1)),
+ new RowBucket(JAN_4, Collections.singletonList(row2))
+ ));
+
+ Iterator