Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] orc_test.py::test_orc_scan_with_aggregate_pushdown fails with a standalone cluster on spark 3.3.0 #10099

Closed
NvTimLiu opened this issue Dec 27, 2023 · 12 comments
Assignees
Labels
bug Something isn't working

Comments

@NvTimLiu
Copy link
Collaborator

NvTimLiu commented Dec 27, 2023

Describe the bug
orc_test.py::test_orc_scan_with_aggregate_pushdown FAILED against UCX/MULTITHREAD tests

This failure looks like environment related issue, as

  • Only FAILEd on the above CLUSTER,
  • PASS on other spark-3.3.0+ standalone cluster

BTW, we did not observe these FAILUREs, because we ran spark-3.1.2 JDK8 tests on EGX06 before, which(spark3.1.2) SKIPPED the orc_test.py::test_orc_scan_with_aggregate_pushdown

After switch EGX06 to JDK17/spark-3.3.0, these tests FAILED.

 ----------------------------- Captured stdout call -----------------------------
 ### CPU RUN ###
 ### GPU RUN ###
 __________________ test_orc_scan_with_aggregate_pushdown[MIN] __________________
 
 spark_tmp_path = '/tmp/pyspark_tests//spark-egx-06-master-30635-1824865905/'
 aggregate = 'MIN'
 
     @pytest.mark.skipif(is_before_spark_330(), reason='Aggregate push down on ORC is a new feature of Spark 330')
     @pytest.mark.parametrize('aggregate', _aggregate_orc_list)
     @allow_non_gpu(any = True)
     def test_orc_scan_with_aggregate_pushdown(spark_tmp_path, aggregate):
         """
         Spark(330,_) allows aggregate pushdown on ORC by enabling spark.sql.orc.aggregatePushdown.
         When the spark configuration is enabled we check the following:
         ---------------------------+
         | Aggregate | FallBack CPU |
         +-----------+--------------+
         |   COUNT   |      Y       |
         |    MIN    |      Y       |
         |    MAX    |      Y       |
         """
         data_path = spark_tmp_path + '/ORC_DATA/pushdown_00.orc'
     
         # fallback to CPU
 >       assert_cpu_and_gpu_are_equal_collect_with_capture(
             lambda spark: _do_orc_scan_with_agg(spark, data_path, aggregate),
             exist_classes="BatchScanExec",
             non_exist_classes="GpuBatchScanExec",
             conf=_orc_aggregate_pushdown_enabled_conf)
 
 ../../src/main/python/orc_test.py:709: 
 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
 ../../src/main/python/asserts.py:405: in assert_cpu_and_gpu_are_equal_collect_with_capture
     from_gpu, gpu_df = with_gpu_session(bring_back, conf=conf)
 ../../src/main/python/spark_session.py:164: in with_gpu_session
     return with_spark_session(func, conf=copy)
 /opt/miniconda3/lib/python3.8/contextlib.py:75: in inner
     return func(*args, **kwds)
 ../../src/main/python/spark_session.py:131: in with_spark_session
     ret = func(_spark)
 ../../src/main/python/asserts.py:213: in bring_back
     return (df.collect(), df)
 /var/lib/jenkins/spark/spark-3.3.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/dataframe.py:817: in collect
     sock_info = self._jdf.collectToPython()
 /var/lib/jenkins/spark/spark-3.3.0-bin-hadoop3.2/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321: in __call__
     return_value = get_return_value(
 /var/lib/jenkins/spark/spark-3.3.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py:190: in deco
     return f(*a, **kw)
 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
 
 answer = 'xro4496884'
 gateway_client = <py4j.clientserver.JavaClient object at 0x7f1bcebc4460>
 target_id = 'o4496883', name = 'collectToPython'
 
     def get_return_value(answer, gateway_client, target_id=None, name=None):
         """Converts an answer received from the Java gateway into a Python object.
     
         For example, string representation of integers are converted to Python
         integer, string representation of objects are converted to JavaObject
         instances, etc.
     
         :param answer: the string returned by the Java gateway
         :param gateway_client: the gateway client used to communicate with the Java
             Gateway. Only necessary if the answer is a reference (e.g., object,
             list, map)
         :param target_id: the name of the object from which the answer comes from
             (e.g., *object1* in `object1.hello()`). Optional.
         :param name: the name of the member from which the answer comes from
             (e.g., *hello* in `object1.hello()`). Optional.
         """
         if is_error(answer)[0]:
             if len(answer) > 1:
                 type = answer[1]
                 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
                 if answer[1] == REFERENCE_TYPE:
 >                   raise Py4JJavaError(
                         "An error occurred while calling {0}{1}{2}.\n".
                         format(target_id, ".", name), value)
 E                   py4j.protocol.Py4JJavaError: An error occurred while calling o4496883.collectToPython.
 E                   : org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 59502.0 failed 1 lure: Lost task 10.0 in stage 59502.0 (TID 2007282) (10.136.6.4 executor 0): org.apache.spark.SparkException: Cannot read file: file:/tmp/pyspark_tests/spark-egx-06-master-30635-1824865905/ORC_DATA/pushdown_00.orc/93-48f7-9de9-b06e2a7543a6-c000.snappy.orc. Please consider disabling ORC aggregate push down by setting tePushdown' to false.
 E                   	at xecution.datasources.orc.OrcUtils$.createAggInternalRowFromFooter(OrcUtils.scala:429)
 E                   	at org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory$$2(OrcPartitionReaderFactory.scala:217)
 E                   	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2764)
 E                   	at org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory$te(OrcPartitionReaderFactory.scala:214)
 E                   	at org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory$tionReaderFactory.scala:213)
 E                   	at org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory$onReaderFactory.scala:226)
 E                   	at org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory$onReaderFactory.scala:211)
 E                   	at xecution.datasources.v2.PartitionedFileReader.get(FilePartitionReaderFactory.scala:57)
 E                   	at org.apache.spark.sql.execution.datasources.v2.FilePartitionReader.get(FilePartitionReader.scala:89)
 E                   	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.next(DataSourceRDD.scala:133)
 E                   	at org.apache.spark.sql.execution.datasources.v2.MetricsBatchIterator.next(DataSourceRDD.scala:179)
 E                   	at org.apache.spark.sql.execution.datasources.v2.MetricsBatchIterator.next(DataSourceRDD.scala:176)
 E                   	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.next(DataSourceRDD.scala:67)
 E                   	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
 E                   	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
 E                   	at com.nvidia.spark.rapids.CollectTimeIterator.$anonfun$next$1(GpuExec.scala:183)
 E                   	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
 E                   	at com.nvidia.spark.rapids.CollectTimeIterator.next(GpuExec.scala:182)
 E                   	at com.nvidia.spark.rapids.AbstractGpuCoalesceIterator.getHasOnDeck(GpuCoalesceBatches.scala:314)
 E                   	at com.nvidia.spark.rapids.AbstractGpuCoalesceIterator.hasNext(GpuCoalesceBatches.scala:330)
 E                   	at com.nvidia.spark.rapids.AbstractProjectSplitIterator.hasNext(basicPhysicalOperators.scala:233)
 E                   	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
 E                   	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
 E                   	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
 E                   	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
 E                   	at com.nvidia.spark.rapids.GpuMergeAggregateIterator.$anonfun$next$2(GpuAggregateExec.scala:751)
 E                   	at scala.Option.getOrElse(Option.scala:189)
 E                   	at com.nvidia.spark.rapids.GpuMergeAggregateIterator.next(GpuAggregateExec.scala:749)
 E                   	at com.nvidia.spark.rapids.GpuMergeAggregateIterator.next(GpuAggregateExec.scala:711)
 E                   	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
 E                   	at com.nvidia.spark.rapids.DynamicGpuPartialSortAggregateIterator.regateExec.scala:2042)
 E                   	at scala.Option.map(Option.scala:230)
 E                   	at com.nvidia.spark.rapids.DynamicGpuPartialSortAggregateIterator.next(GpuAggregateExec.scala:2042)
 E                   	at com.nvidia.spark.rapids.DynamicGpuPartialSortAggregateIterator.next(GpuAggregateExec.scala:1906)
 E                   	at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$GpuShuffleExchangeExecBase.scala:333)
 E                   	at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$ffleExchangeExecBase.scala:355)
 E                   	at scala.collection.Iterator.foreach(Iterator.scala:943)
 E                   	at scala.collection.Iterator.foreach$(Iterator.scala:943)
 E                   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
 E                   	at org.apache.spark.sql.rapids.RapidsCachingWriter.write(RapidsShuffleInternalManagerBase.scala:956)
 E                   	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
 E                   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
 E                   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
 E                   	at org.apache.spark.scheduler.Task.run(Task.scala:136)
 E                   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
 E                   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
 E                   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
 E                   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
 E                   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
 E                   	at java.base/java.lang.Thread.run(Thread.java:833)
 E                   Caused by: java.util.NoSuchElementException
 E                   	at java.base/java.util.LinkedList.removeFirst(LinkedList.java:274)
 E                   	at java.base/java.util.LinkedList.remove(LinkedList.java:689)
 E                   	at xecution.datasources.orc.OrcFooterReader.convertStatistics(OrcFooterReader.java:54)
 E                   	at xecution.datasources.orc.OrcFooterReader.readStatistics(OrcFooterReader.java:45)
 E                   	at xecution.datasources.orc.OrcUtils$.createAggInternalRowFromFooter(OrcUtils.scala:425)
 E                   	... 49 more
 E                   
 E                   Driver stacktrace:
 E                   	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
 E                   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
 E                   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
 E                   	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
 E                   	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
 E                   	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
 E                   	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
 E                   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
 E                   	at org.apache.spark.scheduler.DAGScheduler.Failed$1$adapted(DAGScheduler.scala:1182)
 E                   	at scala.Option.foreach(Option.scala:407)
 E                   	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
 E                   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
 E                   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
 E                   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
 E                   	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
 E                   	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
 E                   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
 E                   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
 E                   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
 E                   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
 E                   	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
 E                   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 E                   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
 E                   	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
 E                   	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
 E                   	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:424)
 E                   	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3688)
 E                   	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
 E                   	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
 E                   	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
 E                   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
 E                   	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
 E                   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
 E                   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
 E                   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
 E                   	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
 E                   	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3685)
 E                   	at jdk.internal.reflect.GeneratedMethodAccessor100.invoke(Unknown Source)
 E                   	at java.base/elegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 E                   	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
 E                   	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 E                   	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 E                   	at py4j.Gateway.invoke(Gateway.java:282)
 E                   	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 E                   	at py4j.commands.CallCommand.execute(CallCommand.java:79)
 E                   	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
 E                   	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
 E                   	at java.base/java.lang.Thread.run(Thread.java:833)
 E                   Caused by: org.apache.spark.SparkException: Cannot read columns statistics in file: file:/tmp/x-06-master-30635-1824865905/ORC_DATA/pushdown_00.orc/part-00000-46ff1915-6b93-48f7-9de9-b06e2a7543a6-c000.snappy.orc. Please  aggregate push down by setting 'spark.sql.orc.aggregatePushdown' to false.
 E                   	at xecution.datasources.orc.OrcUtils$.createAggInternalRowFromFooter(OrcUtils.scala:429)
 E                   	at org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory$$2(OrcPartitionReaderFactory.scala:217)
 E                   	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2764)
 E                   	at org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory$te(OrcPartitionReaderFactory.scala:214)
 E                   	at org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory$tionReaderFactory.scala:213)
 E                   	at org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory$onReaderFactory.scala:226)
 E                   	at org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory$onReaderFactory.scala:211)
 E                   	at xecution.datasources.v2.PartitionedFileReader.get(FilePartitionReaderFactory.scala:57)
 E                   	at org.apache.spark.sql.execution.datasources.v2.FilePartitionReader.get(FilePartitionReader.scala:89)
 E                   	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.next(DataSourceRDD.scala:133)
 E                   	at org.apache.spark.sql.execution.datasources.v2.MetricsBatchIterator.next(DataSourceRDD.scala:179)
 E                   	at org.apache.spark.sql.execution.datasources.v2.MetricsBatchIterator.next(DataSourceRDD.scala:176)
 E                   	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.next(DataSourceRDD.scala:67)
 E                   	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
 E                   	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
 E                   	at com.nvidia.spark.rapids.CollectTimeIterator.$anonfun$next$1(GpuExec.scala:183)
 E                   	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
 E                   	at com.nvidia.spark.rapids.CollectTimeIterator.next(GpuExec.scala:182)
 E                   	at com.nvidia.spark.rapids.AbstractGpuCoalesceIterator.getHasOnDeck(GpuCoalesceBatches.scala:314)
 E                   	at com.nvidia.spark.rapids.AbstractGpuCoalesceIterator.hasNext(GpuCoalesceBatches.scala:330)
 E                   	at com.nvidia.spark.rapids.AbstractProjectSplitIterator.hasNext(basicPhysicalOperators.scala:233)
 E                   	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
 E                   	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
 E                   	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
 E                   	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
 E                   	at com.nvidia.spark.rapids.GpuMergeAggregateIterator.$anonfun$next$2(GpuAggregateExec.scala:751)
 E                   	at scala.Option.getOrElse(Option.scala:189)
 E                   	at com.nvidia.spark.rapids.GpuMergeAggregateIterator.next(GpuAggregateExec.scala:749)
 E                   	at com.nvidia.spark.rapids.GpuMergeAggregateIterator.next(GpuAggregateExec.scala:711)
 E                   	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
 E                   	at com.nvidia.spark.rapids.DynamicGpuPartialSortAggregateIterator.regateExec.scala:2042)
 E                   	at scala.Option.map(Option.scala:230)
 E                   	at com.nvidia.spark.rapids.DynamicGpuPartialSortAggregateIterator.next(GpuAggregateExec.scala:2042)
 E                   	at com.nvidia.spark.rapids.DynamicGpuPartialSortAggregateIterator.next(GpuAggregateExec.scala:1906)
 E                   	at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$GpuShuffleExchangeExecBase.scala:333)
 E                   	at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$ffleExchangeExecBase.scala:355)
 E                   	at scala.collection.Iterator.foreach(Iterator.scala:943)
 E                   	at scala.collection.Iterator.foreach$(Iterator.scala:943)
 E                   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
 E                   	at org.apache.spark.sql.rapids.RapidsCachingWriter.write(RapidsShuffleInternalManagerBase.scala:956)
 E                   	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
 E                   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
 E                   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
 E                   	at org.apache.spark.scheduler.Task.run(Task.scala:136)
 E                   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
 E                   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
 E                   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
 E                   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
 E                   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
 E                   	... 1 more
 E                   Caused by: java.util.NoSuchElementException
 E                   	at java.base/java.util.LinkedList.removeFirst(LinkedList.java:274)
 E                   	at java.base/java.util.LinkedList.remove(LinkedList.java:689)
 E                   	at xecution.datasources.orc.OrcFooterReader.convertStatistics(OrcFooterReader.java:54)
 E                   	at xecution.datasources.orc.OrcFooterReader.readStatistics(OrcFooterReader.java:45)
 E                   	at xecution.datasources.orc.OrcUtils$.createAggInternalRowFromFooter(OrcUtils.scala:425)
 E                   	... 49 more
  /var/lib/jenkins/spark/spark-3.3.0-bin-hadoop3.2/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py:326: Py4JJavaError

 =========================== short test summary info ============================
FAILED ../../src/main/python/orc_test.py::test_orc_scan_with_aggregate_pushdown[COUNT][DATAGEN_SEED=1703470139, ALLOW_NON_GPU(ANY)]
FAILED ../../src/main/python/orc_test.py::test_orc_scan_with_aggregate_pushdown[MAX][DATAGEN_SEED=1703470139, ALLOW_NON_GPU(ANY)]
FAILED ../../src/main/python/orc_test.py::test_orc_scan_with_aggregate_pushdown[MIN][DATAGEN_SEED=1703470139, INJECT_OOM, ALLOW_NON_GPU(ANY)]
= 3 failed, 20020 passed, 1728 skipped, 406 xfailed, 290 xpassed, 8605 warnings in 22283.43s (6:11:23) =

Environment details (please complete the following information)

  • EGX06
  • UCX/MULTITHREAD
  • spark-3.3.0+
  • JDK17
  • standalone cluster

Additional context

  • Only FAILEd on the above CLUSTER,
  • PASS on other JDK17 spark-3.3.0+ standalone cluster
@NvTimLiu NvTimLiu added bug Something isn't working ? - Needs Triage Need team to review and classify labels Dec 27, 2023
@abellina
Copy link
Collaborator

@NvTimLiu this has nothing to do with UCX or the environment. I got it to happen on spark-3.3.0 locally. This is all around setting the spark master to a standalone cluster:

