Skip to content
Permalink
Browse files
Remove path registration in IoTDBSink (#3620)
  • Loading branch information
yuyuankang committed Jul 23, 2021
1 parent 72799f3 commit 469f78b9d43dca18edbe42188128893e27d41d91
Showing 1 changed file with 1 addition and 25 deletions.
@@ -19,8 +19,6 @@
package org.apache.iotdb.flink;

import org.apache.iotdb.flink.options.IoTDBSinkOptions;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -78,36 +76,14 @@ public void open(Configuration parameters) throws Exception {
initScheduler();
}

void initSession() throws Exception {
void initSession() {
pool =
new SessionPool(
options.getHost(),
options.getPort(),
options.getUser(),
options.getPassword(),
sessionPoolSize);

try {
pool.setStorageGroup(options.getStorageGroup());
} catch (StatementExecutionException e) {
if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
throw e;
}
}

for (IoTDBSinkOptions.TimeseriesOption option : options.getTimeseriesOptionList()) {
if (!pool.checkTimeseriesExists(option.getPath())) {
try {
pool.createTimeseries(
option.getPath(), option.getDataType(), option.getEncoding(), option.getCompressor());
} catch (StatementExecutionException e) {
// path could have been created by the other process here
if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
throw e;
}
}
}
}
}

void initScheduler() {

0 comments on commit 469f78b

Please sign in to comment.