Skip to content

Commit

Permalink
[Flink] fix STRING Type converted to varchar(1) (#1930)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZBHdd committed Aug 23, 2023
1 parent 4c5fa1f commit 23826a3
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 20 deletions.
Expand Up @@ -99,7 +99,7 @@ public static LogicalType toFlinkDataType(DataType deltaType, boolean nullable)
case TIMESTAMP:
return new TimestampType(nullable, TimestampType.DEFAULT_PRECISION);
case STRING:
return new VarCharType(nullable, VarCharType.DEFAULT_LENGTH);
return new VarCharType(nullable, VarCharType.MAX_LENGTH);
case STRUCT:
return toRowType((StructType) deltaType, nullable);
default:
Expand Down
Expand Up @@ -320,7 +320,7 @@ private void assertRowsFromSnapshot(Snapshot snapshot) throws IOException {
);
assertThat(
row.getString(ALL_DATA_TABLE_COLUMN_NAMES[8]),
equalTo(String.valueOf(i))
equalTo("test-" + i)
);

// same value for all columns
Expand Down
Expand Up @@ -324,7 +324,7 @@ private void assertRowValues(int i, RowData row) {
equalTo(Timestamp.valueOf("2022-06-14 18:54:24.547557")
.toLocalDateTime().toInstant(ZoneOffset.UTC))
);
assertThat(row.getString(8).toString(), equalTo(String.valueOf(i)));
assertThat(row.getString(8).toString(), equalTo("test-" + i));

// same value for all columns
assertThat(row.getBoolean(9), equalTo(true));
Expand Down
Expand Up @@ -41,15 +41,15 @@ private static Stream<Arguments> dataTypes() {
return Stream.of(
Arguments.of(new io.delta.standalone.types.FloatType(), new FloatType()),
Arguments.of(new io.delta.standalone.types.IntegerType(), new IntType()),
Arguments.of(new io.delta.standalone.types.StringType(), new VarCharType()),
Arguments.of(new io.delta.standalone.types.StringType(), new VarCharType(VarCharType.MAX_LENGTH)),
Arguments.of(new io.delta.standalone.types.DoubleType(), new DoubleType()),
Arguments.of(
new io.delta.standalone.types.MapType(
new io.delta.standalone.types.StringType(),
new io.delta.standalone.types.IntegerType(),
true // valueContainsNull
),
new MapType(new VarCharType(), new IntType())),
new MapType(new VarCharType(VarCharType.MAX_LENGTH), new IntType())),
Arguments.of(
new io.delta.standalone.types.ArrayType(
new io.delta.standalone.types.ByteType(),
Expand All @@ -61,16 +61,16 @@ private static Stream<Arguments> dataTypes() {
new io.delta.standalone.types.StringType(),
true // containsNull
),
new ArrayType(new VarCharType())),
Arguments.of(new io.delta.standalone.types.StringType(), new VarCharType()),
new ArrayType(new VarCharType(VarCharType.MAX_LENGTH))),
Arguments.of(new io.delta.standalone.types.StringType(), new VarCharType(VarCharType.MAX_LENGTH)),
Arguments.of(new io.delta.standalone.types.BooleanType(), new BooleanType()),
Arguments.of(new io.delta.standalone.types.ByteType(), new TinyIntType()),
Arguments.of(new io.delta.standalone.types.ShortType(), new SmallIntType()),
Arguments.of(new io.delta.standalone.types.LongType(), new BigIntType()),
Arguments.of(new io.delta.standalone.types.BinaryType(), new BinaryType()),
Arguments.of(new io.delta.standalone.types.TimestampType(), new TimestampType()),
Arguments.of(new io.delta.standalone.types.DateType(), new DateType()),
Arguments.of(new io.delta.standalone.types.StringType(), new VarCharType()),
Arguments.of(new io.delta.standalone.types.StringType(), new VarCharType(VarCharType.MAX_LENGTH)),
Arguments.of(new io.delta.standalone.types.DecimalType(10, 0), new DecimalType(10, 0)),
Arguments.of(new io.delta.standalone.types.DecimalType(2, 0), new DecimalType(2)),
Arguments.of(new io.delta.standalone.types.DecimalType(2, 2), new DecimalType(2, 2)),
Expand All @@ -82,7 +82,7 @@ private static Stream<Arguments> dataTypes() {
new StructField("f02", new io.delta.standalone.types.IntegerType()),
}),
new RowType(Arrays.asList(
new RowType.RowField("f01", new VarCharType()),
new RowType.RowField("f01", new VarCharType(VarCharType.MAX_LENGTH)),
new RowType.RowField("f02", new IntType()))
))
);
Expand Down Expand Up @@ -116,7 +116,7 @@ private static Stream<Arguments> mapTypes() {
new io.delta.standalone.types.IntegerType(),
true
),
new MapType(new VarCharType(), new IntType())),
new MapType(new VarCharType(VarCharType.MAX_LENGTH), new IntType())),
Arguments.of(
new io.delta.standalone.types.MapType(
new io.delta.standalone.types.IntegerType(),
Expand All @@ -138,7 +138,7 @@ private static Stream<Arguments> mapTypes() {
),
new MapType(new BigIntType(),
new RowType(Arrays.asList(
new RowType.RowField("f01", new VarCharType()),
new RowType.RowField("f01", new VarCharType(VarCharType.MAX_LENGTH)),
new RowType.RowField("f02", new IntType())
)))),
Arguments.of(
Expand All @@ -154,7 +154,7 @@ private static Stream<Arguments> mapTypes() {
new io.delta.standalone.types.IntegerType(),
true
),
new MapType(new VarCharType(), new IntType()))
new MapType(new VarCharType(VarCharType.MAX_LENGTH), new IntType()))
);
}

