Skip to content

Commit

Permalink
Supporting [any] operator in the built-in aggregators (#9912)
Browse files Browse the repository at this point in the history
* Supporting [any] operator in the built-in aggregators

* Parametrizing aggrSpec test. Adding client test.

* Test extension for EE
  • Loading branch information
tombujok committed Feb 17, 2017
1 parent 7495b3b commit 7d73d12
Show file tree
Hide file tree
Showing 29 changed files with 425 additions and 205 deletions.
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2008-2017, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.client.aggregation;

import com.hazelcast.aggregation.AggregatorsSpecTest;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.After;
import org.junit.experimental.categories.Category;

import static com.hazelcast.spi.properties.GroupProperty.AGGREGATION_ACCUMULATION_PARALLEL_EVALUATION;
import static com.hazelcast.spi.properties.GroupProperty.PARTITION_COUNT;

@Category({QuickTest.class, ParallelTest.class})
public class ClientAggregatorsSpecTest extends AggregatorsSpecTest {

private TestHazelcastFactory factory;

@Override
protected <K, V> IMap<K, V> getMapWithNodeCount(int nodeCount, boolean parallelAccumulation) {
if (nodeCount < 1) {
throw new IllegalArgumentException("node count < 1");
}

MapConfig mapConfig = new MapConfig()
.setName("aggr")
.setInMemoryFormat(inMemoryFormat);

Config config = new Config()
.setProperty(PARTITION_COUNT.getName(), String.valueOf(nodeCount))
.setProperty(AGGREGATION_ACCUMULATION_PARALLEL_EVALUATION.getName(), String.valueOf(parallelAccumulation))
.addMapConfig(mapConfig);

factory = new TestHazelcastFactory();
for (int i = 0; i < nodeCount; i++) {
factory.newHazelcastInstance(config);
}
HazelcastInstance instance = factory.newHazelcastClient();
return instance.getMap("aggr");
}

@After
public void teardown() {
factory.terminateAll();
}

}
Expand Up @@ -18,17 +18,30 @@

import com.hazelcast.aggregation.Aggregator;
import com.hazelcast.query.impl.Extractable;
import com.hazelcast.query.impl.getters.MultiResult;

import java.util.List;
import java.util.Map;

/**
* Abstract class providing convenience for concrete implementations of an {@link Aggregator}
* It also provides the extract() method that enables extracting values of the given attributePath from the input.
* It provides built-in extraction capabilities that may be used in the accumulation phase.
* <p>
* Extraction rules:
* <ul>
* <li>If the attributePath is not null and the input object is an instance of Extractable the value will be extracted
* from the attributePath in the input object and accumulated instead of the whole input object.
* </li>
* <li>If the attributePath is null and the input object is an instance of Map.Entry the Map.Entry.getValue() will be
* accumulated instead of the whole input object.
* </li>
* </ul>
*
* @param <I> input type
* @param <E> extracted value type
* @param <R> result type
*/
public abstract class AbstractAggregator<I, R> extends Aggregator<I, R> {
public abstract class AbstractAggregator<I, E, R> extends Aggregator<I, R> {

protected String attributePath;

Expand All @@ -40,11 +53,25 @@ public AbstractAggregator(String attributePath) {
this.attributePath = attributePath;
}

@Override
public final void accumulate(I entry) {
E extractedValue = extract(entry);
if (extractedValue instanceof MultiResult) {
@SuppressWarnings("unchecked")
List<E> results = ((MultiResult<E>) extractedValue).getResults();
for (E o : results) {
accumulateExtracted(o);
}
} else {
accumulateExtracted(extractedValue);
}
}

/**
* Extract the value of the given attributePath from the given entry.
*/
@SuppressWarnings("unchecked")
protected final <T> T extract(I input) {
private <T> T extract(I input) {
if (attributePath == null) {
if (input instanceof Map.Entry) {
return (T) ((Map.Entry) input).getValue();
Expand All @@ -54,4 +81,15 @@ protected final <T> T extract(I input) {
}
throw new IllegalArgumentException("Can't extract " + attributePath + " from the given input");
}

/**
* Accumulates a single extracted value.
* This method may be called multiple times per accumulated entry if the attributePath contains [any] operator.
*
* @param value If attributePath is not null the methods accumulates the value extracted from the attributePath.
* If attributePath is null and the input object is a Map.Entry the method accumulates Map.Entry.getValue().
* Otherwise the method accumulates the input value as-is.
*/
protected abstract void accumulateExtracted(E value);

}
Expand Up @@ -24,7 +24,7 @@
import java.io.IOException;
import java.math.BigDecimal;

public final class BigDecimalAverageAggregator<I> extends AbstractAggregator<I, BigDecimal>
public final class BigDecimalAverageAggregator<I> extends AbstractAggregator<I, BigDecimal, BigDecimal>
implements IdentifiedDataSerializable {

private BigDecimal sum = BigDecimal.ZERO;
Expand All @@ -39,11 +39,9 @@ public BigDecimalAverageAggregator(String attributePath) {
}

@Override
public void accumulate(I input) {
protected void accumulateExtracted(BigDecimal value) {
count++;

BigDecimal extractedValue = (BigDecimal) extract(input);
sum = sum.add(extractedValue);
sum = sum.add(value);
}

@Override
Expand Down
Expand Up @@ -24,7 +24,8 @@
import java.io.IOException;
import java.math.BigDecimal;

public final class BigDecimalSumAggregator<I> extends AbstractAggregator<I, BigDecimal> implements IdentifiedDataSerializable {
public final class BigDecimalSumAggregator<I> extends AbstractAggregator<I, BigDecimal, BigDecimal>
implements IdentifiedDataSerializable {

private BigDecimal sum = BigDecimal.ZERO;

Expand All @@ -37,9 +38,8 @@ public BigDecimalSumAggregator(String attributePath) {
}

@Override
public void accumulate(I entry) {
BigDecimal extractedValue = (BigDecimal) extract(entry);
sum = sum.add(extractedValue);
public void accumulateExtracted(BigDecimal value) {
sum = sum.add(value);
}

@Override
Expand Down
Expand Up @@ -25,7 +25,7 @@
import java.math.BigDecimal;
import java.math.BigInteger;

public final class BigIntegerAverageAggregator<I> extends AbstractAggregator<I, BigDecimal>
public final class BigIntegerAverageAggregator<I> extends AbstractAggregator<I, BigInteger, BigDecimal>
implements IdentifiedDataSerializable {

private BigInteger sum = BigInteger.ZERO;
Expand All @@ -40,11 +40,9 @@ public BigIntegerAverageAggregator(String attributePath) {
}

@Override
public void accumulate(I entry) {
public void accumulateExtracted(BigInteger value) {
count++;

BigInteger extractedValue = (BigInteger) extract(entry);
sum = sum.add(extractedValue);
sum = sum.add(value);
}

@Override
Expand Down
Expand Up @@ -24,7 +24,8 @@
import java.io.IOException;
import java.math.BigInteger;

public final class BigIntegerSumAggregator<I> extends AbstractAggregator<I, BigInteger> implements IdentifiedDataSerializable {
public final class BigIntegerSumAggregator<I> extends AbstractAggregator<I, BigInteger, BigInteger>
implements IdentifiedDataSerializable {

private BigInteger sum = BigInteger.ZERO;

Expand All @@ -37,9 +38,8 @@ public BigIntegerSumAggregator(String attributePath) {
}

@Override
public void accumulate(I entry) {
BigInteger extractedValue = (BigInteger) extract(entry);
sum = sum.add(extractedValue);
public void accumulateExtracted(BigInteger value) {
sum = sum.add(value);
}

@Override
Expand Down
Expand Up @@ -23,7 +23,7 @@

import java.io.IOException;

public final class CountAggregator<I> extends AbstractAggregator<I, Long> implements IdentifiedDataSerializable {
public final class CountAggregator<I> extends AbstractAggregator<I, Object, Long> implements IdentifiedDataSerializable {
private long count;

public CountAggregator() {
Expand All @@ -35,7 +35,7 @@ public CountAggregator(String attributePath) {
}

@Override
public void accumulate(I entry) {
public void accumulateExtracted(Object value) {
count++;
}

Expand Down
Expand Up @@ -25,7 +25,7 @@
import java.util.HashSet;
import java.util.Set;

public final class DistinctValuesAggregator<I, R> extends AbstractAggregator<I, Set<R>> implements IdentifiedDataSerializable {
public final class DistinctValuesAggregator<I, R> extends AbstractAggregator<I, R, Set<R>> implements IdentifiedDataSerializable {
Set<R> values = new HashSet<R>();

public DistinctValuesAggregator() {
Expand All @@ -37,9 +37,8 @@ public DistinctValuesAggregator(String attributePath) {
}

@Override
public void accumulate(I entry) {
R extractedValue = (R) extract(entry);
values.add(extractedValue);
public void accumulateExtracted(R value) {
values.add(value);
}

@Override
Expand Down
Expand Up @@ -23,7 +23,8 @@

import java.io.IOException;

public final class DoubleAverageAggregator<I> extends AbstractAggregator<I, Double> implements IdentifiedDataSerializable {
public final class DoubleAverageAggregator<I> extends AbstractAggregator<I, Double, Double>
implements IdentifiedDataSerializable {

private double sum;

Expand All @@ -38,10 +39,9 @@ public DoubleAverageAggregator(String attributePath) {
}

@Override
public void accumulate(I entry) {
public void accumulateExtracted(Double value) {
count++;
Double extractedValue = (Double) extract(entry);
sum += extractedValue;
sum += value;
}

@Override
Expand Down
Expand Up @@ -23,7 +23,8 @@

import java.io.IOException;

public final class DoubleSumAggregator<I> extends AbstractAggregator<I, Double> implements IdentifiedDataSerializable {
public final class DoubleSumAggregator<I> extends AbstractAggregator<I, Double, Double>
implements IdentifiedDataSerializable {

private double sum;

Expand All @@ -36,9 +37,8 @@ public DoubleSumAggregator(String attributePath) {
}

@Override
public void accumulate(I entry) {
Double extractedValue = (Double) extract(entry);
sum += extractedValue;
public void accumulateExtracted(Double value) {
sum += value;
}

@Override
Expand Down
Expand Up @@ -23,7 +23,7 @@

import java.io.IOException;

public final class FixedSumAggregator<I> extends AbstractAggregator<I, Long> implements IdentifiedDataSerializable {
public final class FixedSumAggregator<I> extends AbstractAggregator<I, Number, Long> implements IdentifiedDataSerializable {

private long sum;

Expand All @@ -36,9 +36,8 @@ public FixedSumAggregator(String attributePath) {
}

@Override
public void accumulate(I entry) {
Number extractedValue = (Number) extract(entry);
sum += extractedValue.longValue();
public void accumulateExtracted(Number value) {
sum += value.longValue();
}

@Override
Expand Down
Expand Up @@ -23,7 +23,8 @@

import java.io.IOException;

public final class FloatingPointSumAggregator<I> extends AbstractAggregator<I, Double> implements IdentifiedDataSerializable {
public final class FloatingPointSumAggregator<I> extends AbstractAggregator<I, Number, Double>
implements IdentifiedDataSerializable {

private double sum;

Expand All @@ -36,9 +37,8 @@ public FloatingPointSumAggregator(String attributePath) {
}

@Override
public void accumulate(I entry) {
Number extractedValue = (Number) extract(entry);
sum += extractedValue.doubleValue();
public void accumulateExtracted(Number value) {
sum += value.doubleValue();
}

@Override
Expand Down
Expand Up @@ -23,7 +23,8 @@

import java.io.IOException;

public final class IntegerAverageAggregator<I> extends AbstractAggregator<I, Double> implements IdentifiedDataSerializable {
public final class IntegerAverageAggregator<I> extends AbstractAggregator<I, Integer, Double>
implements IdentifiedDataSerializable {

private long sum;

Expand All @@ -38,10 +39,9 @@ public IntegerAverageAggregator(String attributePath) {
}

@Override
public void accumulate(I entry) {
public void accumulateExtracted(Integer value) {
count++;
Integer extractedValue = (Integer) extract(entry);
sum += extractedValue;
sum += value;
}

@Override
Expand Down

0 comments on commit 7d73d12

Please sign in to comment.