Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark过滤过程中出现StarrocksInternalException异常 #83

Open
xiaoqubb opened this issue Sep 7, 2023 · 0 comments
Open

spark过滤过程中出现StarrocksInternalException异常 #83

xiaoqubb opened this issue Sep 7, 2023 · 0 comments

Comments

@xiaoqubb
Copy link

xiaoqubb commented Sep 7, 2023

我用spark加载starRocks数据 和 deltaLake数据湖数据 对比分析, 数据量12亿, deltaLake可以正常处理, 花费了4秒, Starrocks在3分29秒后出现异常, 我测试方法如下:
deltalake测试:
%spark
val df2 = spark.read.format("delta").load("hdfs://172.16.xx.xxx:9000/users/.d/202309/8709141d4cac4a1a8861b26be198392b_A卷数据12亿测试.d");
df2.filter("A103_VALUE == '3'").count()

res9: Long = 203581440
Took 4 sec.

Starrocks测试:
%spark
val df4 = spark.read.format("starrocks")
.option("starrocks.table.identifier", "ssb.biga")
.option("starrocks.fe.http.url", "172.16.xxx.xxx:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://172.16.xxx.xxx:9030")
.option("starrocks.user", "root")
.option("starrocks.password", "")
.load();
df4.filter("A103_VALUE == '3'").count()

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 42.0 failed 4 times, most recent failure: Lost task 0.3 in stage 42.0 (TID 1377) (10.245.222.145 executor 4): com.starrocks.connector.spark.exception.StarrocksInternalException: StarRocks server StarRocks BE{host='172.16.11.63', port=9060} internal failed, status code [CANCELLED] error message is [Set cancelled by MemoryScratchSinkOperator]
at com.starrocks.connector.spark.backend.BackendClient.getNext(BackendClient.java:185)
at com.starrocks.connector.spark.rdd.ScalaValueReader.hasNext(ScalaValueReader.scala:197)
at com.starrocks.connector.spark.rdd.AbstractStarrocksRDDIterator.hasNext(AbstractStarrocksRDDIterator.scala:58)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: com.starrocks.connector.spark.exception.StarrocksInternalException: StarRocks server StarRocks BE{host='172.16.11.63', port=9060} internal failed, status code [CANCELLED] error message is [Set cancelled by MemoryScratchSinkOperator]
at com.starrocks.connector.spark.backend.BackendClient.getNext(BackendClient.java:185)
at com.starrocks.connector.spark.rdd.ScalaValueReader.hasNext(ScalaValueReader.scala:197)
at com.starrocks.connector.spark.rdd.AbstractStarrocksRDDIterator.hasNext(AbstractStarrocksRDDIterator.scala:58)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
SPARK JOB ERROR
Took 3 min 29 sec.

我知道StarRocks支持 starrocks.filter.query传递过滤表达式可以在返回dataframe时减少数据量, 但有时候这种方式不适合交互式探索场景, 同样数据量以及同样的操作方式,deltaLake可以在4秒返回, 我想是它的connector支持了谓词下推的能力, 而咱们的connector是把所有数据拉到spark执行引擎后再filter,导致异常

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant