Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
3a5b842
Temp commit
akshaisarma Dec 14, 2017
7af5721
Moved operations out
akshaisarma Dec 15, 2017
d381d74
Moving operations
akshaisarma Dec 15, 2017
b978b88
Runner
akshaisarma Dec 15, 2017
64c1830
Cleaning up aggregation
akshaisarma Dec 15, 2017
c6af195
Working on tests
akshaisarma Dec 16, 2017
6b72c13
Working on tests
akshaisarma Dec 19, 2017
f900de0
Working on Querier. Isolated and moved tests
akshaisarma Dec 20, 2017
8586cff
Fixing Raw to not auto reset
akshaisarma Dec 20, 2017
c1bc4b2
Using a DualSketch concept
akshaisarma Dec 21, 2017
cfcea96
Windowing outline
akshaisarma Dec 30, 2017
fee3016
Broke up Error
akshaisarma Dec 30, 2017
8b4391f
Starting to move Strategy into Scheme. Closable
akshaisarma Jan 3, 2018
c89c7eb
Breaking out a Queryable
akshaisarma Jan 3, 2018
64c85d5
Breaking out a Queryable
akshaisarma Jan 3, 2018
6cbc66c
Calling it monoidal
akshaisarma Jan 4, 2018
ae13c25
Adding windows. Renaming result.Metadata to result.Meta
akshaisarma Jan 10, 2018
c7a1fb0
Fixing aggregation size. Adding rate limiting. Working on docs
akshaisarma Jan 11, 2018
07cb362
Fixing defaults. Tweaking windowing
akshaisarma Jan 12, 2018
5f76a8b
Docs and cleanup
akshaisarma Jan 13, 2018
09e1127
Test fixing
akshaisarma Jan 16, 2018
ef486b7
Metadata reconciling
akshaisarma Jan 17, 2018
822ac82
Fixing tests and bugs
akshaisarma Jan 17, 2018
5f69729
Fixing more tests
akshaisarma Jan 17, 2018
64dcbf0
Fixing FilterOperationsTest
akshaisarma Jan 17, 2018
5663721
Pulling out utilities. Refactoring. Fixing more tests
akshaisarma Jan 17, 2018
746a2fa
Tests pass
akshaisarma Jan 18, 2018
4053ada
Sketches, PubSub, Result and Common coverage
akshaisarma Jan 18, 2018
4346121
Refactoring and moving enums into parsing and qualifying them
akshaisarma Jan 19, 2018
ec0e3dc
Testing aggregations
akshaisarma Jan 19, 2018
90f88ce
Moving tests around
akshaisarma Jan 19, 2018
a729d2d
Window tests
akshaisarma Jan 20, 2018
d1eb5cc
Fixing windowing init bugs. Finding Scheme tests
akshaisarma Jan 20, 2018
bd86e29
Additive Tumbling
akshaisarma Jan 20, 2018
2a66f7e
Rate Limit and RunningQuery tests
akshaisarma Jan 23, 2018
0152fc7
Adding a flag to disable windows
akshaisarma Jan 23, 2018
49e252a
Moving some test helpers. Basic Window test
akshaisarma Jan 24, 2018
9da7656
Basic tests
akshaisarma Jan 24, 2018
9970fbf
Moving errors around. Removing the chained error collecting in Querier
akshaisarma Jan 24, 2018
4d129d4
Fixing reactive window creation and avoiding window modification
akshaisarma Jan 24, 2018
648cb7c
SlidingRecord tests
akshaisarma Jan 24, 2018
5a1a261
Adding unless guard for Validator. Adding tests for Basic, Reactive, …
akshaisarma Jan 25, 2018
e6d11e8
All windowing tests
akshaisarma Jan 25, 2018
619e160
Fixing Rate limiting bugs. Adding some missing tests. Adding some Que…
akshaisarma Jan 25, 2018
bc82447
Finishing up tests
akshaisarma Jan 25, 2018
554c3a7
Fixing checkstyles on test sources
akshaisarma Jan 25, 2018
6f9917d
Making sure Query is readonly after configure
akshaisarma Jan 26, 2018
7e05b39
Making initialize not auto called for querier
akshaisarma Jan 29, 2018
fd46d7a
Fixing dropped coverage by removing unneeded method
akshaisarma Jan 29, 2018
b3bd480
Fixing concept name. Making the check for isClosed not done on consume
akshaisarma Jan 31, 2018
f3be104
Review changes
akshaisarma Feb 2, 2018
7c448c0
Removing Raw closed check for consuming and combining
akshaisarma Feb 2, 2018
d021975
Validating after merging
akshaisarma Feb 2, 2018
705b0bc
Fixing docs for Querier
akshaisarma Feb 2, 2018
fd8a39f
Moving time checking to RunningQuery
akshaisarma Feb 6, 2018
50bfe6f
Adding a way to get the original validator
akshaisarma Feb 7, 2018
c5dfd1b
Dual Sketch bug fix
akshaisarma Feb 8, 2018
8b7a84a
Fixing bad javadoc
akshaisarma Feb 8, 2018
ba4e7b7
Making all windows only be closed on their criteria. Restoring RAW ch…
akshaisarma Feb 9, 2018
bca724e
Removing the forcing of windows. Adding new errors
akshaisarma Feb 10, 2018
612521e
Review comments
akshaisarma Feb 10, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ coverage:
doc:
mvn clean javadoc:javadoc

