Skip to content

Commit

Permalink
[KYUUBI #1705] Support decimal type for Flink SQL Engine
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1706 from yanghua/KYUUBI-1705.

Closes #1705

fad35d7 [yanghua] [KYUUBI #1705] Support decimal type for Flink SQL Engine

Authored-by: yanghua <yanghua1127@gmail.com>
Signed-off-by: yanghua <yanghua1127@gmail.com>
  • Loading branch information
yanghua committed Jan 9, 2022
1 parent c43d3a7 commit 6f4427b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 4 deletions.
Expand Up @@ -24,7 +24,7 @@ import java.util.Collections
import scala.collection.JavaConverters._
import scala.language.implicitConversions

import org.apache.flink.table.types.logical._
import org.apache.flink.table.types.logical.{DecimalType, _}
import org.apache.flink.types.Row
import org.apache.hive.service.rpc.thrift._

Expand Down Expand Up @@ -148,8 +148,17 @@ object RowSet {
case _: CharType =>
val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
TColumn.stringVal(new TStringColumn(values, nulls))

case _ =>
null
val values = rows.zipWithIndex.toList.map { case (row, i) =>
nulls.set(i, row.getField(ordinal) == null)
if (row.getField(ordinal) == null) {
""
} else {
toHiveString((row.getField(ordinal), logicalType))
}
}.asJava
TColumn.stringVal(new TStringColumn(values, nulls))
}
}

Expand All @@ -168,7 +177,7 @@ object RowSet {
nulls.set(idx, true)
ret.add(idx, defaultVal)
} else {
ret.add(idx, row.getField(ordinal).asInstanceOf[T])
ret.add(idx, row.getFieldAs[T](ordinal))
}
idx += 1
}
Expand Down Expand Up @@ -212,6 +221,25 @@ object RowSet {
case _: DoubleType => TTypeId.DOUBLE_TYPE
case _: VarCharType => TTypeId.STRING_TYPE
case _: CharType => TTypeId.STRING_TYPE
case _ => null
case _: DecimalType => TTypeId.DECIMAL_TYPE
case other =>
throw new IllegalArgumentException(s"Unrecognized type name: ${other.asSummaryString()}")
}

/**
* A simpler impl of Flink's toHiveString
*/
def toHiveString(dataWithType: (Any, LogicalType)): String = {
dataWithType match {
case (null, _) =>
// Only match nulls in nested type values
"null"

case (decimal: java.math.BigDecimal, _: DecimalType) =>
decimal.toPlainString

case (other, _) =>
other.toString
}
}
}
Expand Up @@ -74,4 +74,20 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
assert(resultSet.getString(1) === "tmp.hello")
}
}

test("execute statement - select decimal") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("SELECT 1.2BD, 1.23BD ")
assert(resultSet.next())
assert(resultSet.getBigDecimal(1) === java.math.BigDecimal.valueOf(1.2))
assert(resultSet.getBigDecimal(2) === java.math.BigDecimal.valueOf(1.23))
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.DECIMAL)
assert(metaData.getColumnType(2) === java.sql.Types.DECIMAL)
assert(metaData.getPrecision(1) == 2)
assert(metaData.getPrecision(2) == 3)
assert(metaData.getScale(1) == 1)
assert(metaData.getScale(2) == 2)
}
}
}

0 comments on commit 6f4427b

Please sign in to comment.