Skip to content

[Bug]: Not able to run Beam batch pipeline using Spark runner on GCP Dataproc cluster  #25608

@pwiecek

Description

@pwiecek

What happened?

Hello,

I wanted to run a simple Beam batch pipeline as a Spark job on a Dataproc cluster. Unfortunately, there is a problem with lambda deserialization:

23/02/23 11:50:15 INFO SparkRunner$Evaluator: Evaluating org.apache.beam.sdk.transforms.View$ToListViewDoFn@14b48f39
23/02/23 11:50:15 INFO SparkRunner$Evaluator: Evaluating View.CreatePCollectionView
23/02/23 11:50:20 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1) (beam-spark-poc-XXXXXXXXXXXXXXXXXXXXXXXXXXXX executor 1): java.io.IOException: unexpected exception type
        at java.base/java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1512)
        at java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1142)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2237)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1675)
        at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
        at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1675)
        at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
        at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
        at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
        at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
        at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.reflect.InvocationTargetException
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:237)
        at java.base/jdk.internal.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1136)
        ... 35 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
        at org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.$deserializeLambda$(GroupNonMergingWindowsFunctions.java:50)
        ... 44 more

Test job source code:

Pipeline pipeline = Pipeline.create(options);

pipeline
.apply(TextIO.read().from("gs://<FILE_PATH>"))
.apply("Do something", ParDo.of(new MessageConverterFn()))
.apply(TextIO.write().to("wordcounts"));
public static class MessageConverterFn extends DoFn<String, String> {

		private static final long serialVersionUID = 1L;
		
		@ProcessElement
		public void processElement(ProcessContext c) {
			String line = c.element();
			c.output(line);
		}
	}

Beam version: 2.45.0
Spark version: 3.3.0
Java version: 11
Dataproc image version: 2.1.4-debian11 (Spark 3.3.0, Java 11)

When I replace TextIO.write() with simple logging to screen the job works. I don't know where this problem comes from. Java and Spark versions match.

.apply(Println.<String>stdout());
public static <T> Println<T> stdout() {
		return new Println<>(new DoFn<T, Void>() {

			private static final long serialVersionUID = -313060014379406773L;

			@ProcessElement
			public void processElement(@Element T element) {
				System.out.println(element);
			}
		});
	}

Spark submit command:

spark-submit --class <MY_CLASS>  --master yarn --deploy-mode client --jars gs://<MY_JAR>.jar   gs://<MY_JAR>.jar --runner=SparkRunner

Of course the same code works within the IDE and on the Spark cluster running on the laptop.
Any ideas?

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions