From c157123d2280b2d455bf546eedcd98d7f4ba10dd Mon Sep 17 00:00:00 2001 From: Sanskar Modi Date: Wed, 6 May 2026 14:44:27 +0530 Subject: [PATCH 1/3] [CELEBORN-1577][FOLLOWUP] Fix backward compatiblity issue with interrupt shuffle --- .../org/apache/celeborn/client/ApplicationHeartbeater.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala index e91e236b6c4..f6aecf8e539 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala @@ -164,7 +164,7 @@ class ApplicationHeartbeater( } private def checkQuotaExceeds(response: CheckQuotaResponse): Unit = { - if (conf.quotaInterruptShuffleEnabled && !response.isAvailable) { + if (conf.quotaInterruptShuffleEnabled && !response.isAvailable && response.reason.nonEmpty) { cancelAllActiveStages( s"Application interrupted caused by storage quota exceeded, reason: ${response.reason}") } From 61c5084275ed78e6e32187d3d122d86afea1d608 Mon Sep 17 00:00:00 2001 From: Sanskar Modi Date: Tue, 2 Jun 2026 17:43:43 +0530 Subject: [PATCH 2/3] Fix code --- .../apache/celeborn/client/ApplicationHeartbeater.scala | 2 +- .../common/protocol/message/ControlMessages.scala | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala index f6aecf8e539..e91e236b6c4 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala @@ -164,7 +164,7 @@ class ApplicationHeartbeater( } private def checkQuotaExceeds(response: CheckQuotaResponse): Unit = { - if (conf.quotaInterruptShuffleEnabled && !response.isAvailable && response.reason.nonEmpty) { + if (conf.quotaInterruptShuffleEnabled && !response.isAvailable) { cancelAllActiveStages( s"Application interrupted caused by storage quota exceeded, reason: ${response.reason}") } diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 36f164d697e..3c7b8e0b7ad 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -1364,7 +1364,12 @@ object ControlMessages extends Logging { case HEARTBEAT_FROM_APPLICATION_RESPONSE_VALUE => val pbHeartbeatFromApplicationResponse = PbHeartbeatFromApplicationResponse.parseFrom(message.getPayload) - val pbCheckQuotaResponse = pbHeartbeatFromApplicationResponse.getCheckQuotaResponse + val checkQuotaResponse = if (pbHeartbeatFromApplicationResponse.hasCheckQuotaResponse) { + val pbCheckQuotaResponse = pbHeartbeatFromApplicationResponse.getCheckQuotaResponse + CheckQuotaResponse(pbCheckQuotaResponse.getAvailable, pbCheckQuotaResponse.getReason) + } else { + CheckQuotaResponse(isAvailable = true, "") + } HeartbeatFromApplicationResponse( StatusCode.fromValue(pbHeartbeatFromApplicationResponse.getStatus), pbHeartbeatFromApplicationResponse.getExcludedWorkersList.asScala @@ -1374,7 +1379,7 @@ object ControlMessages extends Logging { pbHeartbeatFromApplicationResponse.getShuttingWorkersList.asScala .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava, pbHeartbeatFromApplicationResponse.getRegisteredShufflesList, - CheckQuotaResponse(pbCheckQuotaResponse.getAvailable, pbCheckQuotaResponse.getReason)) + checkQuotaResponse) case CHECK_QUOTA_VALUE => val pbCheckAvailable = PbCheckQuota.parseFrom(message.getPayload) From e5c63c7da2ce97b514855664ebd6511ba775ec1b Mon Sep 17 00:00:00 2001 From: Sanskar Modi Date: Tue, 2 Jun 2026 18:36:37 +0530 Subject: [PATCH 3/3] Add tests --- .../protocol/message/ControlMessages.scala | 13 ++--- .../common/util/PbSerDeUtilsTest.scala | 53 ++++++++++++++++++- 2 files changed, 58 insertions(+), 8 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 3c7b8e0b7ad..e12d4b697f7 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -1364,12 +1364,13 @@ object ControlMessages extends Logging { case HEARTBEAT_FROM_APPLICATION_RESPONSE_VALUE => val pbHeartbeatFromApplicationResponse = PbHeartbeatFromApplicationResponse.parseFrom(message.getPayload) - val checkQuotaResponse = if (pbHeartbeatFromApplicationResponse.hasCheckQuotaResponse) { - val pbCheckQuotaResponse = pbHeartbeatFromApplicationResponse.getCheckQuotaResponse - CheckQuotaResponse(pbCheckQuotaResponse.getAvailable, pbCheckQuotaResponse.getReason) - } else { - CheckQuotaResponse(isAvailable = true, "") - } + val checkQuotaResponse = + if (pbHeartbeatFromApplicationResponse.hasCheckQuotaResponse) { + val pbCheckQuotaResponse = pbHeartbeatFromApplicationResponse.getCheckQuotaResponse + CheckQuotaResponse(pbCheckQuotaResponse.getAvailable, pbCheckQuotaResponse.getReason) + } else { + CheckQuotaResponse(isAvailable = true, "") + } HeartbeatFromApplicationResponse( StatusCode.fromValue(pbHeartbeatFromApplicationResponse.getStatus), pbHeartbeatFromApplicationResponse.getExcludedWorkersList.asScala diff --git a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala index 5b8fe9979a1..47b618b566e 100644 --- a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala +++ b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala @@ -31,10 +31,11 @@ import org.apache.hadoop.shaded.org.apache.commons.lang3.RandomStringUtils import org.apache.celeborn.CelebornFunSuite import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.meta._ -import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionType, PbFileInfo, PbPackedWorkerResource, PbWorkerResource, StorageInfo} +import org.apache.celeborn.common.network.protocol.TransportMessage +import org.apache.celeborn.common.protocol.{MessageType, PartitionLocation, PartitionType, PbFileInfo, PbHeartbeatFromApplicationResponse, PbPackedWorkerResource, PbWorkerResource, StorageInfo} import org.apache.celeborn.common.protocol.PartitionLocation.Mode import org.apache.celeborn.common.protocol.message.{ControlMessages, StatusCode} -import org.apache.celeborn.common.protocol.message.ControlMessages.{GetReducerFileGroupResponse, WorkerResource} +import org.apache.celeborn.common.protocol.message.ControlMessages.{CheckQuotaResponse, GetReducerFileGroupResponse, HeartbeatFromApplicationResponse, WorkerResource} import org.apache.celeborn.common.quota.ResourceConsumption import org.apache.celeborn.common.util.PbSerDeUtils.{fromPbPackedPartitionLocationsPair, toPbPackedPartitionLocationsPair, toPbUserIdentifier} import org.apache.celeborn.common.write.LocationPushFailedBatches @@ -806,4 +807,52 @@ class PbSerDeUtilsTest extends CelebornFunSuite { assert(restoredFailedBatch.equals(failedBatch)) } + test("fromAndToHeartbeatFromApplicationResponse") { + val heartbeatFromApplicationResponse = HeartbeatFromApplicationResponse( + StatusCode.SUCCESS, + mockWorkers("host0").toList.asJava, + mockWorkers("host1").toList.asJava, + mockWorkers("host2").toList.asJava, + Array(Integer.valueOf(1)).toList.asJava, + CheckQuotaResponse(isAvailable = false, "test_reason")) + val toTransportHeartbeatFromApplicationResponse = + ControlMessages.toTransportMessage(heartbeatFromApplicationResponse) + val fromTransportHeartbeatFromApplicationResponse = + ControlMessages.fromTransportMessage(toTransportHeartbeatFromApplicationResponse) + .asInstanceOf[HeartbeatFromApplicationResponse] + + assert(fromTransportHeartbeatFromApplicationResponse.equals(heartbeatFromApplicationResponse)) + } + + test("HeartbeatFromApplicationResponse backward compatibility without checkQuotaResponse") { + val payload = PbHeartbeatFromApplicationResponse.newBuilder() + .setStatus(StatusCode.SUCCESS.getValue) + .addAllExcludedWorkers( + mockWorkers("host0").map(PbSerDeUtils.toPbWorkerInfo( + _, + true, + true)).toList.asJava) + .addAllUnknownWorkers( + mockWorkers("host1").map(PbSerDeUtils.toPbWorkerInfo( + _, + true, + true)).toList.asJava) + .addAllShuttingWorkers( + mockWorkers("host2").map(PbSerDeUtils.toPbWorkerInfo( + _, + true, + true)).toList.asJava) + .addAllRegisteredShuffles(Array(Integer.valueOf(1)).toList.asJava) + .build().toByteArray + val fromTransportHeartbeatFromApplicationResponse = ControlMessages.fromTransportMessage( + new TransportMessage(MessageType.HEARTBEAT_FROM_APPLICATION_RESPONSE, payload)) + .asInstanceOf[HeartbeatFromApplicationResponse] + assert( + fromTransportHeartbeatFromApplicationResponse.checkQuotaResponse.isAvailable.equals(true)) + assert(fromTransportHeartbeatFromApplicationResponse.checkQuotaResponse.reason.equals("")) + } + + def mockWorkers(host: String): Array[WorkerInfo] = { + Array(new WorkerInfo(host, -1, -1, -1, -1)) + } }