Skip to content

Commit

Permalink
[SPARK-17495][SQL] Support date, timestamp and interval types in Hive…
Browse files Browse the repository at this point in the history
… hash

## What changes were proposed in this pull request?

- Timestamp hashing is done as per [TimestampWritable.hashCode()](https://github.com/apache/hive/blob/ff67cdda1c538dc65087878eeba3e165cf3230f4/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java#L406) in Hive
- Interval hashing is done as per [HiveIntervalDayTime.hashCode()](https://github.com/apache/hive/blob/ff67cdda1c538dc65087878eeba3e165cf3230f4/storage-api/src/java/org/apache/hadoop/hive/common/type/HiveIntervalDayTime.java#L178). Note that there are inherent differences in how Hive and Spark store intervals under the hood which limits the ability to be in completely sync with hive's hashing function. I have explained this in the method doc.
- Date type was already supported. This PR adds test for that.

## How was this patch tested?

Added unit tests

Author: Tejas Patil <tejasp@fb.com>

Closes #17062 from tejasapatil/SPARK-17495_time_related_types.
  • Loading branch information
tejasapatil authored and gatorsmile committed Mar 13, 2017
1 parent 0a4d06a commit 9456688
Show file tree
Hide file tree
Showing 2 changed files with 268 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ abstract class HashExpression[E] extends Expression {
}
}

protected def genHashTimestamp(t: String, result: String): String = genHashLong(t, result)

protected def genHashCalendarInterval(input: String, result: String): String = {
val microsecondsHash = s"$hasherClassName.hashLong($input.microseconds, $result)"
s"$result = $hasherClassName.hashInt($input.months, $microsecondsHash);"
Expand Down Expand Up @@ -400,7 +402,8 @@ abstract class HashExpression[E] extends Expression {
case NullType => ""
case BooleanType => genHashBoolean(input, result)
case ByteType | ShortType | IntegerType | DateType => genHashInt(input, result)
case LongType | TimestampType => genHashLong(input, result)
case LongType => genHashLong(input, result)
case TimestampType => genHashTimestamp(input, result)
case FloatType => genHashFloat(input, result)
case DoubleType => genHashDouble(input, result)
case d: DecimalType => genHashDecimal(ctx, d, input, result)
Expand Down Expand Up @@ -433,6 +436,10 @@ abstract class InterpretedHashFunction {

protected def hashUnsafeBytes(base: AnyRef, offset: Long, length: Int, seed: Long): Long

/**
* Computes hash of a given `value` of type `dataType`. The caller needs to check the validity
* of input `value`.
*/
def hash(value: Any, dataType: DataType, seed: Long): Long = {
value match {
case null => seed
Expand Down Expand Up @@ -580,8 +587,6 @@ object XxHash64Function extends InterpretedHashFunction {
*
* We should use this hash function for both shuffle and bucket of Hive tables, so that
* we can guarantee shuffle and bucketing have same data distribution
*
* TODO: Support date related types
*/
@ExpressionDescription(
usage = "_FUNC_(expr1, expr2, ...) - Returns a hash value of the arguments.")
Expand Down Expand Up @@ -648,11 +653,16 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] {

override protected def genHashCalendarInterval(input: String, result: String): String = {
s"""
$result = (31 * $hasherClassName.hashInt($input.months)) +
$hasherClassName.hashLong($input.microseconds);"
$result = (int)
${HiveHashFunction.getClass.getName.stripSuffix("$")}.hashCalendarInterval($input);
"""
}

override protected def genHashTimestamp(input: String, result: String): String =
s"""
$result = (int) ${HiveHashFunction.getClass.getName.stripSuffix("$")}.hashTimestamp($input);
"""

override protected def genHashString(input: String, result: String): String = {
val baseObject = s"$input.getBaseObject()"
val baseOffset = s"$input.getBaseOffset()"
Expand Down Expand Up @@ -781,6 +791,49 @@ object HiveHashFunction extends InterpretedHashFunction {
result
}

/**
* Mimics TimestampWritable.hashCode() in Hive
*/
def hashTimestamp(timestamp: Long): Long = {
val timestampInSeconds = timestamp / 1000000
val nanoSecondsPortion = (timestamp % 1000000) * 1000

var result = timestampInSeconds
result <<= 30 // the nanosecond part fits in 30 bits
result |= nanoSecondsPortion
((result >>> 32) ^ result).toInt
}

/**
* Hive allows input intervals to be defined using units below but the intervals
* have to be from the same category:
* - year, month (stored as HiveIntervalYearMonth)
* - day, hour, minute, second, nanosecond (stored as HiveIntervalDayTime)
*
* eg. (INTERVAL '30' YEAR + INTERVAL '-23' DAY) fails in Hive
*
* This method mimics HiveIntervalDayTime.hashCode() in Hive.
*
* Two differences wrt Hive due to how intervals are stored in Spark vs Hive:
*
* - If the `INTERVAL` is backed as HiveIntervalYearMonth in Hive, then this method will not
* produce Hive compatible result. The reason being Spark's representation of calendar does not
* have such categories based on the interval and is unified.
*
* - Spark's [[CalendarInterval]] has precision upto microseconds but Hive's
* HiveIntervalDayTime can store data with precision upto nanoseconds. So, any input intervals
* with nanosecond values will lead to wrong output hashes (ie. non adherent with Hive output)
*/
def hashCalendarInterval(calendarInterval: CalendarInterval): Long = {
val totalSeconds = calendarInterval.microseconds / CalendarInterval.MICROS_PER_SECOND.toInt
val result: Int = (17 * 37) + (totalSeconds ^ totalSeconds >> 32).toInt

val nanoSeconds =
(calendarInterval.microseconds -
(totalSeconds * CalendarInterval.MICROS_PER_SECOND.toInt)).toInt * 1000
(result * 37) + nanoSeconds
}

override def hash(value: Any, dataType: DataType, seed: Long): Long = {
value match {
case null => 0
Expand Down Expand Up @@ -834,10 +887,10 @@ object HiveHashFunction extends InterpretedHashFunction {
}
result

case d: Decimal =>
normalizeDecimal(d.toJavaBigDecimal).hashCode()

case _ => super.hash(value, dataType, seed)
case d: Decimal => normalizeDecimal(d.toJavaBigDecimal).hashCode()
case timestamp: Long if dataType.isInstanceOf[TimestampType] => hashTimestamp(timestamp)
case calendarInterval: CalendarInterval => hashCalendarInterval(calendarInterval)
case _ => super.hash(value, dataType, 0)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@
package org.apache.spark.sql.catalyst.expressions

import java.nio.charset.StandardCharsets
import java.util.TimeZone

import scala.collection.mutable.ArrayBuffer

import org.apache.commons.codec.digest.DigestUtils
import org.scalatest.exceptions.TestFailedException

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.encoders.{ExamplePointUDT, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.types.{ArrayType, StructType, _}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val random = new scala.util.Random
Expand Down Expand Up @@ -168,6 +170,208 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
// scalastyle:on nonascii
}

test("hive-hash for date type") {
def checkHiveHashForDateType(dateString: String, expected: Long): Unit = {
checkHiveHash(
DateTimeUtils.stringToDate(UTF8String.fromString(dateString)).get,
DateType,
expected)
}

// basic case
checkHiveHashForDateType("2017-01-01", 17167)

// boundary cases
checkHiveHashForDateType("0000-01-01", -719530)
checkHiveHashForDateType("9999-12-31", 2932896)

// epoch
checkHiveHashForDateType("1970-01-01", 0)

// before epoch
checkHiveHashForDateType("1800-01-01", -62091)

// Invalid input: bad date string. Hive returns 0 for such cases
intercept[NoSuchElementException](checkHiveHashForDateType("0-0-0", 0))
intercept[NoSuchElementException](checkHiveHashForDateType("-1212-01-01", 0))
intercept[NoSuchElementException](checkHiveHashForDateType("2016-99-99", 0))

// Invalid input: Empty string. Hive returns 0 for this case
intercept[NoSuchElementException](checkHiveHashForDateType("", 0))

// Invalid input: February 30th for a leap year. Hive supports this but Spark doesn't
intercept[NoSuchElementException](checkHiveHashForDateType("2016-02-30", 16861))
}

test("hive-hash for timestamp type") {
def checkHiveHashForTimestampType(
timestamp: String,
expected: Long,
timeZone: TimeZone = TimeZone.getTimeZone("UTC")): Unit = {
checkHiveHash(
DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp), timeZone).get,
TimestampType,
expected)
}

// basic case
checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445725271)

// with higher precision
checkHiveHashForTimestampType("2017-02-24 10:56:29.111111", 1353936655)

// with different timezone
checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445732471,
TimeZone.getTimeZone("US/Pacific"))

// boundary cases
checkHiveHashForTimestampType("0001-01-01 00:00:00", 1645926784)
checkHiveHashForTimestampType("9999-01-01 00:00:00", -1081818240)

// epoch
checkHiveHashForTimestampType("1970-01-01 00:00:00", 0)

// before epoch
checkHiveHashForTimestampType("1800-01-01 03:12:45", -267420885)

// Invalid input: bad timestamp string. Hive returns 0 for such cases
intercept[NoSuchElementException](checkHiveHashForTimestampType("0-0-0 0:0:0", 0))
intercept[NoSuchElementException](checkHiveHashForTimestampType("-99-99-99 99:99:45", 0))
intercept[NoSuchElementException](checkHiveHashForTimestampType("555555-55555-5555", 0))

// Invalid input: Empty string. Hive returns 0 for this case
intercept[NoSuchElementException](checkHiveHashForTimestampType("", 0))

// Invalid input: February 30th is a leap year. Hive supports this but Spark doesn't
intercept[NoSuchElementException](checkHiveHashForTimestampType("2016-02-30 00:00:00", 0))

// Invalid input: Hive accepts upto 9 decimal place precision but Spark uses upto 6
intercept[TestFailedException](checkHiveHashForTimestampType("2017-02-24 10:56:29.11111111", 0))
}

test("hive-hash for CalendarInterval type") {
def checkHiveHashForIntervalType(interval: String, expected: Long): Unit = {
checkHiveHash(CalendarInterval.fromString(interval), CalendarIntervalType, expected)
}

// ----- MICROSEC -----

// basic case
checkHiveHashForIntervalType("interval 1 microsecond", 24273)

// negative
checkHiveHashForIntervalType("interval -1 microsecond", 22273)

// edge / boundary cases
checkHiveHashForIntervalType("interval 0 microsecond", 23273)
checkHiveHashForIntervalType("interval 999 microsecond", 1022273)
checkHiveHashForIntervalType("interval -999 microsecond", -975727)

// ----- MILLISEC -----

// basic case
checkHiveHashForIntervalType("interval 1 millisecond", 1023273)

// negative
checkHiveHashForIntervalType("interval -1 millisecond", -976727)

// edge / boundary cases
checkHiveHashForIntervalType("interval 0 millisecond", 23273)
checkHiveHashForIntervalType("interval 999 millisecond", 999023273)
checkHiveHashForIntervalType("interval -999 millisecond", -998976727)

// ----- SECOND -----

// basic case
checkHiveHashForIntervalType("interval 1 second", 23310)

// negative
checkHiveHashForIntervalType("interval -1 second", 23273)

// edge / boundary cases
checkHiveHashForIntervalType("interval 0 second", 23273)
checkHiveHashForIntervalType("interval 2147483647 second", -2147460412)
checkHiveHashForIntervalType("interval -2147483648 second", -2147460412)

// Out of range for both Hive and Spark
// Hive throws an exception. Spark overflows and returns wrong output
// checkHiveHashForIntervalType("interval 9999999999 second", 0)

// ----- MINUTE -----

// basic cases
checkHiveHashForIntervalType("interval 1 minute", 25493)

// negative
checkHiveHashForIntervalType("interval -1 minute", 25456)

// edge / boundary cases
checkHiveHashForIntervalType("interval 0 minute", 23273)
checkHiveHashForIntervalType("interval 2147483647 minute", 21830)
checkHiveHashForIntervalType("interval -2147483648 minute", 22163)

// Out of range for both Hive and Spark
// Hive throws an exception. Spark overflows and returns wrong output
// checkHiveHashForIntervalType("interval 9999999999 minute", 0)

// ----- HOUR -----

// basic case
checkHiveHashForIntervalType("interval 1 hour", 156473)

// negative
checkHiveHashForIntervalType("interval -1 hour", 156436)

// edge / boundary cases
checkHiveHashForIntervalType("interval 0 hour", 23273)
checkHiveHashForIntervalType("interval 2147483647 hour", -62308)
checkHiveHashForIntervalType("interval -2147483648 hour", -43327)

// Out of range for both Hive and Spark
// Hive throws an exception. Spark overflows and returns wrong output
// checkHiveHashForIntervalType("interval 9999999999 hour", 0)

// ----- DAY -----

// basic cases
checkHiveHashForIntervalType("interval 1 day", 3220073)

// negative
checkHiveHashForIntervalType("interval -1 day", 3220036)

// edge / boundary cases
checkHiveHashForIntervalType("interval 0 day", 23273)
checkHiveHashForIntervalType("interval 106751991 day", -451506760)
checkHiveHashForIntervalType("interval -106751991 day", -451514123)

// Hive supports `day` for a longer range but Spark's range is smaller
// The check for range is done at the parser level so this does not fail in Spark
// checkHiveHashForIntervalType("interval -2147483648 day", -1575127)
// checkHiveHashForIntervalType("interval 2147483647 day", -4767228)

// Out of range for both Hive and Spark
// Hive throws an exception. Spark overflows and returns wrong output
// checkHiveHashForIntervalType("interval 9999999999 day", 0)

// ----- MIX -----

checkHiveHashForIntervalType("interval 0 day 0 hour", 23273)
checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute", 23273)
checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute 0 second", 23273)
checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute 0 second 0 millisecond", 23273)
checkHiveHashForIntervalType(
"interval 0 day 0 hour 0 minute 0 second 0 millisecond 0 microsecond", 23273)

checkHiveHashForIntervalType("interval 6 day 15 hour", 21202073)
checkHiveHashForIntervalType("interval 5 day 4 hour 8 minute", 16557833)
checkHiveHashForIntervalType("interval -23 day 56 hour -1111113 minute 9898989 second",
-2128468593)
checkHiveHashForIntervalType("interval 66 day 12 hour 39 minute 23 second 987 millisecond",
1199697904)
checkHiveHashForIntervalType(
"interval 66 day 12 hour 39 minute 23 second 987 millisecond 123 microsecond", 1199820904)
}

test("hive-hash for array") {
// empty array
checkHiveHash(
Expand Down

0 comments on commit 9456688

Please sign in to comment.