From 44ee9a8ad8ad9031b418d3a2a3ceff87bc236526 Mon Sep 17 00:00:00 2001 From: Bibo <33744252+531651225@users.noreply.github.com> Date: Tue, 18 Oct 2022 17:34:03 +0800 Subject: [PATCH] [Feature][Connector-V2] StarRocks Source & Sink connector (#3060) * [Feature][Connector-V2] StarRocks connector * improve starrocks e2e --- docs/en/connector-v2/sink/Jdbc.md | 1 + docs/en/connector-v2/source/Jdbc.md | 1 + .../seatunnel/jdbc/JdbcStarRocksdbIT.java | 162 ++++++++++++++++++ .../jdbc_starrocks_source_to_sink.conf | 45 +++++ 4 files changed, 209 insertions(+) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_starrocks_source_to_sink.conf diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index 85811b1c457..989e69b2c0d 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -116,6 +116,7 @@ there are some reference value for params above. | phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client | | sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | | oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | oracle.jdbc.xa.OracleXADataSource | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | +| starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | ## Example diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index cd57462019f..0d3ca398188 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -98,6 +98,7 @@ there are some reference value for params above. | phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client | | sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | | oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | +| starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | ## Example diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java new file mode 100644 index 00000000000..90b0629855f --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.jdbc.util.JdbcCompareUtil; + +import org.junit.jupiter.api.Assertions; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +public class JdbcStarRocksdbIT extends AbstractJdbcIT { + + private static final String DOCKER_IMAGE = "d87904488/starrocks-starter:2.2.1"; + private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + private static final String NETWORK_ALIASES = "e2e_starRocksdb"; + private static final int SR_PORT = 9030; + private static final String USERNAME = "root"; + private static final String PASSWORD = ""; + private static final String DATABASE = "test"; + private static final String URL = "jdbc:mysql://" + HOST + ":" + SR_PORT + "/" + DATABASE + "?createDatabaseIfNotExist=true"; + + private static final String SOURCE_TABLE = "e2e_table_source"; + private static final String SINK_TABLE = "e2e_table_sink"; + private static final String SR_DRIVER_JAR = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar"; + private static final String COLUMN_STRING = "BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL"; + private static final String CONFIG_FILE = "/jdbc_starrocks_source_to_sink.conf"; + + private static final String DDL_SOURCE = "create table " + DATABASE + "." + SOURCE_TABLE + " (\n" + + " BIGINT_COL BIGINT,\n" + + " LARGEINT_COL LARGEINT,\n" + + " SMALLINT_COL SMALLINT,\n" + + " TINYINT_COL TINYINT,\n" + + " BOOLEAN_COL BOOLEAN,\n" + + " DECIMAL_COL DECIMAL,\n" + + " DOUBLE_COL DOUBLE,\n" + + " FLOAT_COL FLOAT,\n" + + " INT_COL INT,\n" + + " CHAR_COL CHAR,\n" + + " VARCHAR_11_COL VARCHAR(11),\n" + + " STRING_COL STRING,\n" + + " DATETIME_COL DATETIME,\n" + + " DATE_COL DATE\n" + + ")ENGINE=OLAP\n" + + "DUPLICATE KEY(`BIGINT_COL`)\n" + + "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\",\n" + + "\"in_memory\" = \"false\"," + + "\"in_memory\" = \"false\"," + + "\"storage_format\" = \"DEFAULT\"" + + ")"; + + + private static final String DDL_SINK = "create table " + DATABASE + "." + SINK_TABLE + " (\n" + + " BIGINT_COL BIGINT,\n" + + " LARGEINT_COL LARGEINT,\n" + + " SMALLINT_COL SMALLINT,\n" + + " TINYINT_COL TINYINT,\n" + + " BOOLEAN_COL BOOLEAN,\n" + + " DECIMAL_COL DECIMAL,\n" + + " DOUBLE_COL DOUBLE,\n" + + " FLOAT_COL FLOAT,\n" + + " INT_COL INT,\n" + + " CHAR_COL CHAR,\n" + + " VARCHAR_11_COL VARCHAR(11),\n" + + " STRING_COL STRING,\n" + + " DATETIME_COL DATETIME,\n" + + " DATE_COL DATE\n" + + ")ENGINE=OLAP\n" + + "DUPLICATE KEY(`BIGINT_COL`)\n" + + "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\",\n" + + "\"in_memory\" = \"false\"," + + "\"in_memory\" = \"false\"," + + "\"storage_format\" = \"DEFAULT\"" + + ")"; + + private static final String INIT_DATA_SQL = "insert into " + DATABASE + "." + SOURCE_TABLE + " (\n" + + " BIGINT_COL,\n" + + " LARGEINT_COL,\n" + + " SMALLINT_COL,\n" + + " TINYINT_COL,\n" + + " BOOLEAN_COL,\n" + + " DECIMAL_COL,\n" + + " DOUBLE_COL,\n" + + " FLOAT_COL,\n" + + " INT_COL,\n" + + " CHAR_COL,\n" + + " VARCHAR_11_COL,\n" + + " STRING_COL,\n" + + " DATETIME_COL,\n" + + " DATE_COL\n" + + ")values(\n" + + "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" + + ")"; + + @Override + JdbcCase getJdbcCase() { + Map containerEnv = new HashMap<>(); + String jdbcUrl = String.format(URL, SR_PORT); + return JdbcCase.builder().dockerImage(DOCKER_IMAGE).networkAliases(NETWORK_ALIASES).containerEnv(containerEnv).driverClass(DRIVER_CLASS) + .host(HOST).port(SR_PORT).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE) + .sourceTable(SOURCE_TABLE).sinkTable(SINK_TABLE).driverJar(SR_DRIVER_JAR) + .ddlSource(DDL_SOURCE).ddlSink(DDL_SINK).initDataSql(INIT_DATA_SQL).configFile(CONFIG_FILE).seaTunnelRow(initTestData()).build(); + } + + @Override + void compareResult() throws SQLException, IOException { + assertHasData(SOURCE_TABLE); + assertHasData(SINK_TABLE); + JdbcCompareUtil.compare(jdbcConnection, String.format("select * from %s.%s limit 1", DATABASE, SOURCE_TABLE), + String.format("select * from %s.%s limit 1", DATABASE, SINK_TABLE), COLUMN_STRING); + clearSinkTable(); + } + + @Override + void clearSinkTable() { + try (Statement statement = jdbcConnection.createStatement()) { + statement.execute(String.format("TRUNCATE TABLE %s", DATABASE + "." + SINK_TABLE)); + } catch (SQLException e) { + throw new RuntimeException("test starrocks server image error", e); + } + } + + @Override + SeaTunnelRow initTestData() { + return new SeaTunnelRow( + new Object[]{1234, 1123456, 12, 1, 0, 2222243.2222243, 2222243.22222, 1.22222, 12, "a", "VARCHAR_COL", "STRING_COL", "2022-08-13 17:35:59", "2022-08-13"}); + } + + private void assertHasData(String table) { + try (Statement statement = jdbcConnection.createStatement()) { + String sql = String.format("select * from %s.%s limit 1", DATABASE, table); + ResultSet source = statement.executeQuery(sql); + Assertions.assertTrue(source.next()); + } catch (SQLException e) { + throw new RuntimeException("test starrocks server image error", e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_starrocks_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_starrocks_source_to_sink.conf new file mode 100644 index 00000000000..056cf47db6b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_starrocks_source_to_sink.conf @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = com.mysql.cj.jdbc.Driver + url = "jdbc:mysql://e2e_starRocksdb:9030" + user = root + password = "" + query = "select BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL from `test`.`e2e_table_source`" + } +} + +transform { + +} + +sink { + Jdbc { + driver = com.mysql.cj.jdbc.Driver + url = "jdbc:mysql://e2e_starRocksdb:9030" + user = root + password = "" + query = "INSERT INTO `test`.`e2e_table_sink` (BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)" + } +}