From ce3eb795f1b7d397c9825f08b7b6322e8f5a3a57 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Tue, 15 Sep 2015 16:16:49 +0800 Subject: [PATCH] [SPARK-10608] disable reduce locality as default --- .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b4f90e8347894..5c6dbbec26008 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -186,7 +186,7 @@ class DAGScheduler( // Flag to control if reduce tasks are assigned preferred locations private val shuffleLocalityEnabled = - sc.getConf.getBoolean("spark.shuffle.reduceLocality.enabled", true) + sc.getConf.getBoolean("spark.shuffle.reduceLocality.enabled", false) // Number of map, reduce tasks above which we do not assign preferred locations // based on map output sizes. We limit the size of jobs for which assign preferred locations // as computing the top locations by size becomes expensive. diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 1c55f90ad9b44..ff3613d8164c0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1250,6 +1250,7 @@ class DAGSchedulerSuite } test("reduce tasks should be placed locally with map output") { + reduceLocalityEnvSet() // Create an shuffleMapRdd with 1 partition val shuffleMapRdd = new MyRDD(sc, 1, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) @@ -1270,6 +1271,7 @@ class DAGSchedulerSuite } test("reduce task locality preferences should only include machines with largest map outputs") { + reduceLocalityEnvSet() val numMapTasks = 4 // Create an shuffleMapRdd with more partitions val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil) @@ -1591,5 +1593,23 @@ class DAGSchedulerSuite info } + // Reset the original environment and change the SparkConf to enable reduce + // locality + private def reduceLocalityEnvSet(): Unit = { + scheduler.stop() + val reduceLocalityConf = new SparkConf + reduceLocalityConf.set("spark.shuffle.reduceLocality.enabled", "true") + sc = new SparkContext("local", "DAGSchedulerSuite", reduceLocalityConf) + sc.addSparkListener(sparkListener) + scheduler = new DAGScheduler( + sc, + taskScheduler, + sc.listenerBus, + mapOutputTracker, + blockManagerMaster, + sc.env) + dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler) + } + }