Skip to content

Commit

Permalink
[GLUTEN-2919][VL]Support orc format in hive scan
Browse files Browse the repository at this point in the history
  • Loading branch information
yma11 committed Aug 28, 2023
1 parent b4f6ce7 commit 1c73a48
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 3 deletions.
3 changes: 3 additions & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,9 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
case SubstraitFileFormatCase::kParquet:
splitInfo->format = dwio::common::FileFormat::PARQUET;
break;
case SubstraitFileFormatCase::kText:
splitInfo->format = dwio::common::FileFormat::TEXT;
break;
default:
splitInfo->format = dwio::common::FileFormat::UNKNOWN;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.sql.types.{ArrayType, MapType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils

import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
import org.apache.hadoop.mapred.TextInputFormat

import java.net.URI
Expand Down Expand Up @@ -122,6 +123,9 @@ class HiveTableScanExecTransformer(
ReadFileFormat.JsonReadFormat
case _ => ReadFileFormat.TextReadFormat
}
case Some(inputFormat)
if ORC_INPUT_FORMAT_CLASS.isAssignableFrom(Utils.classForName(inputFormat)) =>
ReadFileFormat.OrcReadFormat
case _ => ReadFileFormat.UnknownFormat
}
}
Expand Down Expand Up @@ -152,6 +156,12 @@ class HiveTableScanExecTransformer(
} else {
ValidationResult.notOk("does not support complex type")
}
case ReadFileFormat.OrcReadFormat =>
if (!hasComplexType) {
ValidationResult.ok
} else {
ValidationResult.notOk("does not support complex type")
}
case _ => ValidationResult.notOk("Unknown file format")
}
}
Expand Down Expand Up @@ -209,6 +219,8 @@ object HiveTableScanExecTransformer {
val DEFAULT_FIELD_DELIMITER: Char = 0x01
val TEXT_INPUT_FORMAT_CLASS: Class[TextInputFormat] =
Utils.classForName("org.apache.hadoop.mapred.TextInputFormat")
val ORC_INPUT_FORMAT_CLASS: Class[OrcInputFormat] =
Utils.classForName("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")

def isHiveTableScan(plan: SparkPlan): Boolean = {
plan.isInstanceOf[HiveTableScanExec]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1046,5 +1046,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenSimpleShowCreateTableSuite]
enableSuite[GlutenStatisticsCollectionSuite]
enableSuite[FallbackStrategiesSuite]
enableSuite[GlutenHiveSQLQuerySuite]
}
// scalastyle:on line.size.limit
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution

import io.glutenproject.execution.TransformSupport
import org.apache.spark.SparkConf
import org.apache.spark.internal.config
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec}
import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, HiveUtils}
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}

import scala.reflect.ClassTag

class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
private var _spark: SparkSession = null

override def beforeAll(): Unit = {
prepareWorkDir()
if (_spark == null) {
_spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
}

_spark.sparkContext.setLogLevel("info")
}

override protected def spark: SparkSession = _spark

protected def defaultSparkConf: SparkConf = {
val conf = new SparkConf()
.set("spark.master", "local[1]")
.set("spark.sql.test", "")
.set("spark.sql.testkey", "true")
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
.set(SQLConf.CODEGEN_FACTORY_MODE.key, CodegenObjectFactoryMode.CODEGEN_ONLY.toString)
.set(
HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
"org.apache.spark.sql.hive.execution.PairSerDe")
// SPARK-8910
.set(UI_ENABLED, false)
.set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
// Hive changed the default of hive.metastore.disallow.incompatible.col.type.changes
// from false to true. For details, see the JIRA HIVE-12320 and HIVE-17764.
.set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes", "false")
// Disable ConvertToLocalRelation for better test coverage. Test cases built on
// LocalRelation will exercise the optimization rules better by disabling it as
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)

conf.set(
StaticSQLConf.WAREHOUSE_PATH,
conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName)
}

/**
* Get all the children plan of plans.
*
* @param plans :
* the input plans.
* @return
*/
def getChildrenPlan(plans: Seq[SparkPlan]): Seq[SparkPlan] = {
if (plans.isEmpty) {
return Seq()
}

val inputPlans: Seq[SparkPlan] = plans.map {
case stage: ShuffleQueryStageExec => stage.plan
case plan => plan
}

var newChildren: Seq[SparkPlan] = Seq()
inputPlans.foreach {
plan =>
newChildren = newChildren ++ getChildrenPlan(plan.children)
// To avoid duplication of WholeStageCodegenXXX and its children.
if (!plan.nodeName.startsWith("WholeStageCodegen")) {
newChildren = newChildren :+ plan
}
}
newChildren
}

/**
* Get the executed plan of a data frame.
*
* @param df :
* dataframe.
* @return
* A sequence of executed plans.
*/
def getExecutedPlan(df: DataFrame): Seq[SparkPlan] = {
df.queryExecution.executedPlan match {
case exec: AdaptiveSparkPlanExec =>
getChildrenPlan(Seq(exec.executedPlan))
case plan =>
getChildrenPlan(Seq(plan))
}
}

