Skip to content

Commit

Permalink
[BugFix] In the 3.1 separated storage and computation version, the us…
Browse files Browse the repository at this point in the history
…e of the "insert into" statement in Flink SQL will lose the data with Delete semantics.
  • Loading branch information
hexiufeng committed May 24, 2024
1 parent cc8689d commit 18032cb
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,21 +424,8 @@ private void validateTableStructure(TableSchema flinkSchema) {
}
// validate primary keys
List<String> primayKeys = new ArrayList<>();
for (int i = 0; i < rows.size(); i++) {
String keysType = rows.get(i).get("COLUMN_KEY").toString();
if (!"PRI".equals(keysType)) {
continue;
}
primayKeys.add(rows.get(i).get("COLUMN_NAME").toString().toLowerCase());
}
flinkSchema.getPrimaryKey().ifPresent(c -> c.getColumns().forEach(colName -> primayKeys.add(colName.toLowerCase())));
if (!primayKeys.isEmpty()) {
if (!constraint.isPresent()) {
throw new IllegalArgumentException("Primary keys not defined in the sink `TableSchema`.");
}
if (constraint.get().getColumns().size() != primayKeys.size() ||
!constraint.get().getColumns().stream().allMatch(col -> primayKeys.contains(col.toLowerCase()))) {
throw new IllegalArgumentException("Primary keys of the flink `TableSchema` do not match with the ones from starrocks table.");
}
sinkOptions.enableUpsertDelete();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,8 @@ public void validateTableStructure(StarRocksSinkOptions sinkOptions, TableSchema
}
// validate primary keys
List<String> primaryKeys = new ArrayList<>();
for (Map<String, Object> row : rows) {
String keysType = row.get("COLUMN_KEY").toString();
if (!"PRI".equals(keysType)) {
continue;
}
primaryKeys.add(row.get("COLUMN_NAME").toString().toLowerCase());
}
flinkSchema.getPrimaryKey().ifPresent(c -> c.getColumns().forEach(colName -> primaryKeys.add(colName.toLowerCase())));
if (!primaryKeys.isEmpty()) {
if (!constraint.isPresent()) {
throw new IllegalArgumentException("Primary keys not defined in the sink `TableSchema`.");
}
if (constraint.get().getColumns().size() != primaryKeys.size() ||
!constraint.get().getColumns().stream().allMatch(col -> primaryKeys.contains(col.toLowerCase()))) {
throw new IllegalArgumentException("Primary keys of the flink `TableSchema` do not match with the ones from starrocks table.");
}
sinkOptions.enableUpsertDelete();
}

Expand Down

0 comments on commit 18032cb

Please sign in to comment.