Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CH] Make shuffle writer exit gracefully when tasks in executors are killed #5823

Open
taiyang-li opened this issue May 21, 2024 · 2 comments
Labels
bug Something isn't working triage

Comments

@taiyang-li
Copy link
Contributor

taiyang-li commented May 21, 2024

Backend

CH (ClickHouse)

Bug description

When spark speculation execution is enabled, some tasks maybe killed by driver because another attempt had finished successfully.
Those tasks failed because they are not exiting gracefully.

image

Spark version

Spark-3.3.x

Spark configurations

--conf spark.speculation=true
    --conf spark.speculation.multiplier=1.1
    --conf spark.speculation.quantile=0.5

System information

No response

Relevant logs

2024-05-22 14:33:45.853 <Information> LocalExecutor: build pipeline 0.314 ms; create executor 0.026 ms;
2024-05-22 14:33:47.230 <Debug> jni: [{"id":1,"name":"kFilter","time":2068,"input_wait_time":2748222,"output_wait_time":1767,"steps":[{"name":"Filter","description":"WHERE","processors":[{"name":"FilterTransform","time":1762,"output_rows":605146,"output_bytes":10892628,"input_rows":605146,"input_bytes":10892628}]},{"name":"Expression","description":"Remove nullable properties","processors":[{"name":"ExpressionTransform","time":306,"output_rows":605146,"output_bytes":10287482,"input_rows":605146,"input_bytes":10892628}]}]},{"id":0,"name":"kRead","time":1332017,"input_wait_time":770,"output_wait_time":3352,"steps":[{"name":"SubstraitFileSourceStep","description":"read local files","processors":[{"name":"SubstraitFileSource","time":1332017,"output_rows":605146,"output_bytes":10892628,"input_rows":0,"input_bytes":0}]}]}]
2024/05/22 14:33:47.244 INFO [dispatcher-Executor] spark.executor.Executor: Executor is trying to kill task 91.0 in stage 6.0 (TID 2398), reason: another attempt succeeded
2024/05/22 14:33:47.245 ERROR [Executor task launch worker for task 91.0 in stage 6.0 (TID 2398)] celeborn.client.ShuffleClientImpl: Exception raised while registering shuffle 6 with 136 mapper and 200 partitions.
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:248)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
	at org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:233)
	at org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:74)
	at org.apache.celeborn.common.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:89)
	at org.apache.celeborn.client.ShuffleClientImpl.lambda$registerShuffle$2(ShuffleClientImpl.java:473)
	at org.apache.celeborn.client.ShuffleClientImpl.registerShuffleInternal(ShuffleClientImpl.java:528)
	at org.apache.celeborn.client.ShuffleClientImpl.registerShuffle(ShuffleClientImpl.java:468)
	at org.apache.celeborn.client.ShuffleClientImpl.lambda$getPartitionLocation$4(ShuffleClientImpl.java:512)
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
	at org.apache.celeborn.common.util.JavaUtils$ConcurrentHashMapForJDK8.computeIfAbsent(JavaUtils.java:472)
	at org.apache.celeborn.client.ShuffleClientImpl.getPartitionLocation(ShuffleClientImpl.java:511)
	at org.apache.celeborn.client.ShuffleClientImpl.pushOrMergeData(ShuffleClientImpl.java:794)
	at org.apache.celeborn.client.ShuffleClientImpl.mergeData(ShuffleClientImpl.java:1189)
	at org.apache.spark.shuffle.CelebornPartitionPusher.pushPartitionData(CelebornPartitionPusher.scala:61)
	at org.apache.gluten.vectorized.CHShuffleSplitterJniWrapper.stop(Native Method)
	at org.apache.spark.shuffle.CHCelebornHashBasedColumnarShuffleWriter.internalWrite(CHCelebornHashBasedColumnarShuffleWriter.scala:120)
	at org.apache.spark.shuffle.CelebornHashBasedColumnarShuffleWriter.write(CelebornHashBasedColumnarShuffleWriter.scala:91)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Exception in thread "Executor task launch worker for task 91.0 in stage 6.0 (TID 2398)" 2024-05-22 14:33:47.263 <Error> local_engine: Enter java exception handle.
