Skip to content

Commit

Permalink
new conf
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed Jul 1, 2021
1 parent 94c1e3c commit ad97b52
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 1 deletion.
Expand Up @@ -2502,7 +2502,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
case ("float" | "real", Nil) => FloatType
case ("double", Nil) => DoubleType
case ("date", Nil) => DateType
case ("timestamp", Nil) => TimestampType
case ("timestamp", Nil) => SQLConf.get.timestampType
case ("string", Nil) => StringType
case ("character" | "char", length :: Nil) => CharType(length.getText.toInt)
case ("varchar", length :: Nil) => VarcharType(length.getText.toInt)
Expand Down
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.plans.logical.HintErrorHandler
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.types.{AtomicType, TimestampNTZType, TimestampType}
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -2820,6 +2821,23 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

object TimestampTypes extends Enumeration {
val TIMESTAMP_NTZ, TIMESTAMP_LTZ = Value
}

val TIMESTAMP_TYPE =
buildConf("spark.sql.timestampType")
.doc("Configures the default timestamp type of Spark SQL, including SQL DDL and Cast " +
"clause. Setting the configuration as TIMESTAMP_NTZ will use TIMESTAMP WITHOUT TIME " +
"ZONE as the default type while putting it as TIMESTAMP_LTZ will use TIMESTAMP WITH " +
"LOCAL TIME ZONE. Before the 3.2.0 release, Spark only supports the TIMESTAMP WITH " +
"LOCAL TIME ZONE type.")
.version("3.2.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(TimestampTypes.values.map(_.toString))
.createWithDefault(TimestampTypes.TIMESTAMP_LTZ.toString)

val DATETIME_JAVA8API_ENABLED = buildConf("spark.sql.datetime.java8API.enabled")
.doc("If the configuration property is set to true, java.time.Instant and " +
"java.time.LocalDate classes of Java 8 API are used as external types for " +
Expand Down Expand Up @@ -3897,6 +3915,15 @@ class SQLConf extends Serializable with Logging {

def ansiEnabled: Boolean = getConf(ANSI_ENABLED)

def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match {
case "TIMESTAMP_LTZ" =>
// For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL TIME ZONE
TimestampType

case "TIMESTAMP_NTZ" =>
TimestampNTZType
}

def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED)

def serializerNestedSchemaPruningEnabled: Boolean =
Expand Down
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import java.sql.Timestamp
import java.time.LocalDateTime

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{StructField, StructType, TimestampNTZType, TimestampType}
import org.apache.spark.util.ResetSystemProperties

// Test suite for setting the default timestamp type of Spark SQL
class TimestampTypeSuite extends QueryTest with SharedSparkSession with ResetSystemProperties {
test("Create and Alter Table") {
Seq(("TIMESTAMP_NTZ", TimestampNTZType), ("TIMESTAMP_LTZ", TimestampType)).foreach {
case (v, dt) =>
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> v) {
withTable("t") {
sql("create table t(ts timestamp) using csv")
val expectedSchema = new StructType().add(StructField("ts", dt))
assert(spark.table("t").schema == expectedSchema)
sql("alter table t add column (ts2 timestamp)")
val expectedSchema2 = expectedSchema.add(StructField("ts2", dt))
assert(spark.table("t").schema == expectedSchema2)
}
}
}
}

test("Cast") {
val timestampStr = "2021-01-01 00:00:00"
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") {
val df = sql(s"select cast('$timestampStr' as timestamp) as ts")
val expectedSchema = new StructType().add(StructField("ts", TimestampNTZType))
assert(df.schema == expectedSchema)
val expectedAnswer = Row(LocalDateTime.parse(timestampStr.replace(" ", "T")))
checkAnswer(df, expectedAnswer)
}
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_LTZ") {
val df = sql(s"select cast('$timestampStr' as timestamp) as ts")
val expectedSchema = new StructType().add(StructField("ts", TimestampType))
assert(df.schema == expectedSchema)
val expectedAnswer = Row(Timestamp.valueOf(timestampStr))
checkAnswer(df, expectedAnswer)
}
}
}

0 comments on commit ad97b52

Please sign in to comment.