def checkOperatorMatch[T <: TransformSupport](df: DataFrame)(implicit tag: ClassTag[T]): Unit = {
val executedPlan = getExecutedPlan(df)
assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass))
}

override def sparkConf: SparkConf = {
defaultSparkConf
.set("spark.plugins", "io.glutenproject.GlutenPlugin")
.set("spark.default.parallelism", "1")
.set("spark.memory.offHeap.enabled", "true")
.set("spark.memory.offHeap.size", "1024MB")
}
test("hive orc scan") {
withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "false") {
sql("DROP TABLE IF EXISTS test_orc")
sql("CREATE TABLE test_orc (name STRING, favorite_color STRING) USING hive OPTIONS(fileFormat 'orc')")
sql("INSERT INTO test_orc VALUES('test_1', 'red')");
val df = spark.sql("select * from test_orc")
checkAnswer(df, Seq(Row("test_1", "red")))
checkOperatorMatch[HiveTableScanExecTransformer](df)
}
spark.sessionState.catalog.dropTable(
TableIdentifier("test_orc"),
ignoreIfNotExists = true,
purge = false)
}

test("hive orc scan") {
withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "false") {
sql("DROP TABLE IF EXISTS test_orc")
sql("CREATE TABLE test_orc (name STRING, favorite_color STRING)" +
" USING hive OPTIONS(fileFormat 'orc')")
sql("INSERT INTO test_orc VALUES('test_1', 'red')");
val df = spark.sql("select * from test_orc")
checkAnswer(df, Seq(Row("test_1", "red")))
checkOperatorMatch[HiveTableScanExecTransformer](df)
}
spark.sessionState.catalog.dropTable(
TableIdentifier("test_orc"),
ignoreIfNotExists = true,
purge = false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,26 @@
package io.glutenproject.utils.velox

import io.glutenproject.utils.BackendTestSettings

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{GlutenAnsiCastSuiteWithAnsiModeOff, GlutenAnsiCastSuiteWithAnsiModeOn, GlutenArithmeticExpressionSuite, GlutenBitwiseExpressionsSuite, GlutenCastSuite, GlutenCastSuiteWithAnsiModeOn, GlutenCollectionExpressionsSuite, GlutenComplexTypeSuite, GlutenConditionalExpressionSuite, GlutenDateExpressionsSuite, GlutenDecimalExpressionSuite, GlutenHashExpressionsSuite, GlutenIntervalExpressionsSuite, GlutenLiteralExpressionSuite, GlutenMathExpressionsSuite, GlutenMiscExpressionsSuite, GlutenNondeterministicSuite, GlutenNullExpressionsSuite, GlutenPredicateSuite, GlutenRandomSuite, GlutenRegexpExpressionsSuite, GlutenSortOrderExpressionsSuite, GlutenStringExpressionsSuite, GlutenTryCastSuite}
import org.apache.spark.sql.connector.{GlutenDataSourceV2DataFrameSessionCatalogSuite, GlutenDataSourceV2DataFrameSuite, GlutenDataSourceV2FunctionSuite, GlutenDataSourceV2SQLSessionCatalogSuite, GlutenDataSourceV2SQLSuite, GlutenDataSourceV2Suite, GlutenDeleteFromTableSuite, GlutenFileDataSourceV2FallBackSuite, GlutenKeyGroupedPartitioningSuite, GlutenLocalScanSuite, GlutenMetadataColumnSuite, GlutenSupportsCatalogOptionsSuite, GlutenTableCapabilityCheckSuite, GlutenWriteDistributionAndOrderingSuite}
import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite, GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite, GlutenQueryParsingErrorsSuite}
import org.apache.spark.sql.execution.{FallbackStrategiesSuite, GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, GlutenExchangeSuite, GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite, GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite}
import org.apache.spark.sql.execution.{FallbackStrategiesSuite, GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, GlutenExchangeSuite, GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, GlutenSQLWindowFunctionSuite, GlutenSameResultSuite, GlutenSortSuite, GlutenTakeOrderedAndProjectSuite}
import org.apache.spark.sql.execution.adaptive.GlutenAdaptiveQueryExecSuite
import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite, GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite, GlutenFileFormatWriterSuite, GlutenFileIndexSuite, GlutenFileMetadataStructSuite, GlutenFileSourceStrategySuite, GlutenHadoopFileLinesReaderSuite, GlutenHeaderCSVReadSchemaSuite, GlutenJsonReadSchemaSuite, GlutenMergedOrcReadSchemaSuite, GlutenMergedParquetReadSchemaSuite, GlutenOrcCodecSuite, GlutenOrcReadSchemaSuite, GlutenOrcV1AggregatePushDownSuite, GlutenOrcV2AggregatePushDownSuite, GlutenParquetCodecSuite, GlutenParquetReadSchemaSuite, GlutenParquetV1AggregatePushDownSuite, GlutenParquetV2AggregatePushDownSuite, GlutenPathFilterStrategySuite, GlutenPathFilterSuite, GlutenPruneFileSourcePartitionsSuite, GlutenVectorizedOrcReadSchemaSuite, GlutenVectorizedParquetReadSchemaSuite}
import org.apache.spark.sql.execution.datasources.binaryfile.GlutenBinaryFileFormatSuite
import org.apache.spark.sql.execution.datasources.csv.{GlutenCSVLegacyTimeParserSuite, GlutenCSVv1Suite, GlutenCSVv2Suite}
import org.apache.spark.sql.execution.datasources.exchange.GlutenValidateRequirementsSuite
import org.apache.spark.sql.execution.datasources.json.{GlutenJsonLegacyTimeParserSuite, GlutenJsonV1Suite, GlutenJsonV2Suite}
import org.apache.spark.sql.execution.datasources.orc.{GlutenOrcColumnarBatchReaderSuite, GlutenOrcFilterSuite, GlutenOrcPartitionDiscoverySuite, GlutenOrcSourceSuite, GlutenOrcV1FilterSuite, GlutenOrcV1PartitionDiscoverySuite, GlutenOrcV1QuerySuite, GlutenOrcV1SchemaPruningSuite, GlutenOrcV2QuerySuite, GlutenOrcV2SchemaPruningSuite}
import org.apache.spark.sql.execution.datasources.parquet.{GlutenParquetColumnIndexSuite, GlutenParquetCompressionCodecPrecedenceSuite, GlutenParquetDeltaByteArrayEncodingSuite, GlutenParquetDeltaEncodingInteger, GlutenParquetDeltaEncodingLong, GlutenParquetDeltaLengthByteArrayEncodingSuite, GlutenParquetEncodingSuite, GlutenParquetFieldIdIOSuite, GlutenParquetFileFormatV1Suite, GlutenParquetFileFormatV2Suite, GlutenParquetInteroperabilitySuite, GlutenParquetIOSuite, GlutenParquetProtobufCompatibilitySuite, GlutenParquetRebaseDatetimeV1Suite, GlutenParquetRebaseDatetimeV2Suite, GlutenParquetSchemaInferenceSuite, GlutenParquetSchemaSuite, GlutenParquetThriftCompatibilitySuite, GlutenParquetV1FilterSuite, GlutenParquetV1PartitionDiscoverySuite, GlutenParquetV1QuerySuite, GlutenParquetV1SchemaPruningSuite, GlutenParquetV2FilterSuite, GlutenParquetV2PartitionDiscoverySuite, GlutenParquetV2QuerySuite, GlutenParquetV2SchemaPruningSuite, GlutenParquetVectorizedSuite}
import org.apache.spark.sql.execution.datasources.parquet.{GlutenParquetColumnIndexSuite, GlutenParquetCompressionCodecPrecedenceSuite, GlutenParquetDeltaByteArrayEncodingSuite, GlutenParquetDeltaEncodingInteger, GlutenParquetDeltaEncodingLong, GlutenParquetDeltaLengthByteArrayEncodingSuite, GlutenParquetEncodingSuite, GlutenParquetFieldIdIOSuite, GlutenParquetFileFormatV1Suite, GlutenParquetFileFormatV2Suite, GlutenParquetIOSuite, GlutenParquetInteroperabilitySuite, GlutenParquetProtobufCompatibilitySuite, GlutenParquetRebaseDatetimeV1Suite, GlutenParquetRebaseDatetimeV2Suite, GlutenParquetSchemaInferenceSuite, GlutenParquetSchemaSuite, GlutenParquetThriftCompatibilitySuite, GlutenParquetV1FilterSuite, GlutenParquetV1PartitionDiscoverySuite, GlutenParquetV1QuerySuite, GlutenParquetV1SchemaPruningSuite, GlutenParquetV2FilterSuite, GlutenParquetV2PartitionDiscoverySuite, GlutenParquetV2QuerySuite, GlutenParquetV2SchemaPruningSuite, GlutenParquetVectorizedSuite}
import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, GlutenTextV2Suite}
import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite}
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
import org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite, GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite, GlutenDDLSourceLoadSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE, GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite, GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite, GlutenPrunedScanSuite, GlutenResolvedDataSourceSuite, GlutenSaveLoadSuite, GlutenTableScanSuite}

// Some settings' line length exceeds 100
Expand Down Expand Up @@ -1124,5 +1124,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("cases when literal is max")
enableSuite[GlutenXPathFunctionsSuite]
enableSuite[GlutenFallbackSuite]
enableSuite[GlutenHiveSQLQuerySuite]
}
// scalastyle:on line.size.limit
Loading

0 comments on commit 1c73a48

Please sign in to comment.