Skip to content

Commit c1a0ce8

Browse files
hddongyaooqinn
authored andcommitted
[KYUUBI #1515] Add RowSet for trino engine
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Add RowSet for trino engine ### _How was this patch tested?_ - [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [X] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1549 from hddong/add-rowset-for-trino. Closes #1515 cc88513 [hongdongdong] remove timeZone 2fb1093 [hongdongdong] fix toHiveString 9066caf [hongdongdong] rm optionalStart 491b768 [hongdongdong] Reset 27895eb [hongdongdong] fix RowSetUtils 7ce4ede [hongdongdong] fix 34dd01b [hongdongdong] move functions to Utils 79e0621 [hongdongdong] fix 2dcd91e [hongdongdong] fix 71bdcfd [hongdongdong] [KYUUBI #1515] Add RowSet for trino engine Authored-by: hongdongdong <hongdongdong@cmss.chinamobile.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent 53ff56b commit c1a0ce8

File tree

5 files changed

+669
-0
lines changed

5 files changed

+669
-0
lines changed

externals/kyuubi-trino-engine/pom.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,25 @@
4545
<version>${project.version}</version>
4646
</dependency>
4747

48+
<dependency>
49+
<groupId>io.trino</groupId>
50+
<artifactId>trino-client</artifactId>
51+
</dependency>
52+
53+
<dependency>
54+
<groupId>org.apache.kyuubi</groupId>
55+
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
56+
<version>${project.version}</version>
57+
<type>test-jar</type>
58+
<scope>test</scope>
59+
</dependency>
60+
61+
<dependency>
62+
<groupId>org.slf4j</groupId>
63+
<artifactId>jul-to-slf4j</artifactId>
64+
<scope>test</scope>
65+
</dependency>
66+
4867
</dependencies>
4968

5069
</project>
Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.trino.schema
19+
20+
import java.nio.ByteBuffer
21+
import java.nio.charset.StandardCharsets
22+
23+
import scala.collection.JavaConverters._
24+
25+
import io.trino.client.ClientStandardTypes._
26+
import io.trino.client.Column
27+
import io.trino.client.Row
28+
import org.apache.hive.service.rpc.thrift.TBinaryColumn
29+
import org.apache.hive.service.rpc.thrift.TBoolColumn
30+
import org.apache.hive.service.rpc.thrift.TBoolValue
31+
import org.apache.hive.service.rpc.thrift.TByteColumn
32+
import org.apache.hive.service.rpc.thrift.TByteValue
33+
import org.apache.hive.service.rpc.thrift.TColumn
34+
import org.apache.hive.service.rpc.thrift.TColumnValue
35+
import org.apache.hive.service.rpc.thrift.TDoubleColumn
36+
import org.apache.hive.service.rpc.thrift.TDoubleValue
37+
import org.apache.hive.service.rpc.thrift.TI16Column
38+
import org.apache.hive.service.rpc.thrift.TI16Value
39+
import org.apache.hive.service.rpc.thrift.TI32Column
40+
import org.apache.hive.service.rpc.thrift.TI32Value
41+
import org.apache.hive.service.rpc.thrift.TI64Column
42+
import org.apache.hive.service.rpc.thrift.TI64Value
43+
import org.apache.hive.service.rpc.thrift.TProtocolVersion
44+
import org.apache.hive.service.rpc.thrift.TRow
45+
import org.apache.hive.service.rpc.thrift.TRowSet
46+
import org.apache.hive.service.rpc.thrift.TStringColumn
47+
import org.apache.hive.service.rpc.thrift.TStringValue
48+
49+
import org.apache.kyuubi.util.RowSetUtils.bitSetToBuffer
50+
51+
object RowSet {
52+
53+
def toTRowSet(
54+
rows: Seq[List[_]],
55+
schema: List[Column],
56+
protocolVersion: TProtocolVersion): TRowSet = {
57+
if (protocolVersion.getValue < TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
58+
toRowBasedSet(rows, schema)
59+
} else {
60+
toColumnBasedSet(rows, schema)
61+
}
62+
}
63+
64+
def toRowBasedSet(rows: Seq[List[_]], schema: List[Column]): TRowSet = {
65+
val tRows = rows.map { row =>
66+
val tRow = new TRow()
67+
(0 until row.size).map(i => toTColumnValue(i, row, schema))
68+
.foreach(tRow.addToColVals)
69+
tRow
70+
}.asJava
71+
new TRowSet(0, tRows)
72+
}
73+
74+
def toColumnBasedSet(rows: Seq[List[_]], schema: List[Column]): TRowSet = {
75+
val size = rows.size
76+
val tRowSet = new TRowSet(0, new java.util.ArrayList[TRow](size))
77+
schema.zipWithIndex.foreach { case (filed, i) =>
78+
val tColumn = toTColumn(
79+
rows,
80+
i,
81+
filed.getType)
82+
tRowSet.addToColumns(tColumn)
83+
}
84+
tRowSet
85+
}
86+
87+
private def toTColumn(
88+
rows: Seq[Seq[Any]],
89+
ordinal: Int,
90+
typ: String): TColumn = {
91+
val nulls = new java.util.BitSet()
92+
typ match {
93+
case BOOLEAN =>
94+
val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true)
95+
TColumn.boolVal(new TBoolColumn(values, nulls))
96+
97+
case TINYINT =>
98+
val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 0.toByte)
99+
TColumn.byteVal(new TByteColumn(values, nulls))
100+
101+
case SMALLINT =>
102+
val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls, 0.toShort)
103+
TColumn.i16Val(new TI16Column(values, nulls))
104+
105+
case INTEGER =>
106+
val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0)
107+
TColumn.i32Val(new TI32Column(values, nulls))
108+
109+
case BIGINT =>
110+
val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L)
111+
TColumn.i64Val(new TI64Column(values, nulls))
112+
113+
case REAL =>
114+
val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 0.toFloat)
115+
.asScala.map(n => java.lang.Double.valueOf(n.toDouble)).asJava
116+
TColumn.doubleVal(new TDoubleColumn(values, nulls))
117+
118+
case DOUBLE =>
119+
val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 0.toDouble)
120+
TColumn.doubleVal(new TDoubleColumn(values, nulls))
121+
122+
case VARCHAR =>
123+
val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
124+
TColumn.stringVal(new TStringColumn(values, nulls))
125+
126+
case VARBINARY =>
127+
val values = getOrSetAsNull[Array[Byte]](rows, ordinal, nulls, Array())
128+
.asScala
129+
.map(ByteBuffer.wrap)
130+
.asJava
131+
TColumn.binaryVal(new TBinaryColumn(values, nulls))
132+
133+
case _ =>
134+
val values = rows.zipWithIndex.map { case (row, i) =>
135+
nulls.set(i, row(ordinal) == null)
136+
if (row(ordinal) == null) {
137+
""
138+
} else {
139+
toHiveString((row(ordinal), typ))
140+
}
141+
}.asJava
142+
TColumn.stringVal(new TStringColumn(values, nulls))
143+
}
144+
}
145+
146+
private def getOrSetAsNull[T](
147+
rows: Seq[Seq[Any]],
148+
ordinal: Int,
149+
nulls: java.util.BitSet,
150+
defaultVal: T): java.util.List[T] = {
151+
val size = rows.length
152+
val ret = new java.util.ArrayList[T](size)
153+
var idx = 0
154+
while (idx < size) {
155+
val row = rows(idx)
156+
val isNull = row(ordinal) == null
157+
if (isNull) {
158+
nulls.set(idx, true)
159+
ret.add(idx, defaultVal)
160+
} else {
161+
ret.add(idx, row(ordinal).asInstanceOf[T])
162+
}
163+
idx += 1
164+
}
165+
ret
166+
}
167+
168+
private def toTColumnValue(
169+
ordinal: Int,
170+
row: List[Any],
171+
types: List[Column]): TColumnValue = {
172+
173+
types(ordinal).getType match {
174+
case BOOLEAN =>
175+
val boolValue = new TBoolValue
176+
if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean])
177+
TColumnValue.boolVal(boolValue)
178+
179+
case TINYINT =>
180+
val byteValue = new TByteValue
181+
if (row(ordinal) != null) byteValue.setValue(row(ordinal).asInstanceOf[Byte])
182+
TColumnValue.byteVal(byteValue)
183+
184+
case SMALLINT =>
185+
val tI16Value = new TI16Value
186+
if (row(ordinal) != null) tI16Value.setValue(row(ordinal).asInstanceOf[Short])
187+
TColumnValue.i16Val(tI16Value)
188+
189+
case INTEGER =>
190+
val tI32Value = new TI32Value
191+
if (row(ordinal) != null) tI32Value.setValue(row(ordinal).asInstanceOf[Int])
192+
TColumnValue.i32Val(tI32Value)
193+
194+
case BIGINT =>
195+
val tI64Value = new TI64Value
196+
if (row(ordinal) != null) tI64Value.setValue(row(ordinal).asInstanceOf[Long])
197+
TColumnValue.i64Val(tI64Value)
198+
199+
case REAL =>
200+
val tDoubleValue = new TDoubleValue
201+
if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Float])
202+
TColumnValue.doubleVal(tDoubleValue)
203+
204+
case DOUBLE =>
205+
val tDoubleValue = new TDoubleValue
206+
if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Double])
207+
TColumnValue.doubleVal(tDoubleValue)
208+
209+
case VARCHAR =>
210+
val tStringValue = new TStringValue
211+
if (row(ordinal) != null) tStringValue.setValue(row(ordinal).asInstanceOf[String])
212+
TColumnValue.stringVal(tStringValue)
213+
214+
case _ =>
215+
val tStrValue = new TStringValue
216+
if (row(ordinal) != null) {
217+
tStrValue.setValue(
218+
toHiveString((row(ordinal), types(ordinal).getType)))
219+
}
220+
TColumnValue.stringVal(tStrValue)
221+
}
222+
}
223+
224+
/**
225+
* A simpler impl of Trino's toHiveString
226+
*/
227+
def toHiveString(dataWithType: (Any, String)): String = {
228+
dataWithType match {
229+
case (null, _) =>
230+
// Only match nulls in nested type values
231+
"null"
232+
233+
case (bin: Array[Byte], VARBINARY) =>
234+
new String(bin, StandardCharsets.UTF_8)
235+
236+
case (s: String, VARCHAR) =>
237+
// Only match string in nested type values
238+
"\"" + s + "\""
239+
240+
// for Array Map and Row, temporarily convert to string
241+
// TODO further analysis of type
242+
case (list: java.util.List[_], _) =>
243+
formatValue(list)
244+
245+
case (m: java.util.Map[_, _], _) =>
246+
formatValue(m)
247+
248+
case (row: Row, _) =>
249+
formatValue(row)
250+
251+
case (other, _) =>
252+
other.toString
253+
}
254+
}
255+
256+
def formatValue(o: Any): String = {
257+
o match {
258+
case null =>
259+
"null"
260+
261+
case m: java.util.Map[_, _] =>
262+
m.asScala.map { case (key, value) =>
263+
formatValue(key) + ":" + formatValue(value)
264+
}.toSeq.sorted.mkString("{", ",", "}")
265+
266+
case l: java.util.List[_] =>
267+
l.asScala.map(formatValue).mkString("[", ",", "]")
268+
269+
case row: Row =>
270+
row.getFields.asScala.map { r =>
271+
val formattedValue = formatValue(r.getValue())
272+
if (r.getName.isPresent) {
273+
r.getName.get() + "=" + formattedValue
274+
} else {
275+
formattedValue
276+
}
277+
}.mkString("{", ",", "}")
278+
279+
case _ => o.toString
280+
}
281+
}
282+
}

0 commit comments

Comments
 (0)