Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Problem] Inconsistent blocks when reading shuffle data #198

Closed
zuston opened this issue Sep 5, 2022 · 43 comments
Closed

[Problem] Inconsistent blocks when reading shuffle data #198

zuston opened this issue Sep 5, 2022 · 43 comments

Comments

@zuston
Copy link
Member

zuston commented Sep 5, 2022

I found some tasks of spark jobs will throw the exceptions that the inconsistent blocks number. The stacktrace is as follows

22/09/03 15:29:21 ERROR Executor: Exception in task 330.0 in stage 9.0 (TID 59001)
org.apache.uniffle.common.exception.RssException: Blocks read inconsistent: expected 30000 blocks, actual 15636 blocks
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.checkProcessedBlockIds(ShuffleReadClientImpl.java:215)
	at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:135)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)

I didn't find any error/warn log in shuffle server which stored the corresponding partition data.

We dont set any replica config and directly use the MEMORY_LOCALFILE storageType. Does this exception caused by the disk error?

@jerqi
Copy link
Contributor

jerqi commented Sep 5, 2022

It may be a bug of our rss. I ever try to fix the similar problems by #40 and Tencent/Firestorm#92. Could you reproduce this problem? Are the missing blocks disk storage or memory storage?

@zuston
Copy link
Member Author

zuston commented Sep 5, 2022

Could you reproduce this problem?

It occurs in our users' jobs and maybe I have to write test spark code to reproduce

Are the missing blocks disk storage or memory storage?

I dont know which storage's block missing. But I check the request number in shuffle server and the request log entry number in tasks, they are equal.

@zuston
Copy link
Member Author

zuston commented Sep 5, 2022

It may be a bug of our rss. I ever try to fix the similar problems by #40 and Tencent/Firestorm#92

It looks this problem is not the same as the problem you said. This problem is caused by lacking partial blocks.

@smallzhongfeng
Copy link
Contributor

22/09/05 20:58:01 INFO impl.HdfsShuffleReadHandler: Read index files hdfs://ns1/tmp/4/308-308/xxxx-19999_0.index for 2 ms
22/09/05 20:58:01 WARN util.RssUtils: Read index data under flow
java.nio.BufferUnderflowException
	at java.nio.Buffer.nextGetIndex(Buffer.java:506)
	at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:412)
	at org.apache.uniffle.common.util.RssUtils.transIndexDataToSegments(RssUtils.java:204)
	at org.apache.uniffle.common.util.RssUtils.transIndexDataToSegments(RssUtils.java:188)
	at org.apache.uniffle.storage.handler.impl.DataSkippableReadHandler.readShuffleData(DataSkippableReadHandler.java:67)
	at org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler.readShuffleData(HdfsClientReadHandler.java:135)
	at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:112)
	at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:140)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:195)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:131)
	at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:100)
	at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:213)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)

We seem to have the same problem, but the reason for the inconsistent blocks number is that an exception occurred when the index file was read.

@jerqi
Copy link
Contributor

jerqi commented Sep 6, 2022

transIndexDataToSegments

It seems that we read incomplete index file.

@zuston
Copy link
Member Author

zuston commented Sep 6, 2022

transIndexDataToSegments

It seems that we read incomplete index file.

We should fail fast when encountering BufferUnderflowException instead of ignoring.

LOGGER.warn("Read index data under flow", ue);

@jerqi
Copy link
Contributor

jerqi commented Sep 6, 2022

For MEMORY_HDFS or MEMORY_LOCALFILE, it's normal to read incomplete index data, so we choose to ignore instead of failing fast.

@smallzhongfeng
Copy link
Contributor

We should fail fast when encountering BufferUnderflowException instead of ignoring.

What I am curious about is whether this is caused by the fact that the data of the stream is not flushed normally or the stream is closed abnormally.

@jerqi
Copy link
Contributor

jerqi commented Sep 6, 2022

We should fail fast when encountering BufferUnderflowException instead of ignoring.

What I am curious about is whether this is caused by the fact that the data of the stream is not flushed normally or the stream is closed abnormally.

Maybe shuffle index is writing..

@zuston
Copy link
Member Author

zuston commented Sep 6, 2022

For MEMORY_HDFS or MEMORY_LOCALFILE, it's normal to read incomplete index data, so we choose to ignore instead of failing fast.

Oh, make sense.

We should fail fast when encountering BufferUnderflowException instead of ignoring.

What I am curious about is whether this is caused by the fact that the data of the stream is not flushed normally or the stream is closed abnormally.

Maybe shuffle index is writing..