Expand Down
Expand Up @@ -119,7 +119,7 @@ public void shouldGetTableSchema() {
assertThat(sourceSchema.getSnapshotVersion(), equalTo(SNAPSHOT_VERSION));
assertArrayEquals(new String[]{"col1", "col2"}, sourceSchema.getColumnNames());
assertArrayEquals(
new LogicalType[]{new VarCharType(), new IntType()},
new LogicalType[]{new VarCharType(VarCharType.MAX_LENGTH), new IntType()},
sourceSchema.getColumnTypes()
);
}
Expand All @@ -140,7 +140,7 @@ public void shouldGetTableSchemaForUserColumns() {
assertThat(sourceSchema.getSnapshotVersion(), equalTo(SNAPSHOT_VERSION));
assertArrayEquals(new String[]{"col1"}, sourceSchema.getColumnNames());
assertArrayEquals(
new LogicalType[]{new VarCharType()},
new LogicalType[]{new VarCharType(VarCharType.MAX_LENGTH)},
sourceSchema.getColumnTypes()
);
}
Expand Down
Expand Up @@ -671,11 +671,11 @@ public void shouldDescribeTable() throws Exception {

// column name; column type; is nullable; primary key; comments; watermark
assertThat(describeRows).containsExactly(
"name;VARCHAR(1);true;null;null;null",
"surname;VARCHAR(1);true;null;null;null",
"name;STRING;true;null;null;null",
"surname;STRING;true;null;null;null",
"age;INT;true;null;null;null",
"col1;VARCHAR(1);true;null;null;null",
"col2;VARCHAR(1);true;null;null;null"
"col1;STRING;true;null;null;null",
"col2;STRING;true;null;null;null"
);
}

Expand Down
Expand Up @@ -21,7 +21,7 @@ This table was generated using scala/spark code:
```
park.range(0, 5)
.map(x => (
x.toByte, x.toShort, x.toInt, x.toDouble, x.toFloat, BigInt(x), BigDecimal(x), Timestamp.valueOf(java.time.LocalDateTime.now), x.toString, true)
x.toByte, x.toShort, x.toInt, x.toDouble, x.toFloat, BigInt(x), BigDecimal(x), Timestamp.valueOf(java.time.LocalDateTime.now), s"test-${x}", true)
)
.toDF("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10")
.write
Expand Down
@@ -1,4 +1,4 @@
{"commitInfo":{"timestamp":1655232870674,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"2573","numOutputRows":"5"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"447f54c2-6f7c-4d7e-8ddf-0b6a51fe03b6","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"col1\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col2\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col4\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col5\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col6\",\"type\":\"decimal(38,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col7\",\"type\":\"decimal(38,18)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col8\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col9\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col10\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1655232868484}}
{"add":{"path":"part-00000-64ff527a-c93c-448d-ba15-088a95699f01-c000.snappy.parquet","partitionValues":{},"size":2573,"modificationTime":1655232870594,"dataChange":true}}
{"add":{"path":"part-00000-64ff527a-c93c-448d-ba15-088a95699f01-c000.snappy.parquet","partitionValues":{},"size":2831,"modificationTime":1655232870594,"dataChange":true}}
Binary file not shown.

0 comments on commit 23826a3

Please sign in to comment.