Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-13739][table-blink] JDK String to bytes should specify UTF-8 encoding #9455

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -23,6 +23,8 @@ import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedEx
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isBinaryString
import org.apache.flink.table.types.logical.LogicalType

import java.nio.charset.StandardCharsets

/**
* Generates PRINT function call.
*/
Expand All @@ -40,8 +42,9 @@ class PrintCallGen extends CallGenerator {
val logTerm = "logger$"
ctx.addReusableLogger(logTerm, "_Print$_")

val charsets = classOf[StandardCharsets].getCanonicalName
val outputCode = if (isBinaryString(returnType)) {
s"new String($resultTerm, java.nio.charset.Charset.defaultCharset())"
s"new String($resultTerm, $charsets.UTF_8)"
} else {
s"String.valueOf(${operands(1).resultTerm})"
}
Expand Down
Expand Up @@ -24,6 +24,7 @@

import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.Random;

/**
Expand Down Expand Up @@ -108,7 +109,7 @@ public void eval(String str, String separatorChars, int startIndex) {

public void eval(byte[] varbinary) {
if (varbinary != null) {
this.eval(new String(varbinary));
this.eval(new String(varbinary, StandardCharsets.UTF_8));
}
}

Expand Down
Expand Up @@ -26,6 +26,8 @@ import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.types.Row

import java.nio.charset.StandardCharsets

abstract class ScalarTypesTestBase extends ExpressionTestBase {

override def testData: Row = {
Expand Down Expand Up @@ -83,8 +85,8 @@ abstract class ScalarTypesTestBase extends ExpressionTestBase {
testData.setField(50, localDate("1997-11-11"))
testData.setField(51, localTime("09:44:55"))
testData.setField(52, localDateTime("1997-11-11 09:44:55.333"))
testData.setField(53, "hello world".getBytes)
testData.setField(54, "This is a testing string.".getBytes)
testData.setField(53, "hello world".getBytes(StandardCharsets.UTF_8))
testData.setField(54, "This is a testing string.".getBytes(StandardCharsets.UTF_8))
testData
}

Expand Down
Expand Up @@ -45,6 +45,7 @@ import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
import org.junit._

import java.nio.charset.StandardCharsets
import java.sql.{Date, Time, Timestamp}
import java.time.{LocalDate, LocalDateTime}
import java.util
Expand Down Expand Up @@ -361,7 +362,7 @@ class CalcITCase extends BatchTestBase {

@Test
def testBinary(): Unit = {
val data = Seq(row(1, 2, "hehe".getBytes))
val data = Seq(row(1, 2, "hehe".getBytes(StandardCharsets.UTF_8)))
registerCollection(
"MyTable",
data,
Expand Down Expand Up @@ -1170,21 +1171,24 @@ class CalcITCase extends BatchTestBase {
def testCalcBinary(): Unit = {
registerCollection(
"BinaryT",
nullData3.map((r) => row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes)),
nullData3.map((r) => row(r.getField(0), r.getField(1),
r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO),
"a, b, c",
nullablesOfNullData3)
checkResult(
"select a, b, c from BinaryT where b < 1000",
nullData3.map((r) => row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes))
nullData3.map((r) => row(r.getField(0), r.getField(1),
r.getField(2).toString.getBytes(StandardCharsets.UTF_8)))
)
}

@Test(expected = classOf[UnsupportedOperationException])
def testOrderByBinary(): Unit = {
registerCollection(
"BinaryT",
nullData3.map((r) => row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes)),
nullData3.map((r) => row(r.getField(0), r.getField(1),
r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO),
"a, b, c",
nullablesOfNullData3)
Expand All @@ -1196,7 +1200,8 @@ class CalcITCase extends BatchTestBase {
"select * from BinaryT order by c",
nullData3.sortBy((x : Row) =>
x.getField(2).asInstanceOf[String]).map((r) =>
row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes)),
row(r.getField(0), r.getField(1),
r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
isSorted = true
)
}
Expand All @@ -1205,7 +1210,8 @@ class CalcITCase extends BatchTestBase {
def testGroupByBinary(): Unit = {
registerCollection(
"BinaryT2",
nullData3.map((r) => row(r.getField(0), r.getField(1).toString.getBytes, r.getField(2))),
nullData3.map((r) => row(r.getField(0),
r.getField(1).toString.getBytes(StandardCharsets.UTF_8), r.getField(2))),
new RowTypeInfo(INT_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO, STRING_TYPE_INFO),
"a, b, c",
nullablesOfNullData3)
Expand Down
Expand Up @@ -113,21 +113,6 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[
}
}

@SerialVersionUID(1L)
class TableFunc5 extends TableFunction[Tuple2[String, Int]] {
def eval(bytes: Array[Byte]) {
if (null != bytes) {
collect(new Tuple2(new String(bytes), bytes.length))
}
}

def eval(str: String) {
if (null != str) {
collect(new Tuple2(str, str.length))
}
}
}

//TODO support dynamic type
//class UDTFWithDynamicType extends TableFunction[Row] {
//
Expand Down
Expand Up @@ -27,6 +27,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import static org.apache.flink.table.dataformat.BinaryFormat.MAX_FIX_PART_DATA_SIZE;
Expand Down Expand Up @@ -75,7 +76,7 @@ public abstract class AbstractBinaryWriter implements BinaryWriter {
public void writeString(int pos, BinaryString input) {
if (input.getSegments() == null) {
String javaObject = input.getJavaObject();
writeBytes(pos, javaObject.getBytes());
writeBytes(pos, javaObject.getBytes(StandardCharsets.UTF_8));
} else {
int len = input.getSizeInBytes();
if (len <= 7) {
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.dataformat.vector.BytesColumnVector.Bytes;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;

/**
* A VectorizedColumnBatch is a set of rows, organized with each column as a vector. It is the
Expand Down Expand Up @@ -114,7 +115,7 @@ private byte[] getBytes(int rowId, int colId) {

public String getString(int rowId, int colId) {
Bytes byteArray = getByteArray(rowId, colId);
return new String(byteArray.data, byteArray.offset, byteArray.len);
return new String(byteArray.data, byteArray.offset, byteArray.len, StandardCharsets.UTF_8);
}

public Decimal getDecimal(int rowId, int colId, int precision, int scale) {
Expand Down
Expand Up @@ -486,7 +486,7 @@ private static byte[] strToBytesWithCharset(String str, String charsetName) {
}
}
if (byteArray == null) {
byteArray = str.getBytes();
byteArray = str.getBytes(StandardCharsets.UTF_8);
}
return byteArray;
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.core.memory.MemorySegment;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import static org.apache.flink.table.runtime.util.SegmentsUtil.allocateReuseBytes;
Expand Down Expand Up @@ -297,10 +298,6 @@ public static int decodeUTF8Strict(MemorySegment segment, int sp, int len, char[
}

public static String defaultDecodeUTF8(byte[] bytes, int offset, int len) {
try {
return new String(bytes, offset, len, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("encodeUTF8 error", e);
}
return new String(bytes, offset, len, StandardCharsets.UTF_8);
}
}
Expand Up @@ -48,6 +48,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
Expand Down Expand Up @@ -182,7 +183,7 @@ public void testWriteString() {
BinaryRow row = new BinaryRow(2);
BinaryRowWriter writer = new BinaryRowWriter(row);
writer.writeString(0, fromString(str));
writer.writeString(1, fromBytes(str.getBytes()));
writer.writeString(1, fromBytes(str.getBytes(StandardCharsets.UTF_8)));
writer.complete();

assertEquals(str, row.getString(0).toString());
Expand Down
Expand Up @@ -724,7 +724,7 @@ public void testDecodeWithIllegalUtf8Bytes() throws UnsupportedEncodingException
byte[] bytes = new byte[] {(byte) 20122, (byte) 40635, 124, (byte) 38271, (byte) 34966,
124, (byte) 36830, (byte) 34915, (byte) 35033, 124, (byte) 55357, 124, (byte) 56407 };

String str = new String(bytes);
String str = new String(bytes, StandardCharsets.UTF_8);
assertEquals(str, StringUtf8Utils.decodeUTF8(bytes, 0, bytes.length));
assertEquals(str, StringUtf8Utils.decodeUTF8(MemorySegmentFactory.wrap(bytes), 0, bytes.length));

Expand Down
Expand Up @@ -30,6 +30,8 @@

import org.junit.Test;

import java.nio.charset.StandardCharsets;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand All @@ -49,7 +51,7 @@ public void testTyped() {

HeapBytesVector col1 = new HeapBytesVector(VECTOR_SIZE);
for (int i = 0; i < VECTOR_SIZE; i++) {
byte[] bytes = String.valueOf(i).getBytes();
byte[] bytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
col1.setVal(i, bytes, 0, bytes.length);
}

Expand Down