Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-29783
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn committed Nov 7, 2019
2 parents 429ee49 + 9562b26 commit da119c5
Show file tree
Hide file tree
Showing 32 changed files with 141 additions and 115 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.util;

public class DateTimeConstants {

public static final int YEARS_PER_DECADE = 10;
public static final int YEARS_PER_CENTURY = 100;
public static final int YEARS_PER_MILLENNIUM = 1000;

public static final byte MONTHS_PER_QUARTER = 3;
public static final int MONTHS_PER_YEAR = 12;

public static final byte DAYS_PER_WEEK = 7;
public static final long DAYS_PER_MONTH = 30L;

public static final long HOURS_PER_DAY = 24L;

public static final long MINUTES_PER_HOUR = 60L;

public static final long SECONDS_PER_MINUTE = 60L;
public static final long SECONDS_PER_HOUR = MINUTES_PER_HOUR * SECONDS_PER_MINUTE;
public static final long SECONDS_PER_DAY = HOURS_PER_DAY * SECONDS_PER_HOUR;

public static final long MILLIS_PER_SECOND = 1000L;
public static final long MILLIS_PER_MINUTE = SECONDS_PER_MINUTE * MILLIS_PER_SECOND;
public static final long MILLIS_PER_HOUR = MINUTES_PER_HOUR * MILLIS_PER_MINUTE;
public static final long MILLIS_PER_DAY = HOURS_PER_DAY * MILLIS_PER_HOUR;

public static final long MICROS_PER_MILLIS = 1000L;
public static final long MICROS_PER_SECOND = MILLIS_PER_SECOND * MICROS_PER_MILLIS;
public static final long MICROS_PER_MINUTE = SECONDS_PER_MINUTE * MICROS_PER_SECOND;
public static final long MICROS_PER_HOUR = MINUTES_PER_HOUR * MICROS_PER_MINUTE;
public static final long MICROS_PER_DAY = HOURS_PER_DAY * MICROS_PER_HOUR;
public static final long MICROS_PER_MONTH = DAYS_PER_MONTH * MICROS_PER_DAY;
/* 365.25 days per year assumes leap year every four years */
public static final long MICROS_PER_YEAR = (36525L * MICROS_PER_DAY) / 100;

public static final long NANOS_PER_MICROS = 1000L;
public static final long NANOS_PER_MILLIS = MICROS_PER_MILLIS * NANOS_PER_MICROS;
public static final long NANOS_PER_SECOND = MILLIS_PER_SECOND * NANOS_PER_MILLIS;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,16 @@
import java.time.temporal.ChronoUnit;
import java.util.Objects;

import static org.apache.spark.sql.catalyst.util.DateTimeConstants.*;

/**
* The internal representation of interval type.
*/
public final class CalendarInterval implements Serializable {
public static final long MICROS_PER_MILLI = 1000L;
public static final long MICROS_PER_SECOND = MICROS_PER_MILLI * 1000;
public static final long MICROS_PER_MINUTE = MICROS_PER_SECOND * 60;
public static final long MICROS_PER_HOUR = MICROS_PER_MINUTE * 60;
public static final long MICROS_PER_DAY = MICROS_PER_HOUR * 24;
public static final long MICROS_PER_WEEK = MICROS_PER_DAY * 7;

public final int months;
public final int days;
public final long microseconds;

public long milliseconds() {
return this.microseconds / MICROS_PER_MILLI;
}

public CalendarInterval(int months, int days, long microseconds) {
this.months = months;
this.days = days;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.time.Period;

import static org.junit.Assert.*;
import static org.apache.spark.unsafe.types.CalendarInterval.*;
import static org.apache.spark.sql.catalyst.util.DateTimeConstants.*;

public class CalendarIntervalSuite {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ import org.apache.avro.util.Utf8

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

/**
* A deserializer to deserialize data in avro format to data in catalyst format.
*/
Expand Down Expand Up @@ -110,7 +110,7 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
// Before we upgrade Avro to 1.8 for logical type support, spark-avro converts Long to Date.
// For backward compatibility, we still keep this conversion.
case (LONG, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal, (value.asInstanceOf[Long] / DateTimeUtils.MILLIS_PER_DAY).toInt)
updater.setInt(ordinal, (value.asInstanceOf[Long] / MILLIS_PER_DAY).toInt)

case (FLOAT, FloatType) => (updater, ordinal, value) =>
updater.setFloat(ordinal, value.asInstanceOf[Float])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval


/**
* Helper object for stream joins. See [[StreamingSymmetricHashJoinExec]] in SQL for more details.
*/
Expand Down Expand Up @@ -264,7 +264,7 @@ object StreamingJoinHelper extends PredicateHelper with Logging {
s"watermark calculation. Use interval in terms of day instead.")
Literal(0.0)
} else {
Literal(calendarInterval.days * CalendarInterval.MICROS_PER_DAY.toDouble +
Literal(calendarInterval.days * MICROS_PER_DAY.toDouble +
calendarInterval.microseconds.toDouble)
}
case DoubleType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval

case class TimeWindow(
timeColumn: Expression,
Expand Down Expand Up @@ -108,7 +108,7 @@ object TimeWindow {
throw new IllegalArgumentException(
s"Intervals greater than a month is not supported ($interval).")
}
cal.days * CalendarInterval.MICROS_PER_DAY + cal.microseconds
cal.days * MICROS_PER_DAY + cal.microseconds
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, MapData}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ import org.apache.spark.sql.catalyst.expressions.ArraySortLike.NullOrder
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.UTF8StringBuilder
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.array.ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
import org.apache.spark.unsafe.types.{ByteArray, UTF8String}
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.unsafe.types.{ByteArray, CalendarInterval, UTF8String}
import org.apache.spark.util.collection.OpenHashSet

/**
Expand Down Expand Up @@ -2613,7 +2613,7 @@ object Sequence {
new CalendarInterval(0, 1, 0))

private val backedSequenceImpl = new IntegralSequenceImpl[T](dt)
private val microsPerDay = 24 * CalendarInterval.MICROS_PER_HOUR
private val microsPerDay = HOURS_PER_DAY * MICROS_PER_HOUR
// We choose a minimum days(28) in one month to calculate the `intervalStepInMicros`
// in order to make sure the estimated array length is long enough
private val microsPerMonth = 28 * microsPerDay
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.hash.Murmur3_x86_32
Expand Down Expand Up @@ -902,14 +902,11 @@ object HiveHashFunction extends InterpretedHashFunction {
* with nanosecond values will lead to wrong output hashes (ie. non adherent with Hive output)
*/
def hashCalendarInterval(calendarInterval: CalendarInterval): Long = {
val totalMicroSeconds =
calendarInterval.days * CalendarInterval.MICROS_PER_DAY + calendarInterval.microseconds
val totalSeconds = totalMicroSeconds / CalendarInterval.MICROS_PER_SECOND.toInt
val totalMicroSeconds = calendarInterval.days * MICROS_PER_DAY + calendarInterval.microseconds
val totalSeconds = totalMicroSeconds / MICROS_PER_SECOND.toInt
val result: Int = (17 * 37) + (totalSeconds ^ totalSeconds >> 32).toInt

val nanoSeconds =
(totalMicroSeconds -
(totalSeconds * CalendarInterval.MICROS_PER_SECOND.toInt)).toInt * 1000
val nanoSeconds = (totalMicroSeconds - (totalSeconds * MICROS_PER_SECOND.toInt)).toInt * 1000
(result * 37) + nanoSeconds
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._

import com.google.common.util.concurrent.AtomicLongMap

import org.apache.spark.sql.catalyst.util.DateTimeUtils.NANOS_PER_SECOND
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND

case class QueryExecutionMetering() {
private val timeMap = AtomicLongMap.create[String]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit._

import scala.util.control.NonFatal

import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

Expand All @@ -46,20 +47,6 @@ object DateTimeUtils {
// it's 2440587.5, rounding up to compatible with Hive
final val JULIAN_DAY_OF_EPOCH = 2440588

// Pre-calculated values can provide an opportunity of additional optimizations
// to the compiler like constants propagation and folding.
final val NANOS_PER_MICROS: Long = 1000
final val MICROS_PER_MILLIS: Long = 1000
final val MILLIS_PER_SECOND: Long = 1000
final val SECONDS_PER_DAY: Long = 24 * 60 * 60
final val MICROS_PER_SECOND: Long = MILLIS_PER_SECOND * MICROS_PER_MILLIS
final val NANOS_PER_MILLIS: Long = NANOS_PER_MICROS * MICROS_PER_MILLIS
final val NANOS_PER_SECOND: Long = NANOS_PER_MICROS * MICROS_PER_SECOND
final val MICROS_PER_DAY: Long = SECONDS_PER_DAY * MICROS_PER_SECOND
final val MILLIS_PER_MINUTE: Long = 60 * MILLIS_PER_SECOND
final val MILLIS_PER_HOUR: Long = 60 * MILLIS_PER_MINUTE
final val MILLIS_PER_DAY: Long = SECONDS_PER_DAY * MILLIS_PER_SECOND

// number of days between 1.1.1970 and 1.1.2001
final val to2001 = -11323

Expand Down

0 comments on commit da119c5

Please sign in to comment.