$SPARK_HOME/sbin/start-master.sh -h 127.0.0.1
$SPARK_HOME/sbin/start-worker.sh spark://127.0.0.1:7077
 PYSP_TEST_spark_master=spark://127.0.0.1:7077 DATAGEN_SEED=1703470139 ./run_pyspark_from_build.sh -k test_orc_scan_with_aggregate_pushdown

Yields:

Caused by: org.apache.spark.SparkException: Cannot read columns statistics in file: file:/tmp/pyspark_tests/alebox-master-5381-1945477566/ORC_DATA/pushdown_00.orc/part-00000-f3e7ee2a-aa05-427e-a10e-d9cc0cf3b64f-c000.snappy.orc. Please consider disabling ORC aggregate push down by setting 'spark.sql.orc.aggregatePushdown' to false.

It really just says to me that either the test or the distributed nature of a cluster is causing the orc metadata not to be read somehow.

@abellina
Copy link
Collaborator

The issue looks to be on the ORC write path. If I disable GPU to write the orc file it works.

If I write with the GPU and read with the CPU it fails.

@abellina abellina changed the title [BUG] orc_test.py::test_orc_scan_with_aggregate_pushdown FAILED against UCX/MULTITHREAD tests [BUG] orc_test.py::test_orc_scan_with_aggregate_pushdown fails with a standalone cluster on spark 3.3.0 Dec 27, 2023
@ttnghia
Copy link
Collaborator

