Skip to content

Commit

Permalink
[FLINK-35243][base] Support more schema change events & PreSchema bac…
Browse files Browse the repository at this point in the history
…kfill
  • Loading branch information
yuxiqian committed May 9, 2024
1 parent fa6e7ea commit f22c3a4
Show file tree
Hide file tree
Showing 50 changed files with 2,857 additions and 246 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.event;

import org.apache.flink.cdc.common.schema.Schema;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A {@link SchemaChangeEvent} that represents an {@code ALTER COLUMN} DDL, which may contain the
* comment changes.
*/
public class AlterColumnCommentEvent implements SchemaChangeEventWithPreSchema {

private static final long serialVersionUID = 1L;

private final TableId tableId;

/** key => column name, value => column type after changing. */
private final Map<String, String> commentMapping;

private final Map<String, String> oldCommentMapping;

public AlterColumnCommentEvent(TableId tableId, Map<String, String> commentMapping) {
this.tableId = tableId;
this.commentMapping = commentMapping;
this.oldCommentMapping = new HashMap<>();
}

public AlterColumnCommentEvent(
TableId tableId,
Map<String, String> commentMapping,
Map<String, String> oldCommentMapping) {
this.tableId = tableId;
this.commentMapping = commentMapping;
this.oldCommentMapping = oldCommentMapping;
}

/** Returns the type mapping. */
public Map<String, String> getCommentMapping() {
return commentMapping;
}

public Map<String, String> getOldCommentMapping() {
return oldCommentMapping;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof AlterColumnCommentEvent)) {
return false;
}
AlterColumnCommentEvent that = (AlterColumnCommentEvent) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(commentMapping, that.commentMapping)
&& Objects.equals(oldCommentMapping, that.oldCommentMapping);
}

@Override
public int hashCode() {
return Objects.hash(tableId, commentMapping, oldCommentMapping);
}

@Override
public String toString() {
if (hasPreSchema()) {
return "AlterColumnCommentEvent{"
+ "tableId="
+ tableId
+ ", commentMapping="
+ commentMapping
+ ", oldCommentMapping="
+ oldCommentMapping
+ '}';
} else {
return "AlterColumnCommentEvent{"
+ "tableId="
+ tableId
+ ", commentMapping="
+ commentMapping
+ '}';
}
}

@Override
public TableId tableId() {
return tableId;
}

@Override
public boolean hasPreSchema() {
return !oldCommentMapping.isEmpty();
}

@Override
public void fillPreSchema(Schema oldTypeSchema) {
oldCommentMapping.clear();
oldTypeSchema.getColumns().stream()
.filter(e -> commentMapping.containsKey(e.getName()))
.forEach(e -> oldCommentMapping.put(e.getName(), e.getComment()));
}

@Override
public boolean trimRedundantChanges() {
if (hasPreSchema()) {
Set<String> redundantlyChangedColumns =
commentMapping.keySet().stream()
.filter(
e ->
Objects.equals(
commentMapping.get(e),
oldCommentMapping.get(e)))
.collect(Collectors.toSet());

// Remove redundant alter column type records that doesn't really change the type
commentMapping.keySet().removeAll(redundantlyChangedColumns);
oldCommentMapping.keySet().removeAll(redundantlyChangedColumns);
}
return !commentMapping.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@

package org.apache.flink.cdc.common.event;

import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A {@link SchemaChangeEvent} that represents an {@code ALTER COLUMN} DDL, which may contain the
* lenient column type changes.
*/
public class AlterColumnTypeEvent implements SchemaChangeEvent {
public class AlterColumnTypeEvent implements SchemaChangeEventWithPreSchema {

private static final long serialVersionUID = 1L;

Expand All @@ -35,9 +40,21 @@ public class AlterColumnTypeEvent implements SchemaChangeEvent {
/** key => column name, value => column type after changing. */
private final Map<String, DataType> typeMapping;

private final Map<String, DataType> oldTypeMapping;

public AlterColumnTypeEvent(TableId tableId, Map<String, DataType> typeMapping) {
this.tableId = tableId;
this.typeMapping = typeMapping;
this.oldTypeMapping = new HashMap<>();
}

public AlterColumnTypeEvent(
TableId tableId,
Map<String, DataType> typeMapping,
Map<String, DataType> oldTypeMapping) {
this.tableId = tableId;
this.typeMapping = typeMapping;
this.oldTypeMapping = oldTypeMapping;
}

/** Returns the type mapping. */
Expand All @@ -55,26 +72,71 @@ public boolean equals(Object o) {
}
AlterColumnTypeEvent that = (AlterColumnTypeEvent) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(typeMapping, that.typeMapping);
&& Objects.equals(typeMapping, that.typeMapping)
&& Objects.equals(oldTypeMapping, that.oldTypeMapping);
}

@Override
public int hashCode() {
return Objects.hash(tableId, typeMapping);
return Objects.hash(tableId, typeMapping, oldTypeMapping);
}

@Override
public String toString() {
return "AlterColumnTypeEvent{"
+ "tableId="
+ tableId
+ ", nameMapping="
+ typeMapping
+ '}';
if (hasPreSchema()) {
return "AlterColumnTypeEvent{"
+ "tableId="
+ tableId
+ ", typeMapping="
+ typeMapping
+ ", oldTypeMapping="
+ oldTypeMapping
+ '}';
} else {
return "AlterColumnTypeEvent{"
+ "tableId="
+ tableId
+ ", typeMapping="
+ typeMapping
+ '}';
}
}

@Override
public TableId tableId() {
return tableId;
}

public Map<String, DataType> getOldTypeMapping() {
return oldTypeMapping;
}

@Override
public boolean hasPreSchema() {
return !oldTypeMapping.isEmpty();
}

@Override
public void fillPreSchema(Schema oldTypeSchema) {
oldTypeMapping.clear();
oldTypeMapping.putAll(
oldTypeSchema.getColumns().stream()
.filter(e -> typeMapping.containsKey(e.getName()) && e.getType() != null)
.collect(Collectors.toMap(Column::getName, Column::getType)));
}

@Override
public boolean trimRedundantChanges() {
if (hasPreSchema()) {
Set<String> redundantlyChangedColumns =
typeMapping.keySet().stream()
.filter(e -> Objects.equals(typeMapping.get(e), oldTypeMapping.get(e)))
.collect(Collectors.toSet());

// Remove redundant alter column type records that doesn't really change the type
typeMapping.keySet().removeAll(redundantlyChangedColumns);
oldTypeMapping.keySet().removeAll(redundantlyChangedColumns);
}
return !typeMapping.isEmpty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.event;

import java.util.Objects;

/** A {@link SchemaChangeEvent} that represents an {@code ALTER TABLE COMMENT = ...} DDL. */
public class AlterTableCommentEvent implements SchemaChangeEvent {
private static final long serialVersionUID = 1L;

private final TableId tableId;

/** key => column name, value => column type after changing. */
private final String tableComment;

public AlterTableCommentEvent(TableId tableId, String tableComment) {
this.tableId = tableId;
this.tableComment = tableComment;
}

public String getTableComment() {
return tableComment;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof AlterTableCommentEvent)) {
return false;
}
AlterTableCommentEvent that = (AlterTableCommentEvent) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(tableComment, that.tableComment);
}

@Override
public int hashCode() {
return Objects.hash(tableId, tableComment);
}

@Override
public String toString() {
return "AlterTableCommentEvent{"
+ "tableId="
+ tableId
+ ", tableComment="
+ tableComment
+ '}';
}

@Override
public TableId tableId() {
return tableId;
}
}

0 comments on commit f22c3a4

Please sign in to comment.