Skip to content

Commit 4561a8a

Browse files
lvyanquanleonardBang
authored andcommitted
[FLINK-34638][cdc-common] Support column with default value
1 parent f06cc1f commit 4561a8a

16 files changed

Lines changed: 174 additions & 30 deletions

File tree

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Column.java

Lines changed: 71 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,46 @@ public abstract class Column implements Serializable {
3838

3939
private static final long serialVersionUID = 1L;
4040

41-
protected static final String FIELD_FORMAT_WITH_DESCRIPTION = "%s %s '%s'";
41+
protected static final String FIELD_FORMAT_WITH_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION =
42+
"%s %s '%s'";
4243

43-
protected static final String FIELD_FORMAT_NO_DESCRIPTION = "%s %s";
44+
protected static final String FIELD_FORMAT_NO_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION =
45+
"%s %s '%s'";
46+
47+
protected static final String FIELD_FORMAT_WITH_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION =
48+
"%s %s '%s' '%s'";
49+
50+
protected static final String FIELD_FORMAT_NO_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION = "%s %s";
4451

4552
protected final String name;
4653

4754
protected final DataType type;
4855

4956
protected final @Nullable String comment;
5057

58+
/**
59+
* Save the literal value of the column's default value, For uncertain functions such as UUID(),
60+
* the value is null, For the current time function such as CURRENT_TIMESTAMP(), the value is
61+
* Unix Epoch time(1970-01-01 00:00:00).
62+
*/
63+
protected final @Nullable String defaultValueExpression;
64+
5165
protected Column(String name, DataType type, @Nullable String comment) {
5266
this.name = name;
5367
this.type = type;
5468
this.comment = comment;
69+
this.defaultValueExpression = null;
70+
}
71+
72+
protected Column(
73+
String name,
74+
DataType type,
75+
@Nullable String comment,
76+
@Nullable String defaultValueExpression) {
77+
this.name = name;
78+
this.type = type;
79+
this.comment = comment;
80+
this.defaultValueExpression = defaultValueExpression;
5581
}
5682

5783
/** Returns the name of this column. */
@@ -69,17 +95,41 @@ public String getComment() {
6995
return comment;
7096
}
7197

98+
@Nullable
99+
public String getDefaultValueExpression() {
100+
return defaultValueExpression;
101+
}
102+
72103
/** Returns a string that summarizes this column for printing to a console. */
73104
public String asSummaryString() {
74105
if (comment == null) {
75-
return String.format(
76-
FIELD_FORMAT_NO_DESCRIPTION, escapeIdentifier(name), type.asSummaryString());
106+
if (defaultValueExpression == null) {
107+
return String.format(
108+
FIELD_FORMAT_NO_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION,
109+
escapeIdentifier(name),
110+
type.asSummaryString());
111+
} else {
112+
return String.format(
113+
FIELD_FORMAT_NO_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION,
114+
escapeIdentifier(name),
115+
type.asSummaryString(),
116+
defaultValueExpression);
117+
}
77118
} else {
78-
return String.format(
79-
FIELD_FORMAT_WITH_DESCRIPTION,
80-
escapeIdentifier(name),
81-
type.asSummaryString(),
82-
escapeSingleQuotes(comment));
119+
if (defaultValueExpression == null) {
120+
return String.format(
121+
FIELD_FORMAT_WITH_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION,
122+
escapeIdentifier(name),
123+
type.asSummaryString(),
124+
escapeSingleQuotes(comment));
125+
} else {
126+
return String.format(
127+
FIELD_FORMAT_WITH_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION,
128+
escapeIdentifier(name),
129+
type.asSummaryString(),
130+
escapeSingleQuotes(comment),
131+
defaultValueExpression);
132+
}
83133
}
84134
}
85135

@@ -103,19 +153,29 @@ public boolean equals(Object o) {
103153
Column column = (Column) o;
104154
return name.equals(column.name)
105155
&& type.equals(column.type)
106-
&& Objects.equals(comment, column.comment);
156+
&& Objects.equals(comment, column.comment)
157+
&& Objects.equals(defaultValueExpression, column.defaultValueExpression);
107158
}
108159

109160
@Override
110161
public int hashCode() {
111-
return Objects.hash(name, type, comment);
162+
return Objects.hash(name, type, comment, defaultValueExpression);
112163
}
113164

114165
@Override
115166
public String toString() {
116167
return asSummaryString();
117168
}
118169

170+
/** Creates a physical column. */
171+
public static PhysicalColumn physicalColumn(
172+
String name,
173+
DataType type,
174+
@Nullable String comment,
175+
@Nullable String defaultValueExpression) {
176+
return new PhysicalColumn(name, type, comment, defaultValueExpression);
177+
}
178+
119179
/** Creates a physical column. */
120180
public static PhysicalColumn physicalColumn(
121181
String name, DataType type, @Nullable String comment) {

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/PhysicalColumn.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,23 @@ public PhysicalColumn(String name, DataType type, @Nullable String comment) {
3232
super(name, type, comment);
3333
}
3434

35+
public PhysicalColumn(
36+
String name, DataType type, @Nullable String comment, @Nullable String defaultValue) {
37+
super(name, type, comment, defaultValue);
38+
}
39+
3540
@Override
3641
public boolean isPhysical() {
3742
return true;
3843
}
3944

4045
@Override
4146
public Column copy(DataType newType) {
42-
return new PhysicalColumn(name, newType, comment);
47+
return new PhysicalColumn(name, newType, comment, defaultValueExpression);
4348
}
4449

4550
@Override
4651
public Column copy(String newName) {
47-
return new PhysicalColumn(newName, type, comment);
52+
return new PhysicalColumn(newName, type, comment, defaultValueExpression);
4853
}
4954
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,21 @@ public Builder physicalColumn(String columnName, DataType type, String comment)
316316
return this;
317317
}
318318

319+
/**
320+
* Declares a physical column that is appended to this schema.
321+
*
322+
* @param columnName column name
323+
* @param type data type of the column
324+
* @param comment description of the column
325+
* @param defaultValue default value of the column
326+
*/
327+
public Builder physicalColumn(
328+
String columnName, DataType type, String comment, String defaultValue) {
329+
checkColumn(columnName, type);
330+
columns.add(Column.physicalColumn(columnName, type, comment, defaultValue));
331+
return this;
332+
}
333+
319334
/**
320335
* Declares a metadata column that is appended to this schema.
321336
*

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,11 @@ private Map<String, FieldSchema> buildFields(Schema schema) {
134134
}
135135
fieldSchemaMap.put(
136136
column.getName(),
137-
new FieldSchema(column.getName(), typeString, column.getComment()));
137+
new FieldSchema(
138+
column.getName(),
139+
typeString,
140+
column.getDefaultValueExpression(),
141+
column.getComment()));
138142
}
139143
return fieldSchemaMap;
140144
}
@@ -170,6 +174,7 @@ private void applyAddColumnEvent(AddColumnEvent event)
170174
new FieldSchema(
171175
column.getName(),
172176
buildTypeString(column.getType()),
177+
column.getDefaultValueExpression(),
173178
column.getComment());
174179
schemaChangeManager.addColumn(
175180
tableId.getSchemaName(), tableId.getTableName(), addFieldSchema);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,10 @@ public void exitAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx)
253253

254254
private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) {
255255
return org.apache.flink.cdc.common.schema.Column.physicalColumn(
256-
dbzColumn.name(), fromDbzColumn(dbzColumn), dbzColumn.comment());
256+
dbzColumn.name(),
257+
fromDbzColumn(dbzColumn),
258+
dbzColumn.comment(),
259+
dbzColumn.defaultValueExpression().orElse(null));
257260
}
258261

259262
private org.apache.flink.cdc.common.event.TableId toCdcTableId(TableId dbzTableId) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,11 @@ private Schema parseDDL(String ddlStatement, TableId tableId) {
205205
if (!column.isOptional()) {
206206
dataType = dataType.notNull();
207207
}
208-
tableBuilder.physicalColumn(colName, dataType, column.comment());
208+
tableBuilder.physicalColumn(
209+
colName,
210+
dataType,
211+
column.comment(),
212+
column.defaultValueExpression().orElse(null));
209213
}
210214

211215
List<String> primaryKey = table.primaryKeyColumnNames();

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
355355
tableId,
356356
Schema.newBuilder()
357357
.physicalColumn("id", DataTypes.INT().notNull())
358-
.physicalColumn("name", DataTypes.VARCHAR(255).notNull())
358+
.physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink")
359359
.physicalColumn("description", DataTypes.VARCHAR(512))
360360
.physicalColumn("weight", DataTypes.FLOAT())
361361
.primaryKey(Collections.singletonList("id"))

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import java.util.ArrayList;
3838
import java.util.List;
3939

40+
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksUtils.toStarRocksDataType;
41+
4042
/** A {@code MetadataApplier} that applies metadata changes to StarRocks. */
4143
public class StarRocksMetadataApplier implements MetadataApplier {
4244

@@ -117,8 +119,9 @@ private void applyAddColumn(AddColumnEvent addColumnEvent) {
117119
new StarRocksColumn.Builder()
118120
.setColumnName(column.getName())
119121
.setOrdinalPosition(-1)
120-
.setColumnComment(column.getComment());
121-
StarRocksUtils.toStarRocksDataType(column, false, builder);
122+
.setColumnComment(column.getComment())
123+
.setDefaultValue(column.getDefaultValueExpression());
124+
toStarRocksDataType(column, false, builder);
122125
addColumns.add(builder.build());
123126
}
124127

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ public static StarRocksTable toStarRocksTable(
8484
new StarRocksColumn.Builder()
8585
.setColumnName(column.getName())
8686
.setOrdinalPosition(i)
87-
.setColumnComment(column.getComment());
87+
.setColumnComment(column.getComment())
88+
.setDefaultValue(column.getDefaultValueExpression());
8889
toStarRocksDataType(column, i < primaryKeyCount, builder);
8990
starRocksColumns.add(builder.build());
9091
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,11 @@ private void registerNewSchema(TableId tableId, Schema newSchema) {
172172
/** Serializer for {@link SchemaManager}. */
173173
public static class Serializer implements SimpleVersionedSerializer<SchemaManager> {
174174

175-
public static final int CURRENT_VERSION = 1;
175+
/**
176+
* Update history: from Version 3.0.0, set to 0, from version 3.1.1, updated to 1, from
177+
* version 3.2.0, updated to 2.
178+
*/
179+
public static final int CURRENT_VERSION = 2;
176180

177181
@Override
178182
public int getVersion() {
@@ -214,6 +218,7 @@ public SchemaManager deserialize(int version, byte[] serialized) throws IOExcept
214218
switch (version) {
215219
case 0:
216220
case 1:
221+
case 2:
217222
TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
218223
SchemaSerializer schemaSerializer = SchemaSerializer.INSTANCE;
219224
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);

0 commit comments

Comments
 (0)