From 635ba0ef751c592d49636af72671a5f5913baa07 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 21 Nov 2023 10:32:03 +0800 Subject: [PATCH 1/3] Add naitve plan string and plan with stats --- cpp/velox/compute/WholeStageResultIterator.cc | 9 +++ docs/get-started/Velox.md | 78 +++++++++++++++++++ .../execution/WholeStageTransformer.scala | 11 ++- .../scala/io/glutenproject/GlutenConfig.scala | 15 ++++ 4 files changed, 112 insertions(+), 1 deletion(-) diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index f62564cd2e82..ec5cf2f53031 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -65,6 +65,7 @@ const std::string kAbandonPartialAggregationMinRows = const std::string kBloomFilterExpectedNumItems = "spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems"; const std::string kBloomFilterNumBits = "spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits"; const std::string kBloomFilterMaxNumBits = "spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits"; +const std::string kPrintNativePlanWithStats = "spark.gluten.sql.columnar.backend.velox.printNativePlanWithStats"; // metrics const std::string kDynamicFiltersProduced = "dynamicFiltersProduced"; @@ -211,6 +212,14 @@ void WholeStageResultIterator::collectMetrics() { return; } + if (folly::to(getConfigValue(confMap_, kPrintNativePlanWithStats, "false"))) { + auto planWithStats = velox::exec::printPlanWithStats(*veloxPlan_.get(), task_->taskStats(), true); + std::ostringstream oss; + oss << "Native Plan with stats for: " << taskInfo_; + oss << "\n" << planWithStats << std::endl; + LOG(INFO) << oss.str(); + } + auto planStats = velox::exec::toPlanStats(task_->taskStats()); // Calculate the total number of metrics. int statsNum = 0; diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md index 75db952559cf..5a9afd550c10 100644 --- a/docs/get-started/Velox.md +++ b/docs/get-started/Velox.md @@ -791,6 +791,84 @@ If you want to disable Gluten UI, add a config when submitting `--conf spark.glu Gluten UI also supports Spark history server. Add gluten-ui jar into the history server classpath, e.g., $SPARK_HOME/jars, then restart history server. +## Native plan string + +Gluten supports inject native plan string into Spark explain with formatted mode by setting `--conf spark.gluten.sql.injectNativePlanStringToExplain=true`. +Here is an example, how Gluten show the native plan string. + +``` +(9) WholeStageCodegenTransformer (2) +Input [6]: [c1#0L, c2#1L, c3#2L, c1#3L, c2#4L, c3#5L] +Arguments: false +Native Plan: +-- Project[expressions: (n3_6:BIGINT, "n0_0"), (n3_7:BIGINT, "n0_1"), (n3_8:BIGINT, "n0_2"), (n3_9:BIGINT, "n1_0"), (n3_10:BIGINT, "n1_1"), (n3_11:BIGINT, "n1_2")] -> n3_6:BIGINT, n3_7:BIGINT, n3_8:BIGINT, n3_9:BIGINT, n3_10:BIGINT, n3_11:BIGINT + -- HashJoin[INNER n1_1=n0_1] -> n1_0:BIGINT, n1_1:BIGINT, n1_2:BIGINT, n0_0:BIGINT, n0_1:BIGINT, n0_2:BIGINT + -- TableScan[table: hive_table, range filters: [(c2, Filter(IsNotNull, deterministic, null not allowed))]] -> n1_0:BIGINT, n1_1:BIGINT, n1_2:BIGINT + -- ValueStream[] -> n0_0:BIGINT, n0_1:BIGINT, n0_2:BIGINT +``` + +## Native plan with stats + +Gluten supports print native plan with stats to executor system output stream by setting `--conf spark.gluten.sql.columnar.backend.velox.printNativePlanWithStats=true`. +Note that, the plan string with stats is task level which may cause executor log size big. Here is an example, how Gluten show the native plan string with stats. + +``` +I20231121 10:19:42.348845 90094332 WholeStageResultIterator.cc:220] Native Plan with stats for: [Stage: 1 TID: 16] +-- Project[expressions: (n3_6:BIGINT, "n0_0"), (n3_7:BIGINT, "n0_1"), (n3_8:BIGINT, "n0_2"), (n3_9:BIGINT, "n1_0"), (n3_10:BIGINT, "n1_1"), (n3_11:BIGINT, "n1_2")] -> n3_6:BIGINT, n3_7:BIGINT, n3_8:BIGINT, n3_9:BIGINT, n3_10:BIGINT, n3_11:BIGINT + Output: 27 rows (3.56KB, 3 batches), Cpu time: 10.58us, Blocked wall time: 0ns, Peak memory: 0B, Memory allocations: 0, Threads: 1 + queuedWallNanos sum: 2.00us, count: 1, min: 2.00us, max: 2.00us + runningAddInputWallNanos sum: 626ns, count: 1, min: 626ns, max: 626ns + runningFinishWallNanos sum: 0ns, count: 1, min: 0ns, max: 0ns + runningGetOutputWallNanos sum: 5.54us, count: 1, min: 5.54us, max: 5.54us + -- HashJoin[INNER n1_1=n0_1] -> n1_0:BIGINT, n1_1:BIGINT, n1_2:BIGINT, n0_0:BIGINT, n0_1:BIGINT, n0_2:BIGINT + Output: 27 rows (3.56KB, 3 batches), Cpu time: 223.00us, Blocked wall time: 0ns, Peak memory: 93.12KB, Memory allocations: 15 + HashBuild: Input: 10 rows (960B, 10 batches), Output: 0 rows (0B, 0 batches), Cpu time: 185.67us, Blocked wall time: 0ns, Peak memory: 68.00KB, Memory allocations: 2, Threads: 1 + distinctKey0 sum: 4, count: 1, min: 4, max: 4 + hashtable.capacity sum: 4, count: 1, min: 4, max: 4 + hashtable.numDistinct sum: 10, count: 1, min: 10, max: 10 + hashtable.numRehashes sum: 1, count: 1, min: 1, max: 1 + queuedWallNanos sum: 0ns, count: 1, min: 0ns, max: 0ns + rangeKey0 sum: 4, count: 1, min: 4, max: 4 + runningAddInputWallNanos sum: 1.27ms, count: 1, min: 1.27ms, max: 1.27ms + runningFinishWallNanos sum: 0ns, count: 1, min: 0ns, max: 0ns + runningGetOutputWallNanos sum: 1.29us, count: 1, min: 1.29us, max: 1.29us + H23/11/21 10:19:42 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID 13) in 335 ms on 10.221.97.35 (executor driver) (1/10) +ashProbe: Input: 9 rows (864B, 3 batches), Output: 27 rows (3.56KB, 3 batches), Cpu time: 37.33us, Blocked wall time: 0ns, Peak memory: 25.12KB, Memory allocations: 13, Threads: 1 + dynamicFiltersProduced sum: 1, count: 1, min: 1, max: 1 + queuedWallNanos sum: 0ns, count: 1, min: 0ns, max: 0ns + runningAddInputWallNanos sum: 4.54us, count: 1, min: 4.54us, max: 4.54us + runningFinishWallNanos sum: 83ns, count: 1, min: 83ns, max: 83ns + runningGetOutputWallNanos sum: 29.08us, count: 1, min: 29.08us, max: 29.08us + -- TableScan[table: hive_table, range filters: [(c2, Filter(IsNotNull, deterministic, null not allowed))]] -> n1_0:BIGINT, n1_1:BIGINT, n1_2:BIGINT + Input: 9 rows (864B, 3 batches), Output: 9 rows (864B, 3 batches), Cpu time: 630.75us, Blocked wall time: 0ns, Peak memory: 2.44KB, Memory allocations: 63, Threads: 1, Splits: 3 + dataSourceWallNanos sum: 102.00us, count: 1, min: 102.00us, max: 102.00us + dynamicFiltersAccepted sum: 1, count: 1, min: 1, max: 1 + flattenStringDictionaryValues sum: 0, count: 1, min: 0, max: 0 + ioWaitNanos sum: 312.00us, count: 1, min: 312.00us, max: 312.00us + localReadBytes sum: 0B, count: 1, min: 0B, max: 0B + numLocalRead sum: 0, count: 1, min: 0, max: 0 + numPrefetch sum: 0, count: 1, min: 0, max: 0 + numRamRead sum: 0, count: 1, min: 0, max: 0 + numStorageRead sum: 6, count: 1, min: 6, max: 6 + overreadBytes sum: 0B, count: 1, min: 0B, max: 0B + prefetchBytes sum: 0B, count: 1, min: 0B, max: 0B + queryThreadIoLatency sum: 12, count: 1, min: 12, max: 12 + ramReadBytes sum: 0B, count: 1, min: 0B, max: 0B + runningAddInputWallNanos sum: 0ns, count: 1, min: 0ns, max: 0ns + runningFinishWallNanos sum: 125ns, count: 1, min: 125ns, max: 125ns + runningGetOutputWallNanos sum: 1.07ms, count: 1, min: 1.07ms, max: 1.07ms + skippedSplitBytes sum: 0B, count: 1, min: 0B, max: 0B + skippedSplits sum: 0, count: 1, min: 0, max: 0 + skippedStrides sum: 0, count: 1, min: 0, max: 0 + storageReadBytes sum: 3.44KB, count: 1, min: 3.44KB, max: 3.44KB + totalScanTime sum: 0ns, count: 1, min: 0ns, max: 0ns + -- ValueStream[] -> n0_0:BIGINT, n0_1:BIGINT, n0_2:BIGINT + Input: 0 rows (0B, 0 batches), Output: 10 rows (960B, 10 batches), Cpu time: 1.03ms, Blocked wall time: 0ns, Peak memory: 0B, Memory allocations: 0, Threads: 1 + runningAddInputWallNanos sum: 0ns, count: 1, min: 0ns, max: 0ns + runningFinishWallNanos sum: 54.62us, count: 1, min: 54.62us, max: 54.62us + runningGetOutputWallNanos sum: 1.10ms, count: 1, min: 1.10ms, max: 1.10ms +``` + # Gluten Implicits Gluten provides a helper class to get the fallback summary from a Spark Dataset. diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala index 9b67cf875333..9dd6b2e0ed1e 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala @@ -114,7 +114,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f def substraitPlan: PlanNode = { if (wholeStageTransformerContext.isDefined) { // TODO: remove this work around after we make `RelNode#toProtobuf` idempotent - // see `SubstraitContext#getCurrentLocalFileNode`. + // see `SubstraitContext#initSplitInfosIndex`. wholeStageTransformerContext.get.substraitContext.initSplitInfosIndex(0) wholeStageTransformerContext.get.root } else { @@ -173,6 +173,15 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f // See buildSparkPlanGraphNode in SparkPlanGraph.scala of Spark. override def nodeName: String = s"WholeStageCodegenTransformer ($transformStageId)" + override def verboseStringWithOperatorId(): String = { + val nativePlan = if (conf.getConf(GlutenConfig.INJECT_NATIVE_PLAN_STRING_TO_EXPLAIN)) { + s"Native Plan:\n${nativePlanString()}" + } else { + "" + } + super.verboseStringWithOperatorId() ++ nativePlan + } + private def generateWholeStageTransformContext(): WholeStageTransformContext = { val substraitContext = new SubstraitContext val childCtx = child diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala index d525f64e7743..fa0fbdf61d0e 100644 --- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala +++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala @@ -1351,4 +1351,19 @@ object GlutenConfig { "when executing. It is used to get substrait plan node and native plan string.") .booleanConf .createWithDefault(false) + + val INJECT_NATIVE_PLAN_STRING_TO_EXPLAIN = + buildConf("spark.gluten.sql.injectNativePlanStringToExplain") + .internal() + .doc("When true, Gluten will inject native plan tree to explain string inside " + + "`WholeStageTransformerContext`.") + .booleanConf + .createWithDefault(false) + + val PRINT_NATIVE_PLAN_WITH_STATS = + buildConf(s"$GLUTEN_CONFIG_PREFIX.velox.printNativePlanWithStats") + .internal() + .doc("When true, Gluten will print native plan with stats to executor system output stream.") + .booleanConf + .createWithDefault(false) } From 23144a046f552aa7216e9b5776a50e3e1254ef47 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 21 Nov 2023 15:28:41 +0800 Subject: [PATCH 2/3] address comments --- cpp/velox/compute/VeloxRuntime.cc | 4 ++-- cpp/velox/compute/WholeStageResultIterator.cc | 3 +-- cpp/velox/utils/ConfigExtractor.cc | 3 +++ cpp/velox/utils/ConfigExtractor.h | 4 ++++ .../src/main/scala/io/glutenproject/GlutenConfig.scala | 7 ------- 5 files changed, 10 insertions(+), 11 deletions(-) diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 31410e6f753c..9dbeceaf425a 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -37,7 +37,7 @@ VeloxRuntime::VeloxRuntime(const std::unordered_map& c void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo) { taskInfo_ = taskInfo; - if (getConfigValue(confMap_, kDebugModeEnabled, "false") == "true") { + if (debugModeEnabled(confMap_)) { try { auto jsonPlan = substraitFromPbToJson("Plan", data, size); LOG(INFO) << std::string(50, '#') << " received substrait::Plan:"; @@ -84,7 +84,7 @@ std::shared_ptr VeloxRuntime::createResultIterator( const std::string& spillDir, const std::vector>& inputs, const std::unordered_map& sessionConf) { - if (getConfigValue(confMap_, kDebugModeEnabled, "false") == "true") { + if (debugModeEnabled(confMap_)) { LOG(INFO) << "VeloxRuntime session config:" << printConfig(confMap_); } diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index ec5cf2f53031..da33d9538a17 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -65,7 +65,6 @@ const std::string kAbandonPartialAggregationMinRows = const std::string kBloomFilterExpectedNumItems = "spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems"; const std::string kBloomFilterNumBits = "spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits"; const std::string kBloomFilterMaxNumBits = "spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits"; -const std::string kPrintNativePlanWithStats = "spark.gluten.sql.columnar.backend.velox.printNativePlanWithStats"; // metrics const std::string kDynamicFiltersProduced = "dynamicFiltersProduced"; @@ -212,7 +211,7 @@ void WholeStageResultIterator::collectMetrics() { return; } - if (folly::to(getConfigValue(confMap_, kPrintNativePlanWithStats, "false"))) { + if (debugModeEnabled(confMap_)) { auto planWithStats = velox::exec::printPlanWithStats(*veloxPlan_.get(), task_->taskStats(), true); std::ostringstream oss; oss << "Native Plan with stats for: " << taskInfo_; diff --git a/cpp/velox/utils/ConfigExtractor.cc b/cpp/velox/utils/ConfigExtractor.cc index 78f22a52f20d..432b9a027542 100644 --- a/cpp/velox/utils/ConfigExtractor.cc +++ b/cpp/velox/utils/ConfigExtractor.cc @@ -36,4 +36,7 @@ std::string getConfigValue( return got->second; } +bool debugModeEnabled(const std::unordered_map& confMap) { + return getConfigValue(confMap, kDebugModeEnabled, "false") == "true"; +} } // namespace gluten diff --git a/cpp/velox/utils/ConfigExtractor.h b/cpp/velox/utils/ConfigExtractor.h index 10967f946796..a755c4158aee 100644 --- a/cpp/velox/utils/ConfigExtractor.h +++ b/cpp/velox/utils/ConfigExtractor.h @@ -23,6 +23,8 @@ #include #include +#include "config/GlutenConfig.h" + namespace gluten { std::string getConfigValue( @@ -30,4 +32,6 @@ std::string getConfigValue( const std::string& key, const std::optional& fallbackValue); +bool debugModeEnabled(const std::unordered_map& confMap); + } // namespace gluten diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala index fa0fbdf61d0e..ec19f8f1b6f1 100644 --- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala +++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala @@ -1359,11 +1359,4 @@ object GlutenConfig { "`WholeStageTransformerContext`.") .booleanConf .createWithDefault(false) - - val PRINT_NATIVE_PLAN_WITH_STATS = - buildConf(s"$GLUTEN_CONFIG_PREFIX.velox.printNativePlanWithStats") - .internal() - .doc("When true, Gluten will print native plan with stats to executor system output stream.") - .booleanConf - .createWithDefault(false) } From dc7323596363438bec2fee1287b14ebdefd15c3a Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 21 Nov 2023 15:30:07 +0800 Subject: [PATCH 3/3] docs --- docs/get-started/Velox.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md index 5a9afd550c10..acfec93fad37 100644 --- a/docs/get-started/Velox.md +++ b/docs/get-started/Velox.md @@ -809,7 +809,7 @@ Native Plan: ## Native plan with stats -Gluten supports print native plan with stats to executor system output stream by setting `--conf spark.gluten.sql.columnar.backend.velox.printNativePlanWithStats=true`. +Gluten supports print native plan with stats to executor system output stream by setting `--conf spark.gluten.sql.debug=true`. Note that, the plan string with stats is task level which may cause executor log size big. Here is an example, how Gluten show the native plan string with stats. ```