From b716615e9cfad5352131101e98e5a53f70fc52e6 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 7 Mar 2017 14:24:12 -0800 Subject: [PATCH] Populate display data on Window.Assign --- .../beam/sdk/transforms/windowing/Window.java | 89 ++++++++++++------- 1 file changed, 57 insertions(+), 32 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 94870ff23116..d9be1ac68eae 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -487,38 +487,14 @@ public PCollection expand(PCollection input) { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - if (windowFn != null) { - builder - .add(DisplayData.item("windowFn", windowFn.getClass()) - .withLabel("Windowing Function")) - .include("windowFn", windowFn); - } - - if (allowedLateness != null) { - builder.addIfNotDefault(DisplayData.item("allowedLateness", allowedLateness) - .withLabel("Allowed Lateness"), - Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); - } - - if (trigger != null && !(trigger instanceof DefaultTrigger)) { - builder.add(DisplayData.item("trigger", trigger.toString()) - .withLabel("Trigger")); - } - - if (mode != null) { - builder.add(DisplayData.item("accumulationMode", mode.toString()) - .withLabel("Accumulation Mode")); - } - - if (closingBehavior != null) { - builder.add(DisplayData.item("closingBehavior", closingBehavior.toString()) - .withLabel("Window Closing Behavior")); - } - - if (outputTimeFn != null) { - builder.add(DisplayData.item("outputTimeFn", outputTimeFn.getClass()) - .withLabel("Output Time Function")); - } + populateWindowingStrategyDisplayData( + WindowingStrategy.of(windowFn) + .withAllowedLateness(allowedLateness) + .withClosingBehavior(closingBehavior) + .withTrigger(trigger) + .withOutputTimeFn(outputTimeFn) + .withMode(mode), + builder); } @Override @@ -557,6 +533,55 @@ public PCollection expand(PCollection input) { public WindowFn getWindowFn() { return updatedStrategy.getWindowFn(); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + populateWindowingStrategyDisplayData(updatedStrategy, builder); + } + } + + private static void populateWindowingStrategyDisplayData( + WindowingStrategy windowingStrategy, DisplayData.Builder builder) { + if (windowingStrategy.getWindowFn() != null) { + builder + .add( + DisplayData.item("windowFn", windowingStrategy.getWindowFn().getClass()) + .withLabel("Windowing Function")) + .include("windowFn", windowingStrategy.getWindowFn()); + } + + if (windowingStrategy.getAllowedLateness() != null) { + builder.addIfNotDefault( + DisplayData.item("allowedLateness", windowingStrategy.getAllowedLateness()) + .withLabel("Allowed Lateness"), + Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); + } + + if (windowingStrategy.getTrigger() != null + && !(windowingStrategy.getTrigger() instanceof DefaultTrigger)) { + builder.add( + DisplayData.item("trigger", windowingStrategy.getTrigger().toString()) + .withLabel("Trigger")); + } + + if (windowingStrategy.getMode() != null) { + builder.add( + DisplayData.item("accumulationMode", windowingStrategy.getMode().toString()) + .withLabel("Accumulation Mode")); + } + + if (windowingStrategy.getClosingBehavior() != null) { + builder.add( + DisplayData.item("closingBehavior", windowingStrategy.getClosingBehavior().toString()) + .withLabel("Window Closing Behavior")); + } + + if (windowingStrategy.getOutputTimeFn() != null) { + builder.add( + DisplayData.item("outputTimeFn", windowingStrategy.getOutputTimeFn().getClass()) + .withLabel("Output Time Function")); + } } /**