Skip to content

Commit c06e042

Browse files
cfmcgradypan3793
authored andcommitted
[KYUUBI #1784][FOLLOWUP][FLINK] Support float type
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> ```sql SELECT cast(0.1 as float) ``` Before this PR, this query will throw an exception when we ran it with the Flink engine. <details> <summary>Exception message</summary> ``` org.apache.kyuubi.shade.org.apache.thrift.transport.TTransportException at org.apache.kyuubi.shade.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) at org.apache.kyuubi.shade.org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:376) at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:453) at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:435) at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslClientTransport.read(TSaslClientTransport.java:37) at org.apache.kyuubi.shade.org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) at org.apache.kyuubi.shade.org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) at org.apache.kyuubi.shade.org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) at org.apache.kyuubi.shade.org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219) at org.apache.kyuubi.shade.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Client.recv_FetchResults(TCLIService.java:559) at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Client.FetchResults(TCLIService.java:546) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.kyuubi.jdbc.hive.KyuubiConnection$SynchronizedHandler.invoke(KyuubiConnection.java:1611) at com.sun.proxy.$Proxy20.FetchResults(Unknown Source) at org.apache.kyuubi.jdbc.hive.KyuubiQueryResultSet.next(KyuubiQueryResultSet.java:367) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.$anonfun$new$5(FlinkOperationSuite.scala:53) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.$anonfun$new$5$adapted(FlinkOperationSuite.scala:50) at org.apache.kyuubi.operation.JDBCTestHelper.$anonfun$withMultipleConnectionJdbcStatement$3(JDBCTestHelper.scala:60) at org.apache.kyuubi.operation.JDBCTestHelper.$anonfun$withMultipleConnectionJdbcStatement$3$adapted(JDBCTestHelper.scala:60) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.kyuubi.operation.JDBCTestHelper.withMultipleConnectionJdbcStatement(JDBCTestHelper.scala:60) at org.apache.kyuubi.operation.JDBCTestHelper.withMultipleConnectionJdbcStatement$(JDBCTestHelper.scala:54) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.withMultipleConnectionJdbcStatement(FlinkOperationSuite.scala:34) at org.apache.kyuubi.operation.JDBCTestHelper.withJdbcStatement(JDBCTestHelper.scala:98) at org.apache.kyuubi.operation.JDBCTestHelper.withJdbcStatement$(JDBCTestHelper.scala:97) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.withJdbcStatement(FlinkOperationSuite.scala:34) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.$anonfun$new$4(FlinkOperationSuite.scala:50) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226) at org.apache.kyuubi.KyuubiFunSuite.withFixture(KyuubiFunSuite.scala:61) at org.apache.kyuubi.KyuubiFunSuite.withFixture$(KyuubiFunSuite.scala:55) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.withFixture(FlinkOperationSuite.scala:34) at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236) at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(FlinkOperationSuite.scala:34) at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.runTest(FlinkOperationSuite.scala:34) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269) at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) at scala.collection.immutable.List.foreach(List.scala:431) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269) at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268) at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563) at org.scalatest.Suite.run(Suite.scala:1112) at org.scalatest.Suite.run$(Suite.scala:1094) at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273) at org.scalatest.SuperEngine.runImpl(Engine.scala:535) at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273) at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.org$scalatest$BeforeAndAfterAll$$super$run(FlinkOperationSuite.scala:34) at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.run(FlinkOperationSuite.scala:34) at org.scalatest.Suite.callExecuteOnSuite$1(Suite.scala:1175) at org.scalatest.Suite.$anonfun$runNestedSuites$1(Suite.scala:1222) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.scalatest.Suite.runNestedSuites(Suite.scala:1220) at org.scalatest.Suite.runNestedSuites$(Suite.scala:1154) at org.scalatest.tools.DiscoverySuite.runNestedSuites(DiscoverySuite.scala:30) at org.scalatest.Suite.run(Suite.scala:1109) at org.scalatest.Suite.run$(Suite.scala:1094) at org.scalatest.tools.DiscoverySuite.run(DiscoverySuite.scala:30) at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1322) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1316) at scala.collection.immutable.List.foreach(List.scala:431) at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1316) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971) at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1482) at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971) at org.scalatest.tools.Runner$.main(Runner.scala:775) ``` </details> ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1801 from cfmcgrady/kyuubi-1784-flink. Closes #1784 17623e8 [Fu Chen] fix style 6385c41 [Fu Chen] fix Authored-by: Fu Chen <cfmcgrady@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent c65df0f commit c06e042

File tree

2 files changed

+13
-2
lines changed

2 files changed

+13
-2
lines changed

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ object RowSet {
112112
case _: FloatType =>
113113
val tDoubleValue = new TDoubleValue
114114
if (row.getField(ordinal) != null) {
115-
tDoubleValue.setValue(row.getField(ordinal).asInstanceOf[Float])
115+
val doubleValue = lang.Double.valueOf(row.getField(ordinal).asInstanceOf[Float].toString)
116+
tDoubleValue.setValue(doubleValue)
116117
}
117118
TColumnValue.doubleVal(tDoubleValue)
118119
case _: DoubleType =>
@@ -167,7 +168,8 @@ object RowSet {
167168
val values = getOrSetAsNull[lang.Long](rows, ordinal, nulls, 0L)
168169
TColumn.i64Val(new TI64Column(values, nulls))
169170
case _: FloatType =>
170-
val values = getOrSetAsNull[lang.Double](rows, ordinal, nulls, 0.0)
171+
val values = getOrSetAsNull[lang.Float](rows, ordinal, nulls, 0.0f)
172+
.asScala.map(n => lang.Double.valueOf(n.toString)).asJava
171173
TColumn.doubleVal(new TDoubleColumn(values, nulls))
172174
case _: DoubleType =>
173175
val values = getOrSetAsNull[lang.Double](rows, ordinal, nulls, 0.0)

externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,15 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
593593
}
594594
}
595595

596+
test("execute statement - select float") {
597+
withJdbcStatement()({ statement =>
598+
val resultSet = statement.executeQuery("SELECT cast(0.1 as float)")
599+
assert(resultSet.next())
600+
assert(resultSet.getString(1) == "0.1")
601+
assert(resultSet.getFloat(1) == 0.1f)
602+
})
603+
}
604+
596605
test("execute statement - show functions") {
597606
withJdbcStatement() { statement =>
598607
val resultSet = statement.executeQuery("show functions")

0 commit comments

Comments
 (0)