Skip to content

[SPARK-40535][SQL] Fix bug the buffer of AggregatingAccumulator will not be created if the input rows is empty#37977

Closed
beliefer wants to merge 2 commits intoapache:masterfrom
beliefer:SPARK-37203_followup
Closed

[SPARK-40535][SQL] Fix bug the buffer of AggregatingAccumulator will not be created if the input rows is empty#37977
beliefer wants to merge 2 commits intoapache:masterfrom
beliefer:SPARK-37203_followup

Conversation

@beliefer
Copy link
Contributor

What changes were proposed in this pull request?

When AggregatingAccumulator serialize aggregate buffer, may throwing NPE.
There is one test case could repeat this error.

val namedObservation = Observation("named")
val df = spark.range(1, 10, 1, 10)
val observed_df = df.observe(
  namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))
observed_df.collect()

throws exception as follows:

13:45:10.976 ERROR org.apache.spark.util.Utils: Exception encountered
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:641)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:602)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:205)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33)
	at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186)
	at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1245)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1456)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Why are the changes needed?

Fix a bug.
After my investigation, The root cause is the buffer of AggregatingAccumulator will not be created if the input rows is empty.

Does this PR introduce any user-facing change?

'Yes'.
Users will see the correct results.

How was this patch tested?

New test case.

@github-actions github-actions bot added the SQL label Sep 23, 2022
@beliefer
Copy link
Contributor Author

ping @MaxGekk cc @cloud-fan

@beliefer beliefer changed the title [SPARK-37203][SQL][FOLLOWUP] Fix bug the buffer of AggregatingAccumulator will not be created if the input rows is empty [SPARK-40535][SQL] Fix bug the buffer of AggregatingAccumulator will not be created if the input rows is empty Sep 23, 2022
Copy link
Member

@MaxGekk MaxGekk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I have checked the example from SPARK-40535:

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val df = spark.range(1,10,1,11)
df: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> df.observe("collectedList", collect_list("id")).collect()
res0: Array[Long] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

Waiting for CI.

@MaxGekk
Copy link
Member

MaxGekk commented Sep 23, 2022

+1, LGTM. Merging to master/3.3.
Thank you, @beliefer.

MaxGekk pushed a commit that referenced this pull request Sep 23, 2022
…not be created if the input rows is empty

### What changes were proposed in this pull request?
When `AggregatingAccumulator` serialize aggregate buffer, may throwing NPE.
There is one test case could repeat this error.
```
val namedObservation = Observation("named")
val df = spark.range(1, 10, 1, 10)
val observed_df = df.observe(
  namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))
observed_df.collect()
```
throws exception as follows:
```
13:45:10.976 ERROR org.apache.spark.util.Utils: Exception encountered
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:641)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:602)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:205)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33)
	at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186)
	at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1245)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1456)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Why are the changes needed?
Fix a bug.
After my investigation, The root cause is the buffer of `AggregatingAccumulator` will not be created if the input rows is empty.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Users will see the correct results.

### How was this patch tested?
New test case.

Closes #37977 from beliefer/SPARK-37203_followup.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 7bbd975)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
@MaxGekk MaxGekk closed this in 7bbd975 Sep 23, 2022
@beliefer
Copy link
Contributor Author

@MaxGekk Thank you for you ping. Let me have a chance to fix my bug.

yxheartipp pushed a commit to yxheartipp/spark that referenced this pull request Feb 25, 2025
…not be created if the input rows is empty

### What changes were proposed in this pull request?
When `AggregatingAccumulator` serialize aggregate buffer, may throwing NPE.
There is one test case could repeat this error.
```
val namedObservation = Observation("named")
val df = spark.range(1, 10, 1, 10)
val observed_df = df.observe(
  namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))
observed_df.collect()
```
throws exception as follows:
```
13:45:10.976 ERROR org.apache.spark.util.Utils: Exception encountered
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:641)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:602)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:205)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33)
	at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186)
	at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1245)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1456)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Why are the changes needed?
Fix a bug.
After my investigation, The root cause is the buffer of `AggregatingAccumulator` will not be created if the input rows is empty.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Users will see the correct results.

### How was this patch tested?
New test case.