ttnghia commented Dec 27, 2023

Do you know why it is failing in standalone cluster mode but not in local mode?

@abellina
Copy link
Collaborator

I don't. I suspect it has to do with the different partitioning we are likely hitting in standalone mode => different row count per writer? I am not sure if there is optional code where the metadata isn't written in all scenarios, as that's what it seems to be.

@abellina
Copy link
Collaborator

e.g. could some partitions have 0 rows, and does that mean that the orc writer for the gpu behaves differently?

@ttnghia
Copy link
Collaborator

ttnghia commented Dec 27, 2023

Oh yeah, that's may be the issue. I looked at the written files and indeed there is one of them having 0 row. I'm going to check that in cudf code.

@sameerz sameerz removed the ? - Needs Triage Need team to review and classify label Dec 27, 2023
@ttnghia
Copy link
Collaborator

ttnghia commented Dec 27, 2023

So for the problematic file:

File Version: 0.12 with FUTURE by Unknown(5) 
Rows: 0
Compression: SNAPPY
Compression size: 262144
Calendar: Julian/Gregorian
Type: struct<id:bigint,p:bigint>

Stripe Statistics:

File Statistics:

Stripes:

File length: 68 bytes
Padding length: 0 bytes
Padding ratio: 0%

It seems that there are at least 2 problems:

  • Compression size: 262144: This is incorrect, since there is no data.
  • There are no stripes/column statistics.

@ttnghia
Copy link
Collaborator

ttnghia commented Dec 27, 2023

I've filed a cudf issue: rapidsai/cudf#14675

@abellina abellina assigned ttnghia and unassigned abellina Dec 27, 2023
@abellina
Copy link
Collaborator

Thanks @ttnghia !!

@ttnghia
Copy link
Collaborator

ttnghia commented Jan 4, 2024

Verified that rapidsai/cudf#14707 fixes this.

@ttnghia
Copy link
Collaborator

ttnghia commented Jan 9, 2024

@NvTimLiu The cudf PR above is merged. Please verify in your new environment and close this if the issue no longer exists.

@NvTimLiu
Copy link
Collaborator Author

Test PASS now, close

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants