Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ class CHIteratorApi extends IIteratorApi with Logging {
wsCxt.substraitContext.initLocalFilesNodesIndex(0)
wsCxt.substraitContext.setLocalFilesNodes(localFilesNodes)
val substraitPlan = wsCxt.root.toProtobuf
logDebug(s"The substrait plan for partition ${index}:\n${substraitPlan.toString}")
if (index < 3) {
logDebug(s"The substrait plan for partition ${index}:\n${substraitPlan.toString}")
}
NativePartition(index, substraitPlan.toByteArray)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
Expand Down Expand Up @@ -148,7 +148,8 @@ class CHSparkPlanExecApi extends ISparkPlanExecApi {
/**
* Create broadcast relation for BroadcastExchangeExec
*/
override def createBroadcastRelation(child: SparkPlan,
override def createBroadcastRelation(mode: BroadcastMode,
child: SparkPlan,
numOutputRows: SQLMetric,
dataSize: SQLMetric): BuildSideRelation = {
val countsAndBytes = child
Expand All @@ -175,7 +176,7 @@ class CHSparkPlanExecApi extends ISparkPlanExecApi {
}
numOutputRows += countsAndBytes.map(_._1).sum
dataSize += rawSize
ClickHouseBuildSideRelation(child.output, batches)
ClickHouseBuildSideRelation(mode, child.output, batches)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class CHTransformerApi extends ITransformerApi with Logging {
.asInstanceOf[ExpressionTransformer]
.doTransform(substraitContext.registeredFunction)
if (!node.isInstanceOf[SelectionNode]) {
logInfo("Expressions are not supported in HashPartitioning.")
logDebug("Expressions are not supported in HashPartitioning.")
false
} else {
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package io.glutenproject.execution

import java.util

import com.google.protobuf.Any
import io.glutenproject.expression._
import io.glutenproject.substrait.{AggregationParams, SubstraitContext}
import io.glutenproject.substrait.`type`.{TypeBuilder, TypeNode}
import io.glutenproject.substrait.expression.{AggregateFunctionNode, ExpressionBuilder, ExpressionNode}
import io.glutenproject.substrait.extensions.ExtensionBuilder
import io.glutenproject.substrait.rel.{LocalFilesBuilder, RelBuilder, RelNode}
import java.util

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
Expand Down Expand Up @@ -80,7 +81,13 @@ case class CHHashAggregateExecTransformer(
val nameList = new util.ArrayList[String]()
for (attr <- aggregateResultAttributes) {
typeList.add(ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
nameList.add(ConverterUtils.genColumnNameWithExprId(attr))
val colName = if (aggregateAttributes.exists(_ == attr)) {
ConverterUtils.genColumnNameWithExprId(attr) +
"#Partial#" + ConverterUtils.getShortAttributeName(attr)
} else {
ConverterUtils.genColumnNameWithExprId(attr)
}
nameList.add(colName)
}
// The iterator index will be added in the path of LocalFiles.
val iteratorIndex: Long = context.nextIteratorIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ import io.glutenproject.execution.BroadCastHashJoinContext
import io.glutenproject.vectorized.StorageJoinBuilder

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.vectorized.ColumnarBatch

case class ClickHouseBuildSideRelation(output: Seq[Attribute], batches: Array[Array[Byte]])
case class ClickHouseBuildSideRelation(mode: BroadcastMode,
output: Seq[Attribute],
batches: Array[Array[Byte]])
extends BuildSideRelation with Logging {

override def deserialized: Iterator[ColumnarBatch] = Iterator.empty
Expand All @@ -43,4 +47,14 @@ case class ClickHouseBuildSideRelation(output: Seq[Attribute], batches: Array[Ar
storageJoinBuilder.build()
this
}

/**
* Transform columnar broadcasted value to Array[InternalRow] by key and distinct.
* @return
*/
override def transform(key: Expression): Array[InternalRow] = {
val allBatches = batches.flatten
// convert broadcasted value to Array[InternalRow].
Array.empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ object DSV2BenchmarkTest {

val libPath = "/home/myubuntu/Works/c_cpp_projects/Kyligence-ClickHouse-1/" +
"cmake-build-release/utils/local-engine/libch.so"
// val libPath = "/usr/local/clickhouse/lib/libch.so"
val sessionBuilder = if (!configed) {
val sessionBuilderTmp1 = sessionBuilderTmp
.master("local[6]")
Expand All @@ -98,9 +99,13 @@ object DSV2BenchmarkTest {
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseSparkCatalog")
// .config("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
// .config("spark.io.compression.codec", "LZ4")
.config("spark.shuffle.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.io.compression.codec", "SNAPPY")
.config("spark.io.compression.zstd.bufferSize", "32k")
.config("spark.io.compression.zstd.level", "1")
.config("spark.io.compression.zstd.bufferPool.enabled", "true")
.config("spark.io.compression.snappy.blockSize", "32k")
.config("spark.io.compression.lz4.blockSize", "32k")
.config("spark.reducer.maxSizeInFlight", "48m")
.config("spark.shuffle.file.buffer", "32k")
// .config("spark.gluten.sql.columnar.shuffleSplitDefaultSize", "8192")
Expand All @@ -109,7 +114,7 @@ object DSV2BenchmarkTest {
.config("spark.databricks.delta.properties.defaults.checkpointInterval", 5)
.config("spark.databricks.delta.stalenessLimit", 3600 * 1000)
// .config("spark.sql.execution.arrow.maxRecordsPerBatch", "20000")
.config("spark.gluten.sql.columnar.columnartorow", "true")
.config("spark.gluten.sql.columnar.columnartorow", "false")
.config("spark.gluten.sql.columnar.backend.lib", "ch")
.config("spark.gluten.sql.columnar.backend.ch.worker.id", "1")
.config("spark.gluten.sql.columnar.backend.ch.use.v2", "false")
Expand All @@ -125,7 +130,7 @@ object DSV2BenchmarkTest {
.config("spark.sql.autoBroadcastJoinThreshold", "10MB")
.config("spark.sql.exchange.reuse", "true")
.config("spark.gluten.sql.columnar.forceshuffledhashjoin", "true")
.config("spark.gluten.sql.columnar.coalesce.batches", "false")
.config("spark.gluten.sql.columnar.coalesce.batches", "true")
// .config("spark.gluten.sql.columnar.filescan", "true")
// .config("spark.sql.optimizeNullAwareAntiJoin", "false")
// .config("spark.sql.join.preferSortMergeJoin", "false")
Expand All @@ -135,7 +140,7 @@ object DSV2BenchmarkTest {
.config("spark.sql.parquet.columnarReaderBatchSize", "4096")
.config("spark.memory.offHeap.enabled", "true")
.config("spark.memory.offHeap.size", "21474836480")
.config("spark.shuffle.sort.bypassMergeThreshold", "20")
.config("spark.shuffle.sort.bypassMergeThreshold", "200")
.config("spark.local.dir", "/data1/gazelle-jni-warehouse/spark_local_dirs")
.config("spark.executor.heartbeatInterval", "240s")
.config("spark.network.timeout", "300s")
Expand All @@ -149,6 +154,8 @@ object DSV2BenchmarkTest {
// "org.apache.spark.sql.execution.vectorized.PublicOffHeapColumnVector")
// .config("spark.hadoop.io.file.buffer.size", "524288")
.config("spark.sql.codegen.comments", "true")
.config("spark.ui.retainedJobs", "2500")
.config("spark.ui.retainedStages", "5000")

if (!warehouse.isEmpty) {
sessionBuilderTmp1.config("spark.sql.warehouse.dir", warehouse)
Expand Down Expand Up @@ -179,6 +186,7 @@ object DSV2BenchmarkTest {
refreshClickHouseTable(spark)
}
// scalastyle:off println
Thread.sleep(1000)
println("start to query ... ")

// createClickHouseTablesAsSelect(spark)
Expand Down Expand Up @@ -224,14 +232,36 @@ object DSV2BenchmarkTest {
val df = spark.sql(
s"""
|SELECT
| sum(l_extendedprice * l_discount) AS revenue
| l_shipmode,
| sum(
| CASE WHEN o_orderpriority = '1-URGENT'
| OR o_orderpriority = '2-HIGH' THEN
| 1
| ELSE
| 0
| END) AS high_line_count,
| sum(
| CASE WHEN o_orderpriority <> '1-URGENT'
| AND o_orderpriority <> '2-HIGH' THEN
| 1
| ELSE
| 0
| END) AS low_line_count
|FROM
| ch_lineitem100
| orders,
| lineitem
|WHERE
| l_shipdate >= date'1994-01-01'
| AND l_shipdate < date'1994-01-01' + interval 1 year
| AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
| AND l_quantity < 24;
| o_orderkey = l_orderkey
| AND l_shipmode IN ('MAIL', 'SHIP')
| AND l_commitdate < l_receiptdate
| AND l_shipdate < l_commitdate
| AND l_receiptdate >= date'1994-01-01'
| AND l_receiptdate < date'1994-01-01' + interval 1 year
|GROUP BY
| l_shipmode
|ORDER BY
| l_shipmode;
|
|""".stripMargin) // .show(30, false)
df.explain(false)
val plan = df.queryExecution.executedPlan
Expand Down Expand Up @@ -323,7 +353,7 @@ object DSV2BenchmarkTest {
val tookTimeArr = ArrayBuffer[Long]()
val executedCnt = 1
val executeExplain = false
val sqlFilePath = "/data2/tpch-queries-ch100/"
val sqlFilePath = "/data2/tpch-queries-spark/"
for (i <- 1 to 22) {
if (i != 21) {
val sqlNum = "q" + "%02d".format(i)
Expand Down
Loading