Skip to content

Commit

Permalink
[FLINK-15801] fix Timestamp extractor created from properties does no…
Browse files Browse the repository at this point in the history
…t work for some physical fields
  • Loading branch information
Matrix42 committed Apr 16, 2020
1 parent 577abe7 commit 3ba0866
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
Expand Up @@ -293,8 +293,14 @@ public static Map<String, String> deriveFieldMapping(
.containsKey(SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_TYPE);
boolean isGeneratedColumn = properties
.containsKey(SCHEMA + "." + i + "." + TABLE_SCHEMA_EXPR);
// remove proctime/rowtime from mapping
if (isProctime || isRowtime || isGeneratedColumn) {
if (isRowtime) {
Optional<String> timestampSource = properties.getOptionalString(SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_FROM);
if (!timestampSource.isPresent()) {
mapping.remove(name);
}
}
// remove proctime and generatedColumn from mapping
if (isProctime || isGeneratedColumn) {
mapping.remove(name);
}
// check for invalid fields
Expand Down
Expand Up @@ -201,4 +201,25 @@ class SchemaValidatorTest {
val schema = SchemaValidator.deriveTableSinkSchema(properties)
assertEquals(expectd, schema)
}

@Test
def testDeriveFieldMappingWithRowtimeFromField(): Unit = {
val descriptor = new Schema()
.field("f1", Types.STRING)
.field("f2", Types.STRING)
.field("f3", Types.SQL_TIMESTAMP)
.field("rt", Types.SQL_TIMESTAMP).rowtime(
new Rowtime().timestampsFromField("rt")
.watermarksPeriodicBounded(1000L))
val properties = new DescriptorProperties()
properties.putProperties(descriptor.toProperties)

val rowtime = SchemaValidator.deriveRowtimeAttributes(properties).get(0)
assertEquals("rt", rowtime.getAttributeName)
val extractor = rowtime.getTimestampExtractor
assertTrue(extractor.equals(new ExistingField("rt")))
assertTrue(rowtime.getWatermarkStrategy.isInstanceOf[BoundedOutOfOrderTimestamps])
val fieldMapping = SchemaValidator.deriveFieldMapping(properties, Optional.of(properties.getTableSchema(Schema.SCHEMA).toRowType))
assertTrue(fieldMapping.containsKey("rt"))
}
}

0 comments on commit 3ba0866

Please sign in to comment.