Skip to content

Commit

Permalink
TIMESTAMP_SECONDS supports fractional input
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jun 30, 2020
1 parent 5472170 commit f0256db
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 39 deletions.
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions
import java.text.ParseException
import java.time.{DateTimeException, LocalDate, LocalDateTime, ZoneId}
import java.time.format.DateTimeParseException
import java.time.temporal.IsoFields
import java.util.Locale

import org.apache.commons.text.StringEscapeUtils
Expand Down Expand Up @@ -386,7 +385,7 @@ case class DayOfYear(child: Expression) extends GetDateField {
override val funcName = "getDayInYear"
}

abstract class NumberToTimestampBase extends UnaryExpression
abstract class IntegralToTimestampBase extends UnaryExpression
with ExpectsInputTypes with NullIntolerant {

protected def upScaleFactor: Long
Expand All @@ -408,19 +407,66 @@ abstract class NumberToTimestampBase extends UnaryExpression
}
}

// scalastyle:off line.size.limit
@ExpressionDescription(
usage = "_FUNC_(seconds) - Creates timestamp from the number of seconds since UTC epoch.",
usage = "_FUNC_(seconds) - Creates timestamp from the number of seconds (can be fractional) since UTC epoch.",
examples = """
Examples:
> SELECT _FUNC_(1230219000);
2008-12-25 07:30:00
> SELECT _FUNC_(1230219000.123);
2008-12-25 07:30:00.123
""",
group = "datetime_funcs",
since = "3.1.0")
case class SecondsToTimestamp(child: Expression)
extends NumberToTimestampBase {
// scalastyle:on line.size.limit
case class SecondsToTimestamp(child: Expression) extends UnaryExpression
with ExpectsInputTypes with NullIntolerant{

override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)

override def dataType: DataType = TimestampType

override def upScaleFactor: Long = MICROS_PER_SECOND
override def nullable: Boolean = child.dataType match {
case _: FloatType | _: DoubleType => true
case _ => child.nullable
}

@transient
private lazy val evalFunc: Any => Any = child.dataType match {
case _: IntegralType => input =>
Math.multiplyExact(input.asInstanceOf[Number].longValue(), MICROS_PER_SECOND)
case _: DecimalType => input =>
val operand = new java.math.BigDecimal(MICROS_PER_SECOND)
input.asInstanceOf[Decimal].toJavaBigDecimal.multiply(operand).longValueExact()
case _: FloatType => input =>
val f = input.asInstanceOf[Float]
if (f.isNaN || f.isInfinite) null else (f * MICROS_PER_SECOND).toLong
case _: DoubleType => input =>
val d = input.asInstanceOf[Double]
if (d.isNaN || d.isInfinite) null else (d * MICROS_PER_SECOND).toLong
}

override def nullSafeEval(input: Any): Any = evalFunc(input)

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = child.dataType match {
case _: IntegralType =>
defineCodeGen(ctx, ev, c => s"java.lang.Math.multiplyExact($c, ${MICROS_PER_SECOND}L)")
case _: DecimalType =>
val operand = s"new java.math.BigDecimal($MICROS_PER_SECOND)"
defineCodeGen(ctx, ev, c => s"$c.toJavaBigDecimal().multiply($operand).longValueExact()")
case other =>
nullSafeCodeGen(ctx, ev, c => {
val typeStr = CodeGenerator.boxedType(other)
s"""
|if ($typeStr.isNaN($c) || $typeStr.isInfinite($c)) {
| ${ev.isNull} = true;
|} else {
| ${ev.value} = (long)($c * $MICROS_PER_SECOND);
|}
|""".stripMargin
})
}

override def prettyName: String = "timestamp_seconds"
}
Expand All @@ -437,7 +483,7 @@ case class SecondsToTimestamp(child: Expression)
since = "3.1.0")
// scalastyle:on line.size.limit
case class MillisToTimestamp(child: Expression)
extends NumberToTimestampBase {
extends IntegralToTimestampBase {

override def upScaleFactor: Long = MICROS_PER_MILLIS

Expand All @@ -456,7 +502,7 @@ case class MillisToTimestamp(child: Expression)
since = "3.1.0")
// scalastyle:on line.size.limit
case class MicrosToTimestamp(child: Expression)
extends NumberToTimestampBase {
extends IntegralToTimestampBase {

override def upScaleFactor: Long = 1L

Expand Down
Expand Up @@ -1142,28 +1142,6 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}

test("SPARK-31710:Adds TIMESTAMP_SECONDS, " +
"TIMESTAMP_MILLIS and TIMESTAMP_MICROS functions") {
checkEvaluation(SecondsToTimestamp(Literal(1230219000)), 1230219000L * MICROS_PER_SECOND)
checkEvaluation(SecondsToTimestamp(Literal(-1230219000)), -1230219000L * MICROS_PER_SECOND)
checkEvaluation(SecondsToTimestamp(Literal(null, IntegerType)), null)
checkEvaluation(MillisToTimestamp(Literal(1230219000123L)), 1230219000123L * MICROS_PER_MILLIS)
checkEvaluation(MillisToTimestamp(
Literal(-1230219000123L)), -1230219000123L * MICROS_PER_MILLIS)
checkEvaluation(MillisToTimestamp(Literal(null, IntegerType)), null)
checkEvaluation(MicrosToTimestamp(Literal(1230219000123123L)), 1230219000123123L)
checkEvaluation(MicrosToTimestamp(Literal(-1230219000123123L)), -1230219000123123L)
checkEvaluation(MicrosToTimestamp(Literal(null, IntegerType)), null)
checkExceptionInExpression[ArithmeticException](
SecondsToTimestamp(Literal(1230219000123123L)), "long overflow")
checkExceptionInExpression[ArithmeticException](
SecondsToTimestamp(Literal(-1230219000123123L)), "long overflow")
checkExceptionInExpression[ArithmeticException](
MillisToTimestamp(Literal(92233720368547758L)), "long overflow")
checkExceptionInExpression[ArithmeticException](
MillisToTimestamp(Literal(-92233720368547758L)), "long overflow")
}

test("Consistent error handling for datetime formatting and parsing functions") {

def checkException[T <: Exception : ClassTag](c: String): Unit = {
Expand Down Expand Up @@ -1194,4 +1172,118 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
new ParseToTimestamp(Literal("11:11 PM"), Literal("mm:ss a")).child,
Timestamp.valueOf("1970-01-01 12:11:11.0"))
}

def testIntegralInput(testFunc: Number => Unit): Unit = {
def checkResult(input: Long): Unit = {
if (input.toByte == input) {
testFunc(input.toByte)
} else if (input.toShort == input) {
testFunc(input.toShort)
} else if (input.toInt == input) {
testFunc(input.toInt)
} else {
testFunc(input)
}
}
checkResult(0)
checkResult(Byte.MaxValue)
checkResult(Byte.MinValue)
checkResult(Short.MaxValue)
checkResult(Short.MinValue)
checkResult(Int.MaxValue)
checkResult(Int.MinValue)
checkResult(Int.MaxValue.toLong + 100)
checkResult(Int.MinValue.toLong - 100)
}

test("TIMESTAMP_SECONDS") {
def testIntegralFunc(value: Number): Unit = {
checkEvaluation(
SecondsToTimestamp(Literal(value)),
Instant.ofEpochSecond(value.longValue()))
}

// test null input
checkEvaluation(
SecondsToTimestamp(Literal(null, IntegerType)),
null)

// test integral input
testIntegralInput(testIntegralFunc)
// test overflow
checkExceptionInExpression[ArithmeticException](
SecondsToTimestamp(Literal(Long.MaxValue, LongType)), EmptyRow, "long overflow")

def testFractionalInput(input: String): Unit = {
Seq(input.toFloat, input.toDouble, Decimal(input)).foreach { value =>
checkEvaluation(
SecondsToTimestamp(Literal(value)),
(input.toDouble * MICROS_PER_SECOND).toLong)
}
}

testFractionalInput("1.0")
testFractionalInput("-1.0")
testFractionalInput("1.234567")
testFractionalInput("-1.234567")

// test overflow for decimal input
checkExceptionInExpression[ArithmeticException](
SecondsToTimestamp(Literal(Decimal("9" * 38))), "Overflow"
)
// test truncation error for decimal input
checkExceptionInExpression[ArithmeticException](
SecondsToTimestamp(Literal(Decimal("0.1234567"))), "Rounding necessary"
)

// test NaN
checkEvaluation(
SecondsToTimestamp(Literal(Double.NaN)),
null)
checkEvaluation(
SecondsToTimestamp(Literal(Float.NaN)),
null)
// double input can truncate
checkEvaluation(
SecondsToTimestamp(Literal(123.456789123)),
Instant.ofEpochSecond(123, 456789000))
}

test("TIMESTAMP_MILLIS") {
def testIntegralFunc(value: Number): Unit = {
checkEvaluation(
MillisToTimestamp(Literal(value)),
Instant.ofEpochMilli(value.longValue()))
}

// test null input
checkEvaluation(
MillisToTimestamp(Literal(null, IntegerType)),
null)

// test integral input
testIntegralInput(testIntegralFunc)
// test overflow
checkExceptionInExpression[ArithmeticException](
MillisToTimestamp(Literal(Long.MaxValue, LongType)), EmptyRow, "long overflow")
}

test("TIMESTAMP_MICROS") {
def testIntegralFunc(value: Number): Unit = {
checkEvaluation(
MicrosToTimestamp(Literal(value)),
value.longValue())
}

// test null input
checkEvaluation(
MicrosToTimestamp(Literal(null, IntegerType)),
null)

// test integral input
testIntegralInput(testIntegralFunc)
// test max/min input
testIntegralFunc(Long.MaxValue)
testIntegralFunc(Long.MinValue)
}
}
5 changes: 4 additions & 1 deletion sql/core/src/test/resources/sql-tests/inputs/datetime.sql
Expand Up @@ -2,13 +2,16 @@

-- [SPARK-31710] TIMESTAMP_SECONDS, TIMESTAMP_MILLISECONDS and TIMESTAMP_MICROSECONDS to timestamp transfer
select TIMESTAMP_SECONDS(1230219000),TIMESTAMP_SECONDS(-1230219000),TIMESTAMP_SECONDS(null);
select TIMESTAMP_SECONDS(1.23), TIMESTAMP_SECONDS(1.23d);
select TIMESTAMP_MILLIS(1230219000123),TIMESTAMP_MILLIS(-1230219000123),TIMESTAMP_MILLIS(null);
select TIMESTAMP_MICROS(1230219000123123),TIMESTAMP_MICROS(-1230219000123123),TIMESTAMP_MICROS(null);
-- overflow exception:
-- overflow exception
select TIMESTAMP_SECONDS(1230219000123123);
select TIMESTAMP_SECONDS(-1230219000123123);
select TIMESTAMP_MILLIS(92233720368547758);
select TIMESTAMP_MILLIS(-92233720368547758);
-- truncate exception
select TIMESTAMP_SECONDS(0.1234567);

-- [SPARK-16836] current_date and current_timestamp literals
select current_date = current_date(), current_timestamp = current_timestamp();
Expand Down
@@ -1,15 +1,23 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 103
-- Number of queries: 105


-- !query
select TIMESTAMP_SECONDS(1230219000),TIMESTAMP_SECONDS(-1230219000),TIMESTAMP_SECONDS(null)
-- !query schema
struct<timestamp_seconds(1230219000):timestamp,timestamp_seconds(-1230219000):timestamp,timestamp_seconds(CAST(NULL AS INT)):timestamp>
struct<timestamp_seconds(1230219000):timestamp,timestamp_seconds(-1230219000):timestamp,timestamp_seconds(CAST(NULL AS DOUBLE)):timestamp>
-- !query output
2008-12-25 07:30:00 1931-01-07 00:30:00 NULL


-- !query
select TIMESTAMP_SECONDS(1.23), TIMESTAMP_SECONDS(1.23d)
-- !query schema
struct<timestamp_seconds(1.23):timestamp,timestamp_seconds(1.23):timestamp>
-- !query output
1969-12-31 16:00:01.23 1969-12-31 16:00:01.23


-- !query
select TIMESTAMP_MILLIS(1230219000123),TIMESTAMP_MILLIS(-1230219000123),TIMESTAMP_MILLIS(null)
-- !query schema
Expand Down Expand Up @@ -62,6 +70,15 @@ java.lang.ArithmeticException
long overflow


-- !query
select TIMESTAMP_SECONDS(0.1234567)
-- !query schema
struct<>
-- !query output
java.lang.ArithmeticException
Rounding necessary


-- !query
select current_date = current_date(), current_timestamp = current_timestamp()
-- !query schema
Expand Down
@@ -1,15 +1,23 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 103
-- Number of queries: 105


-- !query
select TIMESTAMP_SECONDS(1230219000),TIMESTAMP_SECONDS(-1230219000),TIMESTAMP_SECONDS(null)
-- !query schema
struct<timestamp_seconds(1230219000):timestamp,timestamp_seconds(-1230219000):timestamp,timestamp_seconds(CAST(NULL AS INT)):timestamp>
struct<timestamp_seconds(1230219000):timestamp,timestamp_seconds(-1230219000):timestamp,timestamp_seconds(CAST(NULL AS DOUBLE)):timestamp>
-- !query output
2008-12-25 07:30:00 1931-01-07 00:30:00 NULL


-- !query
select TIMESTAMP_SECONDS(1.23), TIMESTAMP_SECONDS(1.23d)
-- !query schema
struct<timestamp_seconds(1.23):timestamp,timestamp_seconds(1.23):timestamp>
-- !query output
1969-12-31 16:00:01.23 1969-12-31 16:00:01.23


-- !query
select TIMESTAMP_MILLIS(1230219000123),TIMESTAMP_MILLIS(-1230219000123),TIMESTAMP_MILLIS(null)
-- !query schema
Expand Down Expand Up @@ -62,6 +70,15 @@ java.lang.ArithmeticException
long overflow


-- !query
select TIMESTAMP_SECONDS(0.1234567)
-- !query schema
struct<>
-- !query output
java.lang.ArithmeticException
Rounding necessary


-- !query
select current_date = current_date(), current_timestamp = current_timestamp()
-- !query schema
Expand Down
21 changes: 19 additions & 2 deletions sql/core/src/test/resources/sql-tests/results/datetime.sql.out
@@ -1,15 +1,23 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 103
-- Number of queries: 105


-- !query
select TIMESTAMP_SECONDS(1230219000),TIMESTAMP_SECONDS(-1230219000),TIMESTAMP_SECONDS(null)
-- !query schema
struct<timestamp_seconds(1230219000):timestamp,timestamp_seconds(-1230219000):timestamp,timestamp_seconds(CAST(NULL AS INT)):timestamp>
struct<timestamp_seconds(1230219000):timestamp,timestamp_seconds(-1230219000):timestamp,timestamp_seconds(CAST(NULL AS DOUBLE)):timestamp>
-- !query output
2008-12-25 07:30:00 1931-01-07 00:30:00 NULL


-- !query
select TIMESTAMP_SECONDS(1.23), TIMESTAMP_SECONDS(1.23d)
-- !query schema
struct<timestamp_seconds(1.23):timestamp,timestamp_seconds(1.23):timestamp>
-- !query output
1969-12-31 16:00:01.23 1969-12-31 16:00:01.23


-- !query
select TIMESTAMP_MILLIS(1230219000123),TIMESTAMP_MILLIS(-1230219000123),TIMESTAMP_MILLIS(null)
-- !query schema
Expand Down Expand Up @@ -62,6 +70,15 @@ java.lang.ArithmeticException
long overflow


-- !query
select TIMESTAMP_SECONDS(0.1234567)
-- !query schema
struct<>
-- !query output
java.lang.ArithmeticException
Rounding necessary


-- !query
select current_date = current_date(), current_timestamp = current_timestamp()
-- !query schema
Expand Down

0 comments on commit f0256db

Please sign in to comment.