org.apache.celeborn.common.exception.CelebornIOException: Register shuffle failed for shuffle 6.
	at org.apache.celeborn.client.ShuffleClientImpl.pushOrMergeData(ShuffleClientImpl.java:797)
	at org.apache.celeborn.client.ShuffleClientImpl.mergeData(ShuffleClientImpl.java:1189)
	at org.apache.spark.shuffle.CelebornPartitionPusher.pushPartitionData(CelebornPartitionPusher.scala:61)
	at org.apache.gluten.vectorized.CHShuffleSplitterJniWrapper.stop(Native Method)
	at org.apache.spark.shuffle.CHCelebornHashBasedColumnarShuffleWriter.internalWrite(CHCelebornHashBasedColumnarShuffleWriter.scala:120)
	at org.apache.spark.shuffle.CelebornHashBasedColumnarShuffleWriter.write(CelebornHashBasedColumnarShuffleWriter.scala:91)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
2024-05-22 14:33:47.367 <Error> Allocator::free: Code: 50. DB::Exception: org.apache.celeborn.common.exception.CelebornIOException: Register shuffle failed for shuffle 6. (UNKNOWN_TYPE), Stack trace (when copying this message, always include the lines below):

0. ./contrib/llvm-project/libcxx/include/exception:141: Poco::Exception::Exception(String const&, int) @ 0x0000000012241c79
1. ./build_gcc/./src/Common/Exception.cpp:101: DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000aebb7bb
2. ./contrib/llvm-project/libcxx/include/string:1499: DB::Exception::createRuntime(int, String&) @ 0x00000000060a1b0c
3. ./utils/extern-local-engine/jni/jni_common.h:110: void local_engine::safeCallVoidMethod<long>(JNIEnv_*, _jobject*, _jmethodID*, long) @ 0x000000000b3028c5
4. ./build_gcc/./utils/extern-local-engine/jni/ReservationListenerWrapper.cpp:58: local_engine::ReservationListenerWrapper::free(long) @ 0x000000000b3029c9
5. ./contrib/llvm-project/libcxx/include/__functional/function.h:0: ? @ 0x000000000ae8e010
6. ./build_gcc/./src/Common/Allocator.cpp:128: Allocator<false, false>::free(void*, unsigned long) @ 0x000000000ae8c787
7. ./src/IO/BufferWithOwnMemory.h:141: local_engine::CompressedWriteBuffer::~CompressedWriteBuffer() @ 0x000000000b2bef18
8. ./contrib/llvm-project/libcxx/include/__memory/shared_ptr.h:701: local_engine::CelebornPartitionWriter::unsafeEvictSinglePartition(bool, bool, unsigned long)::$_0::operator()() const @ 0x000000000b2b9049
9. ./src/Common/Stopwatch.h:72: local_engine::CelebornPartitionWriter::unsafeEvictSinglePartition(bool, bool, unsigned long) @ 0x000000000b2b8829
10. ./build_gcc/./utils/extern-local-engine/Shuffle/PartitionWriter.cpp:508: local_engine::CelebornPartitionWriter::unsafeEvictPartitions(bool, bool) @ 0x000000000b2b855e
11. ./build_gcc/./utils/extern-local-engine/Shuffle/PartitionWriter.cpp:582: local_engine::CelebornPartitionWriter::unsafeStop() @ 0x000000000b2b9231
12. ./build_gcc/./utils/extern-local-engine/Shuffle/PartitionWriter.cpp:321: local_engine::PartitionWriter::stop() @ 0x000000000b2b5a9a
13. ./build_gcc/./utils/extern-local-engine/Shuffle/CachedShuffleWriter.cpp:152: local_engine::CachedShuffleWriter::stop() @ 0x000000000b2a8343
14. ./contrib/llvm-project/libcxx/include/vector:537: Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_stop @ 0x0000000006092a60
 (version 24.5.1.1)
libc++abi: terminating due to uncaught exception of type DB::Exception: org.apache.celeborn.common.exception.CelebornIOException: Register shuffle failed for shuffle 6.
@taiyang-li taiyang-li added bug Something isn't working triage labels May 21, 2024
@taiyang-li
Copy link
Contributor Author

