New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-10650] Windmill implementation for TimestampOrderedState #12864
Conversation
47d0139
to
a3057b9
Compare
R: @dpmills |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this unblock any ValidatesRunner
tests?
* Tracker for the ids used in an ordered list. | ||
* | ||
* <p>Windmill accepts an int64 id for each timestamped-element in the list. Unique elements are | ||
* identified by the pair of timestamp and id. This means that tow unique elements e1, e2 must |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* identified by the pair of timestamp and id. This means that tow unique elements e1, e2 must | |
* identified by the pair of timestamp and id. This means that two unique elements e1, e2 must |
* | ||
* <p>Windmill accepts an int64 id for each timestamped-element in the list. Unique elements are | ||
* identified by the pair of timestamp and id. This means that tow unique elements e1, e2 must | ||
* have different (ts1, id1), (ts2, id2) pairs. To accomplish this we bucket time into five-minute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should call out that in the case where there is no unique id, there is loss of information.
Maybe should think about configuration which has to be explicitly overwritten to allow for inserts of two identical elements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand. These ids are not visible to the user, rather they are an internal implementation detail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, my misunderstanding... where is the id created ? Is it based on the object hash?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. A set of available id ranges is stored in an (internal) ValueState, and this class picks a free id from the range.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah i see that now further down. Nice solution.
public Iterable<TimestampedValue<T>> readRange(Instant minTimestamp, Instant limitTimestamp) { | ||
throw new UnsupportedOperationException( | ||
String.format("%s is not supported", OrderedListState.class.getSimpleName())); | ||
public Iterable<TimestampedValue<T>> readRange( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be useful to have a readAndRemoveRange op. When in OnTimer doing process up to this point work, the next step after reading is to clear. So this would save having to make two calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe, but right now I think that would be premature. Users only get charged for bytes transferred, so it's unclear that this would save end users much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I would like to see this in the OrderedListState API, as it would reduce the code the dev needs to write...
private static class WindmillOrderedList<T> extends SimpleWindmillState | ||
implements OrderedListState<T> { | ||
// Coder for closed-open ranges. | ||
private static class RangeCoder<T extends Comparable> extends CustomCoder<Range<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably extend StructuredCoder instead of CustomCoder, although it may not matter if it's never serialized. If you leave it as CustomCoder, implement hashCode and equals
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
} | ||
|
||
private static class RangeSetCoder<T extends Comparable> extends CustomCoder<RangeSet<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could simplify this a little with DelegateCoder
insertBuilder.addEntries( | ||
SortedListEntry.newBuilder() | ||
.setValue(elementStream.toByteString()) | ||
.setSortKey(elem.getTimestamp().getMillis() * 1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For all of these "millis * 1000" calls, use WindmillTimeUtils.harnessToWindmillTimestamp()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
for (Range<Instant> current = getTrackedRange(tsRange.lowerEndpoint()); | ||
current.lowerEndpoint().isBefore(tsRange.upperEndpoint()); | ||
current = getTrackedRange(current.lowerEndpoint().plus(RESOLUTION))) { | ||
// TODO: shouldn't need to iterate over all ranges. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put your username or a JIRA issue in the TODOs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
try { | ||
T value = elemCoder.decode(entry.getValue().newInput(), Coder.Context.OUTER); | ||
entryList.addWeighted( | ||
TimestampedValue.of(value, Instant.ofEpochMilli(entry.getSortKey() / 1000)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use WindmillTimeUtils.windmillToHarnessTimestamp(). Current code is wrong for negative timestamps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Run Dataflow ValidatesRunner |
51f72ea
to
49ab7e6
Compare
Run Dataflow ValidatesRunner |
1 similar comment
Run Dataflow ValidatesRunner |
Run Dataflow ValidatesRunner |
1 similar comment
Run Dataflow ValidatesRunner |
Run Dataflow ValidatesRunner |
Run Dataflow ValidatesRunner |
1 similar comment
Run Dataflow ValidatesRunner |
No description provided.