cc: see-coverage

see-coverage: coverage
cd target/site/clover; python -m SimpleHTTPServer

cd: see-doc

see-doc: doc
cd target/site/apidocs; python -m SimpleHTTPServer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.operations.aggregations;
package com.yahoo.bullet.aggregations;

import com.yahoo.bullet.BulletConfig;
import com.yahoo.bullet.Utilities;
import com.yahoo.bullet.operations.aggregations.sketches.ThetaSketch;
import com.yahoo.bullet.aggregations.sketches.ThetaSketch;
import com.yahoo.bullet.common.BulletConfig;
import com.yahoo.bullet.common.BulletError;
import com.yahoo.bullet.common.Utilities;
import com.yahoo.bullet.parsing.Aggregation;
import com.yahoo.bullet.parsing.Error;
import com.yahoo.bullet.record.BulletRecord;
import com.yahoo.bullet.result.Clip;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.ResizeFactor;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static java.util.Collections.singletonList;

Expand All @@ -34,13 +35,14 @@ public class CountDistinct extends KMVStrategy<ThetaSketch> {
private final String newName;

/**
* Constructor that requires an {@link Aggregation}.
* Constructor that requires an {@link Aggregation} and a {@link BulletConfig} configuration.
*
* @param aggregation An {@link Aggregation} with valid fields and attributes for this aggregation type.
* @param config The config that has relevant configs for this strategy.
*/
@SuppressWarnings("unchecked")
public CountDistinct(Aggregation aggregation) {
super(aggregation);
public CountDistinct(Aggregation aggregation, BulletConfig config) {
super(aggregation, config);
Map<String, Object> attributes = aggregation.getAttributes();

ResizeFactor resizeFactor = getResizeFactor(BulletConfig.COUNT_DISTINCT_AGGREGATION_SKETCH_RESIZE_FACTOR);
Expand All @@ -54,8 +56,8 @@ public CountDistinct(Aggregation aggregation) {
}

@Override
public List<Error> initialize() {
return Utilities.isEmpty(fields) ? singletonList(REQUIRES_FIELD_ERROR) : null;
public Optional<List<BulletError>> initialize() {
return Utilities.isEmpty(fields) ? Optional.of(singletonList(REQUIRES_FIELD_ERROR)) : Optional.empty();
}

@Override
Expand All @@ -65,14 +67,24 @@ public void consume(BulletRecord data) {
}

@Override
public Clip getAggregation() {
Clip result = super.getAggregation();
// One record only
BulletRecord record = result.getRecords().get(0);
record.rename(ThetaSketch.COUNT_FIELD, newName);
public Clip getResult() {
Clip result = super.getResult();
renameInPlace(result.getRecords());
return result;
}

@Override
public List<BulletRecord> getRecords() {
return renameInPlace(super.getRecords());
}

private List<BulletRecord> renameInPlace(List<BulletRecord> records) {
// One record only.
BulletRecord record = records.get(0);
record.rename(ThetaSketch.COUNT_FIELD, newName);
return records;
}

/**
* Convert a String family into a {@link Family}. This is used to recognize a user input family choice.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,26 @@
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.operations.aggregations;
package com.yahoo.bullet.aggregations;

import com.yahoo.bullet.BulletConfig;
import com.yahoo.bullet.Utilities;
import com.yahoo.bullet.operations.AggregationOperations.DistributionType;
import com.yahoo.bullet.operations.aggregations.sketches.QuantileSketch;
import com.yahoo.bullet.aggregations.sketches.QuantileSketch;
import com.yahoo.bullet.common.BulletConfig;
import com.yahoo.bullet.common.BulletError;
import com.yahoo.bullet.common.Utilities;
import com.yahoo.bullet.parsing.Aggregation;
import com.yahoo.bullet.parsing.Error;
import com.yahoo.bullet.parsing.Specification;
import com.yahoo.bullet.record.BulletRecord;
import lombok.Getter;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static com.yahoo.bullet.parsing.Error.makeError;
import static com.yahoo.bullet.common.Utilities.extractFieldAsNumber;
import static com.yahoo.bullet.common.BulletError.makeError;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;

Expand All @@ -30,6 +31,29 @@
* configured for the sketch, the normalized rank error can be determined and tightly bound.
*/
public class Distribution extends SketchingStrategy<QuantileSketch> {
@Getter
public enum Type {
QUANTILE("QUANTILE"),
PMF("PMF"),
CDF("CDF");

private String name;

Type(String name) {
this.name = name;
}

/**
* Checks to see if this String represents this enum.
*
* @param name The String version of the enum.
* @return true if the name represents this enum.
*/
public boolean isMe(String name) {
return this.name.equals(name);
}
}

// Distribution fields
public static final String TYPE = "type";
public static final String POINTS = "points";
Expand All @@ -48,34 +72,35 @@ public class Distribution extends SketchingStrategy<QuantileSketch> {
private Aggregation aggregation;

// Validation
public static final Map<String, DistributionType> SUPPORTED_DISTRIBUTION_TYPES = new HashMap<>();
public static final Map<String, Type> SUPPORTED_DISTRIBUTION_TYPES = new HashMap<>();
static {
SUPPORTED_DISTRIBUTION_TYPES.put(DistributionType.QUANTILE.getName(), DistributionType.QUANTILE);
SUPPORTED_DISTRIBUTION_TYPES.put(DistributionType.PMF.getName(), DistributionType.PMF);
SUPPORTED_DISTRIBUTION_TYPES.put(DistributionType.CDF.getName(), DistributionType.CDF);
SUPPORTED_DISTRIBUTION_TYPES.put(Type.QUANTILE.getName(), Type.QUANTILE);
SUPPORTED_DISTRIBUTION_TYPES.put(Type.PMF.getName(), Type.PMF);
SUPPORTED_DISTRIBUTION_TYPES.put(Type.CDF.getName(), Type.CDF);
}
public static final Error REQUIRES_TYPE_ERROR =
public static final BulletError REQUIRES_TYPE_ERROR =
makeError("The DISTRIBUTION type requires specifying a type", "Please set type to one of: " +
SUPPORTED_DISTRIBUTION_TYPES.keySet().stream().collect(Collectors.joining(", ")));
public static final Error REQUIRES_POINTS_ERROR =
public static final BulletError REQUIRES_POINTS_ERROR =
makeError("The DISTRIBUTION type requires at least one point specified in attributes",
"Please add a list of numeric points with points, OR " +
"specify a number of equidistant points to generate with numberOfPoints OR " +
"specify a range to generate points for with start, end and increment (start < end, increment > 0)");
public static final Error REQUIRES_POINTS_PROPER_RANGE =
makeError(DistributionType.QUANTILE.getName() + " requires points in the proper range",
public static final BulletError REQUIRES_POINTS_PROPER_RANGE =
makeError(Type.QUANTILE.getName() + " requires points in the proper range",
"Please add or generate points: 0 <= point <= 1");
public static final Error REQUIRES_ONE_FIELD_ERROR =
public static final BulletError REQUIRES_ONE_FIELD_ERROR =
makeError("The aggregation type requires exactly one field", "Please add exactly one field to fields");

/**
* Constructor that requires an {@link Aggregation}.
* Constructor that requires an {@link Aggregation} and a {@link BulletConfig} configuration.
*
* @param aggregation An {@link Aggregation} with valid fields and attributes for this aggregation type.
* @param config The config that has relevant configs for this strategy.
*/
@SuppressWarnings("unchecked")
public Distribution(Aggregation aggregation) {
super(aggregation);
public Distribution(Aggregation aggregation, BulletConfig config) {
super(aggregation, config);
entries = config.getAs(BulletConfig.DISTRIBUTION_AGGREGATION_SKETCH_ENTRIES, Integer.class);
rounding = config.getAs(BulletConfig.DISTRIBUTION_AGGREGATION_GENERATED_POINTS_ROUNDING, Integer.class);

Expand All @@ -87,46 +112,47 @@ public Distribution(Aggregation aggregation) {
}

@Override
public List<Error> initialize() {
public Optional<List<BulletError>> initialize() {
if (Utilities.isEmpty(fields) || fields.size() != 1) {
return singletonList(REQUIRES_ONE_FIELD_ERROR);
return Optional.of(singletonList(REQUIRES_ONE_FIELD_ERROR));
}

Map<String, Object> attributes = aggregation.getAttributes();
if (Utilities.isEmpty(attributes)) {
return singletonList(REQUIRES_TYPE_ERROR);
return Optional.of(singletonList(REQUIRES_TYPE_ERROR));
}

String typeString = Utilities.getCasted(attributes, TYPE, String.class);
DistributionType type = SUPPORTED_DISTRIBUTION_TYPES.get(typeString);
Type type = SUPPORTED_DISTRIBUTION_TYPES.get(typeString);
if (type == null) {
return singletonList(REQUIRES_TYPE_ERROR);
return Optional.of(singletonList(REQUIRES_TYPE_ERROR));
}

// Try to initialize sketch now
sketch = getSketch(entries, maxPoints, rounding, type, attributes);

if (sketch == null) {
return type == DistributionType.QUANTILE ? asList(REQUIRES_POINTS_ERROR, REQUIRES_POINTS_PROPER_RANGE) :
singletonList(REQUIRES_POINTS_ERROR);
return Optional.of(type == Type.QUANTILE ?
asList(REQUIRES_POINTS_ERROR, REQUIRES_POINTS_PROPER_RANGE) :
singletonList(REQUIRES_POINTS_ERROR));
}

// Initialize field since we have exactly 1
field = fields.get(0);

return null;
return Optional.empty();
}

@Override
public void consume(BulletRecord data) {
Number value = Specification.extractFieldAsNumber(field, data);
Number value = extractFieldAsNumber(field, data);
if (value != null) {
sketch.update(value.doubleValue());
}
}

private static QuantileSketch getSketch(int entries, int maxPoints, int rounding,
DistributionType type, Map<String, Object> attributes) {
Type type, Map<String, Object> attributes) {
int equidistantPoints = getNumberOfEquidistantPoints(attributes);
if (equidistantPoints > 0) {
return new QuantileSketch(entries, rounding, type, Math.min(equidistantPoints, maxPoints));
Expand All @@ -151,10 +177,9 @@ private static QuantileSketch getSketch(int entries, int maxPoints, int rounding
return new QuantileSketch(entries, type, cleanedPoints);
}

private static boolean invalidBounds(DistributionType type, double[] points) {
private static boolean invalidBounds(Type type, double[] points) {
// No points or if type is QUANTILE, invalid range if the start < 0 or end > 1
return points.length < 1 || (type == DistributionType.QUANTILE && (points[0] < 0.0 ||
points[points.length - 1] > 1.0));
return points.length < 1 || (type == Type.QUANTILE && (points[0] < 0.0 || points[points.length - 1] > 1.0));
}

// Point generation methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.operations.aggregations;
package com.yahoo.bullet.aggregations;

import com.yahoo.bullet.Utilities;
import com.yahoo.bullet.operations.SerializerDeserializer;
import com.yahoo.bullet.operations.aggregations.grouping.GroupData;
import com.yahoo.bullet.operations.aggregations.grouping.GroupOperation;
import com.yahoo.bullet.aggregations.grouping.GroupData;
import com.yahoo.bullet.aggregations.grouping.GroupOperation;
import com.yahoo.bullet.common.BulletError;
import com.yahoo.bullet.common.SerializerDeserializer;
import com.yahoo.bullet.common.Utilities;
import com.yahoo.bullet.parsing.Aggregation;
import com.yahoo.bullet.parsing.Error;
import com.yahoo.bullet.record.BulletRecord;
import com.yahoo.bullet.result.Clip;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static java.util.Collections.singletonList;
Expand All @@ -27,7 +29,7 @@ public class GroupAll implements Strategy {

private Set<GroupOperation> operations;
/**
* Constructor that takes in an {@link Aggregation}. Requires the aggregation to have generated its group operations.
* Constructor that requires an {@link Aggregation}.
*
* @param aggregation The {@link Aggregation} that specifies how and what this will compute.
*/
Expand All @@ -38,9 +40,9 @@ public GroupAll(Aggregation aggregation) {
}

@Override
public List<Error> initialize() {
public Optional<List<BulletError>> initialize() {
if (Utilities.isEmpty(operations)) {
return singletonList(GroupOperation.REQUIRES_FIELD_OR_OPERATION_ERROR);
return Optional.of(singletonList(GroupOperation.REQUIRES_FIELD_OR_OPERATION_ERROR));
}
return GroupOperation.checkOperations(operations);
}
Expand All @@ -51,17 +53,29 @@ public void consume(BulletRecord data) {
}

@Override
public void combine(byte[] serializedAggregation) {
data.combine(serializedAggregation);
public void combine(byte[] data) {
this.data.combine(data);
}

@Override
public byte[] getSerializedAggregation() {
public byte[] getData() {
return SerializerDeserializer.toBytes(data);
}

@Override
public Clip getAggregation() {
return Clip.of(data.getMetricsAsBulletRecord());
public Clip getResult() {
return Clip.of(getRecords());
}

@Override
public List<BulletRecord> getRecords() {
List<BulletRecord> list = new ArrayList<>();
list.add(data.getMetricsAsBulletRecord());
return list;
}

@Override
public void reset() {
data = new GroupData(operations);
}
}
Loading