Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29783][SQL] Support SQL Standard/ISO_8601 output style for interval type #26418

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.unsafe.types;

import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.Period;
import java.time.temporal.ChronoUnit;
Expand Down Expand Up @@ -80,39 +79,8 @@ public int compareTo(CalendarInterval that) {

@Override
public String toString() {
if (months == 0 && days == 0 && microseconds == 0) {
return "0 seconds";
}

StringBuilder sb = new StringBuilder();

if (months != 0) {
appendUnit(sb, months / 12, "years");
appendUnit(sb, months % 12, "months");
}

appendUnit(sb, days, "days");

if (microseconds != 0) {
long rest = microseconds;
appendUnit(sb, rest / MICROS_PER_HOUR, "hours");
rest %= MICROS_PER_HOUR;
appendUnit(sb, rest / MICROS_PER_MINUTE, "minutes");
rest %= MICROS_PER_MINUTE;
if (rest != 0) {
String s = BigDecimal.valueOf(rest, 6).stripTrailingZeros().toPlainString();
sb.append(s).append(" seconds ");
}
}

sb.setLength(sb.length() - 1);
return sb.toString();
}

private void appendUnit(StringBuilder sb, long value, String unit) {
if (value != 0) {
sb.append(value).append(' ').append(unit).append(' ');
}
return "CalendarInterval(months= " + months + ", days = " + days + ", microsecond = " +
microseconds + ")";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we use such string representation now? Was it in order to put the same logics into IntervalUtils? If that's the case, we didn't have to move but use toString of this class until this case becomes completely exposed.

}

/**
Expand Down
Expand Up @@ -46,36 +46,6 @@ public void equalsTest() {
assertEquals(i1, i6);
}

@Test
public void toStringTest() {
CalendarInterval i;

i = new CalendarInterval(0, 0, 0);
assertEquals("0 seconds", i.toString());

i = new CalendarInterval(34, 0, 0);
assertEquals("2 years 10 months", i.toString());

i = new CalendarInterval(-34, 0, 0);
assertEquals("-2 years -10 months", i.toString());

i = new CalendarInterval(0, 31, 0);
assertEquals("31 days", i.toString());

i = new CalendarInterval(0, -31, 0);
assertEquals("-31 days", i.toString());

i = new CalendarInterval(0, 0, 3 * MICROS_PER_HOUR + 13 * MICROS_PER_MINUTE + 123);
assertEquals("3 hours 13 minutes 0.000123 seconds", i.toString());

i = new CalendarInterval(0, 0, -3 * MICROS_PER_HOUR - 13 * MICROS_PER_MINUTE - 123);
assertEquals("-3 hours -13 minutes -0.000123 seconds", i.toString());

i = new CalendarInterval(34, 31, 3 * MICROS_PER_HOUR + 13 * MICROS_PER_MINUTE + 123);
assertEquals("2 years 10 months 31 days 3 hours 13 minutes 0.000123 seconds",
i.toString());
}

@Test
public void periodAndDurationTest() {
CalendarInterval interval = new CalendarInterval(120, -40, 123456);
Expand Down
Expand Up @@ -30,7 +30,9 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.IntervalUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.IntervalStyle._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.UTF8StringBuilder
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
Expand Down Expand Up @@ -281,6 +283,14 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit

// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
case CalendarIntervalType => SQLConf.get.intervalOutputStyle match {
case SQL_STANDARD =>
buildCast[CalendarInterval](_, i => UTF8String.fromString(toSqlStandardString(i)))
case ISO_8601 =>
buildCast[CalendarInterval](_, i => UTF8String.fromString(toIso8601String(i)))
case MULTI_UNITS =>
buildCast[CalendarInterval](_, i => UTF8String.fromString(toMultiUnitsString(i)))
}
case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes)
case DateType => buildCast[Int](_, d => UTF8String.fromString(dateFormatter.format(d)))
case TimestampType => buildCast[Long](_,
Expand Down Expand Up @@ -985,6 +995,14 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
timestampFormatter.getClass)
(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(
org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($tf, $c));"""
case CalendarIntervalType =>
val iu = IntervalUtils.getClass.getCanonicalName.stripSuffix("$")
val funcName = SQLConf.get.intervalOutputStyle match {
case SQL_STANDARD => "toSqlStandardString"
case ISO_8601 => "toIso8601String"
case MULTI_UNITS => "toMultiUnitsString"
}
(c, evPrim, _) => code"""$evPrim = UTF8String.fromString($iu.$funcName($c));"""
case ArrayType(et, _) =>
(c, evPrim, evNull) => {
val buffer = ctx.freshVariable("buffer", classOf[UTF8StringBuilder])
Expand Down
Expand Up @@ -409,6 +409,7 @@ case class Literal (value: Any, dataType: DataType) extends LeafExpression {
DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
s"TIMESTAMP('${formatter.format(v)}')"
case (v: Array[Byte], BinaryType) => s"X'${DatatypeConverter.printHexBinary(v)}'"
case (v: CalendarInterval, CalendarIntervalType) => IntervalUtils.toMultiUnitsString(v)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if this is already asked above but why we didn't change this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have not supported to parse interval from iso/SQL standard format yet

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we not support iso/SQL standard format here together?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case _ => value.toString
}
}
Expand Up @@ -26,6 +26,8 @@ import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.IntervalStyle
import org.apache.spark.sql.internal.SQLConf.IntervalStyle.IntervalStyle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import


/**
* Options for parsing JSON data into Spark SQL rows.
Expand Down
Expand Up @@ -119,6 +119,10 @@ private[sql] class JacksonGenerator(
(row: SpecializedGetters, ordinal: Int) =>
gen.writeNumber(row.getDouble(ordinal))

case CalendarIntervalType =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's deal with json in another PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this comment #26102 (comment) is valid for JSON datasource as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wasn't this comment addressed?

(row: SpecializedGetters, ordinal: Int) =>
gen.writeString(IntervalUtils.toMultiUnitsString(row.getInterval(ordinal)))

case StringType =>
(row: SpecializedGetters, ordinal: Int) =>
gen.writeString(row.getUTF8String(ordinal).toString)
Expand Down Expand Up @@ -214,10 +218,15 @@ private[sql] class JacksonGenerator(
private def writeMapData(
map: MapData, mapType: MapType, fieldWriter: ValueWriter): Unit = {
val keyArray = map.keyArray()
val keyString = mapType.keyType match {
case CalendarIntervalType =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't happen actually. We don't allow writing out interval values. Do you have an example that can hit this code path?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah to_json do not have the interval type check. makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaooqinn, how about to_csv?

(i: Int) => IntervalUtils.toMultiUnitsString(keyArray.getInterval(i))
case _ => (i: Int) => keyArray.get(i, mapType.keyType).toString
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's fragile to rely on toString. e.g. UnsafeRow.toString is not human readable. Shall we recursively write map key as json object? cc @HyukjinKwon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I am sorry I missed this cc. in JSON the key should be a string. We should either make it string always or explicitly disallow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @viirya I think we talked about this before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think currently the map key is not very useful for some types. To make human readable map keys, we need do specific serialization for some map key types. Maybe I create a JIRA ticket to follow it up?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah .. +1 !

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code path shouldn't be here per each map here BTW.

}
val valueArray = map.valueArray()
var i = 0
while (i < map.numElements()) {
gen.writeFieldName(keyArray.get(i, mapType.keyType).toString)
gen.writeFieldName(keyString(i))
if (!valueArray.isNullAt(i)) {
fieldWriter.apply(valueArray, i)
} else {
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.util

import java.math.BigDecimal
import java.util.concurrent.TimeUnit

import scala.util.control.NonFatal
Expand Down Expand Up @@ -424,6 +425,111 @@ object IntervalUtils {
fromDoubles(interval.months / num, interval.days / num, interval.microseconds / num)
}

def toMultiUnitsString(interval: CalendarInterval): String = {
if (interval.months == 0 && interval.days == 0 && interval.microseconds == 0) {
return "0 seconds"
}
val sb = new StringBuilder
if (interval.months != 0) {
appendUnit(sb, interval.months / 12, "years")
appendUnit(sb, interval.months % 12, "months")
}
appendUnit(sb, interval.days, "days")
if (interval.microseconds != 0) {
var rest = interval.microseconds
appendUnit(sb, rest / MICROS_PER_HOUR, "hours")
rest %= MICROS_PER_HOUR
appendUnit(sb, rest / MICROS_PER_MINUTE, "minutes")
rest %= MICROS_PER_MINUTE
if (rest != 0) {
val s = BigDecimal.valueOf(rest, 6).stripTrailingZeros.toPlainString
sb.append(s).append(" seconds ")
}
}
sb.setLength(sb.length - 1)
sb.toString
}

private def appendUnit(sb: StringBuilder, value: Long, unit: String): Unit = {
if (value != 0) sb.append(value).append(' ').append(unit).append(' ')
}

def toSqlStandardString(interval: CalendarInterval): String = {
yaooqinn marked this conversation as resolved.
Show resolved Hide resolved
val yearMonthPart = if (interval.months < 0) {
val ma = math.abs(interval.months)
"-" + ma / 12 + "-" + ma % 12
} else if (interval.months > 0) {
"+" + interval.months / 12 + "-" + interval.months % 12
} else {
""
}

val dayPart = if (interval.days < 0) {
interval.days.toString
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we add -?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it is likely -1.toString here

} else if (interval.days > 0) {
"+" + interval.days
} else {
""
}

val timePart = if (interval.microseconds != 0) {
val sign = if (interval.microseconds > 0) "+" else "-"
val sb = new StringBuilder(sign)
var rest = math.abs(interval.microseconds)
sb.append(rest / MICROS_PER_HOUR)
sb.append(':')
rest %= MICROS_PER_HOUR
val minutes = rest / MICROS_PER_MINUTE;
if (minutes < 10) {
sb.append(0)
}
sb.append(minutes)
sb.append(':')
rest %= MICROS_PER_MINUTE
val bd = BigDecimal.valueOf(rest, 6)
if (bd.compareTo(new BigDecimal(10)) < 0) {
sb.append(0)
}
val s = bd.stripTrailingZeros().toPlainString
sb.append(s)
sb.toString()
} else {
""
}

val intervalList = Seq(yearMonthPart, dayPart, timePart).filter(_.nonEmpty)
if (intervalList.nonEmpty) intervalList.mkString(" ") else "0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, a single 0 is also SQL standard?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

postgres=# set IntervalStyle=sql_standard;
SET
postgres=# select interval '0';
 interval
----------
 0
(1 row)

postgres=# set IntervalStyle=postgres;
SET
postgres=# select interval '0';
 interval
----------
 00:00:00
(1 row)

}

def toIso8601String(interval: CalendarInterval): String = {
val sb = new StringBuilder("P")

val year = interval.months / 12
if (year != 0) sb.append(year + "Y")
val month = interval.months % 12
if (month != 0) sb.append(month + "M")

if (interval.days != 0) sb.append(interval.days + "D")

if (interval.microseconds != 0) {
sb.append('T')
var rest = interval.microseconds
val hour = rest / MICROS_PER_HOUR
if (hour != 0) sb.append(hour + "H")
rest %= MICROS_PER_HOUR
val minute = rest / MICROS_PER_MINUTE
if (minute != 0) sb.append(minute + "M")
rest %= MICROS_PER_MINUTE
if (rest != 0) {
val bd = BigDecimal.valueOf(rest, 6)
sb.append(bd.stripTrailingZeros().toPlainString + "S")
}
} else if (interval.days == 0 && interval.months == 0) {
sb.append("T0S")
}
sb.toString()
}

private object ParseState extends Enumeration {
type ParseState = Value

Expand Down
Expand Up @@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
import org.apache.spark.sql.catalyst.plans.logical.HintErrorHandler
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -1784,6 +1783,23 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

object IntervalStyle extends Enumeration {
type IntervalStyle = Value
val SQL_STANDARD, ISO_8601, MULTI_UNITS = Value
}

val INTERVAL_STYLE = buildConf("spark.sql.intervalOutputStyle")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi: we might need to move this config into spark.sql.dialect.xxx along #26444

Copy link
Member Author

@yaooqinn yaooqinn Nov 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, but the behavior of this config is beyond the meaning of one dialect.

.doc("When converting interval values to strings (i.e. for display), this config decides the" +
" interval string format. The value SQL_STANDARD will produce output matching SQL standard" +
" interval literals (i.e. '+3-2 +10 -00:00:01'). The value ISO_8601 will produce output" +
" matching the ISO 8601 standard (i.e. 'P3Y2M10DT-1S'). The value MULTI_UNITS (which is the" +
" default) will produce output in form of value unit pairs, (i.e. '3 year 2 months 10 days" +
" -1 seconds'")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(IntervalStyle.values.map(_.toString))
.createWithDefault(IntervalStyle.MULTI_UNITS.toString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think ansiEnabled is enough for this feature. Any concern?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I guess some users may already rely on the output string


val SORT_BEFORE_REPARTITION =
buildConf("spark.sql.execution.sortBeforeRepartition")
.internal()
Expand Down Expand Up @@ -2512,6 +2528,8 @@ class SQLConf extends Serializable with Logging {
def storeAssignmentPolicy: StoreAssignmentPolicy.Value =
StoreAssignmentPolicy.withName(getConf(STORE_ASSIGNMENT_POLICY))

def intervalOutputStyle: IntervalStyle.Value = IntervalStyle.withName(getConf(INTERVAL_STYLE))

def ansiEnabled: Boolean = getConf(ANSI_ENABLED)

def usePostgreSQLDialect: Boolean = getConf(DIALECT) == Dialect.POSTGRESQL.toString()
Expand Down