diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java index c4624b5272..3256b3f521 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java @@ -390,11 +390,7 @@ private static void convertFlussTablePropertiesToFlinkOptions( public static List toFlussTableChanges( org.apache.flink.table.catalog.TableChange tableChange) { - if (tableChange - instanceof org.apache.flink.table.catalog.TableChange.MaterializedTableChange) { - // MaterializedTableChange may produce multiple fluss TableChange, - return convertMaterializedTableChange(tableChange); - } else if (tableChange instanceof org.apache.flink.table.catalog.TableChange.SetOption) { + if (tableChange instanceof org.apache.flink.table.catalog.TableChange.SetOption) { return Collections.singletonList( convertSetOption( (org.apache.flink.table.catalog.TableChange.SetOption) tableChange)); @@ -402,6 +398,10 @@ public static List toFlussTableChanges( return Collections.singletonList( convertResetOption( (org.apache.flink.table.catalog.TableChange.ResetOption) tableChange)); + } else if (tableChange instanceof ModifyRefreshStatus + || tableChange instanceof ModifyRefreshHandler) { + // MaterializedTableChange may produce multiple fluss TableChange. + return convertMaterializedTableChange(tableChange); } else { throw new UnsupportedOperationException( String.format("Unsupported flink table change: %s.", tableChange));