Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ VeloxRuntime::VeloxRuntime(const std::unordered_map<std::string, std::string>& c

void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo) {
taskInfo_ = taskInfo;
if (getConfigValue(confMap_, kDebugModeEnabled, "false") == "true") {
if (debugModeEnabled(confMap_)) {
try {
auto jsonPlan = substraitFromPbToJson("Plan", data, size);
LOG(INFO) << std::string(50, '#') << " received substrait::Plan:";
Expand Down Expand Up @@ -84,7 +84,7 @@ std::shared_ptr<ResultIterator> VeloxRuntime::createResultIterator(
const std::string& spillDir,
const std::vector<std::shared_ptr<ResultIterator>>& inputs,
const std::unordered_map<std::string, std::string>& sessionConf) {
if (getConfigValue(confMap_, kDebugModeEnabled, "false") == "true") {
if (debugModeEnabled(confMap_)) {
LOG(INFO) << "VeloxRuntime session config:" << printConfig(confMap_);
}

Expand Down
8 changes: 8 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,14 @@ void WholeStageResultIterator::collectMetrics() {
return;
}

if (debugModeEnabled(confMap_)) {
auto planWithStats = velox::exec::printPlanWithStats(*veloxPlan_.get(), task_->taskStats(), true);
std::ostringstream oss;
oss << "Native Plan with stats for: " << taskInfo_;
oss << "\n" << planWithStats << std::endl;
LOG(INFO) << oss.str();
}

auto planStats = velox::exec::toPlanStats(task_->taskStats());
// Calculate the total number of metrics.
int statsNum = 0;
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/utils/ConfigExtractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,7 @@ std::string getConfigValue(
return got->second;
}

bool debugModeEnabled(const std::unordered_map<std::string, std::string>& confMap) {
return getConfigValue(confMap, kDebugModeEnabled, "false") == "true";
}
} // namespace gluten
4 changes: 4 additions & 0 deletions cpp/velox/utils/ConfigExtractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@
#include <string>
#include <unordered_map>

#include "config/GlutenConfig.h"

namespace gluten {

std::string getConfigValue(
const std::unordered_map<std::string, std::string>& confMap,
const std::string& key,
const std::optional<std::string>& fallbackValue);

bool debugModeEnabled(const std::unordered_map<std::string, std::string>& confMap);

} // namespace gluten
78 changes: 78 additions & 0 deletions docs/get-started/Velox.md
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,84 @@ If you want to disable Gluten UI, add a config when submitting `--conf spark.glu

Gluten UI also supports Spark history server. Add gluten-ui jar into the history server classpath, e.g., $SPARK_HOME/jars, then restart history server.

## Native plan string

Gluten supports inject native plan string into Spark explain with formatted mode by setting `--conf spark.gluten.sql.injectNativePlanStringToExplain=true`.
Here is an example, how Gluten show the native plan string.

```
(9) WholeStageCodegenTransformer (2)
Input [6]: [c1#0L, c2#1L, c3#2L, c1#3L, c2#4L, c3#5L]
Arguments: false
Native Plan:
-- Project[expressions: (n3_6:BIGINT, "n0_0"), (n3_7:BIGINT, "n0_1"), (n3_8:BIGINT, "n0_2"), (n3_9:BIGINT, "n1_0"), (n3_10:BIGINT, "n1_1"), (n3_11:BIGINT, "n1_2")] -> n3_6:BIGINT, n3_7:BIGINT, n3_8:BIGINT, n3_9:BIGINT, n3_10:BIGINT, n3_11:BIGINT
-- HashJoin[INNER n1_1=n0_1] -> n1_0:BIGINT, n1_1:BIGINT, n1_2:BIGINT, n0_0:BIGINT, n0_1:BIGINT, n0_2:BIGINT
-- TableScan[table: hive_table, range filters: [(c2, Filter(IsNotNull, deterministic, null not allowed))]] -> n1_0:BIGINT, n1_1:BIGINT, n1_2:BIGINT
-- ValueStream[] -> n0_0:BIGINT, n0_1:BIGINT, n0_2:BIGINT
```

## Native plan with stats

Gluten supports print native plan with stats to executor system output stream by setting `--conf spark.gluten.sql.debug=true`.
Note that, the plan string with stats is task level which may cause executor log size big. Here is an example, how Gluten show the native plan string with stats.

```
I20231121 10:19:42.348845 90094332 WholeStageResultIterator.cc:220] Native Plan with stats for: [Stage: 1 TID: 16]
-- Project[expressions: (n3_6:BIGINT, "n0_0"), (n3_7:BIGINT, "n0_1"), (n3_8:BIGINT, "n0_2"), (n3_9:BIGINT, "n1_0"), (n3_10:BIGINT, "n1_1"), (n3_11:BIGINT, "n1_2")] -> n3_6:BIGINT, n3_7:BIGINT, n3_8:BIGINT, n3_9:BIGINT, n3_10:BIGINT, n3_11:BIGINT
Output: 27 rows (3.56KB, 3 batches), Cpu time: 10.58us, Blocked wall time: 0ns, Peak memory: 0B, Memory allocations: 0, Threads: 1
queuedWallNanos sum: 2.00us, count: 1, min: 2.00us, max: 2.00us
runningAddInputWallNanos sum: 626ns, count: 1, min: 626ns, max: 626ns
runningFinishWallNanos sum: 0ns, count: 1, min: 0ns, max: 0ns
runningGetOutputWallNanos sum: 5.54us, count: 1, min: 5.54us, max: 5.54us
-- HashJoin[INNER n1_1=n0_1] -> n1_0:BIGINT, n1_1:BIGINT, n1_2:BIGINT, n0_0:BIGINT, n0_1:BIGINT, n0_2:BIGINT
Output: 27 rows (3.56KB, 3 batches), Cpu time: 223.00us, Blocked wall time: 0ns, Peak memory: 93.12KB, Memory allocations: 15
HashBuild: Input: 10 rows (960B, 10 batches), Output: 0 rows (0B, 0 batches), Cpu time: 185.67us, Blocked wall time: 0ns, Peak memory: 68.00KB, Memory allocations: 2, Threads: 1
distinctKey0 sum: 4, count: 1, min: 4, max: 4
hashtable.capacity sum: 4, count: 1, min: 4, max: 4
hashtable.numDistinct sum: 10, count: 1, min: 10, max: 10
hashtable.numRehashes sum: 1, count: 1, min: 1, max: 1
queuedWallNanos sum: 0ns, count: 1, min: 0ns, max: 0ns
rangeKey0 sum: 4, count: 1, min: 4, max: 4
runningAddInputWallNanos sum: 1.27ms, count: 1, min: 1.27ms, max: 1.27ms
runningFinishWallNanos sum: 0ns, count: 1, min: 0ns, max: 0ns
runningGetOutputWallNanos sum: 1.29us, count: 1, min: 1.29us, max: 1.29us
H23/11/21 10:19:42 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID 13) in 335 ms on 10.221.97.35 (executor driver) (1/10)
ashProbe: Input: 9 rows (864B, 3 batches), Output: 27 rows (3.56KB, 3 batches), Cpu time: 37.33us, Blocked wall time: 0ns, Peak memory: 25.12KB, Memory allocations: 13, Threads: 1
dynamicFiltersProduced sum: 1, count: 1, min: 1, max: 1
queuedWallNanos sum: 0ns, count: 1, min: 0ns, max: 0ns
runningAddInputWallNanos sum: 4.54us, count: 1, min: 4.54us, max: 4.54us
runningFinishWallNanos sum: 83ns, count: 1, min: 83ns, max: 83ns
runningGetOutputWallNanos sum: 29.08us, count: 1, min: 29.08us, max: 29.08us
-- TableScan[table: hive_table, range filters: [(c2, Filter(IsNotNull, deterministic, null not allowed))]] -> n1_0:BIGINT, n1_1:BIGINT, n1_2:BIGINT
Input: 9 rows (864B, 3 batches), Output: 9 rows (864B, 3 batches), Cpu time: 630.75us, Blocked wall time: 0ns, Peak memory: 2.44KB, Memory allocations: 63, Threads: 1, Splits: 3
dataSourceWallNanos sum: 102.00us, count: 1, min: 102.00us, max: 102.00us
dynamicFiltersAccepted sum: 1, count: 1, min: 1, max: 1
flattenStringDictionaryValues sum: 0, count: 1, min: 0, max: 0
ioWaitNanos sum: 312.00us, count: 1, min: 312.00us, max: 312.00us
localReadBytes sum: 0B, count: 1, min: 0B, max: 0B
numLocalRead sum: 0, count: 1, min: 0, max: 0
numPrefetch sum: 0, count: 1, min: 0, max: 0
numRamRead sum: 0, count: 1, min: 0, max: 0
numStorageRead sum: 6, count: 1, min: 6, max: 6
overreadBytes sum: 0B, count: 1, min: 0B, max: 0B
prefetchBytes sum: 0B, count: 1, min: 0B, max: 0B
queryThreadIoLatency sum: 12, count: 1, min: 12, max: 12
ramReadBytes sum: 0B, count: 1, min: 0B, max: 0B
runningAddInputWallNanos sum: 0ns, count: 1, min: 0ns, max: 0ns
runningFinishWallNanos sum: 125ns, count: 1, min: 125ns, max: 125ns
runningGetOutputWallNanos sum: 1.07ms, count: 1, min: 1.07ms, max: 1.07ms
skippedSplitBytes sum: 0B, count: 1, min: 0B, max: 0B
skippedSplits sum: 0, count: 1, min: 0, max: 0
skippedStrides sum: 0, count: 1, min: 0, max: 0
storageReadBytes sum: 3.44KB, count: 1, min: 3.44KB, max: 3.44KB
totalScanTime sum: 0ns, count: 1, min: 0ns, max: 0ns
-- ValueStream[] -> n0_0:BIGINT, n0_1:BIGINT, n0_2:BIGINT
Input: 0 rows (0B, 0 batches), Output: 10 rows (960B, 10 batches), Cpu time: 1.03ms, Blocked wall time: 0ns, Peak memory: 0B, Memory allocations: 0, Threads: 1
runningAddInputWallNanos sum: 0ns, count: 1, min: 0ns, max: 0ns
runningFinishWallNanos sum: 54.62us, count: 1, min: 54.62us, max: 54.62us
runningGetOutputWallNanos sum: 1.10ms, count: 1, min: 1.10ms, max: 1.10ms
```

# Gluten Implicits

Gluten provides a helper class to get the fallback summary from a Spark Dataset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
def substraitPlan: PlanNode = {
if (wholeStageTransformerContext.isDefined) {
// TODO: remove this work around after we make `RelNode#toProtobuf` idempotent
// see `SubstraitContext#getCurrentLocalFileNode`.
// see `SubstraitContext#initSplitInfosIndex`.
wholeStageTransformerContext.get.substraitContext.initSplitInfosIndex(0)
wholeStageTransformerContext.get.root
} else {
Expand Down Expand Up @@ -173,6 +173,15 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
// See buildSparkPlanGraphNode in SparkPlanGraph.scala of Spark.
override def nodeName: String = s"WholeStageCodegenTransformer ($transformStageId)"

override def verboseStringWithOperatorId(): String = {
val nativePlan = if (conf.getConf(GlutenConfig.INJECT_NATIVE_PLAN_STRING_TO_EXPLAIN)) {
s"Native Plan:\n${nativePlanString()}"
} else {
""
}
super.verboseStringWithOperatorId() ++ nativePlan
}

private def generateWholeStageTransformContext(): WholeStageTransformContext = {
val substraitContext = new SubstraitContext
val childCtx = child
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1351,4 +1351,12 @@ object GlutenConfig {
"when executing. It is used to get substrait plan node and native plan string.")
.booleanConf
.createWithDefault(false)

val INJECT_NATIVE_PLAN_STRING_TO_EXPLAIN =
buildConf("spark.gluten.sql.injectNativePlanStringToExplain")
.internal()
.doc("When true, Gluten will inject native plan tree to explain string inside " +
"`WholeStageTransformerContext`.")
.booleanConf
.createWithDefault(false)
}