Skip to content

[Bug]: Running Java pipeline with JDBCIO when using a window causes a ClassCastException #21882

@sshahar1

Description

@sshahar1

What happened?

I have a pipeline that reads data, windows it, groups, aggregates and write to a relational DB
Code below in Kotlin

pipeline
    .apply(
        "WaitForNewFiles",
        FileIO.match()
            .filepattern(inputPath)
            .continuously(
                // Check for new files every 2 minute
                Duration.standardMinutes(2),
                // Never stop checking for new files
                Watch.Growth.never()
            )
    )
    .apply("GetFiles", FileIO.readMatches())
    .apply(TextIO.readFiles())
    .apply("ToTuple", ParDo.of(toTuple))
    .apply(
        "WaitForLines",
        Window.into<T>(FixedWindows.of(Duration.standardMinutes(1)))
            .triggering(AfterWatermark.pastEndOfWindow())
            .discardingFiredPanes()
            .withAllowedLateness(Duration.ZERO)
    ).apply("Filter", Filter.by(Matcher()))
        .apply("Flatten", ParDo.of(FlattenRecords()))
        .setCoder(AvroCoder.of(MyRecord::class.java))
        .apply("Insert to database",
            JdbcIO.write<MyRecord>()
                .withDataSourceProviderFn(dsProvider)
                .withStatement(cloud_sql_rome_records_insert)
                .withPreparedStatementSetter(preparedSetter()))

Up until upgrading to Apache Beam 2.37 it worked, but after an upgrade we started getting the following Exceptions (and alot of them):

java.lang.ClassCastException: class org.apache.beam.sdk.transforms.windowing.GlobalWindow cannot be cast to class org.apache.beam.sdk.transforms.windowing.IntervalWindow (org.apache.beam.sdk.transforms.windowing.GlobalWindow and org.apache.beam.sdk.transforms.windowing.IntervalWindow are in unnamed module of loader 'app')
at org.apache.beam.sdk.transforms.windowing.IntervalWindow$IntervalWindowCoder.registerByteSizeObserver(IntervalWindow.java:142)
at org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:211)
at org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:60)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:640)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
at org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
at org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output(SimpleDoFnRunner.java:326)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output(SimpleDoFnRunner.java:321)
at org.apache.beam.sdk.io.jdbc.JdbcIO$1.finish(JdbcIO.java:1613)

The new code introduced in 2.37 in JDBCIO includes:

@FinishBundle
                    public void finish(FinishBundleContext c) {
                      if (outputList != null && outputList.size() > 0) {
                        c.output(outputList, Instant.now(), GlobalWindow.INSTANCE);
                      }
                      outputList = null;
                    }

So, from what I understand the time interval and global interval conflict.

This is a very vague exception obviously and should be addressed in documentation.

Thanks in advance

Issue Priority

Priority: 2

Issue Component

Component: io-java-jdbc

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions