Skip to content

Commit

Permalink
[SPARK-31183][SQL][FOLLOWUP] Move rebase tests to AvroSuite and che…
Browse files Browse the repository at this point in the history
…ck the rebase flag out of function bodies

### What changes were proposed in this pull request?
1. The tests added by #27953 are moved from `AvroLogicalTypeSuite` to `AvroSuite`.
2. Checking of the `rebaseDateTime` flag is moved out from functions bodies.

### Why are the changes needed?
1. The tests are moved because they are not directly related to logical types.
2. Checking the flag out of functions bodies should improve performance.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
By running Avro tests via the command `build/sbt avro/test`

Closes #27964 from MaxGekk/rebase-avro-datetime-followup.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
MaxGekk authored and HyukjinKwon committed Mar 20, 2020
1 parent 6a66876 commit b402bc9
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 129 deletions.
Expand Up @@ -106,21 +106,22 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
case (LONG, TimestampType) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), the value is processed as timestamp type with millisecond precision.
case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.millisToMicros(millis)
val rebasedMicros = DateTimeUtils.rebaseJulianToGregorianMicros(micros)
updater.setLong(ordinal, rebasedMicros)
case null | _: TimestampMillis => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.millisToMicros(millis)
if (rebaseDateTime) {
updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros))
} else {
updater.setLong(ordinal, micros)
}
updater.setLong(ordinal, micros)
case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
val rebasedMicros = DateTimeUtils.rebaseJulianToGregorianMicros(micros)
updater.setLong(ordinal, rebasedMicros)
case _: TimestampMicros => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
if (rebaseDateTime) {
updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros))
} else {
updater.setLong(ordinal, micros)
}
updater.setLong(ordinal, micros)
case other => throw new IncompatibleSchemaException(
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
}
Expand Down
Expand Up @@ -149,17 +149,15 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
case (TimestampType, LONG) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), output the timestamp value as with millisecond precision.
case null | _: TimestampMillis => (getter, ordinal) =>
case null | _: TimestampMillis if rebaseDateTime => (getter, ordinal) =>
val micros = getter.getLong(ordinal)
val rebasedMicros = if (rebaseDateTime) {
DateTimeUtils.rebaseGregorianToJulianMicros(micros)
} else micros
val rebasedMicros = DateTimeUtils.rebaseGregorianToJulianMicros(micros)
DateTimeUtils.microsToMillis(rebasedMicros)
case _: TimestampMicros => (getter, ordinal) =>
val micros = getter.getLong(ordinal)
if (rebaseDateTime) {
DateTimeUtils.rebaseGregorianToJulianMicros(micros)
} else micros
case null | _: TimestampMillis => (getter, ordinal) =>
DateTimeUtils.microsToMillis(getter.getLong(ordinal))
case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
DateTimeUtils.rebaseGregorianToJulianMicros(getter.getLong(ordinal))
case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal)
case other => throw new IncompatibleSchemaException(
s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}")
}
Expand Down
Expand Up @@ -17,15 +17,15 @@
package org.apache.spark.sql.avro

import java.io.File
import java.sql.{Date, Timestamp}
import java.sql.Timestamp

import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -348,100 +348,6 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
assert(msg.contains("Unscaled value too large for precision"))
}
}

private def readResourceAvroFile(name: String): DataFrame = {
val url = Thread.currentThread().getContextClassLoader.getResource(name)
spark.read.format("avro").load(url.toString)
}

test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
checkAnswer(
readResourceAvroFile("before_1582_date_v2_4.avro"),
Row(java.sql.Date.valueOf("1001-01-01")))
checkAnswer(
readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
checkAnswer(
readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
}
}

test("SPARK-31183: rebasing microseconds timestamps in write") {
val tsStr = "1001-01-01 01:02:03.123456"
val nonRebased = "1001-01-07 01:09:05.123456"
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
Seq(tsStr).toDF("tsS")
.select($"tsS".cast("timestamp").as("ts"))
.write.format("avro")
.save(path)

checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased)))
}
}
}

test("SPARK-31183: rebasing milliseconds timestamps in write") {
val tsStr = "1001-01-01 01:02:03.123456"
val rebased = "1001-01-01 01:02:03.123"
val nonRebased = "1001-01-07 01:09:05.123"
Seq(
"""{"type": "long","logicalType": "timestamp-millis"}""",
""""long"""").foreach { tsType =>
val timestampSchema = s"""
|{
| "namespace": "logical",
| "type": "record",
| "name": "test",
| "fields": [
| {"name": "ts", "type": $tsType}
| ]
|}""".stripMargin
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
Seq(tsStr).toDF("tsS")
.select($"tsS".cast("timestamp").as("ts"))
.write
.option("avroSchema", timestampSchema)
.format("avro")
.save(path)

checkAnswer(
spark.read.schema("ts timestamp").format("avro").load(path),
Row(Timestamp.valueOf(rebased)))
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
checkAnswer(
spark.read.schema("ts timestamp").format("avro").load(path),
Row(Timestamp.valueOf(nonRebased)))
}
}
}
}

test("SPARK-31183: rebasing dates in write") {
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
Seq("1001-01-01").toDF("dateS")
.select($"dateS".cast("date").as("date"))
.write.format("avro")
.save(path)

checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07")))
}
}
}
}

class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite {
Expand Down
124 changes: 110 additions & 14 deletions external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
Expand Up @@ -21,7 +21,7 @@ import java.io._
import java.net.URL
import java.nio.file.{Files, Paths}
import java.sql.{Date, Timestamp}
import java.util.{Locale, TimeZone, UUID}
import java.util.{Locale, UUID}

import scala.collection.JavaConverters._

Expand All @@ -35,9 +35,10 @@ import org.apache.commons.io.FileUtils

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.TestingUDT.{IntervalData, NullData, NullUDT}
import org.apache.spark.sql.TestingUDT.IntervalData
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
Expand Down Expand Up @@ -83,6 +84,11 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
}, new GenericDatumReader[Any]()).getSchema.toString(false)
}

private def readResourceAvroFile(name: String): DataFrame = {
val url = Thread.currentThread().getContextClassLoader.getResource(name)
spark.read.format("avro").load(url.toString)
}

test("resolve avro data source") {
val databricksAvro = "com.databricks.spark.avro"
// By default the backward compatibility for com.databricks.spark.avro is enabled.
Expand Down Expand Up @@ -402,18 +408,19 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
StructField("float", FloatType, true),
StructField("date", DateType, true)
))
TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
val rdd = spark.sparkContext.parallelize(Seq(
Row(1f, null),
Row(2f, new Date(1451948400000L)),
Row(3f, new Date(1460066400500L))
))
val df = spark.createDataFrame(rdd, schema)
df.write.format("avro").save(dir.toString)
assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
checkAnswer(
spark.read.format("avro").load(dir.toString).select("date"),
Seq(Row(null), Row(new Date(1451865600000L)), Row(new Date(1459987200000L))))
DateTimeTestUtils.withDefaultTimeZone(DateTimeUtils.TimeZoneUTC) {
val rdd = spark.sparkContext.parallelize(Seq(
Row(1f, null),
Row(2f, new Date(1451948400000L)),
Row(3f, new Date(1460066400500L))
))
val df = spark.createDataFrame(rdd, schema)
df.write.format("avro").save(dir.toString)
assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
checkAnswer(
spark.read.format("avro").load(dir.toString).select("date"),
Seq(Row(null), Row(new Date(1451865600000L)), Row(new Date(1459987200000L))))
}
}
}

Expand Down Expand Up @@ -1521,6 +1528,95 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
assert(deprecatedEvents.size === 1)
}
}

test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
checkAnswer(
readResourceAvroFile("before_1582_date_v2_4.avro"),
Row(java.sql.Date.valueOf("1001-01-01")))
checkAnswer(
readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
checkAnswer(
readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
}
}

test("SPARK-31183: rebasing microseconds timestamps in write") {
val tsStr = "1001-01-01 01:02:03.123456"
val nonRebased = "1001-01-07 01:09:05.123456"
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
Seq(tsStr).toDF("tsS")
.select($"tsS".cast("timestamp").as("ts"))
.write.format("avro")
.save(path)

checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased)))
}
}
}

test("SPARK-31183: rebasing milliseconds timestamps in write") {
val tsStr = "1001-01-01 01:02:03.123456"
val rebased = "1001-01-01 01:02:03.123"
val nonRebased = "1001-01-07 01:09:05.123"
Seq(
"""{"type": "long","logicalType": "timestamp-millis"}""",
""""long"""").foreach { tsType =>
val timestampSchema = s"""
|{
| "namespace": "logical",
| "type": "record",
| "name": "test",
| "fields": [
| {"name": "ts", "type": $tsType}
| ]
|}""".stripMargin
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
Seq(tsStr).toDF("tsS")
.select($"tsS".cast("timestamp").as("ts"))
.write
.option("avroSchema", timestampSchema)
.format("avro")
.save(path)

checkAnswer(
spark.read.schema("ts timestamp").format("avro").load(path),
Row(Timestamp.valueOf(rebased)))
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
checkAnswer(
spark.read.schema("ts timestamp").format("avro").load(path),
Row(Timestamp.valueOf(nonRebased)))
}
}
}
}

test("SPARK-31183: rebasing dates in write") {
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
Seq("1001-01-01").toDF("dateS")
.select($"dateS".cast("date").as("date"))
.write.format("avro")
.save(path)

checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07")))
}
}
}
}

class AvroV1Suite extends AvroSuite {
Expand Down

0 comments on commit b402bc9

Please sign in to comment.