Skip to content
Permalink
Browse files
[IOTDB-576] Use SessionPool for Flink Connector instead of Session (#982
)

* [IOTDB-576] Use SessionPool for Flink Connector instead of Session
  • Loading branch information
vesense committed Apr 4, 2020
1 parent 385bdcb commit 4e288af514f2b6d802fab685538c06384a338d2a
Showing 4 changed files with 43 additions and 37 deletions.
@@ -21,7 +21,7 @@
import com.google.common.base.Preconditions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,12 +44,13 @@
private IoTDBOptions options;
private IoTSerializationSchema<IN> serializationSchema;
private Map<String, IoTDBOptions.TimeseriesOption> timeseriesOptionMap;
private transient Session session;
private transient SessionPool pool;
private transient ScheduledExecutorService scheduledExecutor;

private int batchSize = 0;
private int flushIntervalMs = 3000;
private List<Event> batchList;
private int sessionPoolSize = 2;

public IoTDBSink(IoTDBOptions options, IoTSerializationSchema<IN> schema) {
this.options = options;
@@ -68,13 +69,12 @@ public void open(Configuration parameters) throws Exception {
}

void initSession() throws Exception {
session = new Session(options.getHost(), options.getPort(), options.getUser(), options.getPassword());
session.open();
pool = new SessionPool(options.getHost(), options.getPort(), options.getUser(), options.getPassword(), sessionPoolSize);

session.setStorageGroup(options.getStorageGroup());
pool.setStorageGroup(options.getStorageGroup());
for (IoTDBOptions.TimeseriesOption option : options.getTimeseriesOptionList()) {
if (!session.checkTimeseriesExists(option.getPath())) {
session.createTimeseries(option.getPath(), option.getDataType(), option.getEncoding(), option.getCompressor());
if (!pool.checkTimeseriesExists(option.getPath())) {
pool.createTimeseries(option.getPath(), option.getDataType(), option.getEncoding(), option.getCompressor());
}
}
}
@@ -93,8 +93,8 @@ void initScheduler() {
}

// for testing
void setSession(Session session) {
this.session = session;
void setSessionPool(SessionPool pool) {
this.pool = pool;
}

@Override
@@ -115,7 +115,7 @@ public void invoke(IN input, Context context) throws Exception {
}

convertText(event.getDevice(), event.getMeasurements(), event.getValues());
session.insert(event.getDevice(), event.getTimestamp(), event.getMeasurements(),
pool.insert(event.getDevice(), event.getTimestamp(), event.getMeasurements(),
event.getValues());
LOG.debug("send event successfully");
}
@@ -132,15 +132,21 @@ public IoTDBSink<IN> withFlushIntervalMs(int flushIntervalMs) {
return this;
}

public IoTDBSink<IN> withSessionPoolSize(int sessionPoolSize) {
Preconditions.checkArgument(sessionPoolSize > 0);
this.sessionPoolSize = sessionPoolSize;
return this;
}

@Override
public void close() throws Exception {
if (session != null) {
if (pool != null) {
try {
flush();
} catch (Exception e) {
LOG.error("flush error", e);
}
session.close();
pool.close();
}
if (scheduledExecutor != null) {
scheduledExecutor.shutdown();
@@ -176,7 +182,7 @@ private void flush() throws Exception {
measurementsList.add(event.getMeasurements());
valuesList.add(event.getValues());
}
session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
pool.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
LOG.debug("send event successfully");
batchList.clear();
}
@@ -19,7 +19,7 @@
package org.apache.iotdb.flink;

import com.google.common.collect.Lists;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.pool.SessionPool;
import org.junit.Before;
import org.junit.Test;

@@ -33,7 +33,7 @@
public class IoTDBSinkBatchInsertTest {

private IoTDBSink ioTDBSink;
private Session session;
private SessionPool pool;

@Before
public void setUp() throws Exception {
@@ -42,8 +42,8 @@ public void setUp() throws Exception {
ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());
ioTDBSink.withBatchSize(3);

session = mock(Session.class);
ioTDBSink.setSession(session);
pool = mock(SessionPool.class);
ioTDBSink.setSessionPool(pool);
}

@Test
@@ -55,7 +55,7 @@ public void testBatchInsert() throws Exception {
tuple.put("values", "36.5");
ioTDBSink.invoke(tuple, null);

verifyZeroInteractions(session);
verifyZeroInteractions(pool);

tuple = new HashMap();
tuple.put("device", "root.sg.D01");
@@ -64,7 +64,7 @@ public void testBatchInsert() throws Exception {
tuple.put("values", "37.2");
ioTDBSink.invoke(tuple, null);

verifyZeroInteractions(session);
verifyZeroInteractions(pool);

tuple = new HashMap();
tuple.put("device", "root.sg.D01");
@@ -73,7 +73,7 @@ public void testBatchInsert() throws Exception {
tuple.put("values", "37.1");
ioTDBSink.invoke(tuple, null);

verify(session).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));
verify(pool).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));

tuple = new HashMap();
tuple.put("device", "root.sg.D01");
@@ -82,7 +82,7 @@ public void testBatchInsert() throws Exception {
tuple.put("values", "36.5");
ioTDBSink.invoke(tuple, null);

verifyZeroInteractions(session);
verifyZeroInteractions(pool);
}

@Test
@@ -93,10 +93,10 @@ public void close() throws Exception {
tuple.put("measurements", "temperature");
tuple.put("values", "36.5");
ioTDBSink.invoke(tuple, null);
verifyZeroInteractions(session);
verifyZeroInteractions(pool);

ioTDBSink.close();
verify(session).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));
verify(session).close();
verify(pool).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));
verify(pool).close();
}
}
@@ -19,7 +19,7 @@
package org.apache.iotdb.flink;

