Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEMO-252] Fix CreatViewTransform to emit windowed materialized data #141

Merged
merged 15 commits into from Nov 2, 2018

Conversation

taegeonum
Copy link
Contributor

@taegeonum taegeonum commented Nov 1, 2018

JIRA: NEMO-252: Fix CreatViewTransform to emit windowed materialized data

Major changes:

  • Fix CreateViewTransform to collect windowed data and emit them by applying a view function

Minor changes

  • Fix emitting output watermarks in GroupByKeyAndWindowDoFnTransform

Tests for the changes:

  • CreateViewTransformTest that tests materialized data in windows

Copy link
Contributor

@johnyangk johnyangk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @taegeonum. I've left comments.

private final MultiView<Object> multiView;
private final Map<BoundedWindow, List<I>> windowListMap;

// TODO #XXX: we should remove this variable by refactoring broadcast worker for side input
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a JIRA issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

iterator.remove();
isEmitted = true;

if (outputTimestamp > entry.getKey().maxTimestamp().getMillis()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outputTimeStamp = Math.max(outputTimeStamp, entry.getKey().maxTimestamp().getMillis()) ?

}

final Iterator<Map.Entry<BoundedWindow, List<I>>> iterator = windowListMap.entrySet().iterator();
long outputTimestamp = Long.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxOutputTimeStampOfEmittedWindows?


// TODO #XXX: we should remove this variable by refactoring broadcast worker for side input
private boolean isEmitted = false;
private long outputWatermark;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastEmittedWatermark?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change it to currentOutputWatermark

final Instant processingTime,
final Instant synchronizedTime) {
keyToValues.forEach((key, val) -> {
long outputTimestamp = Long.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxOutputTimeStampOfEmittedWindows?


for (final Map.Entry<K, List<WindowedValue<InputT>>> entry : keyToValues.entrySet()) {
final K key = entry.getKey();
final List<WindowedValue<InputT>> val = entry.getValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val -> values

final CreateViewTransform<String, Integer> viewTransform =
new CreateViewTransform(new SumViewFn());

final Instant ts1 = new Instant(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Share code with GroupByKeyAndWindowDoFnTransformTest?


@Override
public Materialization<Materializations.MultimapView<Void, String>> getMaterialization() {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new UnSupportedOperation?

Copy link
Contributor Author

@taegeonum taegeonum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnyangk Thanks for the review! I've addressed your comments

private final MultiView<Object> multiView;
private final Map<BoundedWindow, List<I>> windowListMap;

// TODO #XXX: we should remove this variable by refactoring broadcast worker for side input
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


// TODO #XXX: we should remove this variable by refactoring broadcast worker for side input
private boolean isEmitted = false;
private long outputWatermark;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change it to currentOutputWatermark

johnyangk
johnyangk previously approved these changes Nov 2, 2018
@johnyangk johnyangk merged commit a726a5e into apache:master Nov 2, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants