-
Notifications
You must be signed in to change notification settings - Fork 28k
/
DateTimeFormatterHelper.scala
267 lines (244 loc) · 11.7 KB
/
DateTimeFormatterHelper.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.util
import java.time._
import java.time.chrono.IsoChronology
import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverStyle}
import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
import java.util.Locale
import com.google.common.cache.CacheBuilder
import org.apache.spark.SparkUpgradeException
import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
trait DateTimeFormatterHelper {
private def getOrDefault(accessor: TemporalAccessor, field: ChronoField, default: Int): Int = {
if (accessor.isSupported(field)) {
accessor.get(field)
} else {
default
}
}
protected def toLocalDate(accessor: TemporalAccessor): LocalDate = {
val localDate = accessor.query(TemporalQueries.localDate())
// If all the date fields are specified, return the local date directly.
if (localDate != null) return localDate
// Users may want to parse only a few datetime fields from a string and extract these fields
// later, and we should provide default values for missing fields.
// To be compatible with Spark 2.4, we pick 1970 as the default value of year.
val year = getOrDefault(accessor, ChronoField.YEAR, 1970)
val month = getOrDefault(accessor, ChronoField.MONTH_OF_YEAR, 1)
val day = getOrDefault(accessor, ChronoField.DAY_OF_MONTH, 1)
LocalDate.of(year, month, day)
}
private def toLocalTime(accessor: TemporalAccessor): LocalTime = {
val localTime = accessor.query(TemporalQueries.localTime())
// If all the time fields are specified, return the local time directly.
if (localTime != null) return localTime
val hour = if (accessor.isSupported(ChronoField.HOUR_OF_DAY)) {
accessor.get(ChronoField.HOUR_OF_DAY)
} else if (accessor.isSupported(ChronoField.HOUR_OF_AMPM)) {
// When we reach here, it means am/pm is not specified. Here we assume it's am.
accessor.get(ChronoField.HOUR_OF_AMPM)
} else {
0
}
val minute = getOrDefault(accessor, ChronoField.MINUTE_OF_HOUR, 0)
val second = getOrDefault(accessor, ChronoField.SECOND_OF_MINUTE, 0)
val nanoSecond = getOrDefault(accessor, ChronoField.NANO_OF_SECOND, 0)
LocalTime.of(hour, minute, second, nanoSecond)
}
// Converts the parsed temporal object to ZonedDateTime. It sets time components to zeros
// if they does not exist in the parsed object.
protected def toZonedDateTime(accessor: TemporalAccessor, zoneId: ZoneId): ZonedDateTime = {
val localDate = toLocalDate(accessor)
val localTime = toLocalTime(accessor)
ZonedDateTime.of(localDate, localTime, zoneId)
}
// Gets a formatter from the cache or creates new one. The buildFormatter method can be called
// a few times with the same parameters in parallel if the cache does not contain values
// associated to those parameters. Since the formatter is immutable, it does not matter.
// In this way, synchronised is intentionally omitted in this method to make parallel calls
// less synchronised.
// The Cache.get method is not used here to avoid creation of additional instances of Callable.
protected def getOrCreateFormatter(
pattern: String,
locale: Locale,
needVarLengthSecondFraction: Boolean = false): DateTimeFormatter = {
val newPattern = convertIncompatiblePattern(pattern)
val useVarLen = needVarLengthSecondFraction && newPattern.contains('S')
val key = (newPattern, locale, useVarLen)
var formatter = cache.getIfPresent(key)
if (formatter == null) {
formatter = buildFormatter(newPattern, locale, useVarLen)
cache.put(key, formatter)
}
formatter
}
// When legacy time parser policy set to EXCEPTION, check whether we will get different results
// between legacy parser and new parser. If new parser fails but legacy parser works, throw a
// SparkUpgradeException. On the contrary, if the legacy policy set to CORRECTED,
// DateTimeParseException will address by the caller side.
protected def checkDiffResult[T](
s: String, legacyParseFunc: String => T): PartialFunction[Throwable, T] = {
case e: DateTimeException if SQLConf.get.legacyTimeParserPolicy == EXCEPTION =>
try {
legacyParseFunc(s)
} catch {
case _: Throwable => throw e
}
throw new SparkUpgradeException("3.0", s"Fail to parse '$s' in the new parser. You can " +
s"set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior " +
s"before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.", e)
}
/**
* When the new DateTimeFormatter failed to initialize because of invalid datetime pattern, it
* will throw IllegalArgumentException. If the pattern can be recognized by the legacy formatter
* it will raise SparkUpgradeException to tell users to restore the previous behavior via LEGACY
* policy or follow our guide to correct their pattern. Otherwise, the original
* IllegalArgumentException will be thrown.
*
* @param pattern the date time pattern
* @param tryLegacyFormatter a func to capture exception, identically which forces a legacy
* datetime formatter to be initialized
*/
protected def checkLegacyFormatter(
pattern: String,
tryLegacyFormatter: => Unit): PartialFunction[Throwable, DateTimeFormatter] = {
case e: IllegalArgumentException =>
try {
tryLegacyFormatter
} catch {
case _: Throwable => throw e
}
throw new SparkUpgradeException("3.0", s"Fail to recognize '$pattern' pattern in the" +
s" DateTimeFormatter. 1) You can set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY" +
s" to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern" +
s" with the guide from https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html",
e)
}
}
private object DateTimeFormatterHelper {
val cache = CacheBuilder.newBuilder()
.maximumSize(128)
.build[(String, Locale, Boolean), DateTimeFormatter]()
final val extractor = "^([^S]*)(S*)(.*)$".r
def createBuilder(): DateTimeFormatterBuilder = {
new DateTimeFormatterBuilder().parseCaseInsensitive()
}
def toFormatter(builder: DateTimeFormatterBuilder, locale: Locale): DateTimeFormatter = {
builder
.toFormatter(locale)
.withChronology(IsoChronology.INSTANCE)
.withResolverStyle(ResolverStyle.STRICT)
}
/**
* Building a formatter for parsing seconds fraction with variable length
*/
def createBuilderWithVarLengthSecondFraction(
pattern: String): DateTimeFormatterBuilder = {
val builder = createBuilder()
pattern.split("'").zipWithIndex.foreach {
// Split string starting with the regex itself which is `'` here will produce an extra empty
// string at res(0). So when the first element here is empty string we do not need append `'`
// literal to the DateTimeFormatterBuilder.
case ("", idx) if idx != 0 => builder.appendLiteral("'")
case (pattenPart, idx) if idx % 2 == 0 =>
var rest = pattenPart
while (rest.nonEmpty) {
rest match {
case extractor(prefix, secondFraction, suffix) =>
builder.appendPattern(prefix)
if (secondFraction.nonEmpty) {
builder.appendFraction(ChronoField.NANO_OF_SECOND, 1, secondFraction.length, false)
}
rest = suffix
case _ => throw new IllegalArgumentException(s"Unrecognized datetime pattern: $pattern")
}
}
case (patternPart, _) => builder.appendLiteral(patternPart)
}
builder
}
def buildFormatter(
pattern: String,
locale: Locale,
varLenEnabled: Boolean): DateTimeFormatter = {
val builder = if (varLenEnabled) {
createBuilderWithVarLengthSecondFraction(pattern)
} else {
createBuilder().appendPattern(pattern)
}
toFormatter(builder, locale)
}
lazy val fractionFormatter: DateTimeFormatter = {
val builder = createBuilder()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
.appendLiteral(' ')
.appendValue(ChronoField.HOUR_OF_DAY, 2).appendLiteral(':')
.appendValue(ChronoField.MINUTE_OF_HOUR, 2).appendLiteral(':')
.appendValue(ChronoField.SECOND_OF_MINUTE, 2)
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
toFormatter(builder, TimestampFormatter.defaultLocale)
}
final val unsupportedLetters = Set('A', 'c', 'e', 'n', 'N', 'p')
final val unsupportedNarrowTextStyle =
Set("GGGGG", "MMMMM", "LLLLL", "EEEEE", "uuuuu", "QQQQQ", "qqqqq", "uuuuu")
/**
* In Spark 3.0, we switch to the Proleptic Gregorian calendar and use DateTimeFormatter for
* parsing/formatting datetime values. The pattern string is incompatible with the one defined
* by SimpleDateFormat in Spark 2.4 and earlier. This function converts all incompatible pattern
* for the new parser in Spark 3.0. See more details in SPARK-31030.
* @param pattern The input pattern.
* @return The pattern for new parser
*/
def convertIncompatiblePattern(pattern: String): String = {
val eraDesignatorContained = pattern.split("'").zipWithIndex.exists {
case (patternPart, index) =>
// Text can be quoted using single quotes, we only check the non-quote parts.
index % 2 == 0 && patternPart.contains("G")
}
(pattern + " ").split("'").zipWithIndex.map {
case (patternPart, index) =>
if (index % 2 == 0) {
for (c <- patternPart if unsupportedLetters.contains(c)) {
throw new IllegalArgumentException(s"Illegal pattern character: $c")
}
for (style <- unsupportedNarrowTextStyle if patternPart.contains(style)) {
throw new IllegalArgumentException(s"Too many pattern letters: ${style.head}")
}
// The meaning of 'u' was day number of week in SimpleDateFormat, it was changed to year
// in DateTimeFormatter. Substitute 'u' to 'e' and use DateTimeFormatter to parse the
// string. If parsable, return the result; otherwise, fall back to 'u', and then use the
// legacy SimpleDateFormat parser to parse. When it is successfully parsed, throw an
// exception and ask users to change the pattern strings or turn on the legacy mode;
// otherwise, return NULL as what Spark 2.4 does.
val res = patternPart.replace("u", "e")
// In DateTimeFormatter, 'u' supports negative years. We substitute 'y' to 'u' here for
// keeping the support in Spark 3.0. If parse failed in Spark 3.0, fall back to 'y'.
// We only do this substitution when there is no era designator found in the pattern.
if (!eraDesignatorContained) {
res.replace("y", "u")
} else {
res
}
} else {
patternPart
}
}.mkString("'").stripSuffix(" ")
}
}