import com.google.common.collect.Lists;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.pool.SessionPool;
import org.junit.Before;
import org.junit.Test;

@@ -33,7 +33,7 @@
public class IoTDBSinkBatchTimerTest {

private IoTDBSink ioTDBSink;
private Session session;
private SessionPool pool;

@Before
public void setUp() throws Exception {
@@ -44,8 +44,8 @@ public void setUp() throws Exception {
ioTDBSink.withFlushIntervalMs(1000);
ioTDBSink.initScheduler();

session = mock(Session.class);
ioTDBSink.setSession(session);
pool = mock(SessionPool.class);
ioTDBSink.setSessionPool(pool);
}

@Test
@@ -59,16 +59,16 @@ public void testBatchInsert() throws Exception {

Thread.sleep(2500);

verify(session).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));
verify(pool).insertInBatch(any(List.class), any(List.class), any(List.class), any(List.class));

Thread.sleep(1000);

verifyZeroInteractions(session);
verifyZeroInteractions(pool);
}

@Test
public void close() throws Exception {
ioTDBSink.close();
verify(session).close();
verify(pool).close();
}
}
@@ -19,7 +19,7 @@
package org.apache.iotdb.flink;

import com.google.common.collect.Lists;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.pool.SessionPool;
import org.junit.Before;
import org.junit.Test;

@@ -34,16 +34,16 @@
public class IoTDBSinkInsertTest {

private IoTDBSink ioTDBSink;
private Session session;
private SessionPool pool;

@Before
public void setUp() throws Exception {
IoTDBOptions options = new IoTDBOptions();
options.setTimeseriesOptionList(Lists.newArrayList(new IoTDBOptions.TimeseriesOption("root.sg.D01.temperature")));
ioTDBSink = new IoTDBSink(options, new DefaultIoTSerializationSchema());

session = mock(Session.class);
ioTDBSink.setSession(session);
pool = mock(SessionPool.class);
ioTDBSink.setSessionPool(pool);
}

@Test
@@ -55,12 +55,12 @@ public void testInsert() throws Exception {
tuple.put("values", "36.5");

ioTDBSink.invoke(tuple, null);
verify(session).insert(any(String.class), any(Long.class), any(List.class), any(List.class));
verify(pool).insert(any(String.class), any(Long.class), any(List.class), any(List.class));
}

@Test
public void close() throws Exception {
ioTDBSink.close();
verify(session).close();
verify(pool).close();
}
}

0 comments on commit 4e288af

Please sign in to comment.