Closes apache#37977 from beliefer/SPARK-37203_followup.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
yxheartipp added a commit to Kyligence/spark that referenced this pull request Feb 26, 2025
…not be created if the input rows is empty (#800)

### What changes were proposed in this pull request?
When `AggregatingAccumulator` serialize aggregate buffer, may throwing NPE.
There is one test case could repeat this error.
```
val namedObservation = Observation("named")
val df = spark.range(1, 10, 1, 10)
val observed_df = df.observe(
  namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))
observed_df.collect()
```
throws exception as follows:
```
13:45:10.976 ERROR org.apache.spark.util.Utils: Exception encountered
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:641)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:602)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:205)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33)
	at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186)
	at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1245)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1456)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Why are the changes needed?
Fix a bug.
After my investigation, The root cause is the buffer of `AggregatingAccumulator` will not be created if the input rows is empty.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Users will see the correct results.

### How was this patch tested?
New test case.

Closes apache#37977 from beliefer/SPARK-37203_followup.

Authored-by: Jiaan Geng <beliefer@163.com>

Signed-off-by: Max Gekk <max.gekk@gmail.com>
Co-authored-by: Jiaan Geng <beliefer@163.com>
yxheartipp pushed a commit to yxheartipp/spark that referenced this pull request Feb 26, 2025
…not be created if the input rows is empty

### What changes were proposed in this pull request?
When `AggregatingAccumulator` serialize aggregate buffer, may throwing NPE.
There is one test case could repeat this error.
```
val namedObservation = Observation("named")
val df = spark.range(1, 10, 1, 10)
val observed_df = df.observe(
  namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))
observed_df.collect()
```
throws exception as follows:
```
13:45:10.976 ERROR org.apache.spark.util.Utils: Exception encountered
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:641)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:602)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:205)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33)
	at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186)
	at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1245)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1456)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Why are the changes needed?
Fix a bug.
After my investigation, The root cause is the buffer of `AggregatingAccumulator` will not be created if the input rows is empty.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Users will see the correct results.

### How was this patch tested?
New test case.

Closes apache#37977 from beliefer/SPARK-37203_followup.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
yxheartipp added a commit to Kyligence/spark that referenced this pull request Feb 27, 2025
…not be created if the input rows is empty (#801)

### What changes were proposed in this pull request?
When `AggregatingAccumulator` serialize aggregate buffer, may throwing NPE.
There is one test case could repeat this error.
```
val namedObservation = Observation("named")
val df = spark.range(1, 10, 1, 10)
val observed_df = df.observe(
  namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))
observed_df.collect()
```
throws exception as follows:
```
13:45:10.976 ERROR org.apache.spark.util.Utils: Exception encountered
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:641)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:602)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:205)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33)
	at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186)
	at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1245)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1456)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Why are the changes needed?
Fix a bug.
After my investigation, The root cause is the buffer of `AggregatingAccumulator` will not be created if the input rows is empty.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Users will see the correct results.

### How was this patch tested?
New test case.

Closes apache#37977 from beliefer/SPARK-37203_followup.

Authored-by: Jiaan Geng <beliefer@163.com>

Signed-off-by: Max Gekk <max.gekk@gmail.com>
Co-authored-by: Jiaan Geng <beliefer@163.com>
yhcast0 pushed a commit to yhcast0/spark that referenced this pull request Mar 6, 2025
…not be created if the input rows is empty (Kyligence#801)

### What changes were proposed in this pull request?
When `AggregatingAccumulator` serialize aggregate buffer, may throwing NPE.
There is one test case could repeat this error.
```
val namedObservation = Observation("named")
val df = spark.range(1, 10, 1, 10)
val observed_df = df.observe(
  namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))
observed_df.collect()
```
throws exception as follows:
```
13:45:10.976 ERROR org.apache.spark.util.Utils: Exception encountered
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:641)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:602)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:205)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33)
	at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186)
	at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1245)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1456)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Why are the changes needed?
Fix a bug.
After my investigation, The root cause is the buffer of `AggregatingAccumulator` will not be created if the input rows is empty.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Users will see the correct results.

### How was this patch tested?
New test case.

Closes apache#37977 from beliefer/SPARK-37203_followup.

Authored-by: Jiaan Geng <beliefer@163.com>

Signed-off-by: Max Gekk <max.gekk@gmail.com>
Co-authored-by: Jiaan Geng <beliefer@163.com>
yhcast0 pushed a commit to Kyligence/spark that referenced this pull request Mar 6, 2025
…not be created if the input rows is empty (#801)

### What changes were proposed in this pull request?
When `AggregatingAccumulator` serialize aggregate buffer, may throwing NPE.
There is one test case could repeat this error.
```
val namedObservation = Observation("named")
val df = spark.range(1, 10, 1, 10)
val observed_df = df.observe(
  namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))
observed_df.collect()
```
throws exception as follows:
```
13:45:10.976 ERROR org.apache.spark.util.Utils: Exception encountered
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:641)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:602)
	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:205)
	at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33)
	at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186)
	at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1245)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1456)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Why are the changes needed?
Fix a bug.
After my investigation, The root cause is the buffer of `AggregatingAccumulator` will not be created if the input rows is empty.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Users will see the correct results.

### How was this patch tested?
New test case.

Closes apache#37977 from beliefer/SPARK-37203_followup.

Authored-by: Jiaan Geng <beliefer@163.com>

Signed-off-by: Max Gekk <max.gekk@gmail.com>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants