From 7dc32cf12041725572ad7c79c9bdd5437e0e298f Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Sun, 24 Apr 2022 19:45:38 +0800 Subject: [PATCH] [FLINK-27304][hive] Calcite's varbinary type should be converted to Hive's binary type. --- .../delegation/hive/copy/HiveParserTypeConverter.java | 3 ++- .../apache/flink/connectors/hive/HiveDialectQueryITCase.java | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserTypeConverter.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserTypeConverter.java index 56ca2b52ffd13..8a26b3cfc6f34 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserTypeConverter.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserTypeConverter.java @@ -142,7 +142,7 @@ public static RelDataType convert(PrimitiveTypeInfo type, RelDataTypeFactory dtF convertedType = dtFactory.createSqlType(SqlTypeName.TIMESTAMP, 9); break; case BINARY: - convertedType = dtFactory.createSqlType(SqlTypeName.BINARY); + convertedType = dtFactory.createSqlType(SqlTypeName.VARBINARY); break; case DECIMAL: DecimalTypeInfo dtInf = (DecimalTypeInfo) type; @@ -289,6 +289,7 @@ private static TypeInfo convertPrimitiveType(RelDataType rType) { case INTERVAL_SECOND: return hiveShim.getIntervalDayTimeTypeInfo(); case BINARY: + case VARBINARY: return TypeInfoFactory.binaryTypeInfo; case DECIMAL: return TypeInfoFactory.getDecimalTypeInfo(rType.getPrecision(), rType.getScale()); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java index a41686bdd4734..9365b6a25a6db 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java @@ -82,6 +82,8 @@ public static void setup() throws Exception { tableEnv.executeSql("CREATE TABLE src (key STRING, value STRING)"); tableEnv.executeSql( "CREATE TABLE srcpart (key STRING, `value` STRING) PARTITIONED BY (ds STRING, hr STRING)"); + tableEnv.executeSql("create table binary_t (a int, ab array)"); + tableEnv.executeSql( "CREATE TABLE nested (\n" + " a int,\n" @@ -156,7 +158,8 @@ public void testAdditionalQueries() throws Exception { + "(partition by dep order by salary desc) as rnk from employee) a where rnk=1", "select salary,sum(cnt) over (order by salary)/sum(cnt) over " + "(order by salary ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) from" - + " (select salary,count(*) as cnt from employee group by salary) a")); + + " (select salary,count(*) as cnt from employee group by salary) a", + "select a, one from binary_t lateral view explode(ab) abs as one where a > 0")); if (HiveVersionTestUtil.HIVE_230_OR_LATER) { toRun.add( "select weekofyear(current_timestamp()), dayofweek(current_timestamp()) from src limit 1");