Skip to content

Commit

Permalink
[FLINK-13739][table-blink] JDK String to bytes should specify UTF-8 e…
Browse files Browse the repository at this point in the history
…ncoding

This closes #9455
  • Loading branch information
JingsongLi authored and wuchong committed Aug 18, 2019
1 parent b61ea35 commit 130f4e8
Show file tree
Hide file tree
Showing 12 changed files with 35 additions and 36 deletions.
Expand Up @@ -23,6 +23,8 @@ import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedEx
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isBinaryString
import org.apache.flink.table.types.logical.LogicalType

import java.nio.charset.StandardCharsets

/**
* Generates PRINT function call.
*/
Expand All @@ -40,8 +42,9 @@ class PrintCallGen extends CallGenerator {
val logTerm = "logger$"
ctx.addReusableLogger(logTerm, "_Print$_")

val charsets = classOf[StandardCharsets].getCanonicalName
val outputCode = if (isBinaryString(returnType)) {
s"new String($resultTerm, java.nio.charset.Charset.defaultCharset())"
s"new String($resultTerm, $charsets.UTF_8)"
} else {
s"String.valueOf(${operands(1).resultTerm})"
}
Expand Down
Expand Up @@ -24,6 +24,7 @@

import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.Random;

/**
Expand Down Expand Up @@ -108,7 +109,7 @@ public void eval(String str, String separatorChars, int startIndex) {

public void eval(byte[] varbinary) {
if (varbinary != null) {
this.eval(new String(varbinary));
this.eval(new String(varbinary, StandardCharsets.UTF_8));
}
}

Expand Down
Expand Up @@ -26,6 +26,8 @@ import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.types.Row

import java.nio.charset.StandardCharsets

abstract class ScalarTypesTestBase extends ExpressionTestBase {

override def testData: Row = {
Expand Down Expand Up @@ -83,8 +85,8 @@ abstract class ScalarTypesTestBase extends ExpressionTestBase {
testData.setField(50, localDate("1997-11-11"))
testData.setField(51, localTime("09:44:55"))
testData.setField(52, localDateTime("1997-11-11 09:44:55.333"))
testData.setField(53, "hello world".getBytes)
testData.setField(54, "This is a testing string.".getBytes)
testData.setField(53, "hello world".getBytes(StandardCharsets.UTF_8))
testData.setField(54, "This is a testing string.".getBytes(StandardCharsets.UTF_8))
testData
}

Expand Down
Expand Up @@ -45,6 +45,7 @@ import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
import org.junit._

import java.nio.charset.StandardCharsets
import java.sql.{Date, Time, Timestamp}
import java.time.{LocalDate, LocalDateTime}
import java.util
Expand Down Expand Up @@ -361,7 +362,7 @@ class CalcITCase extends BatchTestBase {

@Test
def testBinary(): Unit = {
val data = Seq(row(1, 2, "hehe".getBytes))
val data = Seq(row(1, 2, "hehe".getBytes(StandardCharsets.UTF_8)))
registerCollection(
"MyTable",
data,
Expand Down Expand Up @@ -1170,21 +1171,24 @@ class CalcITCase extends BatchTestBase {
def testCalcBinary(): Unit = {
registerCollection(
"BinaryT",
nullData3.map((r) => row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes)),
nullData3.map((r) => row(r.getField(0), r.getField(1),
r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO),
"a, b, c",
nullablesOfNullData3)
checkResult(
"select a, b, c from BinaryT where b < 1000",
nullData3.map((r) => row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes))
nullData3.map((r) => row(r.getField(0), r.getField(1),
r.getField(2).toString.getBytes(StandardCharsets.UTF_8)))
)
}

@Test(expected = classOf[UnsupportedOperationException])
def testOrderByBinary(): Unit = {
registerCollection(
"BinaryT",
nullData3.map((r) => row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes)),
nullData3.map((r) => row(r.getField(0), r.getField(1),
r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO),
"a, b, c",
nullablesOfNullData3)
Expand All @@ -1196,7 +1200,8 @@ class CalcITCase extends BatchTestBase {
"select * from BinaryT order by c",
nullData3.sortBy((x : Row) =>
x.getField(2).asInstanceOf[String]).map((r) =>
row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes)),
row(r.getField(0), r.getField(1),
r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
isSorted = true
)
}
Expand All @@ -1205,7 +1210,8 @@ class CalcITCase extends BatchTestBase {
def testGroupByBinary(): Unit = {
registerCollection(
"BinaryT2",
nullData3.map((r) => row(r.getField(0), r.getField(1).toString.getBytes, r.getField(2))),
nullData3.map((r) => row(r.getField(0),
r.getField(1).toString.getBytes(StandardCharsets.UTF_8), r.getField(2))),
new RowTypeInfo(INT_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO, STRING_TYPE_INFO),
"a, b, c",
nullablesOfNullData3)
Expand Down
Expand Up @@ -113,21 +113,6 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[
}
}

@SerialVersionUID(1L)
class TableFunc5 extends TableFunction[Tuple2[String, Int]] {
def eval(bytes: Array[Byte]) {
if (null != bytes) {
collect(new Tuple2(new String(bytes), bytes.length))
}
}

def eval(str: String) {
if (null != str) {
collect(new Tuple2(str, str.length))
}
}
}

//TODO support dynamic type
//class UDTFWithDynamicType extends TableFunction[Row] {
//
Expand Down
Expand Up @@ -27,6 +27,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import static org.apache.flink.table.dataformat.BinaryFormat.MAX_FIX_PART_DATA_SIZE;
Expand Down Expand Up @@ -75,7 +76,7 @@ public abstract class AbstractBinaryWriter implements BinaryWriter {
public void writeString(int pos, BinaryString input) {
if (input.getSegments() == null) {
String javaObject = input.getJavaObject();
writeBytes(pos, javaObject.getBytes());
writeBytes(pos, javaObject.getBytes(StandardCharsets.UTF_8));
} else {
int len = input.getSizeInBytes();
if (len <= 7) {
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.dataformat.vector.BytesColumnVector.Bytes;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;

/**
* A VectorizedColumnBatch is a set of rows, organized with each column as a vector. It is the
Expand Down Expand Up @@ -114,7 +115,7 @@ private byte[] getBytes(int rowId, int colId) {

public String getString(int rowId, int colId) {
Bytes byteArray = getByteArray(rowId, colId);
return new String(byteArray.data, byteArray.offset, byteArray.len);
return new String(byteArray.data, byteArray.offset, byteArray.len, StandardCharsets.UTF_8);
}

public Decimal getDecimal(int rowId, int colId, int precision, int scale) {
Expand Down
Expand Up @@ -486,7 +486,7 @@ private static byte[] strToBytesWithCharset(String str, String charsetName) {
}
}
if (byteArray == null) {
byteArray = str.getBytes();
byteArray = str.getBytes(StandardCharsets.UTF_8);
}
return byteArray;
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.core.memory.MemorySegment;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import static org.apache.flink.table.runtime.util.SegmentsUtil.allocateReuseBytes;
Expand Down Expand Up @@ -297,10 +298,6 @@ public static int decodeUTF8Strict(MemorySegment segment, int sp, int len, char[
}

public static String defaultDecodeUTF8(byte[] bytes, int offset, int len) {
try {
return new String(bytes, offset, len, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("encodeUTF8 error", e);
}
return new String(bytes, offset, len, StandardCharsets.UTF_8);
}
}
Expand Up @@ -48,6 +48,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
Expand Down Expand Up @@ -182,7 +183,7 @@ public void testWriteString() {
BinaryRow row = new BinaryRow(2);
BinaryRowWriter writer = new BinaryRowWriter(row);
writer.writeString(0, fromString(str));
writer.writeString(1, fromBytes(str.getBytes()));
writer.writeString(1, fromBytes(str.getBytes(StandardCharsets.UTF_8)));
writer.complete();

assertEquals(str, row.getString(0).toString());
Expand Down
Expand Up @@ -724,7 +724,7 @@ public void testDecodeWithIllegalUtf8Bytes() throws UnsupportedEncodingException
byte[] bytes = new byte[] {(byte) 20122, (byte) 40635, 124, (byte) 38271, (byte) 34966,
124, (byte) 36830, (byte) 34915, (byte) 35033, 124, (byte) 55357, 124, (byte) 56407 };

String str = new String(bytes);
String str = new String(bytes, StandardCharsets.UTF_8);
assertEquals(str, StringUtf8Utils.decodeUTF8(bytes, 0, bytes.length));
assertEquals(str, StringUtf8Utils.decodeUTF8(MemorySegmentFactory.wrap(bytes), 0, bytes.length));

Expand Down
Expand Up @@ -30,6 +30,8 @@

import org.junit.Test;

import java.nio.charset.StandardCharsets;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand All @@ -49,7 +51,7 @@ public void testTyped() {

HeapBytesVector col1 = new HeapBytesVector(VECTOR_SIZE);
for (int i = 0; i < VECTOR_SIZE; i++) {
byte[] bytes = String.valueOf(i).getBytes();
byte[] bytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
col1.setVal(i, bytes, 0, bytes.length);
}

Expand Down

0 comments on commit 130f4e8

Please sign in to comment.