Firstly, maybe we should reproduce this exception that make reader/writer operate this file at the same time.

@zuston
Copy link
Member Author

zuston commented Sep 6, 2022

Revisit: do we need to fail fast?

As I know, the exception of BufferUnderflowException is caused by the incomplete FileBasedShuffleSegment. In localfile reader, it will always get the n pieces of FileBasedShuffleSegment and then wont throw the exception. However in HDFS index read, it may return the incomplete data ((this problem will happen when spill data from memory to HDFS, so we also should do similar operation like local index file read). After that, we should do fail fast in

LOGGER.warn("Read index data under flow", ue);

Is inconsistent blocks caused by this BufferUnderflowException?

The operation of spilling is a common action for uniffle and it has done the operation of filtering blocks. So this inconsistent blocks means it exist bug when spilling

@leixm
Copy link
Contributor

leixm commented Sep 7, 2022

Our production environment also had such an exception once, but so far it has not been able to reproduce.

@leixm
Copy link
Contributor

leixm commented Sep 7, 2022

I think maybe the shuffle server is writing this file and the output stream has not been closed.

@leixm
Copy link
Contributor

leixm commented Sep 11, 2022

Can you help me understand #issue_63? @jerqi
When we use the storageType MEMORY_HDFS, MEMORY_LOCALFILE_HDFS, after we read data from memory, the data may be flushed to HDFS. So we could read incomplete index data.
org.apache.uniffle.storage.handler.impl.HdfsFileWriter#writeIndex is a complete operation, it looks like it shouldn't write an incomplete index file.

@leixm
Copy link
Contributor

leixm commented Sep 11, 2022

In our production environment, certain tasks will have this problem for a certain period of time. I am trying to reproduce this situation and keep the index file and data file.

@jerqi
Copy link
Contributor

jerqi commented Sep 13, 2022

Can you help me understand #issue_63? @jerqi When we use the storageType MEMORY_HDFS, MEMORY_LOCALFILE_HDFS, after we read data from memory, the data may be flushed to HDFS. So we could read incomplete index data. org.apache.uniffle.storage.handler.impl.HdfsFileWriter#writeIndex is a complete operation, it looks like it shouldn't write an incomplete index file.

If you use MEMORY_HDFS or MEMORY_LOCALFILE_HDFS, you can read data from memory and then you will read data from HDFS, but the data of memory may be flushed, so you can read the incomplete data of memory flushed to HDFS.

@zuston
Copy link
Member Author

zuston commented Sep 13, 2022

After collecting all the failed tasks whose exception is related to org.apache.uniffle.common.exception.RssException: Blocks read inconsistent, I found these exception is caused by the DEADLINE_EXCEEDED of GRPC remote call, which is caught by the Uniffle and dont throw it. This made us confused and thought it was a w/r bug.

I think I will raise a PR to fix it.

@zuston
Copy link
Member Author

zuston commented Sep 14, 2022

Follow up this problem.

I found the Grpc client sometimes will throw DEADLINE exception like as follows

org.apache.uniffle.common.exception.RssException: Failed to read shuffle data with ShuffleServerGrpcClient for host[10.67.67.68], port[21000] due to DEADLINE_EXCEEDED: deadline exceeded after 59.999946594s. [closed=[], committed=[remote_addr=10.67.67.68/10.67.67.68:21000]]
	at org.apache.uniffle.storage.handler.impl.LocalFileClientRemoteReadHandler.readShuffleData(LocalFileClientRemoteReadHandler.java:88)
	at org.apache.uniffle.storage.handler.impl.DataSkippableReadHandler.readShuffleData(DataSkippableReadHandler.java:83)
	at org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler.readShuffleData(LocalFileClientReadHandler.java:79)
	at org.apache.uniffle.storage.handler.impl.LocalFileQuorumClientReadHandler.readShuffleData(LocalFileQuorumClientReadHandler.java:79)
	at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:112)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:195)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:131)
	at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:101)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:238)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage20.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage20.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
	at org.apache.spark.sql.execution.joins.SortMergeFullOuterJoinScanner.advancedRight(SortMergeJoinExec.scala:1000)
	at org.apache.spark.sql.execution.joins.SortMergeFullOuterJoinScanner.<init>(SortMergeJoinExec.scala:975)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.$anonfun$doExecute$1(SortMergeJoinExec.scala:220)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
22/09/13 10:43:48 ERROR ComposedClientReadHandler: Failed to read shuffle data from WARM handler

But I found this response has been sent by shuffle server, but the client side still throw exception. What will cause this? Network? GC?

