From 792cd172379de8520fec7f31367ec39fb3f791c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=90=A8=E7=91=9C?= Date: Thu, 21 Dec 2017 14:53:28 +0800 Subject: [PATCH] Set utf-16 as SQL DSL default charset feat: support national charset --- .../sql/impl/planner/BeamQueryPlanner.java | 10 ++++ .../sdk/extensions/sql/BeamSqlDslBase.java | 2 +- .../extensions/sql/BeamSqlNonAsciiTest.java | 57 +++++++++++++++++++ 3 files changed, 68 insertions(+), 1 deletion(-) create mode 100644 sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlNonAsciiTest.java diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java index ce46e2d4399b..a32759d40e43 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java @@ -56,6 +56,7 @@ import org.apache.calcite.tools.Planner; import org.apache.calcite.tools.RelConversionException; import org.apache.calcite.tools.ValidationException; +import org.apache.calcite.util.ConversionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +75,15 @@ public class BeamQueryPlanner { RelDataTypeSystem.DEFAULT); public BeamQueryPlanner(SchemaPlus schema) { + String defaultCharsetKey = "saffron.default.charset"; + if (System.getProperty(defaultCharsetKey) == null) { + System.setProperty(defaultCharsetKey, ConversionUtil.NATIVE_UTF16_CHARSET_NAME); + System.setProperty("saffron.default.nationalcharset", + ConversionUtil.NATIVE_UTF16_CHARSET_NAME); + System.setProperty("saffron.default.collation.name", + String.format("%s$%s", ConversionUtil.NATIVE_UTF16_CHARSET_NAME, "en_US")); + } + final List traitDefs = new ArrayList<>(); traitDefs.add(ConventionTraitDef.INSTANCE); traitDefs.add(RelCollationTraitDef.INSTANCE); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java index b27435c9d2c5..52987b8bc5c0 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java @@ -127,7 +127,7 @@ private static List prepareInputRowsInTableA() throws ParseException rows.add(row3); BeamRecord row4 = new BeamRecord(rowTypeInTableA - , 4, 4000L, Short.valueOf("4"), Byte.valueOf("4"), 4.0f, 4.0, "string_row4" + , 4, 4000L, Short.valueOf("4"), Byte.valueOf("4"), 4.0f, 4.0, "第四行" , FORMAT.parse("2017-01-01 02:04:03"), 0, new BigDecimal(4)); rows.add(row4); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlNonAsciiTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlNonAsciiTest.java new file mode 100644 index 000000000000..e4debe0b6b33 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlNonAsciiTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.Test; + +/** + * Tests for non ascii char in sql. + */ +public class BeamSqlNonAsciiTest extends BeamSqlDslBase { + + @Test + public void testDefaultCharsetLiteral() { + String sql = "SELECT * FROM TABLE_A WHERE f_string = '第四行'"; + + PCollection result = + PCollectionTuple.of(new TupleTag("TABLE_A"), boundedInput1) + .apply("testCompositeFilter", BeamSql.queryMulti(sql)); + + PAssert.that(result).containsInAnyOrder(recordsInTableA.get(3)); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testNationalCharsetLiteral() { + String sql = "SELECT * FROM TABLE_A WHERE f_string = N'第四行'"; + + PCollection result = + PCollectionTuple.of(new TupleTag("TABLE_A"), boundedInput1) + .apply("testCompositeFilter", BeamSql.queryMulti(sql)); + + PAssert.that(result).containsInAnyOrder(recordsInTableA.get(3)); + + pipeline.run().waitUntilFinish(); + } +}