Skip to content

Commit

Permalink
[Fix][StarRocks] Fix NPE when upstream catalogtable table path only h…
Browse files Browse the repository at this point in the history
…ave table name part (#6540)
  • Loading branch information
liunaijie committed Mar 29, 2024
1 parent 505c125 commit 5795b26
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,15 @@ public TableSink createSink(TableSinkFactoryContext context) {
String sinkDatabaseName = sinkConfig.getDatabase();
String sinkTableName = sinkConfig.getTable();
// to replace
String finalDatabaseName =
sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY, sourceDatabaseName);
sinkDatabaseName =
sinkDatabaseName.replace(
REPLACE_DATABASE_NAME_KEY,
sourceDatabaseName != null ? sourceDatabaseName : "");
String finalTableName = this.replaceFullTableName(sinkTableName, tableId);
// rebuild TableIdentifier and catalogTable
TableIdentifier newTableId =
TableIdentifier.of(
tableId.getCatalogName(), finalDatabaseName, null, finalTableName);
tableId.getCatalogName(), sinkDatabaseName, null, finalTableName);
catalogTable =
CatalogTable.of(
newTableId,
Expand All @@ -107,7 +109,7 @@ public TableSink createSink(TableSinkFactoryContext context) {
CatalogTable finalCatalogTable = catalogTable;
// reset
sinkConfig.setTable(finalTableName);
sinkConfig.setDatabase(finalDatabaseName);
sinkConfig.setDatabase(sinkDatabaseName);
return () -> new StarRocksSink(sinkConfig, finalCatalogTable, context.getOptions());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,31 @@ public class StarRocksIT extends TestSuiteBase implements TestResource {
+ "\"storage_format\" = \"DEFAULT\""
+ ")";

private static final String DDL_FAKE_SINK_TABLE =
"create table "
+ DATABASE
+ "."
+ "fake_table_sink"
+ " (\n"
+ " id BIGINT,\n"
+ " c_string STRING,\n"
+ " c_boolean BOOLEAN,\n"
+ " c_tinyint TINYINT,\n"
+ " c_int INT,\n"
+ " c_bigint BIGINT,\n"
+ " c_float FLOAT,\n"
+ " c_double DOUBLE,\n"
+ " c_decimal Decimal(2, 1),\n"
+ " c_date DATE\n"
+ ")ENGINE=OLAP\n"
+ "DUPLICATE KEY(`id`)\n"
+ "DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
+ "PROPERTIES (\n"
+ "\"replication_num\" = \"1\",\n"
+ "\"in_memory\" = \"false\","
+ "\"storage_format\" = \"DEFAULT\""
+ ")";

private static final String INIT_DATA_SQL =
"insert into "
+ DATABASE
Expand Down Expand Up @@ -253,6 +278,13 @@ public void testStarRocksSink(TestContainer container)
}
}

@TestTemplate
public void testSinkWithCatalogTableNameOnly(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/fake-to-starrocks.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}

private void initializeJdbcConnection()
throws SQLException, ClassNotFoundException, MalformedURLException,
InstantiationException, IllegalAccessException {
Expand All @@ -274,7 +306,7 @@ private void initializeJdbcTable() {
// create source table
statement.execute(DDL_SOURCE);
// create sink table
// statement.execute(DDL_SINK);
statement.execute(DDL_FAKE_SINK_TABLE);
} catch (SQLException e) {
throw new RuntimeException("Initializing table failed!", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
parallelism = 1
result_table_name = "fake"
row.num = 100
schema {
table = "FakeTable"
columns = [
{
name = id
type = bigint
nullable = false
defaultValue = 0
},
{
name = c_string
type = string
nullable = true
},
{
name = c_boolean
type = boolean
nullable = true
},
{
name = c_tinyint
type = tinyint
nullable = true
},
{
name = c_int
type = int
nullable = true
},
{
name = c_bigint
type = bigint
nullable = true
},
{
name = c_float
type = float
nullable = true
},
{
name = c_double
type = double
nullable = true
},
{
name = c_decimal
type = "decimal(2, 1)"
nullable = true
},
{
name = c_date
type = date
nullable = true
}
]
}
}
}

transform {
}

sink {
StarRocks {
source_table_name = "fake"
nodeUrls = ["starrocks_e2e:8030"]
username = root
password = ""
database = "test"
table = "fake_table_sink"
batch_max_rows = 100
max_retries = 3
base-url="jdbc:mysql://starrocks_e2e:9030/test"
starrocks.config = {
format = "JSON"
strip_outer_array = true
}
}
}

0 comments on commit 5795b26

Please sign in to comment.