Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ object FunctionRegistry {
expression[Year]("year"),
expression[TimeWindow]("window"),
expression[MakeDate]("make_date"),
expression[MakeTimestamp]("make_timestamp"),

// collection functions
expression[CreateArray]("array"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,157 @@ abstract class QuaternaryExpression extends Expression {
}
}

/**
* An expression with six inputs + 7th optional input and one output.
* The output is by default evaluated to null if any input is evaluated to null.
*/
abstract class SeptenaryExpression extends Expression {

override def foldable: Boolean = children.forall(_.foldable)

override def nullable: Boolean = children.exists(_.nullable)

/**
* Default behavior of evaluation according to the default nullability of SeptenaryExpression.
* If subclass of SeptenaryExpression override nullable, probably should also override this.
*/
override def eval(input: InternalRow): Any = {
val exprs = children
val v1 = exprs(0).eval(input)
if (v1 != null) {
val v2 = exprs(1).eval(input)
if (v2 != null) {
val v3 = exprs(2).eval(input)
if (v3 != null) {
val v4 = exprs(3).eval(input)
if (v4 != null) {
val v5 = exprs(4).eval(input)
if (v5 != null) {
val v6 = exprs(5).eval(input)
if (v6 != null) {
if (exprs.length > 6) {
val v7 = exprs(6).eval(input)
if (v7 != null) {
return nullSafeEval(v1, v2, v3, v4, v5, v6, Some(v7))
}
} else {
return nullSafeEval(v1, v2, v3, v4, v5, v6, None)
}
}
}
}
}
}
}
null
}

/**
* Called by default [[eval]] implementation. If subclass of SeptenaryExpression keep the
* default nullability, they can override this method to save null-check code. If we need
* full control of evaluation process, we should override [[eval]].
*/
protected def nullSafeEval(
input1: Any,
input2: Any,
input3: Any,
input4: Any,
input5: Any,
input6: Any,
input7: Option[Any]): Any = {
sys.error("SeptenaryExpression must override either eval or nullSafeEval")
}

/**
* Short hand for generating septenary evaluation code.
* If either of the sub-expressions is null, the result of this computation
* is assumed to be null.
*
* @param f accepts seven variable names and returns Java code to compute the output.
*/
protected def defineCodeGen(
ctx: CodegenContext,
ev: ExprCode,
f: (String, String, String, String, String, String, Option[String]) => String
): ExprCode = {
nullSafeCodeGen(ctx, ev, (eval1, eval2, eval3, eval4, eval5, eval6, eval7) => {
s"${ev.value} = ${f(eval1, eval2, eval3, eval4, eval5, eval6, eval7)};"
})
}

/**
* Short hand for generating septenary evaluation code.
* If either of the sub-expressions is null, the result of this computation
* is assumed to be null.
*
* @param f function that accepts the 7 non-null evaluation result names of children
* and returns Java code to compute the output.
*/
protected def nullSafeCodeGen(
ctx: CodegenContext,
ev: ExprCode,
f: (String, String, String, String, String, String, Option[String]) => String
): ExprCode = {
val firstGen = children(0).genCode(ctx)
val secondGen = children(1).genCode(ctx)
val thirdGen = children(2).genCode(ctx)
val fourthGen = children(3).genCode(ctx)
val fifthGen = children(4).genCode(ctx)
val sixthGen = children(5).genCode(ctx)
val seventhGen = if (children.length > 6) Some(children(6).genCode(ctx)) else None
val resultCode = f(
firstGen.value,
secondGen.value,
thirdGen.value,
fourthGen.value,
fifthGen.value,
sixthGen.value,
seventhGen.map(_.value))

if (nullable) {
val nullSafeEval =
firstGen.code + ctx.nullSafeExec(children(0).nullable, firstGen.isNull) {
secondGen.code + ctx.nullSafeExec(children(1).nullable, secondGen.isNull) {
thirdGen.code + ctx.nullSafeExec(children(2).nullable, thirdGen.isNull) {
fourthGen.code + ctx.nullSafeExec(children(3).nullable, fourthGen.isNull) {
fifthGen.code + ctx.nullSafeExec(children(4).nullable, fifthGen.isNull) {
sixthGen.code + ctx.nullSafeExec(children(5).nullable, sixthGen.isNull) {
val nullSafeResultCode =
s"""
${ev.isNull} = false; // resultCode could change nullability.
$resultCode
"""
seventhGen.map { gen =>
gen.code + ctx.nullSafeExec(children(6).nullable, gen.isNull) {
nullSafeResultCode
}
}.getOrElse(nullSafeResultCode)
}
}
}
}
}
}

ev.copy(code = code"""
boolean ${ev.isNull} = true;
${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
$nullSafeEval""")
} else {
ev.copy(code = code"""
${firstGen.code}
${secondGen.code}
${thirdGen.code}
${fourthGen.code}
${fifthGen.code}
${sixthGen.code}
${seventhGen.map(_.code).getOrElse("")}
${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
$resultCode""", isNull = FalseLiteral)
}
}
}

/**
* A trait used for resolving nullable flags, including `nullable`, `containsNull` of [[ArrayType]]
* and `valueContainsNull` of [[MapType]], containsNull, valueContainsNull flags of the output date
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import java.sql.Timestamp
import java.time.{Instant, LocalDate, ZoneId}
import java.time.{DateTimeException, Instant, LocalDate, LocalDateTime, ZoneId}
import java.time.temporal.IsoFields
import java.util.{Locale, TimeZone}

Expand Down Expand Up @@ -1657,3 +1657,156 @@ case class MakeDate(year: Expression, month: Expression, day: Expression)

override def prettyName: String = "make_date"
}

// scalastyle:off line.size.limit
@ExpressionDescription(
usage = "_FUNC_(year, month, day, hour, min, sec[, timezone]) - Create timestamp from year, month, day, hour, min, sec and timezone fields.",
arguments = """
Arguments:
* year - the year to represent, from 1 to 9999
* month - the month-of-year to represent, from 1 (January) to 12 (December)
* day - the day-of-month to represent, from 1 to 31
* hour - the hour-of-day to represent, from 0 to 23
* min - the minute-of-hour to represent, from 0 to 59
* sec - the second-of-minute and its micro-fraction to represent, from
0 to 60. If the sec argument equals to 60, the seconds field is set
to 0 and 1 minute is added to the final timestamp.
* timezone - the time zone identifier. For example, CET, UTC and etc.
""",
examples = """
Examples:
> SELECT _FUNC_(2014, 12, 28, 6, 30, 45.887);
2014-12-28 06:30:45.887
> SELECT _FUNC_(2014, 12, 28, 6, 30, 45.887, 'CET');
2014-12-28 10:30:45.887
> SELECT _FUNC_(2019, 6, 30, 23, 59, 60)
2019-07-01 00:00:00
> SELECT _FUNC_(2019, 13, 1, 10, 11, 12, 13);
NULL
> SELECT _FUNC_(null, 7, 22, 15, 30, 0);
NULL
""",
since = "3.0.0")
// scalastyle:on line.size.limit
case class MakeTimestamp(
year: Expression,
month: Expression,
day: Expression,
hour: Expression,
min: Expression,
sec: Expression,
timezone: Option[Expression] = None,
timeZoneId: Option[String] = None)
extends SeptenaryExpression with TimeZoneAwareExpression with ImplicitCastInputTypes {

def this(
year: Expression,
month: Expression,
day: Expression,
hour: Expression,
min: Expression,
sec: Expression) = {
this(year, month, day, hour, min, sec, None, None)
}

def this(
year: Expression,
month: Expression,
day: Expression,
hour: Expression,
min: Expression,
sec: Expression,
timezone: Expression) = {
this(year, month, day, hour, min, sec, Some(timezone), None)
}

override def children: Seq[Expression] = Seq(year, month, day, hour, min, sec) ++ timezone
override def inputTypes: Seq[AbstractDataType] =
Seq(IntegerType, IntegerType, IntegerType, IntegerType, IntegerType, DoubleType) ++
timezone.map(_ => StringType)
override def dataType: DataType = TimestampType
override def nullable: Boolean = true

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

private def toMicros(
year: Int,
month: Int,
day: Int,
hour: Int,
min: Int,
secAndNanos: Double,
zoneId: ZoneId): Any = {
try {
val seconds = secAndNanos.toInt
val nanos = ((secAndNanos - seconds) * NANOS_PER_SECOND).toInt
val ldt = if (seconds == 60) {
if (nanos == 0) {
// This case of sec = 60 and nanos = 0 is supported for compatibility with PostgreSQL
LocalDateTime.of(year, month, day, hour, min, 0, 0).plusMinutes(1)
} else {
throw new DateTimeException("The fraction of sec must be zero. Valid range is [0, 60].")
}
} else {
LocalDateTime.of(year, month, day, hour, min, seconds, nanos)
}
instantToMicros(ldt.atZone(zoneId).toInstant)
} catch {
case _: DateTimeException => null
}
}

override def nullSafeEval(
year: Any,
month: Any,
day: Any,
hour: Any,
min: Any,
sec: Any,
timezone: Option[Any]): Any = {
val zid = timezone
.map(tz => DateTimeUtils.getZoneId(tz.asInstanceOf[UTF8String].toString))
.getOrElse(zoneId)
toMicros(
year.asInstanceOf[Int],
month.asInstanceOf[Int],
day.asInstanceOf[Int],
hour.asInstanceOf[Int],
min.asInstanceOf[Int],
sec.asInstanceOf[Double],
zid)
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
nullSafeCodeGen(ctx, ev, (year, month, day, hour, min, secAndNanos, timezone) => {
val zoneId = timezone.map(tz => s"$dtu.getZoneId(${tz}.toString())").getOrElse(zid)
s"""
try {
int seconds = (int)$secAndNanos;
int nanos = (int)(($secAndNanos - seconds) * 1000000000L);
java.time.LocalDateTime ldt;
if (seconds == 60) {
if (nanos == 0) {
ldt = java.time.LocalDateTime.of(
$year, $month, $day, $hour, $min, 0, 0).plusMinutes(1);
} else {
throw new java.time.DateTimeException(
"The fraction of sec must be zero. Valid range is [0, 60].");
}
} else {
ldt = java.time.LocalDateTime.of(
$year, $month, $day, $hour, $min, seconds, nanos);
}
java.time.Instant instant = ldt.atZone($zoneId).toInstant();
${ev.value} = $dtu.instantToMicros(instant);
} catch (java.time.DateTimeException e) {
${ev.isNull} = true;
}"""
})
}

override def prettyName: String = "make_timestamp"
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.time.ZoneOffset
import java.time.{ZoneId, ZoneOffset}
import java.util.{Calendar, Locale, TimeZone}
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit._
Expand Down Expand Up @@ -928,4 +928,36 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(MakeDate(Literal(2019), Literal(13), Literal(19)), null)
checkEvaluation(MakeDate(Literal(2019), Literal(7), Literal(32)), null)
}

test("creating values of TimestampType via make_timestamp") {
var makeTimestampExpr = MakeTimestamp(
Literal(2013), Literal(7), Literal(15), Literal(8), Literal(15), Literal(23.5),
Some(Literal(ZoneId.systemDefault().getId)))
val expected = Timestamp.valueOf("2013-7-15 8:15:23.5")
checkEvaluation(makeTimestampExpr, expected)
checkEvaluation(makeTimestampExpr.copy(timezone = None), expected)

checkEvaluation(makeTimestampExpr.copy(year = Literal.create(null, IntegerType)), null)
checkEvaluation(makeTimestampExpr.copy(year = Literal(Int.MaxValue)), null)

checkEvaluation(makeTimestampExpr.copy(month = Literal.create(null, IntegerType)), null)
checkEvaluation(makeTimestampExpr.copy(month = Literal(13)), null)

checkEvaluation(makeTimestampExpr.copy(day = Literal.create(null, IntegerType)), null)
checkEvaluation(makeTimestampExpr.copy(day = Literal(32)), null)

checkEvaluation(makeTimestampExpr.copy(hour = Literal.create(null, IntegerType)), null)
checkEvaluation(makeTimestampExpr.copy(hour = Literal(25)), null)

checkEvaluation(makeTimestampExpr.copy(min = Literal.create(null, IntegerType)), null)
checkEvaluation(makeTimestampExpr.copy(min = Literal(65)), null)

checkEvaluation(makeTimestampExpr.copy(sec = Literal.create(null, DoubleType)), null)
checkEvaluation(makeTimestampExpr.copy(sec = Literal(70.0)), null)

makeTimestampExpr = MakeTimestamp(Literal(2019), Literal(6), Literal(30),
Literal(23), Literal(59), Literal(60.0))
checkEvaluation(makeTimestampExpr, Timestamp.valueOf("2019-07-01 00:00:00"))
checkEvaluation(makeTimestampExpr.copy(sec = Literal(60.5)), null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,7 @@ SELECT '' AS date_trunc_week, date_trunc( 'week', timestamp '2004-02-29 15:44:17
-- FROM TIMESTAMP_TBL;


--[SPARK-28432] Missing Date/Time Functions: make_timestamp
-- timestamp numeric fields constructor
-- SELECT make_timestamp(2014,12,28,6,30,45.887);
SELECT make_timestamp(2014,12,28,6,30,45.887);

DROP TABLE TIMESTAMP_TBL;
Loading