-
Notifications
You must be signed in to change notification settings - Fork 18
Windowing and general refactoring #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. I've successfully used it in my spark for windowless cases.
Sent out comments for my first round of looking.
* @param config The config that has relevant configs for this strategy. It is unused currently. | ||
*/ | ||
public GroupAll(Aggregation aggregation) { | ||
public GroupAll(Aggregation aggregation, BulletConfig config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config is never been used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wanted to keep the constructors the same. Will remove
|
||
@Override | ||
public List<BulletRecord> getRecords() { | ||
List<BulletRecord> asList = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename it? it's the same as the Method Arrays.asList
protected boolean updated = false; | ||
protected boolean unioned = false; | ||
// While this class could implement Monoidal, it does not need the full breadth of those methods and it would | ||
// receiving data as BulletRecord for one off operations, which is cumbersome. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Remember to remove it before merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove what? This comment?
* {@link #collect()} before performing any operations that change data in the sketch such as getting metadata or | ||
* results or resetting. | ||
*/ | ||
public abstract class DualSketch extends Sketch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add
protected Map<String, Object> addMetadata(Map<String, String> conceptKeys) {
collet();
return super.addMetadata(conceptKeys);
}
So all the subclasses don't need to call collect in their addMetadata method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want anything in DualSketch that called collect. Just wanted it to be about the managing metadata related to how collect is idempotent. KMVSketch already does it for Theta and Tuple, so this will only affect QuantileSketch really.
I guess right now the lowest class in the class hierarchy that has a method defined is responsible for calling collect. Maybe it's easier to understand to keep it that way?
* Default constructor. GSON recommended. | ||
*/ | ||
public Query() { | ||
filters = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default value is already null?
timestampKey = config.getAs(BulletConfig.RECORD_INJECT_TIMESTAMP_KEY, String.class); | ||
} | ||
|
||
boolean isRateLimited = config.getAs(BulletConfig.RATE_LIMIT_ENABLE, Boolean.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/isRateLimited/isRateLimitEnabled
private BulletConfig config; | ||
private Map<String, String> metaKeys; | ||
private String timestampKey; | ||
private boolean haveData = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/havaData/hasData
|
||
@Override | ||
public void consume(BulletRecord data) { | ||
if (isClosed() || data == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consume and combine, RAW checks isClosed, but others don't. Is it because no sketch in RAW? It's kinda not consistent with other queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I can remove it. You're right - the inconsistency is weird. Since we made the change to make Querier not do any checking, we should remove it. It was just there as a safety to make sure users, if not honoring isClosed (or isClosedForPartition), don't consume and cause OOM issues.
|
||
private boolean timeIsUp() { | ||
// Never add to query.getDuration since it can be infinite (Long.MAX_VALUE) | ||
return System.currentTimeMillis() - runningQuery.getStartTime() >= runningQuery.getQuery().getDuration(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you extract this function to RunningQuery class, and change here as runningQuery.timeIsUp()? Thus we can mock it when writing tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Will do
* | ||
* @return A boolean denoting whether this query has timed out. | ||
*/ | ||
public boolean isTimedOut() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isTimeOut?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think isTimedOut sounds better with RunningQuery.isTimedOut
} | ||
|
||
@Test | ||
public void testTimingOut() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mock System.currentTimeMillis() and get rid of sleep if not too difficult? This test may fail due to system clock problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want to use things like Powermock to mock static methods if possible. Any other suggestions? I agree it may fail. By the way, there are multiple tests that use this kind of sleep in the Tumbling window tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have better suggestions. Then just leave it as.
// No points or if type is QUANTILE, invalid range if the start < 0 or end > 1 | ||
return points.length < 1 || (type == DistributionType.QUANTILE && (points[0] < 0.0 || | ||
return points.length < 1 || (type == Type.QUANTILE && (points[0] < 0.0 || | ||
points[points.length - 1] > 1.0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation?
makeError("This aggregation type requires at least one field", REQUIRES_FEED_RESOLUTION); | ||
|
||
/** | ||
* Returns false if more data will be not be consumed or combined. This method can be used to avoid passing more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo.
int maxMapSize = config.getAs(BulletConfig.TOP_K_AGGREGATION_SKETCH_ENTRIES, Integer.class); | ||
Number threshold = getThreshold(attributes); | ||
|
||
int maxSize = config.getAs(BulletConfig.AGGREGATION_MAX_SIZE, Integer.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not. Removing
return metadata; | ||
@Override | ||
protected void collectUnionSketch() { | ||
merged = unionSketch.getResult(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So here after collect() is called we are not reseting unionSketch and updateSketch. That may produce incorrect results if a user:
- calls getResult() (which calls collect())
- calls update()
- calls getResult() again
In this case all the data that was in the sketch before the first call to getResult() will be "counted" twice since updateSketch was never reset all that data will still be in there and will be merged into "merged" again.
Maybe reset those sketches after collect() is called?
Is this a use case we want to worry about?
(same thing for ThetaSketch)
addIfNonNull(metadata, conceptKeys, Concept.SKETCH_THETA, this::getTheta); | ||
return metadata; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like getStandardDeviations() (below) will give weird standard deviations if it's called before collect() or getResults(). Maybe add a warning about this to the comments for this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a private method so no one else can call it. Only this class controls when to call it and it calls it after calling merge()
"Please set \"type\" to one of: \"TIME\" or \"RECORD\""); | ||
public static final BulletError IMPROPER_EVERY = makeError("The \"every\" field was missing or had bad values", | ||
"Please set \"every\" to a positive integer"); | ||
public static final BulletError IMPROPER_INCLUDE = makeError("The \"include\" field has to match \"emit\" or not be set", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Include can be "ALL"... maybe mention that here as well.
int minEmitTime = config.getAs(BulletConfig.WINDOW_MIN_EMIT_EVERY, Integer.class); | ||
// Clamp upward to minimum | ||
if (every.intValue() < minEmitTime) { | ||
emit.put(EMIT_EVERY_FIELD, minEmitTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log a warning here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we don't want to log things when we apply configuration. Otherwise, logs will be cluttered with query values being changed (like duration etc).
|
||
@Override | ||
public Optional<List<BulletError>> initialize() { | ||
if (emitType == null || emitType == Unit.ALL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If they omit "window" from the query completely, will this return an error? If window is emitted GSON will parse it to null, then configure calls getUnit() to set emitType, which sets it to null if there is now window? I think I recall us discussing how to handle an omitted "window" optimally, but I don't see that happening here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the window is null, the Query class will not call initialize on it.
// to t's type causing the stream to be empty, the any/all/none matches will behave as above. | ||
// For example: | ||
// SOME_LONG_VALUE EQ [1.23, 35.2] will be false | ||
// SOME_LONG_VALUE NE [1.23. 425.3] will be false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks to me like this case would throw a runtime exception. NE would do (i -> t.compareTo(i)) where "t" is a long. Then TypedObject.compareTo would try to cast the double to a Long and throw a runtime exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would if it were longs but these are actually all strings since our filter values are a list of strings. See the cast function in this class where the arg is a List<String>
. In the future, if we want to add bidirectional casting (force the data to the type of the filter value provided by the user) then we'd have to worry about this stuff.
* supplier produces null, it is not added. | ||
*/ | ||
public static void addIfNonNull(Map<String, Object> meta, Map<String, String> names, Concept concept, | ||
Supplier<Object> supplier) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use a Supplier? Why not just pass the object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because getting that Object might be expensive and we only want to run the getting of it if the key had a mapping. That is, if the user of this did not want a particular meta key, we shouldn't needlessly run possibly expensive code (like getting Sketch metadata etc).
No description provided.