Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10141] HL7v2 io timestamps / watermark estimate #11862

Closed
wants to merge 8 commits into from

Conversation

jaketf
Copy link

@jaketf jaketf commented May 29, 2020

Use timestamps / watermark estimate in HL7v2IO.[Read, ListHL7v2Messages] and provide interface to opt-in to TimestampedValue<HL7v2Message> to preserve the existing interface.

R: @lukecwik


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@@ -179,25 +189,51 @@ public static ListHL7v2Messages readAll(List<String> hl7v2Stores) {
return new ListHL7v2Messages(StaticValueProvider.of(hl7v2Stores), StaticValueProvider.of(null));
}

/** Read all HL7v2 Messages from multiple stores as sendTime {@link TimestampedValue}s. */
public static ListTimestampedHL7v2Messages readAllWithTimestamps(List<String> hl7v2Stores) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding static methods for each variant, any reason to not make ListHL7v2Messages be a builder so someone could invoke withMessageTimestamps?

This would have also make sense to make withFilter a builder like method on ListHL7v2Messages

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good point.

Unfortunately, I think having adding replacing the existing static methods with withFilter builder would be interface breaking. I've added this method to ListHL7v2Messages as it might be more natural for some to use.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can keep the static methods that exist and transition them to use the builder underneath so you don't have to break the interface. This is done in other IOs for common scenarios.

@jaketf jaketf requested a review from lukecwik June 3, 2020 02:48
@aaltay
Copy link
Member

aaltay commented Jun 12, 2020

retest this please

2 similar comments
@aaltay
Copy link
Member

aaltay commented Jun 18, 2020

retest this please

@aaltay
Copy link
Member

aaltay commented Jun 18, 2020

retest this please

@lukecwik
Copy link
Member

Sorry for the long delay.

* @param initialSplitDuration the initial split duration
* @return the list hl 7 v 2 messages
*/
ListHL7v2Messages withInitialSplitDuration(Duration initialSplitDuration) {
this.initialSplitDuration = initialSplitDuration;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please return a new version of this transform instead of mutating the existing version.

It is not uncommon for users to take one transform apply it to the graph once and then call withX on it and apply it elsewhere. We wouldn't want the second mutation to ever impact the first application of the transform and it is easy to guard against it using the @autovalue builders.

* disjoint sendTime filters of the specified duration.
*
* @param initialSplitDuration the initial split duration
* @return the list hl 7 v 2 messages
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

* or the input element's timestamp.
*
* @param timestampMethod the timestamp method
* @return the list hl 7 v 2 messages
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Comment on lines +494 to +495
* Controls if the output elements will be assigned a timestamp beased on the sendTime property
* or the input element's timestamp.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Controls if the output elements will be assigned a timestamp beased on the sendTime property
* or the input element's timestamp.
* Controls if the output elements will be assigned a timestamp beased on the {@code sendTime} property
* or the input element's timestamp.

* @param timestampMethod the timestamp method
* @return the list hl 7 v 2 messages
*/
ListHL7v2Messages withTimestampMethod(TimestampMethod timestampMethod) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about withOutputTimestampMethod, also check that the argument is non-null?

nit: I'm torn between using an enum or just having the two variants between methods. Your call. Having two variants prevents the user from passing in null here.

* <p>Message Listing Message Listing with {@link HL7v2IO.ListHL7v2Messages} supports batch use
* cases where you want to process all the messages in an HL7v2 store or those matching a
* filter @see <a
* <p>Message Listing Message Listing with {@link ListHL7v2Messages} and {@link ListHL7v2Messages}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Suggested change
* <p>Message Listing Message Listing with {@link ListHL7v2Messages} and {@link ListHL7v2Messages}
* <p>Message Listing with {@link ListHL7v2Messages} and {@link ListHL7v2Messages}

outputReceiver.outputWithTimestamp(msg, Instant.parse(msg.getSendTime()));
break;
case INPUT_ELEMENT:
outputReceiver.outputWithTimestamp(msg, context.timestamp());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use output(msg) and not outputWithTimestamp since output() by default does this. This also allows you to not have to take a ProcessContext parameter.

@@ -509,13 +561,17 @@ public ListHL7v2Messages withInitialSplitDuration(Duration initialSplitDuration)
* @param filter the filter
*/
ListHL7v2MessagesFn(String filter) {
this(StaticValueProvider.of(filter), null);
this(StaticValueProvider.of(filter), null, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We pass in null to the constructor and don't handle that case in the switch statement. How about passing in the default?

private Duration initialSplitDuration;
private TimestampMethod timestampMethod;

enum TimestampMethod {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment?

@lukecwik
Copy link
Member

Ping?

@aaltay
Copy link
Member

aaltay commented Sep 1, 2020

@jaketf - Is this PR still active?

@jaketf jaketf marked this pull request as draft September 1, 2020 19:23
@jaketf
Copy link
Author

jaketf commented Sep 1, 2020

apologies I'm not working w/ healthcare clients these days and unable to prioritize this.
I can take a look at this again after my current engagements end (~october)

@stale
Copy link

stale bot commented Nov 7, 2020

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Nov 7, 2020
@stale
Copy link

stale bot commented Nov 16, 2020

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@stale stale bot closed this Nov 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants