Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ object Constants {
@deprecated
val TYPE_MODIFIER_DATE_AS_DOUBLE: TypeModifier =
new TypeModifier(TypeUtils.typeAccepts(_, DateType), DoubleType) {
override def name(): String = "data_as_double"

override def modValue(from: Any): Any = {
from match {
case v: Date => v.getTime.asInstanceOf[Double] / 86400.0d / 1000.0d
Expand All @@ -141,6 +143,8 @@ object Constants {
@deprecated
val TYPE_MODIFIER_INTEGER_AS_DOUBLE: TypeModifier =
new TypeModifier(TypeUtils.typeAccepts(_, IntegerType), DoubleType) {
override def name(): String = "integer_as_double"

override def modValue(from: Any): Any = {
from match {
case v: Int => v.asInstanceOf[Double]
Expand All @@ -151,6 +155,8 @@ object Constants {
@deprecated
val TYPE_MODIFIER_LONG_AS_DOUBLE: TypeModifier =
new TypeModifier(TypeUtils.typeAccepts(_, LongType), DoubleType) {
override def name(): String = "long_as_double"

override def modValue(from: Any): Any = {
from match {
case v: Long => v.asInstanceOf[Double]
Expand All @@ -161,6 +167,8 @@ object Constants {
@deprecated
val TYPE_MODIFIER_DATE_AS_STRING: TypeModifier =
new TypeModifier(TypeUtils.typeAccepts(_, DateType), StringType) {
override def name(): String = "date_as_string"

override def modValue(from: Any): Any = {
from match {
case v: Date => v.toString
Expand All @@ -170,6 +178,8 @@ object Constants {

val TYPE_MODIFIER_DECIMAL_AS_DOUBLE: TypeModifier =
new TypeModifier(TypeUtils.decimalAccepts, DoubleType) {
override def name(): String = "decimal_as_double"

override def modValue(from: Any): Any = {
from match {
case v: java.math.BigDecimal => v.doubleValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ trait DataGen {

abstract class TypeModifier(val predicate: DataType => Boolean, val to: DataType)
extends Serializable {
def name(): String
def modValue(value: Any): Any
}

class NoopModifier(t: DataType) extends TypeModifier(_ => true, t) {
override def name(): String = "noop"
override def modValue(value: Any): Any = value
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ abstract class Suite(
testMetricMapper
}

private[integration] def typeModifiers(): List[TypeModifier] = {
private[integration] def typeModifiers(): Seq[TypeModifier] = {
if (decimalAsDouble) List(TYPE_MODIFIER_DECIMAL_AS_DOUBLE) else List()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,19 @@ class ClickBenchSuite(
) {
import ClickBenchSuite._

require(
Set("parquet").contains(dataSource),
s"Data source type $dataSource is not supported by ClickBench suite")
require(dataScale == 1.0d, "ClickBench suite doesn't support scale factor other than 1")
require(!genPartitionedData, "ClickBench suite doesn't support generating partitioned data")

override protected def historyWritePath(): String = HISTORY_WRITE_PATH

override private[integration] def dataWritePath(): String = {
new File(dataDir).toPath.resolve(DATA_WRITE_RELATIVE_PATH).toFile.getAbsolutePath
}

override private[integration] def createDataGen(): DataGen = {
checkDataGenArgs(dataSource, dataScale, genPartitionedData)
new ClickBenchDataGen(dataWritePath())
}

Expand All @@ -112,15 +117,4 @@ private object ClickBenchSuite {
private val DATA_WRITE_RELATIVE_PATH = "clickbench-generated"
private val HISTORY_WRITE_PATH = "/tmp/clickbench-history"
private val ALL_QUERY_IDS = (1 to 43).map(i => s"q$i").toArray

private def checkDataGenArgs(
dataSource: String,
scale: Double,
genPartitionedData: Boolean): Unit = {
require(
Set("parquet").contains(dataSource),
s"Data source type $dataSource is not supported by ClickBench suite")
require(scale == 1.0d, "ClickBench suite doesn't support scale factor other than 1")
require(!genPartitionedData, "ClickBench suite doesn't support generating partitioned data")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ class TpcdsSuite(
) {
import TpcdsSuite._

checkDataGenArgs(dataSource, dataScale, genPartitionedData)
require(
Set("parquet", "delta").contains(dataSource),
s"Data source type $dataSource is not supported by TPC-DS suite")

private val tableLayout = new TpcdsTableLayout(genPartitionedData)

Expand All @@ -90,9 +92,10 @@ class TpcdsSuite(
} else {
"non_partitioned"
}
val featureFlags = dataGenFeatures.map(feature => s"-$feature").mkString("")
val typeModifierFlags = typeModifiers().map(m => s"-${m.name()}").sorted.mkString("-")
val featureFlags = dataGenFeatures.map(feature => s"-$feature").sorted.mkString("")
val relative =
s"$TPCDS_WRITE_RELATIVE_PATH-$dataScale-$dataSource-$partitionedFlag$featureFlags"
s"$TPCDS_WRITE_RELATIVE_PATH-$dataScale-$dataSource-$partitionedFlag$typeModifierFlags$featureFlags"
new Path(dataDir, relative).toString
}

Expand All @@ -113,7 +116,8 @@ class TpcdsSuite(

override private[integration] def desc(): String = "TPC-DS"

override def tableCreator(): TableCreator = TableCreator.createFromLayout(tableLayout)
override def tableCreator(): TableCreator =
TableCreator.createFromLayout(tableLayout, typeModifiers())

override def tableAnalyzer0(): TableAnalyzer = TableAnalyzer.analyzeAll()
}
Expand Down Expand Up @@ -226,13 +230,4 @@ object TpcdsSuite {
"q99"
)
private val HISTORY_WRITE_PATH = "/tmp/tpcds-history"

private def checkDataGenArgs(
dataSource: String,
scale: Double,
genPartitionedData: Boolean): Unit = {
require(
Set("parquet", "delta").contains(dataSource),
s"Data source type $dataSource is not supported by TPC-DS suite")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,22 @@ class TpchSuite(
) {
import TpchSuite._

require(
Set("parquet", "delta").contains(dataSource),
s"Data source type $dataSource is not supported by TPC-H suite")
require(!genPartitionedData, "TPC-H suite doesn't support generating partitioned data")

override protected def historyWritePath(): String = HISTORY_WRITE_PATH

override private[integration] def dataWritePath(): String = {
val typeModifierFlags = typeModifiers().map(m => s"-${m.name()}").mkString("-")
val featureFlags = dataGenFeatures.map(feature => s"-$feature").mkString("")
val relative =
s"$TPCH_WRITE_RELATIVE_PATH-$dataScale-$dataSource$featureFlags"
s"$TPCH_WRITE_RELATIVE_PATH-$dataScale-$dataSource$typeModifierFlags$featureFlags"
new Path(dataDir, relative).toString
}

override private[integration] def createDataGen(): DataGen = {
checkDataGenArgs(dataSource, dataScale, genPartitionedData)
new TpchDataGen(
dataScale,
shufflePartitions,
Expand Down Expand Up @@ -139,14 +144,4 @@ object TpchSuite {
"q21",
"q22")
private val HISTORY_WRITE_PATH = "/tmp/tpch-history"

private def checkDataGenArgs(
dataSource: String,
scale: Double,
genPartitionedData: Boolean): Unit = {
require(
Set("parquet", "delta").contains(dataSource),
s"Data source type $dataSource is not supported by TPC-H suite")
require(!genPartitionedData, "TPC-H suite doesn't support generating partitioned data")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@
*/
package org.apache.gluten.integration.table

import org.apache.gluten.integration.{DataGen, TypeModifier}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}

import org.apache.hadoop.fs.{FileSystem, Path}

import java.net.URI

import scala.collection.mutable.ArrayBuffer
class LayoutTableCreator(layout: TableLayout) extends TableCreator {

class LayoutTableCreator(layout: TableLayout, typeModifiers: Seq[TypeModifier])
extends TableCreator {
override def create(spark: SparkSession, source: String, dataPath: String): Unit = {
val uri = URI.create(dataPath)
val fs = FileSystem.get(uri, spark.sessionState.newHadoopConf())
Expand Down Expand Up @@ -51,14 +55,17 @@ class LayoutTableCreator(layout: TableLayout) extends TableCreator {
val schema = layout.getSchema(tableName)
val partitionColumns = layout.getPartitionColumns(tableName)

val nonPartitionColumns = schema.fields
val rowModifier = DataGen.getRowModifier(schema, typeModifiers)
val modifiedSchema = DataGen.modifySchema(schema, rowModifier)

val nonPartitionColumns = modifiedSchema.fields
.filterNot(f => partitionColumns.contains(f.name))
.map(f => s"${f.name} ${f.dataType.sql}")
.mkString(", ")

val partitionDDL =
if (partitionColumns.nonEmpty)
s"PARTITIONED BY (${partitionColumns.map(c => s"$c ${schema(c).dataType.sql}").mkString(", ")})"
s"PARTITIONED BY (${partitionColumns.map(c => s"$c ${modifiedSchema(c).dataType.sql}").mkString(", ")})"
else
""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.gluten.integration.table

import org.apache.gluten.integration.TypeModifier

import org.apache.spark.sql.SparkSession

trait TableCreator {
Expand All @@ -27,7 +29,7 @@ object TableCreator {
AutoTableCreator
}

def createFromLayout(layout: TableLayout): TableCreator = {
new LayoutTableCreator(layout)
def createFromLayout(layout: TableLayout, typeModifiers: Seq[TypeModifier]): TableCreator = {
new LayoutTableCreator(layout, typeModifiers)
}
}
Loading