Did you meet similar problems? @jerqi

@jerqi
Copy link
Contributor

jerqi commented Sep 14, 2022

Follow up this problem.

I found the Grpc client sometimes will throw DEADLINE exception like as follows

org.apache.uniffle.common.exception.RssException: Failed to read shuffle data with ShuffleServerGrpcClient for host[10.67.67.68], port[21000] due to DEADLINE_EXCEEDED: deadline exceeded after 59.999946594s. [closed=[], committed=[remote_addr=10.67.67.68/10.67.67.68:21000]]
	at org.apache.uniffle.storage.handler.impl.LocalFileClientRemoteReadHandler.readShuffleData(LocalFileClientRemoteReadHandler.java:88)
	at org.apache.uniffle.storage.handler.impl.DataSkippableReadHandler.readShuffleData(DataSkippableReadHandler.java:83)
	at org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler.readShuffleData(LocalFileClientReadHandler.java:79)
	at org.apache.uniffle.storage.handler.impl.LocalFileQuorumClientReadHandler.readShuffleData(LocalFileQuorumClientReadHandler.java:79)
	at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:112)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:195)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:131)
	at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:101)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:238)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage20.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage20.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
	at org.apache.spark.sql.execution.joins.SortMergeFullOuterJoinScanner.advancedRight(SortMergeJoinExec.scala:1000)
	at org.apache.spark.sql.execution.joins.SortMergeFullOuterJoinScanner.<init>(SortMergeJoinExec.scala:975)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.$anonfun$doExecute$1(SortMergeJoinExec.scala:220)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
22/09/13 10:43:48 ERROR ComposedClientReadHandler: Failed to read shuffle data from WARM handler

But I found this response has been sent by shuffle server, but the client side still throw exception. What will cause this? Network? GC?

Did you meet similar problems? @jerqi

Response may not be sent by shuffle server timely.

@zuston
Copy link
Member Author

zuston commented Sep 14, 2022

Response may not be sent by shuffle server timely.

Do we have some ways to avoid this problem?

@leixm
Copy link
Contributor

leixm commented Sep 14, 2022

Response may not be sent by shuffle server timely.

Do we have some ways to avoid this problem?

This is the response timeout, which should be caused by the high load of ShuffleServer and slow response.

@leixm
Copy link
Contributor

leixm commented Sep 14, 2022

We can increase the rpc timeout, and find the reason for the slow response of the shuffle server.

@zuston
Copy link
Member Author

zuston commented Sep 14, 2022

Response may not be sent by shuffle server timely.

Do we have some ways to avoid this problem?

This is the response timeout, which should be caused by the high load of ShuffleServer and slow response.

But I dont find any exception through the metrics of grpc. How did u locate this problem caused by the high-pressure shuffle server?

@zuston
Copy link
Member Author

zuston commented Sep 14, 2022

We can increase the rpc timeout, and find the reason for the slow response of the shuffle server.

It's not a fundamental way of increasing the rpc timeout. And I found the rpc has been accepted by shuffle server(handled log has been shown), maybe it's stucked on sending.

@leixm
Copy link
Contributor

leixm commented Sep 14, 2022

Response may not be sent by shuffle server timely.

Do we have some ways to avoid this problem?

This is the response timeout, which should be caused by the high load of ShuffleServer and slow response.

But I dont find any exception through the metrics of grpc. How did u locate this problem caused by the high-pressure shuffle server?

We had a similar rpc timeout exception before, and the task of running 10T data will appear. The investigation found that it was because the inflush_memory and used_memory were too high, which caused the client to frequently retry to send data and apply for buffer.

@leixm
Copy link
Contributor

leixm commented Sep 14, 2022

We can increase the rpc timeout, and find the reason for the slow response of the shuffle server.

It's not a fundamental way of increasing the rpc timeout. And I found the rpc has been accepted by shuffle server(handled log has been shown), maybe it's stucked on sending.

You're right, not only the service response, but also the network io, etc.

@zuston
Copy link
Member Author

zuston commented Sep 14, 2022

The investigation found that it was because the inflush_memory and used_memory were too high, which caused the client to frequently retry to send data and apply for buffer.

Yes, the failure of requiring memory will also cause this exception, but I didn't find any exception of server about require-memory failed.

Besides, the problem you mentioned I have submit a new PR to fix #218

@Gustfh
Copy link

Gustfh commented Oct 21, 2022

