Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added SharedReplacingMergeTree support #582

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sink-connector-lightweight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<maven.compiler.target>17</maven.compiler.target>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.debezium>2.5.0.Beta1</version.debezium>
<version.debezium>2.7.0.Alpha1</version.debezium>
<version.junit>5.9.1</version.junit>
<version.testcontainers>1.19.1</version.testcontainers>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,36 @@ public void exitPartitionFunctionList(MySqlParser.PartitionFunctionListContext p

}

@Override
public void enterPartitionSystemVersion(MySqlParser.PartitionSystemVersionContext partitionSystemVersionContext) {

}

@Override
public void exitPartitionSystemVersion(MySqlParser.PartitionSystemVersionContext partitionSystemVersionContext) {

}

@Override
public void enterPartitionSystemVersionDefinitions(MySqlParser.PartitionSystemVersionDefinitionsContext partitionSystemVersionDefinitionsContext) {

}

@Override
public void exitPartitionSystemVersionDefinitions(MySqlParser.PartitionSystemVersionDefinitionsContext partitionSystemVersionDefinitionsContext) {

}

@Override
public void enterPartitionSystemVersionDefinition(MySqlParser.PartitionSystemVersionDefinitionContext partitionSystemVersionDefinitionContext) {

}

@Override
public void exitPartitionSystemVersionDefinition(MySqlParser.PartitionSystemVersionDefinitionContext partitionSystemVersionDefinitionContext) {

}

@Override
public void enterSubPartitionFunctionHash(MySqlParser.SubPartitionFunctionHashContext subPartitionFunctionHashContext) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.clickhouse.data.ClickHouseDataType;
import io.debezium.antlr.DataTypeResolver;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.mysql.MySqlValueConverters;
import io.debezium.connector.mysql.jdbc.MySqlValueConverters;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
Expand Down Expand Up @@ -36,7 +36,6 @@ public static ClickHouseDataType convert(String columnName, MySqlParser.DataType
JdbcValueConverters.BigIntUnsignedMode.LONG,
CommonConnectorConfig.BinaryHandlingMode.BYTES);


DataType dataType = initializeDataTypeResolver().resolveDataType(columnDefChild);
//DataType dataType = MySqlParser.dataTypeResolver.resolveDataType(dataTypeContext);
Column column = Column.editor().name(columnName).type(dataType.name()).jdbcType(dataType.jdbcType()).length((int) dataType.length()).scale(dataType.scale()).create();
Expand All @@ -50,7 +49,8 @@ public static String convertToString(String columnName, int scale, int precision
JdbcValueConverters.DecimalMode.PRECISE,
TemporalPrecisionMode.ADAPTIVE,
JdbcValueConverters.BigIntUnsignedMode.LONG,
CommonConnectorConfig.BinaryHandlingMode.BYTES
CommonConnectorConfig.BinaryHandlingMode.BYTES,
x ->x, CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN
);


Expand Down
2 changes: 1 addition & 1 deletion sink-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>2.5.0.Beta1</version>
<version>2.7.0.Alpha1</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public static ClickHouseConnection createConnection(String url, String clientNam
try {
Properties properties = new Properties();
properties.setProperty("client_name", clientName);
properties.setProperty("custom_settings", "allow_experimental_object_type=1,insert_allow_materialized_columns=1");
properties.setProperty("custom_settings", "insert_allow_materialized_columns=1");

if(!jdbcParams.isEmpty()) {
log.info("**** JDBC PARAMS from configuration:" + jdbcParams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public enum TABLE_ENGINE {
REPLACING_MERGE_TREE("ReplacingMergeTree"),

REPLICATED_REPLACING_MERGE_TREE("ReplicatedReplacingMergeTree"),

SHARED_REPLACING_MERGE_TREE("SharedReplacingMergeTree"),
MERGE_TREE("MergeTree"),

DEFAULT("default");
Expand Down Expand Up @@ -139,6 +139,7 @@ public MutablePair<TABLE_ENGINE, String> getTableEngineUsingShowTable(ClickHouse

public static final String REPLACING_MERGE_TREE_VERSION_WITH_IS_DELETED = "23.2";
public static final String REPLICATED_REPLACING_MERGE_TREE_VER_PREFIX = "ReplicatedReplacingMergeTree(";
public static final String SHARED_REPLACING_MERGE_TREE_VER_PREFIX = "SharedReplacingMergeTree(";
/**
* Function to extract the sign column for CollapsingMergeTree
* @param createDML
Expand Down Expand Up @@ -177,6 +178,17 @@ public String getVersionColumnForReplacingMergeTree(String createDML) {
}
}
}
else if (createDML.contains(TABLE_ENGINE.SHARED_REPLACING_MERGE_TREE.getEngine())) {
String parameters = StringUtils.substringBetween(createDML, SHARED_REPLACING_MERGE_TREE_VER_PREFIX, ")");
if(parameters != null) {
String[] parameterArray = parameters.split(",");
if(parameterArray != null && parameterArray.length == 3) {
versionColumn = parameterArray[2].trim();
} else if(parameterArray != null && parameterArray.length == 4) {
versionColumn = parameterArray[2].trim() + "," + parameterArray[3].trim();
}
}
}
else if(createDML.contains(TABLE_ENGINE.REPLACING_MERGE_TREE.getEngine())) {
if(createDML != null && createDML.indexOf("(") != -1 && createDML.indexOf(")") != -1) {
String subString = StringUtils.substringBetween(createDML, REPLACING_MERGE_TREE_VER_PREFIX, ")");
Expand Down
Loading