Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35172]DDL statement is added to the Schema Change Event #3245

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,35 @@ public final class AddColumnEvent implements SchemaChangeEvent {

private final List<ColumnWithPosition> addedColumns;

private String ddlContent;

public AddColumnEvent(TableId tableId, List<ColumnWithPosition> addedColumns) {
this.tableId = tableId;
this.addedColumns = addedColumns;
}

public AddColumnEvent(
TableId tableId, List<ColumnWithPosition> addedColumns, String ddlContent) {
this.tableId = tableId;
this.addedColumns = addedColumns;
this.ddlContent = ddlContent;
}

/** Returns the added columns. */
public List<ColumnWithPosition> getAddedColumns() {
return addedColumns;
}

@Override
public String getDdlContent() {
return this.ddlContent;
}

@Override
public void setDdlContent(String ddlContent) {
this.ddlContent = ddlContent;
}

/** relative Position of column. */
public enum ColumnPosition implements Serializable {
BEFORE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,35 @@ public class AlterColumnTypeEvent implements SchemaChangeEvent {
/** key => column name, value => column type after changing. */
private final Map<String, DataType> typeMapping;

private String ddlContent;

public AlterColumnTypeEvent(TableId tableId, Map<String, DataType> typeMapping) {
this.tableId = tableId;
this.typeMapping = typeMapping;
}

public AlterColumnTypeEvent(
TableId tableId, Map<String, DataType> typeMapping, String ddlContent) {
this.tableId = tableId;
this.typeMapping = typeMapping;
this.ddlContent = ddlContent;
}

/** Returns the type mapping. */
public Map<String, DataType> getTypeMapping() {
return typeMapping;
}

@Override
public String getDdlContent() {
return ddlContent;
}

@Override
public void setDdlContent(String ddlContent) {
this.ddlContent = ddlContent;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,34 @@ public class CreateTableEvent implements SchemaChangeEvent {

private final Schema schema;

private String ddlContent;

public CreateTableEvent(TableId tableId, Schema schema) {
this.tableId = tableId;
this.schema = schema;
}

public CreateTableEvent(TableId tableId, Schema schema, String ddlContent) {
this.tableId = tableId;
this.schema = schema;
this.ddlContent = ddlContent;
}

/** Returns the table schema. */
public Schema getSchema() {
return schema;
}

@Override
public String getDdlContent() {
return ddlContent;
}

@Override
public void setDdlContent(String ddlContent) {
this.ddlContent = ddlContent;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,33 @@ public class DropColumnEvent implements SchemaChangeEvent {

private final List<String> droppedColumnNames;

private String ddlContent;

public DropColumnEvent(TableId tableId, List<String> droppedColumnNames) {
this.tableId = tableId;
this.droppedColumnNames = droppedColumnNames;
}

public DropColumnEvent(TableId tableId, List<String> droppedColumnNames, String ddlContent) {
this.tableId = tableId;
this.droppedColumnNames = droppedColumnNames;
this.ddlContent = ddlContent;
}

public List<String> getDroppedColumnNames() {
return droppedColumnNames;
}

@Override
public String getDdlContent() {
return ddlContent;
}

@Override
public void setDdlContent(String ddlContent) {
this.ddlContent = ddlContent;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,34 @@ public class RenameColumnEvent implements SchemaChangeEvent {
/** key => column name before changing, value => column name after changing. */
private final Map<String, String> nameMapping;

private String ddlContent;

public RenameColumnEvent(TableId tableId, Map<String, String> nameMapping) {
this.tableId = tableId;
this.nameMapping = nameMapping;
}

public RenameColumnEvent(TableId tableId, Map<String, String> nameMapping, String ddlContent) {
this.tableId = tableId;
this.nameMapping = nameMapping;
this.ddlContent = ddlContent;
}

/** Returns the name mapping. */
public Map<String, String> getNameMapping() {
return nameMapping;
}

@Override
public String getDdlContent() {
return ddlContent;
}

@Override
public void setDdlContent(String ddlContent) {
this.ddlContent = ddlContent;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@
* system, such as CREATE, DROP, RENAME and so on.
*/
@PublicEvolving
public interface SchemaChangeEvent extends ChangeEvent, Serializable {}
public interface SchemaChangeEvent extends ChangeEvent, Serializable {

/** original ddl statement. */
default String getDdlContent() {
return null;
}

default void setDdlContent(String ddlContent) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,32 @@ public static SchemaChangeEvent recreateSchemaChangeEvent(
SchemaChangeEvent schemaChangeEvent, TableId tableId) {
if (schemaChangeEvent instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent;
return new CreateTableEvent(tableId, createTableEvent.getSchema());
return new CreateTableEvent(
tableId, createTableEvent.getSchema(), createTableEvent.getDdlContent());
}
if (schemaChangeEvent instanceof AlterColumnTypeEvent) {
AlterColumnTypeEvent alterColumnTypeEvent = (AlterColumnTypeEvent) schemaChangeEvent;
return new AlterColumnTypeEvent(tableId, alterColumnTypeEvent.getTypeMapping());
return new AlterColumnTypeEvent(
tableId,
alterColumnTypeEvent.getTypeMapping(),
alterColumnTypeEvent.getDdlContent());
}
if (schemaChangeEvent instanceof RenameColumnEvent) {
RenameColumnEvent renameColumnEvent = (RenameColumnEvent) schemaChangeEvent;
return new RenameColumnEvent(tableId, renameColumnEvent.getNameMapping());
return new RenameColumnEvent(
tableId, renameColumnEvent.getNameMapping(), renameColumnEvent.getDdlContent());
}
if (schemaChangeEvent instanceof DropColumnEvent) {
DropColumnEvent dropColumnEvent = (DropColumnEvent) schemaChangeEvent;
return new DropColumnEvent(tableId, dropColumnEvent.getDroppedColumnNames());
return new DropColumnEvent(
tableId,
dropColumnEvent.getDroppedColumnNames(),
dropColumnEvent.getDdlContent());
}
if (schemaChangeEvent instanceof AddColumnEvent) {
AddColumnEvent addColumnEvent = (AddColumnEvent) schemaChangeEvent;
return new AddColumnEvent(tableId, addColumnEvent.getAddedColumns());
return new AddColumnEvent(
tableId, addColumnEvent.getAddedColumns(), addColumnEvent.getDdlContent());
}
throw new UnsupportedOperationException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -126,7 +127,13 @@ public byte[] serialize(Event event) {
jsonSerializers.put(
schemaChangeEvent.tableId(),
new TableSchemaInfo(schema, jsonSerializer, zoneId));
return null;

StringBuilder builder = new StringBuilder();
TableId tableId = schemaChangeEvent.tableId();
builder.append("{\"databaseName\": \"").append(tableId.getSchemaName()).append("\", ");
builder.append("\"schemaName\": null,");
builder.append("\"ddl\": \"").append(schemaChangeEvent.getDdlContent()).append("\"}");
return builder.toString().getBytes(StandardCharsets.UTF_8);
}

DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
Expand Down Expand Up @@ -94,10 +93,7 @@ public ProducerRecord<byte[], byte[]> serialize(
Event event, KafkaSinkContext context, Long timestamp) {
ChangeEvent changeEvent = (ChangeEvent) event;
final byte[] valueSerialized = valueSerialization.serialize(event);
if (event instanceof SchemaChangeEvent) {
// skip sending SchemaChangeEvent.
return null;
}

String topic = unifiedTopic == null ? changeEvent.tableId().toString() : unifiedTopic;
RecordHeaders recordHeaders = new RecordHeaders();
if (addTableToHeaderEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public void testSerialize() throws Exception {
.primaryKey("col1")
.build();
CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, schema);
createTableEvent.setDdlContent("create table demos {}");
JsonNode expected =
mapper.readTree(
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}");
JsonNode actual = mapper.readTree(serializationSchema.serialize(createTableEvent));
Assertions.assertEquals(expected, actual);

Assertions.assertNull(serializationSchema.serialize(createTableEvent));
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
Expand All @@ -75,10 +82,10 @@ public void testSerialize() throws Exception {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
}));
JsonNode expected =
expected =
mapper.readTree(
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}");
JsonNode actual = mapper.readTree(serializationSchema.serialize(insertEvent1));
actual = mapper.readTree(serializationSchema.serialize(insertEvent1));
Assertions.assertEquals(expected, actual);
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ protected List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord rec
historyRecord.document().getString(HistoryRecord.Fields.DDL_STATEMENTS);
customParser.setCurrentDatabase(databaseName);
customParser.parse(ddl, tables);
return customParser.getAndClearParsedEvents();
List<SchemaChangeEvent> changeEvents = customParser.getAndClearParsedEvents();
changeEvents.forEach(event -> event.setDdlContent(ddl));
return changeEvents;
} catch (IOException e) {
throw new IllegalStateException("Failed to parse the schema change : " + record, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,8 @@ public MySqlPipelineRecordEmitter(
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
List<TableId> capturedTableIds = listTables(jdbc, sourceConfig.getTableFilters());
for (TableId tableId : capturedTableIds) {
Schema schema = getSchema(jdbc, tableId);
createTableEventCache.add(
new CreateTableEvent(
org.apache.flink.cdc.common.event.TableId.tableId(
tableId.catalog(), tableId.table()),
schema));
CreateTableEvent event = buildCreateTableEvent(jdbc, tableId);
createTableEventCache.add(event);
}
} catch (SQLException e) {
throw new RuntimeException("Cannot start emitter to fetch table schema.", e);
Expand Down Expand Up @@ -122,26 +118,34 @@ protected void processElement(

private void sendCreateTableEvent(
JdbcConnection jdbc, TableId tableId, SourceOutput<Event> output) {
Schema schema = getSchema(jdbc, tableId);
output.collect(
new CreateTableEvent(
org.apache.flink.cdc.common.event.TableId.tableId(
tableId.catalog(), tableId.table()),
schema));
CreateTableEvent event = buildCreateTableEvent(jdbc, tableId);
output.collect(event);
}

private Schema getSchema(JdbcConnection jdbc, TableId tableId) {
private CreateTableEvent buildCreateTableEvent(JdbcConnection jdbc, TableId tableId) {
String ddlStatement = showCreateTable(jdbc, tableId);

Schema schema;
try {
return parseDDL(ddlStatement, tableId);
schema = parseDDL(ddlStatement, tableId);
} catch (ParsingException pe) {
LOG.warn(
"Failed to parse DDL: \n{}\nWill try parsing by describing table.",
ddlStatement,
pe);

ddlStatement = describeTable(jdbc, tableId);
schema = parseDDL(ddlStatement, tableId);
}
ddlStatement = describeTable(jdbc, tableId);
return parseDDL(ddlStatement, tableId);

CreateTableEvent event =
new CreateTableEvent(
org.apache.flink.cdc.common.event.TableId.tableId(
tableId.catalog(), tableId.table()),
schema);
event.setDdlContent(ddlStatement);

return event;
}

private String showCreateTable(JdbcConnection jdbc, TableId tableId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public CreateTableEvent processCreateTableEvent(CreateTableEvent createTableEven
List<Column> allColumnList = transformProjection.getAllColumnList();
// add the column of projection into Schema
Schema schema = createTableEvent.getSchema().copy(allColumnList);
return new CreateTableEvent(createTableEvent.tableId(), schema);
return new CreateTableEvent(
createTableEvent.tableId(), schema, createTableEvent.getDdlContent());
}

public void processSchemaChangeEvent(Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableE
createTableEvent =
new CreateTableEvent(
tableId,
transformSchemaMetaData(
createTableEvent.getSchema(), transform.f1));
transformSchemaMetaData(createTableEvent.getSchema(), transform.f1),
createTableEvent.getDdlContent());
}
}

Expand Down
Loading
Loading