Skip to content
Permalink
Browse files

[SPARK-27735][SS] Parsing interval string should be case-insensitive …

…in SS

Some APIs in Structured Streaming requires the user to specify an interval. Right now these APIs don't accept upper-case strings.

This PR adds a new method `fromCaseInsensitiveString` to `CalendarInterval` to support paring upper-case strings, and fixes all APIs that need to parse an interval string.

The new unit test.

Closes #24619 from zsxwing/SPARK-27735.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 6a317c8)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information...
zsxwing authored and dongjoon-hyun committed May 16, 2019
1 parent fbd2eac commit 41b05296e158a4dc89bd50e4e5f3cc190d10b4ec
@@ -18,6 +18,7 @@
package org.apache.spark.unsafe.types;

import java.io.Serializable;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@@ -66,6 +67,10 @@ private static long toLong(String s) {
}
}

/**
* Convert a string to CalendarInterval. Return null if the input string is not a valid interval.
* This method is case-sensitive and all characters in the input string should be in lower case.
*/
public static CalendarInterval fromString(String s) {
if (s == null) {
return null;
@@ -87,6 +92,26 @@ public static CalendarInterval fromString(String s) {
}
}

/**
* Convert a string to CalendarInterval. Unlike fromString, this method is case-insensitive and
* will throw IllegalArgumentException when the input string is not a valid interval.
*
* @throws IllegalArgumentException if the string is not a valid internal.
*/
public static CalendarInterval fromCaseInsensitiveString(String s) {
if (s == null || s.trim().isEmpty()) {
throw new IllegalArgumentException("Interval cannot be null or blank.");
}
String sInLowerCase = s.trim().toLowerCase(Locale.ROOT);
String interval =
sInLowerCase.startsWith("interval ") ? sInLowerCase : "interval " + sInLowerCase;
CalendarInterval cal = fromString(interval);
if (cal == null) {
throw new IllegalArgumentException("Invalid interval: " + s);
}
return cal;
}

public static long toLongWithRange(String fieldName,
String s, long minValue, long maxValue) throws IllegalArgumentException {
long result = 0;
@@ -104,6 +104,31 @@ public void fromStringTest() {
assertNull(fromString(input));
}

@Test
public void fromCaseInsensitiveStringTest() {
for (String input : new String[]{"5 MINUTES", "5 minutes", "5 Minutes"}) {
assertEquals(fromCaseInsensitiveString(input), new CalendarInterval(0, 5L * 60 * 1_000_000));
}

for (String input : new String[]{null, "", " "}) {
try {
fromCaseInsensitiveString(input);
fail("Expected to throw an exception for the invalid input");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("cannot be null or blank"));
}
}

for (String input : new String[]{"interval", "interval1 day", "foo", "foo 1 day"}) {
try {
fromCaseInsensitiveString(input);
fail("Expected to throw an exception for the invalid input");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("Invalid interval"));
}
}
}

@Test
public void fromYearMonthStringTest() {
String input;
@@ -17,8 +17,6 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.commons.lang3.StringUtils

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
@@ -104,20 +102,7 @@ object TimeWindow {
* precision.
*/
private def getIntervalInMicroSeconds(interval: String): Long = {
if (StringUtils.isBlank(interval)) {
throw new IllegalArgumentException(
"The window duration, slide duration and start time cannot be null or blank.")
}
val intervalString = if (interval.startsWith("interval")) {
interval
} else {
"interval " + interval
}
val cal = CalendarInterval.fromString(intervalString)
if (cal == null) {
throw new IllegalArgumentException(
s"The provided interval ($interval) did not correspond to a valid interval string.")
}
val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
throw new IllegalArgumentException(
s"Intervals greater than a month is not supported ($interval).")
@@ -676,8 +676,14 @@ class Dataset[T] private[sql](
// defined on a derived column cannot referenced elsewhere in the plan.
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan {
val parsedDelay =
Option(CalendarInterval.fromString("interval " + delayThreshold))
.getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
try {
CalendarInterval.fromCaseInsensitiveString(delayThreshold)
} catch {
case e: IllegalArgumentException =>
throw new AnalysisException(
s"Unable to parse time delay '$delayThreshold'",
cause = Some(e))
}
require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
s"delay threshold ($delayThreshold) should not be negative.")
EliminateEventTimeWatermark(
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.streaming

import java.sql.Date

import org.apache.commons.lang3.StringUtils

import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout}
import org.apache.spark.sql.execution.streaming.GroupStateImpl._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
@@ -160,20 +158,7 @@ private[sql] class GroupStateImpl[S] private(
def getTimeoutTimestamp: Long = timeoutTimestamp

private def parseDuration(duration: String): Long = {
if (StringUtils.isBlank(duration)) {
throw new IllegalArgumentException(
"Provided duration is null or blank.")
}
val intervalString = if (duration.startsWith("interval")) {
duration
} else {
"interval " + duration
}
val cal = CalendarInterval.fromString(intervalString)
if (cal == null) {
throw new IllegalArgumentException(
s"Provided duration ($duration) is not valid.")
}
val cal = CalendarInterval.fromCaseInsensitiveString(duration)
if (cal.milliseconds < 0 || cal.months < 0) {
throw new IllegalArgumentException(s"Provided duration ($duration) is not positive")
}
@@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit

import scala.concurrent.duration.Duration

import org.apache.commons.lang3.StringUtils

import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
import org.apache.spark.unsafe.types.CalendarInterval
@@ -38,18 +36,7 @@ case class ContinuousTrigger(intervalMs: Long) extends Trigger {

private[sql] object ContinuousTrigger {
def apply(interval: String): ContinuousTrigger = {
if (StringUtils.isBlank(interval)) {
throw new IllegalArgumentException(
"interval cannot be null or blank.")
}
val cal = if (interval.startsWith("interval")) {
CalendarInterval.fromString(interval)
} else {
CalendarInterval.fromString("interval " + interval)
}
if (cal == null) {
throw new IllegalArgumentException(s"Invalid interval: $interval")
}
val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
}
@@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit

import scala.concurrent.duration.Duration

import org.apache.commons.lang3.StringUtils

import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.unsafe.types.CalendarInterval

@@ -76,18 +74,7 @@ object ProcessingTime {
*/
@deprecated("use Trigger.ProcessingTime(interval)", "2.2.0")
def apply(interval: String): ProcessingTime = {
if (StringUtils.isBlank(interval)) {
throw new IllegalArgumentException(
"interval cannot be null or blank.")
}
val cal = if (interval.startsWith("interval")) {
CalendarInterval.fromString(interval)
} else {
CalendarInterval.fromString("interval " + interval)
}
if (cal == null) {
throw new IllegalArgumentException(s"Invalid interval: $interval")
}
val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
}

0 comments on commit 41b0529

Please sign in to comment.
You can’t perform that action at this time.