Skip to content

Commit

Permalink
Add display data to windowing transforms
Browse files Browse the repository at this point in the history
  • Loading branch information
swegner committed Apr 14, 2016
1 parent 96765f1 commit 526bf2a
Show file tree
Hide file tree
Showing 32 changed files with 553 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.apache.beam.sdk.util.ExecutableTrigger;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;

import org.joda.time.Instant;
Expand Down Expand Up @@ -112,4 +113,13 @@ public void onOnlyFiring(TriggerContext context) throws Exception {
subtrigger.invokeOnFire(context);
}
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder("AfterAll.of(");
Joiner.on(", ").appendTo(builder, subTriggers);
builder.append(")");

return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@

import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.PeriodFormat;
import org.joda.time.format.PeriodFormatter;

import java.util.List;
import java.util.Locale;
import java.util.Objects;

import javax.annotation.Nullable;

/**
Expand All @@ -59,6 +61,8 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger {
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
"delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder()));

private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH);

/**
* To complete an implementation, return the desired time from the TriggerContext.
*/
Expand Down Expand Up @@ -276,6 +280,11 @@ public boolean equals(Object object) {
public int hashCode() {
return Objects.hash(delay);
}

@Override
public String toString() {
return PERIOD_FORMATTER.print(delay.toPeriod());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.util.ExecutableTrigger;

import com.google.common.base.Joiner;
import org.joda.time.Instant;

import java.util.Arrays;
Expand Down Expand Up @@ -127,6 +128,15 @@ public void onFire(Trigger.TriggerContext context) throws Exception {
updateFinishedState(context);
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder("AfterEach.inOrder(");
Joiner.on(", ").appendTo(builder, subTriggers);
builder.append(")");

return builder.toString();
}

private void updateFinishedState(TriggerContext context) {
context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.apache.beam.sdk.util.ExecutableTrigger;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;

import org.joda.time.Instant;
Expand Down Expand Up @@ -108,6 +109,15 @@ protected void onOnlyFiring(TriggerContext context) throws Exception {
}
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder("AfterFirst.of(");
Joiner.on(", ").appendTo(builder, subTriggers);
builder.append(")");

return builder.toString();
}

private void updateFinishedStatus(TriggerContext c) {
boolean anyFinished = false;
for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import java.util.List;
import java.util.Objects;

import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -74,7 +73,15 @@ protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {

@Override
public String toString() {
return "AfterProcessingTime.pastFirstElementInPane(" + timestampMappers + ")";
StringBuilder builder = new StringBuilder("AfterProcessingTime.pastFirstElementInPane()");
for (SerializableFunction<Instant, Instant> delayFn : timestampMappers) {
builder
.append(".plusDelayOf(")
.append(delayFn)
.append(")");
}

return builder.toString();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
@Experimental(Experimental.Kind.TRIGGER)
public class AfterWatermark {

private static final String TO_STRING = "AfterWatermark.pastEndOfWindow()";

// Static factory class.
private AfterWatermark() {}

Expand Down Expand Up @@ -258,6 +260,26 @@ public void onFire(Trigger.TriggerContext context) throws Exception {
}
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder(TO_STRING);

Trigger earlyTrigger = subTriggers.get(EARLY_INDEX);
if (!(earlyTrigger instanceof NeverTrigger)) {
builder
.append(".withEarlyFirings(")
.append(earlyTrigger)
.append(")");
}

builder
.append(".withLateFirings(")
.append(subTriggers.get(LATE_INDEX))
.append(")");

return builder.toString();
}

private void onNonLateFiring(Trigger.TriggerContext context) throws Exception {
// We have not yet transitioned to late firings.
ExecutableTrigger earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX);
Expand Down Expand Up @@ -368,7 +390,7 @@ public FromEndOfWindow getContinuationTrigger(List<Trigger> continuationTriggers

@Override
public String toString() {
return "AfterWatermark.pastEndOfWindow()";
return TO_STRING;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms.windowing;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.display.DisplayData;

import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
Expand All @@ -35,14 +36,16 @@
*/
public class CalendarWindows {

private static final DateTime DEFAULT_START_DATE = new DateTime(0, DateTimeZone.UTC);

/**
* Returns a {@link WindowFn} that windows elements into periods measured by days.
*
* <p>For example, {@code CalendarWindows.days(1)} will window elements into
* separate windows for each day.
*/
public static DaysWindows days(int number) {
return new DaysWindows(number, new DateTime(0, DateTimeZone.UTC), DateTimeZone.UTC);
return new DaysWindows(number, DEFAULT_START_DATE, DateTimeZone.UTC);
}

/**
Expand All @@ -54,7 +57,7 @@ public static DaysWindows days(int number) {
public static DaysWindows weeks(int number, int startDayOfWeek) {
return new DaysWindows(
7 * number,
new DateTime(0, DateTimeZone.UTC).withDayOfWeek(startDayOfWeek),
DEFAULT_START_DATE.withDayOfWeek(startDayOfWeek),
DateTimeZone.UTC);
}

Expand All @@ -67,7 +70,7 @@ public static DaysWindows weeks(int number, int startDayOfWeek) {
* and the first window begins in January 2014.
*/
public static MonthsWindows months(int number) {
return new MonthsWindows(number, 1, new DateTime(0, DateTimeZone.UTC), DateTimeZone.UTC);
return new MonthsWindows(number, 1, DEFAULT_START_DATE, DateTimeZone.UTC);
}

/**
Expand All @@ -79,7 +82,7 @@ public static MonthsWindows months(int number) {
* America/Los_Angeles time zone.
*/
public static YearsWindows years(int number) {
return new YearsWindows(number, 1, 1, new DateTime(0, DateTimeZone.UTC), DateTimeZone.UTC);
return new YearsWindows(number, 1, 1, DEFAULT_START_DATE, DateTimeZone.UTC);
}

/**
Expand Down Expand Up @@ -142,6 +145,14 @@ public boolean isCompatible(WindowFn<?, ?> other) {
&& timeZone == that.timeZone;
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add("numDays", number)
.addIfNotDefault("startDate", new DateTime(startDate, timeZone).toInstant(),
new DateTime(DEFAULT_START_DATE, DateTimeZone.UTC).toInstant());
}

public int getNumber() {
return number;
}
Expand Down Expand Up @@ -229,6 +240,14 @@ public boolean isCompatible(WindowFn<?, ?> other) {
&& timeZone == that.timeZone;
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add("numMonths", number)
.addIfNotDefault("startDate", new DateTime(startDate, timeZone).toInstant(),
new DateTime(DEFAULT_START_DATE, DateTimeZone.UTC).toInstant());
}

public int getNumber() {
return number;
}
Expand Down Expand Up @@ -325,6 +344,14 @@ public boolean isCompatible(WindowFn<?, ?> other) {
&& timeZone == that.timeZone;
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add("numYears", number)
.addIfNotDefault("startDate", new DateTime(startDate, timeZone).toInstant(),
new DateTime(DEFAULT_START_DATE, DateTimeZone.UTC).toInstant());
}

public DateTimeZone getTimeZone() {
return timeZone;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,9 @@ private boolean endOfWindowReached(Trigger.TriggerContext context) {

@Override
public void onFire(Trigger.TriggerContext context) throws Exception { }

@Override
public String toString() {
return "Repeatedly.forever(AfterWatermark.pastEndOfWindow)";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms.windowing;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.display.DisplayData;

import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -82,6 +83,13 @@ public IntervalWindow assignWindow(Instant timestamp) {
return new IntervalWindow(new Instant(start), size);
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add("size", size)
.addIfNotDefault("offset", offset, Duration.ZERO);
}

@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public void onFire(Trigger.TriggerContext context) throws Exception {
updateFinishedState(context);
}

@Override
public String toString() {
return String.format("%s.orFinally(%s)", subTriggers.get(ACTUAL), subTriggers.get(UNTIL));
}

private void updateFinishedState(TriggerContext c) throws Exception {
boolean anyStillFinished = false;
for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class Repeatedly extends Trigger {
private static final int REPEATED = 0;

/**
* Create a composite trigger that repeatedly executes the trigger {@code toRepeat}, firing each
* Create a composite trigger that repeatedly executes the trigger {@code repeated}, firing each
* time it fires and ignoring any indications to finish.
*
* <p>Unless used with {@link Trigger#orFinally} the composite trigger will never finish.
Expand Down Expand Up @@ -92,6 +92,11 @@ public void onFire(TriggerContext context) throws Exception {
}
}

@Override
public String toString() {
return String.format("Repeatedly.forever(%s)", subTriggers.get(REPEATED));
}

private ExecutableTrigger getRepeated(TriggerContext context) {
return context.trigger().subTrigger(REPEATED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.display.DisplayData;

import org.joda.time.Duration;

Expand Down Expand Up @@ -97,6 +98,11 @@ public Duration getGapDuration() {
return gapDuration;
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add("gapDuration", gapDuration);
}

@Override
public boolean equals(Object object) {
if (!(object instanceof Sessions)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.display.DisplayData;

import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -139,6 +140,14 @@ public boolean isCompatible(WindowFn<?, ?> other) {
return equals(other);
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add("size", size)
.add("period", period)
.add("offset", offset);
}

/**
* Return the last start of a sliding window that contains the timestamp.
*/
Expand Down

0 comments on commit 526bf2a

Please sign in to comment.