Skip to content

Commit 8d5712c

Browse files
authored
[Improve] Improve StarRocks Sink Config (#4212)
1 parent 6ef28e5 commit 8d5712c

File tree

4 files changed

+6
-12
lines changed

4 files changed

+6
-12
lines changed

seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,7 @@ public static SinkConfig of(ReadonlyConfig config) {
6464
SinkConfig sinkConfig = new SinkConfig();
6565
sinkConfig.setNodeUrls(config.get(StarRocksSinkOptions.NODE_URLS));
6666
sinkConfig.setDatabase(config.get(StarRocksSinkOptions.DATABASE));
67-
sinkConfig.setJdbcUrl(String.format("jdbc:mysql://%s:%d",
68-
sinkConfig.getNodeUrls().get(0).split(":")[0],
69-
config.get(StarRocksSinkOptions.QUERY_PORT)));
67+
sinkConfig.setJdbcUrl(config.get(StarRocksOptions.BASE_URL));
7068
config.getOptional(StarRocksOptions.USERNAME).ifPresent(sinkConfig::setUsername);
7169
config.getOptional(StarRocksOptions.PASSWORD).ifPresent(sinkConfig::setPassword);
7270
config.getOptional(StarRocksSinkOptions.TABLE).ifPresent(sinkConfig::setTable);

seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public interface StarRocksSinkOptions {
2929
Option<List<String>> NODE_URLS = Options.key("nodeUrls")
3030
.listType()
3131
.noDefaultValue()
32-
.withDescription("StarRocks cluster address, the format is [\"fe_ip:fe_http_port\", ...]");
32+
.withDescription("StarRocks cluster http address, the format is [\"fe_ip:fe_http_port\", ...]");
3333

3434
Option<String> LABEL_PREFIX = Options.key("labelPrefix")
3535
.stringType()
@@ -58,11 +58,6 @@ public interface StarRocksSinkOptions {
5858
" \"replication_num\" = \"1\" \n" +
5959
")").withDescription("Create table statement template, used to create StarRocks table");
6060

61-
Option<Integer> QUERY_PORT = Options.key("query_port")
62-
.intType()
63-
.defaultValue(9030)
64-
.withDescription("FE MySQL server port");
65-
6661
Option<Integer> BATCH_MAX_SIZE = Options.key("batch_max_rows")
6762
.intType()
6863
.defaultValue(1024)

seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public String factoryIdentifier() {
4242
public OptionRule optionRule() {
4343
return OptionRule.builder()
4444
.required(StarRocksOptions.USERNAME, StarRocksOptions.PASSWORD)
45-
.required(StarRocksSinkOptions.DATABASE, StarRocksSinkOptions.QUERY_PORT)
45+
.required(StarRocksSinkOptions.DATABASE, StarRocksOptions.BASE_URL)
4646
.required(StarRocksSinkOptions.NODE_URLS)
4747
.optional(StarRocksSinkOptions.TABLE, StarRocksSinkOptions.LABEL_PREFIX, StarRocksSinkOptions.BATCH_MAX_SIZE, StarRocksSinkOptions.BATCH_MAX_BYTES,
4848
StarRocksSinkOptions.BATCH_INTERVAL_MS, StarRocksSinkOptions.MAX_RETRIES, StarRocksSinkOptions.MAX_RETRY_BACKOFF_MS,

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,16 @@ public void triggerBarrier(Barrier barrier) throws Exception {
132132
this.prepareCloseBarrierId.set(barrier.getId());
133133
}
134134
final long barrierId = barrier.getId();
135-
Serializable snapshotState = null;
135+
Serializable snapshotState;
136+
byte[] serialize = null;
136137
synchronized (enumeratorContext) {
137138
if (barrier.snapshot()) {
138139
snapshotState = enumerator.snapshotState(barrierId);
140+
serialize = enumeratorStateSerializer.serialize(snapshotState);
139141
}
140142
sendToAllReader(location -> new BarrierFlowOperation(barrier, location));
141143
}
142144
if (barrier.snapshot()) {
143-
byte[] serialize = enumeratorStateSerializer.serialize(snapshotState);
144145
this.getExecutionContext().sendToMaster(new TaskAcknowledgeOperation(this.taskLocation, (CheckpointBarrier) barrier,
145146
Collections.singletonList(new ActionSubtaskState(source.getId(), -1, Collections.singletonList(serialize)))));
146147
}

0 commit comments

Comments
 (0)