From 7545075f8dca3b5357edaec432784d255ebfdf0b Mon Sep 17 00:00:00 2001 From: Sanskar Modi Date: Tue, 22 Oct 2024 11:02:37 +0530 Subject: [PATCH] Fix spark2 --- .../spark/shuffle/celeborn/SparkShuffleManager.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java index 96d28acf4f8..52f383905b1 100644 --- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java +++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java @@ -117,15 +117,16 @@ public ShuffleHandle registerShuffle( String appId = SparkUtils.appUniqueId(dependency.rdd().context()); initializeLifecycleManager(appId); - lifecycleManager.registerAppShuffleDeterminate( - shuffleId, - !DeterministicLevel.INDETERMINATE().equals(dependency.rdd().getOutputDeterministicLevel())); - if (fallbackPolicyRunner.applyFallbackPolicies(dependency, lifecycleManager)) { logger.warn("Fallback to SortShuffleManager!"); sortShuffleIds.add(shuffleId); return sortShuffleManager().registerShuffle(shuffleId, numMaps, dependency); } else { + lifecycleManager.registerAppShuffleDeterminate( + shuffleId, + !DeterministicLevel.INDETERMINATE() + .equals(dependency.rdd().getOutputDeterministicLevel())); + return new CelebornShuffleHandle<>( appUniqueId, lifecycleManager.getHost(),