Skip to content

Commit 9bdff9b

Browse files
committed
[KYUUBI #3227] SparkConfParser supports parse bytes and time
### _Why are the changes needed?_ SparkConfParser supports parse bytes and time for better user experience. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3227 from pan3793/tpc. Closes #3227 a2d8847 [Cheng Pan] style 87e54f5 [Cheng Pan] review 381316e [Cheng Pan] nit 5a23f0e [Cheng Pan] fix 48dfe33 [Cheng Pan] nit c7c85f7 [Cheng Pan] SparkConfParser support parse bytes and time Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 6c4a8b0 commit 9bdff9b

File tree

7 files changed

+256
-55
lines changed

7 files changed

+256
-55
lines changed

docs/connector/spark/tpch.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ To add TPC-H tables as a catalog, we can set the following configurations in ``$
7575
spark.sql.catalog.tpch.useAnsiStringType=false
7676
7777
# (optional) Maximum bytes per task, consider reducing it if you want higher parallelism.
78-
spark.sql.catalog.tpch.read.maxPartitionBytes=134217728
78+
spark.sql.catalog.tpch.read.maxPartitionBytes=128m
7979
8080
TPC-H Operations
8181
----------------

extensions/spark/kyuubi-spark-connector-common/pom.xml

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,27 @@
6666
</dependency>
6767

6868
<dependency>
69-
<groupId>org.scalatestplus</groupId>
70-
<artifactId>scalacheck-1-15_${scala.binary.version}</artifactId>
69+
<groupId>org.apache.spark</groupId>
70+
<artifactId>spark-sql_${scala.binary.version}</artifactId>
71+
<type>test-jar</type>
7172
<scope>test</scope>
7273
</dependency>
7374

7475
<dependency>
75-
<groupId>org.apache.spark</groupId>
76-
<artifactId>spark-sql_${scala.binary.version}</artifactId>
77-
<version>${spark.version}</version>
78-
<type>test-jar</type>
76+
<groupId>org.apache.hadoop</groupId>
77+
<artifactId>hadoop-client-api</artifactId>
78+
<scope>test</scope>
79+
</dependency>
80+
81+
<dependency>
82+
<groupId>org.apache.hadoop</groupId>
83+
<artifactId>hadoop-client-runtime</artifactId>
84+
<scope>test</scope>
85+
</dependency>
86+
87+
<dependency>
88+
<groupId>org.scalatestplus</groupId>
89+
<artifactId>scalacheck-1-15_${scala.binary.version}</artifactId>
7990
<scope>test</scope>
8091
</dependency>
8192
</dependencies>
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.spark.connector.common;
19+
20+
import java.util.HashMap;
21+
import java.util.Locale;
22+
import java.util.Map;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.regex.Matcher;
25+
import java.util.regex.Pattern;
26+
import org.apache.spark.network.util.ByteUnit;
27+
28+
/** Copied from Apache Spark org.apache.spark.network.util.JavaUtils */
29+
public class JavaUtils {
30+
31+
private static final Map<String, TimeUnit> timeSuffixes;
32+
33+
static {
34+
timeSuffixes = new HashMap<>();
35+
timeSuffixes.put("us", TimeUnit.MICROSECONDS);
36+
timeSuffixes.put("ms", TimeUnit.MILLISECONDS);
37+
timeSuffixes.put("s", TimeUnit.SECONDS);
38+
timeSuffixes.put("m", TimeUnit.MINUTES);
39+
timeSuffixes.put("min", TimeUnit.MINUTES);
40+
timeSuffixes.put("h", TimeUnit.HOURS);
41+
timeSuffixes.put("d", TimeUnit.DAYS);
42+
}
43+
44+
private static final Map<String, ByteUnit> byteSuffixes;
45+
46+
static {
47+
byteSuffixes = new HashMap<>();
48+
byteSuffixes.put("b", ByteUnit.BYTE);
49+
byteSuffixes.put("k", ByteUnit.KiB);
50+
byteSuffixes.put("kb", ByteUnit.KiB);
51+
byteSuffixes.put("m", ByteUnit.MiB);
52+
byteSuffixes.put("mb", ByteUnit.MiB);
53+
byteSuffixes.put("g", ByteUnit.GiB);
54+
byteSuffixes.put("gb", ByteUnit.GiB);
55+
byteSuffixes.put("t", ByteUnit.TiB);
56+
byteSuffixes.put("tb", ByteUnit.TiB);
57+
byteSuffixes.put("p", ByteUnit.PiB);
58+
byteSuffixes.put("pb", ByteUnit.PiB);
59+
}
60+
61+
/**
62+
* Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count in the given unit. The
63+
* unit is also considered the default if the given string does not specify a unit.
64+
*/
65+
public static long timeStringAs(String str, TimeUnit unit) {
66+
String lower = str.toLowerCase(Locale.ROOT).trim();
67+
68+
try {
69+
Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(lower);
70+
if (!m.matches()) {
71+
throw new NumberFormatException("Failed to parse time string: " + str);
72+
}
73+
74+
long val = Long.parseLong(m.group(1));
75+
String suffix = m.group(2);
76+
77+
// Check for invalid suffixes
78+
if (suffix != null && !timeSuffixes.containsKey(suffix)) {
79+
throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
80+
}
81+
82+
// If suffix is valid use that, otherwise none was provided and use the default passed
83+
return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : unit);
84+
} catch (NumberFormatException e) {
85+
String timeError =
86+
"Time must be specified as seconds (s), "
87+
+ "milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). "
88+
+ "E.g. 50s, 100ms, or 250us.";
89+
90+
throw new NumberFormatException(timeError + "\n" + e.getMessage());
91+
}
92+
}
93+
94+
/**
95+
* Convert a time parameter such as (50s, 100ms, or 250us) to milliseconds for internal use. If no
96+
* suffix is provided, the passed number is assumed to be in ms.
97+
*/
98+
public static long timeStringAsMs(String str) {
99+
return timeStringAs(str, TimeUnit.MILLISECONDS);
100+
}
101+
102+
/**
103+
* Convert a time parameter such as (50s, 100ms, or 250us) to seconds for internal use. If no
104+
* suffix is provided, the passed number is assumed to be in seconds.
105+
*/
106+
public static long timeStringAsSec(String str) {
107+
return timeStringAs(str, TimeUnit.SECONDS);
108+
}
109+
110+
/**
111+
* Convert a passed byte string (e.g. 50b, 100kb, or 250mb) to the given. If no suffix is
112+
* provided, a direct conversion to the provided unit is attempted.
113+
*/
114+
public static long byteStringAs(String str, ByteUnit unit) {
115+
String lower = str.toLowerCase(Locale.ROOT).trim();
116+
117+
try {
118+
Matcher m = Pattern.compile("([0-9]+)([a-z]+)?").matcher(lower);
119+
Matcher fractionMatcher = Pattern.compile("([0-9]+\\.[0-9]+)([a-z]+)?").matcher(lower);
120+
121+
if (m.matches()) {
122+
long val = Long.parseLong(m.group(1));
123+
String suffix = m.group(2);
124+
125+
// Check for invalid suffixes
126+
if (suffix != null && !byteSuffixes.containsKey(suffix)) {
127+
throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
128+
}
129+
130+
// If suffix is valid use that, otherwise none was provided and use the default passed
131+
return unit.convertFrom(val, suffix != null ? byteSuffixes.get(suffix) : unit);
132+
} else if (fractionMatcher.matches()) {
133+
throw new NumberFormatException(
134+
"Fractional values are not supported. Input was: " + fractionMatcher.group(1));
135+
} else {
136+
throw new NumberFormatException("Failed to parse byte string: " + str);
137+
}
138+
139+
} catch (NumberFormatException e) {
140+
String byteError =
141+
"Size must be specified as bytes (b), "
142+
+ "kibibytes (k), mebibytes (m), gibibytes (g), tebibytes (t), or pebibytes(p). "
143+
+ "E.g. 50b, 100k, or 250m.";
144+
145+
throw new NumberFormatException(byteError + "\n" + e.getMessage());
146+
}
147+
}
148+
149+
/**
150+
* Convert a passed byte string (e.g. 50b, 100k, or 250m) to bytes for internal use.
151+
*
152+
* <p>If no suffix is provided, the passed number is assumed to be in bytes.
153+
*/
154+
public static long byteStringAsBytes(String str) {
155+
return byteStringAs(str, ByteUnit.BYTE);
156+
}
157+
158+
/**
159+
* Convert a passed byte string (e.g. 50b, 100k, or 250m) to kibibytes for internal use.
160+
*
161+
* <p>If no suffix is provided, the passed number is assumed to be in kibibytes.
162+
*/
163+
public static long byteStringAsKb(String str) {
164+
return byteStringAs(str, ByteUnit.KiB);
165+
}
166+
167+
/**
168+
* Convert a passed byte string (e.g. 50b, 100k, or 250m) to mebibytes for internal use.
169+
*
170+
* <p>If no suffix is provided, the passed number is assumed to be in mebibytes.
171+
*/
172+
public static long byteStringAsMb(String str) {
173+
return byteStringAs(str, ByteUnit.MiB);
174+
}
175+
176+
/**
177+
* Convert a passed byte string (e.g. 50b, 100k, or 250m) to gibibytes for internal use.
178+
*
179+
* <p>If no suffix is provided, the passed number is assumed to be in gibibytes.
180+
*/
181+
public static long byteStringAsGb(String str) {
182+
return byteStringAs(str, ByteUnit.GiB);
183+
}
184+
}

extensions/spark/kyuubi-spark-connector-common/src/main/scala/org/apache/kyuubi/spark/connector/common/SparkConfParser.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ case class SparkConfParser(
3838
def longConf(): LongConfParser = new LongConfParser()
3939
def doubleConf(): DoubleConfParser = new DoubleConfParser()
4040
def stringConf(): StringConfParser = new StringConfParser()
41+
def bytesConf(): BytesConfParser = new BytesConfParser()
42+
def timeConf(): TimeConfParser = new TimeConfParser()
4143

4244
class BooleanConfParser extends ConfParser[Boolean] {
4345
override protected def conversion(value: String): Boolean = value.toBoolean
@@ -59,11 +61,19 @@ case class SparkConfParser(
5961
override protected def conversion(value: String): String = value
6062
}
6163

64+
class BytesConfParser extends ConfParser[Long] {
65+
override protected def conversion(value: String): Long = JavaUtils.byteStringAsBytes(value)
66+
}
67+
68+
class TimeConfParser extends ConfParser[Long] {
69+
override protected def conversion(value: String): Long = JavaUtils.timeStringAsMs(value)
70+
}
71+
6272
abstract class ConfParser[T]() {
6373
private var optionName: Option[String] = None
6474
private var sessionConfName: Option[String] = None
6575
private var tablePropertyName: Option[String] = None
66-
private var defaultValue: Option[T] = None
76+
private var defaultStringValue: Option[String] = None
6777

6878
def option(name: String): ConfParser[T] = {
6979
this.optionName = Some(name)
@@ -80,8 +90,8 @@ case class SparkConfParser(
8090
this
8191
}
8292

83-
def defaultValue(value: T): ConfParser[T] = {
84-
this.defaultValue = Some(value)
93+
def defaultStringValue(value: String): ConfParser[T] = {
94+
this.defaultStringValue = Some(value)
8595
this
8696
}
8797

@@ -96,18 +106,13 @@ case class SparkConfParser(
96106
if (valueOpt.isEmpty && properties != null) {
97107
valueOpt = tablePropertyName.flatMap(name => Option(properties.get(name)))
98108
}
99-
valueOpt = valueOpt.filter(_ != null)
100-
if (valueOpt.isDefined) {
101-
valueOpt.map(conversion(_))
102-
} else {
103-
defaultValue
104-
}
109+
valueOpt.orElse(defaultStringValue).map(conversion)
105110
}
106111

107112
protected def conversion(value: String): T
108113

109114
def parse(): T = {
110-
assert(defaultValue.isDefined, "Default value cannot be empty.")
115+
assert(defaultStringValue.isDefined, "Default value cannot be empty.")
111116
parseOptional().get
112117
}
113118

extensions/spark/kyuubi-spark-connector-common/src/test/scala/org/apache/kyuubi/spark/connector/common/SparkConfParserSuite.scala

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,49 +17,50 @@
1717

1818
package org.apache.kyuubi.spark.connector.common
1919

20-
// scalastyle:off anyfunsuite
2120
import scala.collection.JavaConverters._
2221

22+
import org.apache.spark.SparkFunSuite
2323
import org.apache.spark.sql.util.CaseInsensitiveStringMap
24-
import org.scalatest.BeforeAndAfterAll
25-
import org.scalatest.funsuite.AnyFunSuite
2624

27-
class SparkConfParserSuite extends AnyFunSuite with BeforeAndAfterAll {
28-
// scalastyle:on anyfunsuite
25+
class SparkConfParserSuite extends SparkFunSuite {
2926

30-
test("parse options config") {
31-
assert(confParser.stringConf().option("optKey1").defaultValue("test").parse() === "optValue1")
32-
assert(confParser.booleanConf().option("booleanKey").defaultValue(true).parse() === false)
33-
assert(confParser.intConf().option("intKey").defaultValue(0).parse() === 10)
34-
assert(confParser.longConf().option("longKey").defaultValue(0).parse() === Long.MaxValue)
35-
assert(confParser.doubleConf().option("doubleKey").defaultValue(0.0).parse() === 1.1)
36-
}
37-
38-
test("parse properties config") {
39-
assert(confParser.intConf().option("key1")
40-
.tableProperty("key1")
41-
.defaultValue(0).parse() === 111)
42-
assert(confParser.stringConf()
43-
.option("propertyKey1")
44-
.tableProperty("propertyKey1")
45-
.defaultValue("test").parse() === "propertyValue1")
46-
}
47-
48-
private var confParser: SparkConfParser = null
27+
private var confParser: SparkConfParser = _
4928

5029
override protected def beforeAll(): Unit = {
5130
super.beforeAll()
5231
val options = new CaseInsensitiveStringMap(Map(
5332
"key1" -> "111",
54-
"optKey1" -> "optValue1",
5533
"booleanKey" -> "false",
5634
"intKey" -> "10",
5735
"longKey" -> String.valueOf(Long.MaxValue),
58-
"doubleKey" -> "1.1").asJava)
36+
"doubleKey" -> "1.1",
37+
"bytesKey" -> "1k",
38+
"timeKey" -> "1s").asJava)
5939
val properties = Map(
6040
"key1" -> "333",
6141
"propertyKey1" -> "propertyValue1")
6242
confParser = SparkConfParser(options, null, properties.asJava)
6343
}
6444

45+
test("parse options config") {
46+
assert(confParser.stringConf().option("optKey1").defaultStringValue("test").parse() === "test")
47+
assert(
48+
confParser.booleanConf().option("booleanKey").defaultStringValue("true").parse() === false)
49+
assert(confParser.intConf().option("intKey").defaultStringValue("0").parse() === 10)
50+
assert(
51+
confParser.longConf().option("longKey").defaultStringValue("0").parse() === Long.MaxValue)
52+
assert(confParser.doubleConf().option("doubleKey").defaultStringValue("0.0").parse() === 1.1)
53+
assert(confParser.bytesConf().option("bytesKey").defaultStringValue("0k").parse() === 1024L)
54+
assert(confParser.timeConf().option("timeKey").defaultStringValue("0s").parse() === 1000L)
55+
}
56+
57+
test("parse properties config") {
58+
assert(confParser.intConf().option("key1")
59+
.tableProperty("key1")
60+
.defaultStringValue("0").parse() === 111)
61+
assert(confParser.stringConf()
62+
.option("propertyKey1")
63+
.tableProperty("propertyKey1")
64+
.defaultStringValue("test").parse() === "propertyValue1")
65+
}
6566
}

0 commit comments

Comments
 (0)