Skip to content

NullPointerException with ValueState of custom class #18863

@kennknowles

Description

@kennknowles

This is an issue introduced by Beam version 2.4.0, which breaks my current implementation of stateful processing during unit test with direct runner.

I'm write a DoFn that has a ValueState of a custom class which acts as caching. In short, the first item will run through the DoFn just fine, then NullPointerException will be thrown before the second item reaches the DoFn. If I switch to ValueState of a string, then everything works fine.

A simplified version of the DoFn looks like the following:


public class GetPredictionsFn

    extends DoFn<KV<List<String>, String>, KV<String, ArrayList<String>>>
{

    @StateId("cache")

    private final StateSpec<ValueState<PredictionResponseDto>> cache =

 
      StateSpecs.value(AvroCoder.of(PredictionResponseDto.class));

...

    @ProcessElement

 
  public void processElement(ProcessContext c,

          @StateId("cache") ValueState<PredictionResponseDto>
cache

            )

            throws IOException {

        Gson gson = new Gson();



 
      System.out.println("here");



        List<String> contentIds = c.element().getKey();

 
      String sessionId = c.element().getValue();

        System.out.println(sessionId);



 
      ArrayList<String> strItems = null;

        

        try {

            PredictionResponseDto
parsedResponse = cache.read(); 

//            strItems = parsedResponse.getStrItems();

       
    if (parsedResponse == null) {

                throw new RuntimeException();

            }

 
          System.out.println("read");

            System.out.println(gson.toJson(parsedResponse));

 
      }

        catch (Exception e) {

            String jsonStr = "{\"predictions\":[{\"output\":[19]}],\"strItems\":[\"1.9\"],\"timestamp\":1526485646091}";



 
          PredictionResponseDto parsedResponse = PredictionResponseDto.parseJson(jsonStr);

     
      strItems = parsedResponse.getStrItems();

            System.out.println("write");

     
      System.out.println(gson.toJson(parsedResponse));

            System.out.println(gson.toJson(strItems));

 
          cache.write(parsedResponse);

            PredictionResponseDto parsedResponse1 = cache.read(); 

 
          System.out.println(gson.toJson(parsedResponse1));

        }

        

        KV<String,
ArrayList<String>> predictionKv = KV.of(sessionId, strItems);



        System.out.println(gson.toJson(predictionKv));



 
      c.output(predictionKv);

    }

}

And the test pipeline looks like the following:


// Create a test pipeline.

Pipeline p = Pipeline.create();



// Create timestamps

// currentTime
need to be set to the past, otherwise the first window will not be set to

// currentTime + expiry,
but creation time of the pipeline + expiry.

Instant currentTime = new Instant(0L); 

Instant onTime
= currentTime

        .plus(Duration.standardSeconds(streamingOptions.getCacheExpiry()))

     
  .minus(Duration.standardSeconds(1));

Instant lateTime = currentTime

        .plus(Duration.standardSeconds(streamingOptions.getCacheExpiry()))

 
      .plus(Duration.standardSeconds(1));



// Create stream input.

TestStream<KV<List<String>,
String>> events =

        TestStream.create(KvCoder.of(ListCoder.of(

                StringUtf8Coder.of()),
StringUtf8Coder.of()))

        // set the next timestamp to be currentTime

        .advanceWatermarkTo(currentTime)

 
      .addElements(KV.of(Arrays.asList("1"), "1"))

        .advanceWatermarkTo(onTime)

       
.addElements(KV.of(Arrays.asList("1"), "2"))

        .addElements(KV.of(Arrays.asList("1"), "3"))

 
      .advanceWatermarkTo(lateTime)

        .addElements(KV.of(Arrays.asList("1"), "4"))

     
  .advanceWatermarkToInfinity();

PCollection<KV<String, ArrayList<String>>> output =

        p.apply(events).apply(Window.into(FixedWindows.of(

 
          Duration.standardSeconds(streamingOptions.getCacheExpiry()))))

        .apply(ParDo.of(doFn));

PAssert.that(output).containsInAnyOrder(expect);

The Trace looks like:


org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException: in ca.cbc.recsysdataflow.PredictionResponseDto
in array in ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction in array null of array in field
key of ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction of array in field predictions of ca.cbc.recsysdataflow.PredictionResponseDto
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:342)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:312)
at
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:206)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
at
org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at
ca.cbc.recsysdataflow.test.transforms.streaming.GetPredictionsCacheTest.testCache(GetPredictionsCacheTest.java:156)
at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206)
Caused
by: java.lang.NullPointerException: in ca.cbc.recsysdataflow.PredictionResponseDto in array in ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction
in array null of array in field key of ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction of array
in field predictions of ca.cbc.recsysdataflow.PredictionResponseDto
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
at
org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
at
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
at
org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
at org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.uncheckedClone(InMemoryStateInternals.java:648)
at
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.access$000(InMemoryStateInternals.java:62)
at
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:221)
at
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:181)
at
org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable$CopyOnBindBinderFactory$1.bindValue(CopyOnAccessInMemoryStateInternals.java:301)
at
org.apache.beam.runners.direct.repackaged.runners.core.StateTags$2.bindValue(StateTags.java:71)
at
org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:278)
at org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:267)
at
org.apache.beam.runners.direct.repackaged.runners.core.StateTags$SimpleStateTag.bind(StateTags.java:324)
at
org.apache.beam.runners.direct.repackaged.runners.core.StateTable.get(StateTable.java:60)
at org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:127)
at
org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:121)
at
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.state(SimpleDoFnRunner.java:518)
at
ca.cbc.recsysdataflow.transforms.streaming.GetPredictionsFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
Caused by: java.lang.NullPointerException
at org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:71)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
at
org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192)
at org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:68)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
at
org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
at
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
at
org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
at org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.uncheckedClone(InMemoryStateInternals.java:648)
at
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.access$000(InMemoryStateInternals.java:62)
at
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:221)
at
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:181)
at
org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable$CopyOnBindBinderFactory$1.bindValue(CopyOnAccessInMemoryStateInternals.java:301)
at
org.apache.beam.runners.direct.repackaged.runners.core.StateTags$2.bindValue(StateTags.java:71)
at
org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:278)
at org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:267)
at
org.apache.beam.runners.direct.repackaged.runners.core.StateTags$SimpleStateTag.bind(StateTags.java:324)
at
org.apache.beam.runners.direct.repackaged.runners.core.StateTable.get(StateTable.java:60)
at org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:127)
at
org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:121)
at
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.state(SimpleDoFnRunner.java:518)
at
ca.cbc.recsysdataflow.transforms.streaming.GetPredictionsFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
at
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
at
org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:72)
at
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:179)
at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
at
org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory$StatefulParDoEvaluator.processElement(StatefulParDoEvaluatorFactory.java:245)
at
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
at
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at
java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Imported from Jira BEAM-4301. Original Jira may contain additional context.
Reported by: sx5640.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions