Skip to content

Commit

Permalink
modify all
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Aug 15, 2019
1 parent e28534c commit e628a7a
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 31 deletions.
Expand Up @@ -23,6 +23,8 @@ import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedEx
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isBinaryString
import org.apache.flink.table.types.logical.LogicalType

import java.nio.charset.StandardCharsets

/**
* Generates PRINT function call.
*/
Expand All @@ -40,8 +42,9 @@ class PrintCallGen extends CallGenerator {
val logTerm = "logger$"
ctx.addReusableLogger(logTerm, "_Print$_")

val charsets = classOf[StandardCharsets].getCanonicalName
val outputCode = if (isBinaryString(returnType)) {
s"new String($resultTerm, java.nio.charset.Charset.defaultCharset())"
s"new String($resultTerm, $charsets.UTF_8)"
} else {
s"String.valueOf(${operands(1).resultTerm})"
}
Expand Down
Expand Up @@ -24,6 +24,7 @@

import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.Random;

/**
Expand Down Expand Up @@ -108,7 +109,7 @@ public void eval(String str, String separatorChars, int startIndex) {

public void eval(byte[] varbinary) {
if (varbinary != null) {
this.eval(new String(varbinary));
this.eval(new String(varbinary, StandardCharsets.UTF_8));
}
}

Expand Down
Expand Up @@ -45,6 +45,7 @@ import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
import org.junit._

import java.nio.charset.StandardCharsets
import java.sql.{Date, Time, Timestamp}
import java.time.{LocalDate, LocalDateTime}
import java.util
Expand Down Expand Up @@ -361,7 +362,7 @@ class CalcITCase extends BatchTestBase {

@Test
def testBinary(): Unit = {
val data = Seq(row(1, 2, "hehe".getBytes))
val data = Seq(row(1, 2, "hehe".getBytes(StandardCharsets.UTF_8)))
registerCollection(
"MyTable",
data,
Expand Down Expand Up @@ -1170,21 +1171,24 @@ class CalcITCase extends BatchTestBase {
def testCalcBinary(): Unit = {
registerCollection(
"BinaryT",
nullData3.map((r) => row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes)),
nullData3.map((r) => row(r.getField(0), r.getField(1),
r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO),
"a, b, c",
nullablesOfNullData3)
checkResult(
"select a, b, c from BinaryT where b < 1000",
nullData3.map((r) => row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes))
nullData3.map((r) => row(r.getField(0), r.getField(1),
r.getField(2).toString.getBytes(StandardCharsets.UTF_8)))
)
}

@Test(expected = classOf[UnsupportedOperationException])
def testOrderByBinary(): Unit = {
registerCollection(
"BinaryT",
nullData3.map((r) => row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes)),
nullData3.map((r) => row(r.getField(0), r.getField(1),
r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO),
"a, b, c",
nullablesOfNullData3)
Expand All @@ -1196,7 +1200,8 @@ class CalcITCase extends BatchTestBase {
"select * from BinaryT order by c",
nullData3.sortBy((x : Row) =>
x.getField(2).asInstanceOf[String]).map((r) =>
row(r.getField(0), r.getField(1), r.getField(2).toString.getBytes)),
row(r.getField(0), r.getField(1),
r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
isSorted = true
)
}
Expand All @@ -1205,7 +1210,8 @@ class CalcITCase extends BatchTestBase {
def testGroupByBinary(): Unit = {
registerCollection(
"BinaryT2",
nullData3.map((r) => row(r.getField(0), r.getField(1).toString.getBytes, r.getField(2))),
nullData3.map((r) => row(r.getField(0),
r.getField(1).toString.getBytes(StandardCharsets.UTF_8), r.getField(2))),
new RowTypeInfo(INT_TYPE_INFO, BYTE_PRIMITIVE_ARRAY_TYPE_INFO, STRING_TYPE_INFO),
"a, b, c",
nullablesOfNullData3)
Expand Down
Expand Up @@ -113,21 +113,6 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[
}
}

@SerialVersionUID(1L)
class TableFunc5 extends TableFunction[Tuple2[String, Int]] {
def eval(bytes: Array[Byte]) {
if (null != bytes) {
collect(new Tuple2(new String(bytes), bytes.length))
}
}

def eval(str: String) {
if (null != str) {
collect(new Tuple2(str, str.length))
}
}
}

//TODO support dynamic type
//class UDTFWithDynamicType extends TableFunction[Row] {
//
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.dataformat.vector.BytesColumnVector.Bytes;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;

/**
* A VectorizedColumnBatch is a set of rows, organized with each column as a vector. It is the
Expand Down Expand Up @@ -114,7 +115,7 @@ private byte[] getBytes(int rowId, int colId) {

public String getString(int rowId, int colId) {
Bytes byteArray = getByteArray(rowId, colId);
return new String(byteArray.data, byteArray.offset, byteArray.len);
return new String(byteArray.data, byteArray.offset, byteArray.len, StandardCharsets.UTF_8);
}

public Decimal getDecimal(int rowId, int colId, int precision, int scale) {
Expand Down
Expand Up @@ -486,7 +486,7 @@ private static byte[] strToBytesWithCharset(String str, String charsetName) {
}
}
if (byteArray == null) {
byteArray = str.getBytes();
byteArray = str.getBytes(StandardCharsets.UTF_8);
}
return byteArray;
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.core.memory.MemorySegment;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import static org.apache.flink.table.runtime.util.SegmentsUtil.allocateReuseBytes;
Expand Down Expand Up @@ -297,10 +298,6 @@ public static int decodeUTF8Strict(MemorySegment segment, int sp, int len, char[
}

public static String defaultDecodeUTF8(byte[] bytes, int offset, int len) {
try {
return new String(bytes, offset, len, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("encodeUTF8 error", e);
}
return new String(bytes, offset, len, StandardCharsets.UTF_8);
}
}
Expand Up @@ -724,7 +724,7 @@ public void testDecodeWithIllegalUtf8Bytes() throws UnsupportedEncodingException
byte[] bytes = new byte[] {(byte) 20122, (byte) 40635, 124, (byte) 38271, (byte) 34966,
124, (byte) 36830, (byte) 34915, (byte) 35033, 124, (byte) 55357, 124, (byte) 56407 };

String str = new String(bytes);
String str = new String(bytes, StandardCharsets.UTF_8);
assertEquals(str, StringUtf8Utils.decodeUTF8(bytes, 0, bytes.length));
assertEquals(str, StringUtf8Utils.decodeUTF8(MemorySegmentFactory.wrap(bytes), 0, bytes.length));

Expand Down

0 comments on commit e628a7a

Please sign in to comment.