From a27580be238ab00f6866cde7d08a62c59cba215d Mon Sep 17 00:00:00 2001 From: Lalit Yadav Date: Tue, 19 May 2026 00:00:53 -0500 Subject: [PATCH 1/4] feat: Add Java LogElements transform --- .../beam/sdk/transforms/LogElements.java | 207 ++++++++++++++++++ .../beam/sdk/transforms/LogElementsTest.java | 86 ++++++++ 2 files changed, 293 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java new file mode 100644 index 000000000000..76ea7676142d --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.Objects; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.PCollection; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +/** + * {@link PTransform} for logging elements of a {@link PCollection}. + * + *

Each element is logged and then emitted unchanged. + * + *

{@code
+ * PCollection words = ...;
+ * PCollection loggedWords = words.apply(LogElements.info().withPrefix("word: "));
+ * }
+ * + * @param the element type of the input {@link PCollection} + */ +public class LogElements extends PTransform, PCollection> { + private static final Logger LOG = LoggerFactory.getLogger(LogElements.class); + + private final Level level; + private final String prefix; + private final boolean withTimestamp; + private final boolean withWindow; + private final boolean withPaneInfo; + + /** Returns a {@link LogElements} transform that logs elements at {@link Level#TRACE}. */ + public static LogElements trace() { + return of(Level.TRACE); + } + + /** Returns a {@link LogElements} transform that logs elements at {@link Level#DEBUG}. */ + public static LogElements debug() { + return of(Level.DEBUG); + } + + /** Returns a {@link LogElements} transform that logs elements at {@link Level#INFO}. */ + public static LogElements info() { + return of(Level.INFO); + } + + /** Returns a {@link LogElements} transform that logs elements at {@link Level#WARN}. */ + public static LogElements warn() { + return of(Level.WARN); + } + + /** Returns a {@link LogElements} transform that logs elements at {@link Level#ERROR}. */ + public static LogElements error() { + return of(Level.ERROR); + } + + /** Returns a {@link LogElements} transform that logs elements at the given level. */ + public static LogElements of(Level level) { + return new LogElements<>(level, "", false, false, false); + } + + private LogElements( + Level level, String prefix, boolean withTimestamp, boolean withWindow, boolean withPaneInfo) { + this.level = Objects.requireNonNull(level, "level"); + this.prefix = Objects.requireNonNull(prefix, "prefix"); + this.withTimestamp = withTimestamp; + this.withWindow = withWindow; + this.withPaneInfo = withPaneInfo; + } + + /** Returns a new {@link LogElements} transform with the given prefix before each element. */ + public LogElements withPrefix(String prefix) { + return new LogElements<>(level, prefix, withTimestamp, withWindow, withPaneInfo); + } + + /** Returns a new {@link LogElements} transform that logs each element's timestamp. */ + public LogElements withTimestamp() { + return new LogElements<>(level, prefix, true, withWindow, withPaneInfo); + } + + /** Returns a new {@link LogElements} transform that logs each element's window. */ + public LogElements withWindow() { + return new LogElements<>(level, prefix, withTimestamp, true, withPaneInfo); + } + + /** Returns a new {@link LogElements} transform that logs each element's pane info. */ + public LogElements withPaneInfo() { + return new LogElements<>(level, prefix, withTimestamp, withWindow, true); + } + + @Override + public PCollection expand(PCollection input) { + return input.apply("Log", ParDo.of(new LoggingFn<>(this))).setCoder(input.getCoder()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .add(DisplayData.item("level", level.name()).withLabel("Log Level")) + .addIfNotDefault(DisplayData.item("prefix", prefix).withLabel("Prefix"), "") + .addIfNotDefault( + DisplayData.item("withTimestamp", withTimestamp).withLabel("Log Timestamp"), false) + .addIfNotDefault(DisplayData.item("withWindow", withWindow).withLabel("Log Window"), false) + .addIfNotDefault( + DisplayData.item("withPaneInfo", withPaneInfo).withLabel("Log Pane Info"), false); + } + + static String formatForLogging( + @Nullable Object element, + String prefix, + boolean withTimestamp, + boolean withWindow, + boolean withPaneInfo, + Instant timestamp, + BoundedWindow window, + PaneInfo paneInfo) { + StringBuilder builder = new StringBuilder(prefix).append(element); + if (withTimestamp) { + builder.append(", timestamp=").append(timestamp); + } + if (withWindow) { + builder.append(", window=").append(window); + } + if (withPaneInfo) { + builder.append(", paneInfo=").append(paneInfo); + } + return builder.toString(); + } + + private static void log(Level level, String message) { + switch (level) { + case TRACE: + LOG.trace("{}", message); + break; + case DEBUG: + LOG.debug("{}", message); + break; + case INFO: + LOG.info("{}", message); + break; + case WARN: + LOG.warn("{}", message); + break; + case ERROR: + default: + LOG.error("{}", message); + } + } + + private static class LoggingFn extends DoFn { + private final Level level; + private final String prefix; + private final boolean withTimestamp; + private final boolean withWindow; + private final boolean withPaneInfo; + + private LoggingFn(LogElements transform) { + this.level = transform.level; + this.prefix = transform.prefix; + this.withTimestamp = transform.withTimestamp; + this.withWindow = transform.withWindow; + this.withPaneInfo = transform.withPaneInfo; + } + + @ProcessElement + public void processElement( + @Element T element, + @DoFn.Timestamp Instant timestamp, + BoundedWindow window, + PaneInfo paneInfo, + OutputReceiver receiver) { + log( + level, + formatForLogging( + element, + prefix, + withTimestamp, + withWindow, + withPaneInfo, + timestamp, + window, + paneInfo)); + receiver.output(element); + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java new file mode 100644 index 000000000000..d78be4703a7a --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; + +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.event.Level; + +/** Tests for {@link LogElements}. */ +@RunWith(JUnit4.class) +public class LogElementsTest { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + @Test + @Category(NeedsRunner.class) + public void testLogElementsPreservesElements() { + List elements = Arrays.asList("a", "b", "c"); + + PCollection output = + pipeline.apply(Create.of(elements)).apply(LogElements.info()); + + PAssert.that(output).containsInAnyOrder(elements); + pipeline.run(); + } + + @Test + public void testFormatForLoggingIncludesConfiguredMetadata() { + Instant timestamp = new Instant(0); + IntervalWindow window = new IntervalWindow(timestamp, Duration.standardMinutes(1)); + + String message = + LogElements.formatForLogging( + "a", "row: ", true, true, true, timestamp, window, PaneInfo.NO_FIRING); + + assertThat(message, containsString("row: a")); + assertThat(message, containsString("timestamp=1970-01-01T00:00:00.000Z")); + assertThat( + message, containsString("window=[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)")); + assertThat(message, containsString("paneInfo=PaneInfo.NO_FIRING")); + } + + @Test + public void testDisplayData() { + DisplayData displayData = + DisplayData.from( + LogElements.of(Level.WARN).withPrefix("row: ").withTimestamp().withWindow()); + + assertThat(displayData, hasDisplayItem("level", "WARN")); + assertThat(displayData, hasDisplayItem("prefix", "row: ")); + assertThat(displayData, hasDisplayItem("withTimestamp", true)); + assertThat(displayData, hasDisplayItem("withWindow", true)); + } +} From c34d3eef23312db98baedb8342dcd51d7849798c Mon Sep 17 00:00:00 2001 From: Lalit Yadav Date: Tue, 19 May 2026 00:33:38 -0500 Subject: [PATCH 2/4] Addressed LogElements review feedback --- .../beam/sdk/transforms/LogElements.java | 51 +++++++++---- .../beam/sdk/transforms/LogElementsTest.java | 72 ++++++++++++++++++- 2 files changed, 109 insertions(+), 14 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java index 76ea7676142d..d73f31a0bf2e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java @@ -110,7 +110,7 @@ public LogElements withPaneInfo() { @Override public PCollection expand(PCollection input) { - return input.apply("Log", ParDo.of(new LoggingFn<>(this))).setCoder(input.getCoder()); + return input.apply("Log", ParDo.of(new LoggingFn<>(this))); } @Override @@ -163,11 +163,34 @@ private static void log(Level level, String message) { LOG.warn("{}", message); break; case ERROR: - default: LOG.error("{}", message); + break; + default: + throw unsupportedLogLevel(level); } } + private static boolean isLoggingEnabled(Level level) { + switch (level) { + case TRACE: + return LOG.isTraceEnabled(); + case DEBUG: + return LOG.isDebugEnabled(); + case INFO: + return LOG.isInfoEnabled(); + case WARN: + return LOG.isWarnEnabled(); + case ERROR: + return LOG.isErrorEnabled(); + default: + throw unsupportedLogLevel(level); + } + } + + private static IllegalArgumentException unsupportedLogLevel(Level level) { + return new IllegalArgumentException("Unsupported log level: " + level); + } + private static class LoggingFn extends DoFn { private final Level level; private final String prefix; @@ -190,17 +213,19 @@ public void processElement( BoundedWindow window, PaneInfo paneInfo, OutputReceiver receiver) { - log( - level, - formatForLogging( - element, - prefix, - withTimestamp, - withWindow, - withPaneInfo, - timestamp, - window, - paneInfo)); + if (isLoggingEnabled(level)) { + log( + level, + formatForLogging( + element, + prefix, + withTimestamp, + withWindow, + withPaneInfo, + timestamp, + window, + paneInfo)); + } receiver.output(element); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java index d78be4703a7a..ecffe733d83d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java @@ -21,8 +21,11 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import java.io.Serializable; import java.util.Arrays; import java.util.List; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -43,6 +46,7 @@ @RunWith(JUnit4.class) public class LogElementsTest { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(LogElements.class); @Test @Category(NeedsRunner.class) @@ -72,15 +76,81 @@ public void testFormatForLoggingIncludesConfiguredMetadata() { assertThat(message, containsString("paneInfo=PaneInfo.NO_FIRING")); } + @Test + @Category(NeedsRunner.class) + public void testLogElementsLogsAtConfiguredLevel() { + pipeline + .apply("CreateTrace", Create.of("trace-element")) + .apply("LogTrace", LogElements.of(Level.TRACE).withPrefix("trace: ")); + pipeline + .apply("CreateDebug", Create.of("debug-element")) + .apply("LogDebug", LogElements.of(Level.DEBUG).withPrefix("debug: ")); + pipeline + .apply("CreateInfo", Create.of("info-element")) + .apply("LogInfo", LogElements.of(Level.INFO).withPrefix("info: ")); + pipeline + .apply("CreateWarn", Create.of("warn-element")) + .apply("LogWarn", LogElements.of(Level.WARN).withPrefix("warn: ")); + pipeline + .apply("CreateError", Create.of("error-element")) + .apply("LogError", LogElements.of(Level.ERROR).withPrefix("error: ")); + + pipeline.run(); + + expectedLogs.verifyTrace("trace: trace-element"); + expectedLogs.verifyDebug("debug: debug-element"); + expectedLogs.verifyInfo("info: info-element"); + expectedLogs.verifyWarn("warn: warn-element"); + expectedLogs.verifyError("error: error-element"); + } + + @Test + @Category(NeedsRunner.class) + public void testLogElementsDoesNotFormatWhenLevelDisabled() { + java.util.logging.Logger.getLogger(LogElements.class.getName()) + .setLevel(java.util.logging.Level.OFF); + ThrowsOnToString element = new ThrowsOnToString(); + + PCollection output = + pipeline + .apply(Create.of(element).withCoder(SerializableCoder.of(ThrowsOnToString.class))) + .apply(LogElements.of(Level.INFO)); + + PAssert.that(output).containsInAnyOrder(element); + pipeline.run(); + } + @Test public void testDisplayData() { DisplayData displayData = DisplayData.from( - LogElements.of(Level.WARN).withPrefix("row: ").withTimestamp().withWindow()); + LogElements.of(Level.WARN) + .withPrefix("row: ") + .withTimestamp() + .withWindow() + .withPaneInfo()); assertThat(displayData, hasDisplayItem("level", "WARN")); assertThat(displayData, hasDisplayItem("prefix", "row: ")); assertThat(displayData, hasDisplayItem("withTimestamp", true)); assertThat(displayData, hasDisplayItem("withWindow", true)); + assertThat(displayData, hasDisplayItem("withPaneInfo", true)); + } + + private static class ThrowsOnToString implements Serializable { + @Override + public boolean equals(Object obj) { + return obj instanceof ThrowsOnToString; + } + + @Override + public int hashCode() { + return ThrowsOnToString.class.hashCode(); + } + + @Override + public String toString() { + throw new AssertionError("LogElements should not format elements when logging is disabled."); + } } } From cded2f4cbed59b650aad33b21c5abadb7a21e445 Mon Sep 17 00:00:00 2001 From: Lalit Yadav Date: Tue, 19 May 2026 10:07:36 -0500 Subject: [PATCH 3/4] Add LogElements tests --- .../beam/sdk/transforms/LogElementsTest.java | 64 +++---------------- 1 file changed, 9 insertions(+), 55 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java index ecffe733d83d..dc7f238aa449 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java @@ -21,11 +21,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; -import java.io.Serializable; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -46,7 +43,6 @@ @RunWith(JUnit4.class) public class LogElementsTest { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); - @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(LogElements.class); @Test @Category(NeedsRunner.class) @@ -78,46 +74,21 @@ public void testFormatForLoggingIncludesConfiguredMetadata() { @Test @Category(NeedsRunner.class) - public void testLogElementsLogsAtConfiguredLevel() { - pipeline - .apply("CreateTrace", Create.of("trace-element")) - .apply("LogTrace", LogElements.of(Level.TRACE).withPrefix("trace: ")); - pipeline - .apply("CreateDebug", Create.of("debug-element")) - .apply("LogDebug", LogElements.of(Level.DEBUG).withPrefix("debug: ")); - pipeline - .apply("CreateInfo", Create.of("info-element")) - .apply("LogInfo", LogElements.of(Level.INFO).withPrefix("info: ")); - pipeline - .apply("CreateWarn", Create.of("warn-element")) - .apply("LogWarn", LogElements.of(Level.WARN).withPrefix("warn: ")); - pipeline - .apply("CreateError", Create.of("error-element")) - .apply("LogError", LogElements.of(Level.ERROR).withPrefix("error: ")); + public void testLogElementsSupportsConfiguredLevels() { + assertPreservesElement("Trace", LogElements.trace(), "trace-element"); + assertPreservesElement("Debug", LogElements.debug(), "debug-element"); + assertPreservesElement("Info", LogElements.info(), "info-element"); + assertPreservesElement("Warn", LogElements.warn(), "warn-element"); + assertPreservesElement("Error", LogElements.error(), "error-element"); pipeline.run(); - - expectedLogs.verifyTrace("trace: trace-element"); - expectedLogs.verifyDebug("debug: debug-element"); - expectedLogs.verifyInfo("info: info-element"); - expectedLogs.verifyWarn("warn: warn-element"); - expectedLogs.verifyError("error: error-element"); } - @Test - @Category(NeedsRunner.class) - public void testLogElementsDoesNotFormatWhenLevelDisabled() { - java.util.logging.Logger.getLogger(LogElements.class.getName()) - .setLevel(java.util.logging.Level.OFF); - ThrowsOnToString element = new ThrowsOnToString(); - - PCollection output = - pipeline - .apply(Create.of(element).withCoder(SerializableCoder.of(ThrowsOnToString.class))) - .apply(LogElements.of(Level.INFO)); + private void assertPreservesElement(String name, LogElements transform, String element) { + PCollection output = + pipeline.apply("Create" + name, Create.of(element)).apply("Log" + name, transform); PAssert.that(output).containsInAnyOrder(element); - pipeline.run(); } @Test @@ -136,21 +107,4 @@ public void testDisplayData() { assertThat(displayData, hasDisplayItem("withWindow", true)); assertThat(displayData, hasDisplayItem("withPaneInfo", true)); } - - private static class ThrowsOnToString implements Serializable { - @Override - public boolean equals(Object obj) { - return obj instanceof ThrowsOnToString; - } - - @Override - public int hashCode() { - return ThrowsOnToString.class.hashCode(); - } - - @Override - public String toString() { - throw new AssertionError("LogElements should not format elements when logging is disabled."); - } - } } From 8a90b9667dc8aae852e6ceda20a68037d8e7fd8b Mon Sep 17 00:00:00 2001 From: Lalit Yadav Date: Wed, 20 May 2026 14:54:38 -0500 Subject: [PATCH 4/4] Add LogElements level logging test --- .../beam/sdk/transforms/LogElementsTest.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java index dc7f238aa449..aaba169018c1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.List; +import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -43,6 +44,7 @@ @RunWith(JUnit4.class) public class LogElementsTest { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(LogElements.class); @Test @Category(NeedsRunner.class) @@ -74,21 +76,24 @@ public void testFormatForLoggingIncludesConfiguredMetadata() { @Test @Category(NeedsRunner.class) - public void testLogElementsSupportsConfiguredLevels() { - assertPreservesElement("Trace", LogElements.trace(), "trace-element"); - assertPreservesElement("Debug", LogElements.debug(), "debug-element"); - assertPreservesElement("Info", LogElements.info(), "info-element"); - assertPreservesElement("Warn", LogElements.warn(), "warn-element"); - assertPreservesElement("Error", LogElements.error(), "error-element"); + public void testLogElementsLogsAtConfiguredLevels() { + applyLogElements("Trace", LogElements.trace().withPrefix("trace: "), "trace-element"); + applyLogElements("Debug", LogElements.debug().withPrefix("debug: "), "debug-element"); + applyLogElements("Info", LogElements.info().withPrefix("info: "), "info-element"); + applyLogElements("Warn", LogElements.warn().withPrefix("warn: "), "warn-element"); + applyLogElements("Error", LogElements.error().withPrefix("error: "), "error-element"); pipeline.run(); - } - private void assertPreservesElement(String name, LogElements transform, String element) { - PCollection output = - pipeline.apply("Create" + name, Create.of(element)).apply("Log" + name, transform); + expectedLogs.verifyTrace("trace: trace-element"); + expectedLogs.verifyDebug("debug: debug-element"); + expectedLogs.verifyInfo("info: info-element"); + expectedLogs.verifyWarn("warn: warn-element"); + expectedLogs.verifyError("error: error-element"); + } - PAssert.that(output).containsInAnyOrder(element); + private void applyLogElements(String name, LogElements transform, String element) { + pipeline.apply("Create" + name, Create.of(element)).apply("Log" + name, transform); } @Test