taiyang-li commented May 22, 2024

问题原因:

  1. spark推测执行时,当前task由于"another attempt succeeded"而被driver kill
2024/05/22 14:33:47.244 INFO [dispatcher-Executor] spark.executor.Executor: Executor is trying to kill task 91.0 in stage 6.0 (TID 2398), reason: another attempt succeeded

image

  1. java thread当前正在执行ShuffleClientImpl.doPushMergeData,由于kill触发了java.lang.InterruptedException异常,该函数catch住异常并生成新的org.apache.celeborn.common.exception.CelebornIOException异常
    // do push merged data
    try {
      if (!isPushTargetWorkerExcluded(batches.get(0).loc, wrappedCallback)) {
        if (!testRetryRevive || remainReviveTimes < 1) {
          assert dataClientFactory != null;
          TransportClient client = dataClientFactory.createClient(host, port);
          client.pushMergedData(mergedData, pushDataTimeout, wrappedCallback);
        } else {
          wrappedCallback.onFailure(
              new CelebornIOException(
                  StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE_PRIMARY,
                  new RuntimeException("Mock push merge data failed.")));
        }
      }
    } catch (Exception e) {
      logger.error(
          "Exception raised while pushing merged data for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {} location {}.",
          shuffleId,
          mapId,
          attemptId,
          Arrays.toString(partitionIds),
          groupedBatchId,
          Arrays.toString(batchIds),
          addressPair,
          e);
      wrappedCallback.onFailure(
          new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY, e));
    }
  1. 此时clickhouse backend中c++代码catch住jvm中抛出的java.lang.InterruptedException异常,并重新throw DB::Exception异常。
libc++abi: terminating due to uncaught exception of type DB::Exception: org.apache.celeborn.common.exception.CelebornIOException: Register shuffle failed for shuffle 6.
#define LOCAL_ENGINE_JNI_JMETHOD_END(env) \
    if ((env)->ExceptionCheck()) \
    { \
        LOG_ERROR(&Poco::Logger::get("local_engine"), "Enter java exception handle."); \
        auto excp = (env)->ExceptionOccurred(); \
        (env)->ExceptionDescribe(); \
        (env)->ExceptionClear(); \
        jclass cls = (env)->GetObjectClass(excp); \
        jmethodID mid = env->GetMethodID(cls, "toString", "()Ljava/lang/String;"); \
        jstring jmsg = static_cast<jstring>((env)->CallObjectMethod(excp, mid)); \
        const char * nmsg = (env)->GetStringUTFChars(jmsg, NULL); \
        std::string msg = std::string(nmsg); \
        env->ReleaseStringUTFChars(jmsg, nmsg); \
        throw DB::Exception::createRuntime(DB::ErrorCodes::LOGICAL_ERROR, msg); \
  1. jni接口Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_stop中,通过try catch语句将DB::Exception再次转化为java exception(GlutenException)。但是在进入catch块中时,try中的资源发生释放,调用CompressedWriteBuffer的析构函数,最终调用了ReservationListenerWrapper::free,由于thread已经是interrupted状态,ReservationListenerWrapper::free抛出异常。总结起来就是,我们在jni接口中catch老异常之前,析构try中资源时又抛出了新异常,此时c++直接terminate, executor进程异常退出。Task状态被标记成FAILED

@taiyang-li
Copy link
Contributor Author

taiyang-li commented May 22, 2024

解决:避免析构函数中抛出异常,这也是c++开发的一个准则:修改ReservationListenerWrapper::free接口,如果jvm中返回了异常,清理异常状态并打印warning日志,但是不throw DB::Exception。

liuneng1994 pushed a commit that referenced this issue Jun 6, 2024
… in executors are killed or interrupted (#5839)

What changes were proposed in this pull request?
Changes:

Clean code: remove useless JNIs and classes under cpp-ch
Support cancel for all gluten processors. It was triggered when task is killed or shut down.
Make sure offheap memory free does not throw exception. Ref: https://zhuanlan.zhihu.com/p/65454580
(Fixes: #5787 #5823)

How was this patch tested?
Manual
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage
Projects
None yet
Development

No branches or pull requests

1 participant