we also met this error, but in fact it's rpc timeout, when we in compose client mode, the one time read exception will be catch, then try to read other storage , and if the result is empty, then the exception will be gone. at last client will check the blocks map, found the Inconsistent blocks issue. so am thinking there is exception confuse, some case is not inconsistent blocks issue, just read timeout maybe cased by high load shuffle server. suggest to change the code to raise another exception for this case to void the confuse

@zuston
Copy link
Member Author

zuston commented Oct 21, 2022

suggest to change the code to raise another exception for this case to void the confuse

+1.

@jerqi
Copy link
Contributor

jerqi commented Oct 21, 2022

we also met this error, but in fact it's rpc timeout, when we in compose client mode, the one time read exception will be catch, then try to read other storage , and if the result is empty, then the exception will be gone. at last client will check the blocks map, found the Inconsistent blocks issue. so am thinking there is exception confuse, some case is not inconsistent blocks issue, just read timeout maybe cased by high load shuffle server. suggest to change the code to raise another exception for this case to void the confuse

+1, you can raise a pr if you have time.

@jerqi
Copy link
Contributor

jerqi commented Oct 25, 2022

we also met this error, but in fact it's rpc timeout, when we in compose client mode, the one time read exception will be catch, then try to read other storage , and if the result is empty, then the exception will be gone. at last client will check the blocks map, found the Inconsistent blocks issue. so am thinking there is exception confuse, some case is not inconsistent blocks issue, just read timeout maybe cased by high load shuffle server. suggest to change the code to raise another exception for this case to void the confuse

What you want seems to be implemented in this pr #276? Is it right?

@zuston
Copy link
Member Author

zuston commented Oct 25, 2022

we also met this error, but in fact it's rpc timeout, when we in compose client mode, the one time read exception will be catch, then try to read other storage , and if the result is empty, then the exception will be gone. at last client will check the blocks map, found the Inconsistent blocks issue. so am thinking there is exception confuse, some case is not inconsistent blocks issue, just read timeout maybe cased by high load shuffle server. suggest to change the code to raise another exception for this case to void the confuse

What you want seems to be implemented in this pr #276? Is it right?

#276 just fix the block inconsistency in multiple replica. I think we just fast fail when encountering rpc timeout in this issue.

@zuston zuston closed this as completed Jan 19, 2023
@smlHao
Copy link

smlHao commented Jul 13, 2023

we also met this error, but in fact it's rpc timeout, when we in compose client mode, the one time read exception will be catch, then try to read other storage , and if the result is empty, then the exception will be gone. at last client will check the blocks map, found the Inconsistent blocks issue. so am thinking there is exception confuse, some case is not inconsistent blocks issue, just read timeout maybe cased by high load shuffle server. suggest to change the code to raise another exception for this case to void the confuse

What you want seems to be implemented in this pr #276? Is it right?

#276 just fix the block inconsistency in multiple replica. I think we just fast fail when encountering rpc timeout in this issue.

hi, I had meet same error, my server conf settting rss.storage.type MEMORY_LOCALFILE_HDFS , there is 10 millions records, spark sql is : insert into ssb_new.lineorder_partition_sml_parquet_partition9
select lo_orderkey, lo_linenumber, lo_custkey,
lo_partkey, lo_suppkey, lo_orderpriority,
lo_shippriority, lo_quantity, lo_extendedprice,
lo_ordtotalprice, lo_discount, lo_revenue,
lo_tax, lo_commitdate, lo_shipmode, lo_supplycost,
lo_orderdate,lo_orderdate " +
from ssb_new.lineorder_partition_8 DISTRIBUTE BY lo_orderdate;

Driver logs have Warn log :
2023-07-13 07:02:18,626 [task-result-getter-1] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 16.0 in stage 3.0 (TID 949) (182.166.8.117 executor 6): org.apache.uniffle.common.exception.RssException: Failed to read shuffle data from HOT handler
at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:102)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:236)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:162)
at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:104)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:243)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.uniffle.common.exception.RssException: Failed to read in memory shuffle data with ShuffleServerGrpcClient for host[172.100.3.70], port[20000] due to io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 59.999773883s. [closed=[], committed=[remote_addr=172.100.3.70/172.100.3.70:20000]]
at org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler.readShuffleData(MemoryClientReadHandler.java:77)
at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:100)
... 25 more

