Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-36135][SQL] Support TimestampNTZ type in file partitioning #33344

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -867,6 +867,9 @@ object TypeCoercion extends TypeCoercionBase {
case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) =>
Some(TimestampType)

case (_: TimestampNTZType, _: DateType) | (_: DateType, _: TimestampNTZType) =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed for a test case with mixed Date & TimestampNTZ partition columns

Some(TimestampNTZType)

case (t1, t2) => findTypeForComplex(t1, t2, findTightestCommonType)
}

Expand Down
Expand Up @@ -94,7 +94,7 @@ public static void populate(WritableColumnVector col, InternalRow row, int field
col.getChild(1).putLongs(0, capacity, c.microseconds);
} else if (t instanceof DateType) {
col.putInts(0, capacity, row.getInt(fieldIdx));
} else if (t instanceof TimestampType) {
} else if (t instanceof TimestampType || t instanceof TimestampNTZType) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

col.putLongs(0, capacity, row.getLong(fieldIdx));
} else {
throw new RuntimeException(String.format("DataType %s is not supported" +
Expand Down
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.getPartitionVa
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -480,14 +481,20 @@ object PartitioningUtils {

val timestampTry = Try {
val unescapedRaw = unescapePathName(raw)
// the inferred data type is consistent with the default timestamp type
val timestampType = SQLConf.get.timestampType
// try and parse the date, if no exception occurs this is a candidate to be resolved as
// TimestampType
timestampFormatter.parse(unescapedRaw)
// TimestampType or TimestampNTZType
timestampType match {
case TimestampType => timestampFormatter.parse(unescapedRaw)
case TimestampNTZType => timestampFormatter.parseWithoutTimeZone(unescapedRaw)
}

// SPARK-23436: see comment for date
val timestampValue = Cast(Literal(unescapedRaw), TimestampType, Some(zoneId.getId)).eval()
val timestampValue = Cast(Literal(unescapedRaw), timestampType, Some(zoneId.getId)).eval()
// Disallow TimestampType if the cast returned null
require(timestampValue != null)
TimestampType
timestampType
}

if (typeInference) {
Expand Down Expand Up @@ -522,11 +529,12 @@ object PartitioningUtils {
case _: DecimalType => Literal(new JBigDecimal(value)).value
case DateType =>
Cast(Literal(value), DateType, Some(zoneId.getId)).eval()
case TimestampType =>
// Timestamp types
case dt if AnyTimestampType.acceptsType(dt) =>
Try {
Cast(Literal(unescapePathName(value)), TimestampType, Some(zoneId.getId)).eval()
Cast(Literal(unescapePathName(value)), dt, Some(zoneId.getId)).eval()
}.getOrElse {
Cast(Cast(Literal(value), DateType, Some(zoneId.getId)), TimestampType).eval()
Cast(Cast(Literal(value), DateType, Some(zoneId.getId)), dt).eval()
}
case dt => throw QueryExecutionErrors.typeUnsupportedError(dt)
}
Expand Down
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
import java.math.BigInteger
import java.sql.Timestamp
import java.time.{ZoneId, ZoneOffset}
import java.util.{Calendar, Locale}
import java.time.{LocalDateTime, ZoneId, ZoneOffset}
import java.util.Locale

import com.google.common.io.Files
import org.apache.hadoop.fs.Path
Expand All @@ -32,12 +32,13 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneUTC
import org.apache.spark.sql.catalyst.util.DateTimeUtils.localDateTimeToMicros
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.TimestampTypes
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -82,12 +83,13 @@ abstract class ParquetPartitionDiscoverySuite
check("1.5", DoubleType)
check("hello", StringType)
check("1990-02-24", DateType)
check("1990-02-24 12:00:30", TimestampType)

val c = Calendar.getInstance(TimeZoneUTC)
c.set(1990, 1, 24, 12, 0, 30)
c.set(Calendar.MILLISECOND, 0)
check("1990-02-24 12:00:30", TimestampType, ZoneOffset.UTC)
// The inferred timestmap type is consistent with the value of `SQLConf.TIMESTAMP_TYPE`
Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach { tsType =>
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) {
check("1990-02-24 12:00:30", SQLConf.get.timestampType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: SQLConf.get.timestampType -> tsType

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tsType is ENUM type in SQLConf.

check("1990-02-24 12:00:30", SQLConf.get.timestampType, ZoneOffset.UTC)
}
}

check(defaultPartitionName, NullType)
}
Expand Down Expand Up @@ -366,31 +368,44 @@ abstract class ParquetPartitionDiscoverySuite
s"hdfs://host:9000/path2"),
PartitionSpec.emptySpec)

// The cases below check the resolution for type conflicts.
val t1 = Timestamp.valueOf("2014-01-01 00:00:00.0").getTime * 1000
val t2 = Timestamp.valueOf("2014-01-01 00:01:00.0").getTime * 1000
// Values in column 'a' are inferred as null, date and timestamp each, and timestamp is set
// as a common type.
// Values in column 'b' are inferred as integer, decimal(22, 0) and null, and decimal(22, 0)
// is set as a common type.
check(Seq(
s"hdfs://host:9000/path/a=$defaultPartitionName/b=0",
s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111",
s"hdfs://host:9000/path/a=2014-01-01 00%3A01%3A00.0/b=$defaultPartitionName"),
PartitionSpec(
StructType(Seq(
StructField("a", TimestampType),
StructField("b", DecimalType(22, 0)))),
Seq(
Partition(
InternalRow(null, Decimal(0)),
s"hdfs://host:9000/path/a=$defaultPartitionName/b=0"),
Partition(
InternalRow(t1, Decimal(s"${Long.MaxValue}111")),
s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111"),
Partition(
InternalRow(t2, null),
s"hdfs://host:9000/path/a=2014-01-01 00%3A01%3A00.0/b=$defaultPartitionName"))))
// The inferred timestmap type is consistent with the value of `SQLConf.TIMESTAMP_TYPE`
Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach { tsType =>
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) {
// The cases below check the resolution for type conflicts.
val t1 = if (tsType == TimestampTypes.TIMESTAMP_LTZ) {
Timestamp.valueOf("2014-01-01 00:00:00.0").getTime * 1000
} else {
localDateTimeToMicros(LocalDateTime.parse("2014-01-01T00:00:00"))
}
val t2 = if (tsType == TimestampTypes.TIMESTAMP_LTZ) {
Timestamp.valueOf("2014-01-01 00:01:00.0").getTime * 1000
} else {
localDateTimeToMicros(LocalDateTime.parse("2014-01-01T00:01:00"))
}
// Values in column 'a' are inferred as null, date and timestamp each, and timestamp is set
// as a common type.
// Values in column 'b' are inferred as integer, decimal(22, 0) and null, and decimal(22, 0)
// is set as a common type.
check(Seq(
s"hdfs://host:9000/path/a=$defaultPartitionName/b=0",
s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111",
s"hdfs://host:9000/path/a=2014-01-01 00%3A01%3A00.0/b=$defaultPartitionName"),
PartitionSpec(
StructType(Seq(
StructField("a", SQLConf.get.timestampType),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

StructField("b", DecimalType(22, 0)))),
Seq(
Partition(
InternalRow(null, Decimal(0)),
s"hdfs://host:9000/path/a=$defaultPartitionName/b=0"),
Partition(
InternalRow(t1, Decimal(s"${Long.MaxValue}111")),
s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111"),
Partition(
InternalRow(t2, null),
s"hdfs://host:9000/path/a=2014-01-01 00%3A01%3A00.0/b=$defaultPartitionName"))))
}
}
}

test("parse partitions with type inference disabled") {
Expand Down Expand Up @@ -642,97 +657,121 @@ abstract class ParquetPartitionDiscoverySuite
}

test("Various partition value types") {
val row =
Row(
100.toByte,
40000.toShort,
Int.MaxValue,
Long.MaxValue,
1.5.toFloat,
4.5,
new java.math.BigDecimal(new BigInteger("212500"), 5),
new java.math.BigDecimal("2.125"),
java.sql.Date.valueOf("2015-05-23"),
new Timestamp(0),
"This is a string, /[]?=:",
"This is not a partition column")

// BooleanType is not supported yet
val partitionColumnTypes =
Seq(
ByteType,
ShortType,
IntegerType,
LongType,
FloatType,
DoubleType,
DecimalType(10, 5),
DecimalType.SYSTEM_DEFAULT,
DateType,
TimestampType,
StringType)

val partitionColumns = partitionColumnTypes.zipWithIndex.map {
case (t, index) => StructField(s"p_$index", t)
}
Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach { tsType =>
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) {
val ts = if (tsType == TimestampTypes.TIMESTAMP_LTZ) {
new Timestamp(0)
} else {
LocalDateTime.parse("1970-01-01T00:00:00")
}
val row =
Row(
100.toByte,
40000.toShort,
Int.MaxValue,
Long.MaxValue,
1.5.toFloat,
4.5,
new java.math.BigDecimal(new BigInteger("212500"), 5),
new java.math.BigDecimal("2.125"),
java.sql.Date.valueOf("2015-05-23"),
ts,
"This is a string, /[]?=:",
"This is not a partition column")

// BooleanType is not supported yet
val partitionColumnTypes =
Seq(
ByteType,
ShortType,
IntegerType,
LongType,
FloatType,
DoubleType,
DecimalType(10, 5),
DecimalType.SYSTEM_DEFAULT,
DateType,
SQLConf.get.timestampType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

StringType)

val partitionColumns = partitionColumnTypes.zipWithIndex.map {
case (t, index) => StructField(s"p_$index", t)
}

val schema = StructType(partitionColumns :+ StructField(s"i", StringType))
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)
val schema = StructType(partitionColumns :+ StructField(s"i", StringType))
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)

withTempPath { dir =>
df.write.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name).cast(f.dataType))
checkAnswer(spark.read.load(dir.toString).select(fields: _*), row)
}
withTempPath { dir =>
df.write
.format("parquet")
.partitionBy(partitionColumns.map(_.name): _*)
.save(dir.toString)
val fields = schema.map(f => Column(f.name).cast(f.dataType))
checkAnswer(spark.read.load(dir.toString).select(fields: _*), row)
}

withTempPath { dir =>
df.write.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name).cast(f.dataType))
checkAnswer(spark.read.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.load(dir.toString).select(fields: _*), row)
withTempPath { dir =>
df.write.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name).cast(f.dataType))
checkAnswer(spark.read.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.load(dir.toString).select(fields: _*), row)
}
}
}
}

test("Various inferred partition value types") {
val row =
Row(
Long.MaxValue,
4.5,
new java.math.BigDecimal(new BigInteger("1" * 20)),
java.sql.Date.valueOf("2015-05-23"),
java.sql.Timestamp.valueOf("1990-02-24 12:00:30"),
"This is a string, /[]?=:",
"This is not a partition column")

val partitionColumnTypes =
Seq(
LongType,
DoubleType,
DecimalType(20, 0),
DateType,
TimestampType,
StringType)

val partitionColumns = partitionColumnTypes.zipWithIndex.map {
case (t, index) => StructField(s"p_$index", t)
}
Seq(TimestampTypes.TIMESTAMP_LTZ, TimestampTypes.TIMESTAMP_NTZ).foreach { tsType =>
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> tsType.toString) {
val ts = if (tsType == TimestampTypes.TIMESTAMP_LTZ) {
Timestamp.valueOf("1990-02-24 12:00:30")
} else {
LocalDateTime.parse("1990-02-24T12:00:30")
}
val row =
Row(
Long.MaxValue,
4.5,
new java.math.BigDecimal(new BigInteger("1" * 20)),
java.sql.Date.valueOf("2015-05-23"),
ts,
"This is a string, /[]?=:",
"This is not a partition column")

val partitionColumnTypes =
Seq(
LongType,
DoubleType,
DecimalType(20, 0),
DateType,
SQLConf.get.timestampType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

StringType)

val partitionColumns = partitionColumnTypes.zipWithIndex.map {
case (t, index) => StructField(s"p_$index", t)
}

val schema = StructType(partitionColumns :+ StructField(s"i", StringType))
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)
val schema = StructType(partitionColumns :+ StructField(s"i", StringType))
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)

withTempPath { dir =>
df.write.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name))
checkAnswer(spark.read.load(dir.toString).select(fields: _*), row)
}
withTempPath { dir =>
df.write
.format("parquet")
.partitionBy(partitionColumns.map(_.name): _*)
.save(dir.toString)
val fields = schema.map(f => Column(f.name))
checkAnswer(spark.read.load(dir.toString).select(fields: _*), row)
}

withTempPath { dir =>
df.write.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name))
checkAnswer(spark.read.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.load(dir.toString).select(fields: _*), row)
withTempPath { dir =>
df.write.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name))
checkAnswer(spark.read.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
.load(dir.toString).select(fields: _*), row)
}
}
}
}

Expand Down Expand Up @@ -1346,4 +1385,4 @@ class ParquetV2PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite {
}
}
}
}
}