There is No FULL GC and server.out error happened : 警告: Stream Error
io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:172)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:481)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$1.onStreamClosed(DefaultHttp2RemoteFlowController.java:105)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed(DefaultHttp2Connection.java:357)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams(DefaultHttp2Connection.java:1007)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.deactivate(DefaultHttp2Connection.java:963)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:515)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:521)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.closeStream(Http2ConnectionHandler.java:613)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.processRstStreamWriteResult(Http2ConnectionHandler.java:894)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.access$1000(Http2ConnectionHandler.java:66)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$4.operationComplete(Http2ConnectionHandler.java:809)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$4.operationComplete(Http2ConnectionHandler.java:806)
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at io.grpc.netty.shaded.io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717)
at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272)
at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352)
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel.writeBytesMultiple(AbstractEpollStreamChannel.java:305)
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel.doWriteMultiple(AbstractEpollStreamChannel.java:510)
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel.doWrite(AbstractEpollStreamChannel.java:422)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:949)
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.flush0(AbstractEpollChannel.java:557)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:913)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:189)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:967)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannel.flush(AbstractChannel.java:254)
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.flush(WriteQueue.java:147)
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.access$000(WriteQueue.java:34)
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue$1.run(WriteQueue.java:46)
at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)

@zuston @leixm Hi, I find ShuffleServerNettyHandler.RPC_TIMEOUT is hard coded by 60000 MILLISECONDS

Do you have ways to find why or slove ?

@zuston
Copy link
Member Author

zuston commented Jul 17, 2023

The key point is the client request timeout. I think you should check the shuffle-server metric to find out whether the grpc thread pool is enough and grpc blocking queue size is high?

@smlHao
Copy link

smlHao commented Jul 17, 2023

The key point is the client request timeout. I think you should check the shuffle-server metric to find out whether the grpc thread pool is enough and grpc blocking queue size is high?

@zuston Hi, Thanks for fast reply !!! Do you mean check the shuffle-server metric : grpc_server_executor_active_threads and grpc_server_executor_blocking_queue_size ? need to adjust which key in shuffle-server conf ?

@zuston
Copy link
Member Author

zuston commented Jul 17, 2023

Let's see the metric firstly.

@smlHao
Copy link

smlHao commented Jul 17, 2023

Let's see the metric firstly.

ok , I will rerun the sql and check the metrics , thanks !!!

@smlHao
Copy link

smlHao commented Jul 17, 2023

grpc_server_executor_blocking_queue_size

Hi, I rerun the sql, don`t reproduce the error , the metrics dashboard as follow:

image

server.conf :
rss.rpc.server.port 20000
rss.jetty.http.port 20001
rss.storage.basePath /app/rss-0.7.1/data
rss.storage.type MEMORY_LOCALFILE_HDFS
rss.coordinator.quorum 172.100.3.70:19999,172.100.3.71:19999,172.100.3.72:19999
rss.server.disk.capacity 50g

rss.server.flush.thread.alive 5
rss.server.flush.threadPool.size 10
rss.server.buffer.capacity 40g
rss.server.read.buffer.capacity 20g
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
rss.server.commit.timeout 600000
rss.server.app.expired.withoutHeartbeat 120000
rss.server.disk.capacity 50g
rss.server.flush.cold.storage.threshold.size 100g

@zuston
Copy link
Member Author

zuston commented Jul 17, 2023

From the dashboard metrics, it looks good. Please attach pictures when this error happens and then ping me. Thanks.

@smlHao
Copy link

smlHao commented Jul 17, 2023

From the dashboard metrics, it looks good. Please attach pictures when this error happens and then ping me. Thanks.

@zuston
ok,I will increase data and test with same conf setting , see whether can reproduce the error , if error happend , I will attach pictures. is there other metrics to see ? thanks !!!

@smlHao
Copy link

smlHao commented Jul 20, 2023

From the dashboard metrics, it looks good. Please attach pictures when this error happens and then ping me. Thanks.

@zuston hi, when huge table join huge table, shuffle server have blocked threads , Is it right?
image

image

image

the executor have no daemon thread holding and hava no error log
image
image

@jerqi
Copy link
Contributor

jerqi commented Jul 20, 2023

From the dashboard metrics, it looks good. Please attach pictures when this error happens and then ping me. Thanks.

@zuston hi, when huge table join huge table, shuffle server have blocked threads , Is it right? image

image

image

the executor have no daemon thread holding and hava no error log image image

Could you create a new issue?

@smlHao
Copy link

smlHao commented Jul 20, 2023

From the dashboard metrics, it looks good. Please attach pictures when this error happens and then ping me. Thanks.

@zuston hi, when huge table join huge table, shuffle server have blocked threads , Is it right? image
image
image
the executor have no daemon thread holding and hava no error log image image

Could you create a new issue?

ok , thanks !!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants