From f35a087bcf3c1bb5f4f9fbca4eea7d6541d3247e Mon Sep 17 00:00:00 2001 From: "J.J. Liu" Date: Wed, 7 Jul 2021 09:09:16 +0800 Subject: [PATCH 1/7] [IOTDB-1463] Implement builder pattern for Session and SessionPool (#3502) * Implement builder pattern for Session and SessionPool * Modify tests * Fix * Change ut name * Change default params for SessionPol * Make more params optional * Fix test bug * Add docs * Fix spotless * Remove zoneId * Remove zoneId * Remove zoneId in docs --- .../API/Programming-Java-Native-API.md | 20 ++- .../API/Programming-Java-Native-API.md | 18 ++- .../java/org/apache/iotdb/SessionExample.java | 3 +- .../org/apache/iotdb/SessionPoolExample.java | 9 +- .../java/org/apache/iotdb/session/Config.java | 4 + .../org/apache/iotdb/session/Session.java | 65 ++++++++ .../iotdb/session/pool/SessionPool.java | 142 +++++++++++++++--- .../{SessionUT.java => SessionTest.java} | 25 ++- .../iotdb/session/pool/SessionPoolTest.java | 26 ++++ 9 files changed, 284 insertions(+), 28 deletions(-) rename session/src/test/java/org/apache/iotdb/session/{SessionUT.java => SessionTest.java} (92%) diff --git a/docs/UserGuide/API/Programming-Java-Native-API.md b/docs/UserGuide/API/Programming-Java-Native-API.md index a09a067f2ac1..bfa20798673c 100644 --- a/docs/UserGuide/API/Programming-Java-Native-API.md +++ b/docs/UserGuide/API/Programming-Java-Native-API.md @@ -56,11 +56,21 @@ Here we show the commonly used interfaces and their parameters in the Native API * Initialize a Session ```java -Session(String host, int rpcPort) - -Session(String host, String rpcPort, String username, String password) - -Session(String host, int rpcPort, String username, String password) + // use default configuration + session = new Session.Builder.build(); + + // configure all fields + session = + new Session.Builder() + .host(String host) + .port(int port) + .fetchSize(int fetchSize) + .username(String username) + .password(String password) + .thriftDefaultBufferSize(int thriftDefaultBufferSize) + .thriftMaxFrameSize(int thriftMaxFrameSize) + .enableCacheLeader(boolean enableCacheLeader) + .build(); ``` * Open a Session diff --git a/docs/zh/UserGuide/API/Programming-Java-Native-API.md b/docs/zh/UserGuide/API/Programming-Java-Native-API.md index db91d1328378..7bc6cabba6f2 100644 --- a/docs/zh/UserGuide/API/Programming-Java-Native-API.md +++ b/docs/zh/UserGuide/API/Programming-Java-Native-API.md @@ -59,9 +59,21 @@ mvn clean install -pl session -am -Dmaven.test.skip=true * 初始化Session ```java -Session(String host, int rpcPort) -Session(String host, String rpcPort, String username, String password) -Session(String host, int rpcPort, String username, String password) + // 全部使用默认配置 + session = new Session.Builder.build(); + + // 自行配置参数 + session = + new Session.Builder() + .host(String host) + .port(int port) + .fetchSize(int fetchSize) + .username(String username) + .password(String password) + .thriftDefaultBufferSize(int thriftDefaultBufferSize) + .thriftMaxFrameSize(int thriftMaxFrameSize) + .enableCacheLeader(boolean enableCacheLeader) + .build(); ``` * 开启Session diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index a705c24ee322..a8294f13de45 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -53,7 +53,8 @@ public class SessionExample { public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { - session = new Session(LOCAL_HOST, 6667, "root", "root"); + session = + new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").build(); session.open(false); // set session fetchSize diff --git a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java index d2e4977cb24f..230849d25d1e 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java @@ -37,7 +37,14 @@ public class SessionPoolExample { public static void main(String[] args) throws StatementExecutionException, IoTDBConnectionException, InterruptedException { - pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3); + pool = + new SessionPool.Builder() + .host("127.0.0.1") + .port(6667) + .user("root") + .password("root") + .maxSize(3) + .build(); service = Executors.newFixedThreadPool(10); insertRecord(); diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/session/src/main/java/org/apache/iotdb/session/Config.java index 02e7e708658e..7b1a6364aa67 100644 --- a/session/src/main/java/org/apache/iotdb/session/Config.java +++ b/session/src/main/java/org/apache/iotdb/session/Config.java @@ -20,6 +20,8 @@ public class Config { + public static final String DEFAULT_HOST = "localhost"; + public static final int DEFAULT_PORT = 6667; public static final String DEFAULT_USER = "root"; public static final String DEFAULT_PASSWORD = "root"; public static final int DEFAULT_FETCH_SIZE = 5000; @@ -34,4 +36,6 @@ public class Config { /** thrift max frame size (16384000 bytes by default), we change it to 64MB */ public static final int DEFAULT_MAX_FRAME_SIZE = 67108864; + + public static final int DEFAULT_SESSION_POOL_MAX_SIZE = 5; } diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index 1b2f8125eb61..cfe11a76be7a 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -1839,4 +1839,69 @@ public boolean isEnableQueryRedirection() { public void setEnableQueryRedirection(boolean enableQueryRedirection) { this.enableQueryRedirection = enableQueryRedirection; } + + public static class Builder { + private String host = Config.DEFAULT_HOST; + private int rpcPort = Config.DEFAULT_PORT; + private String username = Config.DEFAULT_USER; + private String password = Config.DEFAULT_PASSWORD; + private int fetchSize = Config.DEFAULT_FETCH_SIZE; + private ZoneId zoneId = null; + private int thriftDefaultBufferSize = Config.DEFAULT_INITIAL_BUFFER_CAPACITY; + private int thriftMaxFrameSize = Config.DEFAULT_MAX_FRAME_SIZE; + private boolean enableCacheLeader = Config.DEFAULT_CACHE_LEADER_MODE; + + public Builder host(String host) { + this.host = host; + return this; + } + + public Builder port(int port) { + this.rpcPort = port; + return this; + } + + public Builder username(String username) { + this.username = username; + return this; + } + + public Builder password(String password) { + this.password = password; + return this; + } + + public Builder fetchSize(int fetchSize) { + this.fetchSize = fetchSize; + return this; + } + + public Builder thriftDefaultBufferSize(int thriftDefaultBufferSize) { + this.thriftDefaultBufferSize = thriftDefaultBufferSize; + return this; + } + + public Builder thriftMaxFrameSize(int thriftMaxFrameSize) { + this.thriftMaxFrameSize = thriftMaxFrameSize; + return this; + } + + public Builder enableCacheLeader(boolean enableCacheLeader) { + this.enableCacheLeader = enableCacheLeader; + return this; + } + + public Session build() { + return new Session( + host, + rpcPort, + username, + password, + fetchSize, + zoneId, + thriftDefaultBufferSize, + thriftMaxFrameSize, + enableCacheLeader); + } + } } diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java index 79a539df5d22..fe8cb25d770d 100644 --- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java +++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java @@ -70,7 +70,7 @@ public class SessionPool { private ConcurrentMap occupied = new ConcurrentHashMap<>(); private int size = 0; private int maxSize = 0; - private String ip; + private String host; private int port; private String user; private String password; @@ -83,9 +83,9 @@ public class SessionPool { private boolean closed; // whether the queue is closed. - public SessionPool(String ip, int port, String user, String password, int maxSize) { + public SessionPool(String host, int port, String user, String password, int maxSize) { this( - ip, + host, port, user, password, @@ -98,9 +98,9 @@ public SessionPool(String ip, int port, String user, String password, int maxSiz } public SessionPool( - String ip, int port, String user, String password, int maxSize, boolean enableCompression) { + String host, int port, String user, String password, int maxSize, boolean enableCompression) { this( - ip, + host, port, user, password, @@ -113,7 +113,7 @@ public SessionPool( } public SessionPool( - String ip, + String host, int port, String user, String password, @@ -121,7 +121,7 @@ public SessionPool( boolean enableCompression, boolean enableCacheLeader) { this( - ip, + host, port, user, password, @@ -134,9 +134,9 @@ public SessionPool( } public SessionPool( - String ip, int port, String user, String password, int maxSize, ZoneId zoneId) { + String host, int port, String user, String password, int maxSize, ZoneId zoneId) { this( - ip, + host, port, user, password, @@ -150,7 +150,7 @@ public SessionPool( @SuppressWarnings("squid:S107") public SessionPool( - String ip, + String host, int port, String user, String password, @@ -161,7 +161,7 @@ public SessionPool( ZoneId zoneId, boolean enableCacheLeader) { this.maxSize = maxSize; - this.ip = ip; + this.host = host; this.port = port; this.user = user; this.password = password; @@ -198,9 +198,9 @@ private Session getSession() throws IoTDBConnectionException { if (canCreate) { // create a new one. if (logger.isDebugEnabled()) { - logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password); + logger.debug("Create a new Session {}, {}, {}, {}", host, port, user, password); } - session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader); + session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader); try { session.open(enableCompression); // avoid someone has called close() the session pool @@ -244,7 +244,7 @@ private Session getSession() throws IoTDBConnectionException { logger.warn( "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}", (System.currentTimeMillis() - start) / 1000, - ip, + host, port, user, password); @@ -255,7 +255,7 @@ private Session getSession() throws IoTDBConnectionException { size); if (System.currentTimeMillis() - start > timeout) { throw new IoTDBConnectionException( - String.format("timeout to get a connection from %s:%s", ip, port)); + String.format("timeout to get a connection from %s:%s", host, port)); } } } catch (InterruptedException e) { @@ -337,7 +337,7 @@ public void closeResultSet(SessionDataSetWrapper wrapper) { @SuppressWarnings({"squid:S2446"}) private synchronized void removeSession() { - logger.warn("Remove a broken Session {}, {}, {}", ip, port, user); + logger.warn("Remove a broken Session {}, {}, {}", host, port, user); size--; // we do not need to notifyAll as any waited thread can continue to work after waked up. this.notify(); @@ -365,7 +365,7 @@ private void cleanSessionAndMayThrowConnectionException( throw new IoTDBConnectionException( String.format( "retry to execute statement on %s:%s failed %d times: %s", - ip, port, RETRY, e.getMessage()), + host, port, RETRY, e.getMessage()), e); } } @@ -1161,4 +1161,112 @@ public SessionDataSetWrapper executeRawDataQuery(List paths, long startT // never go here return null; } + + public int getMaxSize() { + return maxSize; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getUser() { + return user; + } + + public String getPassword() { + return password; + } + + public int getFetchSize() { + return fetchSize; + } + + public long getTimeout() { + return timeout; + } + + public boolean isEnableCompression() { + return enableCompression; + } + + public boolean isEnableCacheLeader() { + return enableCacheLeader; + } + + public static class Builder { + private String host = Config.DEFAULT_HOST; + private int port = Config.DEFAULT_PORT; + private int maxSize = Config.DEFAULT_SESSION_POOL_MAX_SIZE; + private String user = Config.DEFAULT_USER; + private String password = Config.DEFAULT_PASSWORD; + private int fetchSize = Config.DEFAULT_FETCH_SIZE; + private long timeout = 60_000; + private boolean enableCompression = false; + private ZoneId zoneId = null; + private boolean enableCacheLeader = Config.DEFAULT_CACHE_LEADER_MODE; + + public Builder host(String host) { + this.host = host; + return this; + } + + public Builder port(int port) { + this.port = port; + return this; + } + + public Builder maxSize(int maxSize) { + this.maxSize = maxSize; + return this; + } + + public Builder user(String user) { + this.user = user; + return this; + } + + public Builder password(String password) { + this.password = password; + return this; + } + + public Builder fetchSize(int fetchSize) { + this.fetchSize = fetchSize; + return this; + } + + public Builder timeout(long timeout) { + this.timeout = timeout; + return this; + } + + public Builder enableCompression(boolean enableCompression) { + this.enableCompression = enableCompression; + return this; + } + + public Builder enableCacheLeader(boolean enableCacheLeader) { + this.enableCacheLeader = enableCacheLeader; + return this; + } + + public SessionPool build() { + return new SessionPool( + host, + port, + user, + password, + maxSize, + fetchSize, + timeout, + enableCompression, + zoneId, + enableCacheLeader); + } + } } diff --git a/session/src/test/java/org/apache/iotdb/session/SessionUT.java b/session/src/test/java/org/apache/iotdb/session/SessionTest.java similarity index 92% rename from session/src/test/java/org/apache/iotdb/session/SessionUT.java rename to session/src/test/java/org/apache/iotdb/session/SessionTest.java index 8ec1e2ef3953..290c6be6fec9 100644 --- a/session/src/test/java/org/apache/iotdb/session/SessionUT.java +++ b/session/src/test/java/org/apache/iotdb/session/SessionTest.java @@ -43,9 +43,10 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class SessionUT { +public class SessionTest { private Session session; @@ -244,4 +245,26 @@ public void createDeviceTemplate() throws IoTDBConnectionException, StatementExe "template1", schemaNames, measurementList, dataTypeList, encodingList, compressionTypes); session.setSchemaTemplate("template1", "root.sg.1"); } + + @Test + public void testBuilder() { + session = + new Session.Builder() + .host("localhost") + .port(1234) + .fetchSize(1) + .username("abc") + .password("123456") + .thriftDefaultBufferSize(2) + .thriftMaxFrameSize(3) + .enableCacheLeader(true) + .build(); + + assertEquals(session.fetchSize, 1); + assertEquals(session.username, "abc"); + assertEquals(session.password, "123456"); + assertEquals(session.thriftDefaultBufferSize, 2); + assertEquals(session.thriftMaxFrameSize, 3); + assertTrue(session.enableCacheLeader); + } } diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java index 6b7d68fe4acc..9a5c4a331eac 100644 --- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java +++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java @@ -361,4 +361,30 @@ public void testClose() { // e.g., thread A created a new session, but not returned; thread B close the pool; A get the // session. } + + @Test + public void testBuilder() { + SessionPool pool = + new SessionPool.Builder() + .host("localhost") + .port(1234) + .maxSize(10) + .user("abc") + .password("123") + .fetchSize(1) + .timeout(2) + .enableCacheLeader(true) + .enableCompression(true) + .build(); + + assertEquals(pool.getHost(), "localhost"); + assertEquals(pool.getPort(), 1234); + assertEquals(pool.getUser(), "abc"); + assertEquals(pool.getPassword(), "123"); + assertEquals(pool.getMaxSize(), 10); + assertEquals(pool.getFetchSize(), 1); + assertEquals(pool.getTimeout(), 2); + assertTrue(pool.isEnableCacheLeader()); + assertTrue(pool.isEnableCompression()); + } } From 6f67a81cf418f18f87084fb869db1816ce6e92ec Mon Sep 17 00:00:00 2001 From: Hang Ji <55303647+ijihang@users.noreply.github.com> Date: Wed, 7 Jul 2021 14:56:20 +0800 Subject: [PATCH 2/7] [IOTDB-1453]Fix result set when the server query time filtered is And (#3452) --- .../query/fill/ClusterPreviousFill.java | 2 +- .../server/member/MetaGroupMember.java | 5 +- .../iotdb/cluster/utils/PartitionUtils.java | 237 ------------------ .../server/member/MetaGroupMemberTest.java | 6 +- .../iotdb/db/query/executor/QueryRouter.java | 11 + .../iotdb/db/utils/TimeValuePairUtils.java | 232 +++++++++++++++++ .../integration/IoTDBSequenceDataQueryIT.java | 40 +++ 7 files changed, 290 insertions(+), 243 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java index 08689e7efef5..e0f6053b6bae 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java @@ -32,12 +32,12 @@ import org.apache.iotdb.cluster.server.handlers.caller.PreviousFillHandler; import org.apache.iotdb.cluster.server.member.DataGroupMember; import org.apache.iotdb.cluster.server.member.MetaGroupMember; -import org.apache.iotdb.cluster.utils.PartitionUtils.Intervals; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.executor.fill.PreviousFill; +import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index be1fc7d860a3..b78d388608d3 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -83,7 +83,6 @@ import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.iotdb.cluster.utils.ClusterUtils; import org.apache.iotdb.cluster.utils.PartitionUtils; -import org.apache.iotdb.cluster.utils.PartitionUtils.Intervals; import org.apache.iotdb.cluster.utils.StatusUtils; import org.apache.iotdb.cluster.utils.nodetool.function.Status; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -96,6 +95,8 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.TestOnly; +import org.apache.iotdb.db.utils.TimeValuePairUtils; +import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals; import org.apache.iotdb.service.rpc.thrift.EndPoint; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.read.filter.basic.Filter; @@ -1474,7 +1475,7 @@ private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, RaftNode */ public List routeFilter(Filter filter, PartialPath path) throws StorageEngineException, EmptyIntervalException { - Intervals intervals = PartitionUtils.extractTimeInterval(filter); + Intervals intervals = TimeValuePairUtils.extractTimeInterval(filter); if (intervals.isEmpty()) { throw new EmptyIntervalException(filter); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java index 85cf424bf3eb..6c4cf9d1e86c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java @@ -41,21 +41,8 @@ import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan; import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan; import org.apache.iotdb.service.rpc.thrift.TSStatus; -import org.apache.iotdb.tsfile.read.filter.GroupByFilter; -import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeEq; -import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGt; -import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGtEq; -import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeIn; -import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeLt; -import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeLtEq; -import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeNotEq; -import org.apache.iotdb.tsfile.read.filter.basic.Filter; -import org.apache.iotdb.tsfile.read.filter.operator.AndFilter; -import org.apache.iotdb.tsfile.read.filter.operator.NotFilter; -import org.apache.iotdb.tsfile.read.filter.operator.OrFilter; import org.apache.iotdb.tsfile.utils.Murmur128Hash; -import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -154,230 +141,6 @@ public static void reordering(InsertTabletPlan plan, TSStatus[] status, TSStatus } } - public static Intervals extractTimeInterval(Filter filter) { - if (filter == null) { - return Intervals.ALL_INTERVAL; - } - // and, or, not, value, time, group by - // eq, neq, gt, gteq, lt, lteq, in - if (filter instanceof AndFilter) { - AndFilter andFilter = ((AndFilter) filter); - Intervals leftIntervals = extractTimeInterval(andFilter.getLeft()); - Intervals rightIntervals = extractTimeInterval(andFilter.getRight()); - return leftIntervals.intersection(rightIntervals); - } else if (filter instanceof OrFilter) { - OrFilter orFilter = ((OrFilter) filter); - Intervals leftIntervals = extractTimeInterval(orFilter.getLeft()); - Intervals rightIntervals = extractTimeInterval(orFilter.getRight()); - return leftIntervals.union(rightIntervals); - } else if (filter instanceof NotFilter) { - NotFilter notFilter = ((NotFilter) filter); - return extractTimeInterval(notFilter.getFilter()).not(); - } else if (filter instanceof TimeGt) { - TimeGt timeGt = ((TimeGt) filter); - return new Intervals(((long) timeGt.getValue()) + 1, Long.MAX_VALUE); - } else if (filter instanceof TimeGtEq) { - TimeGtEq timeGtEq = ((TimeGtEq) filter); - return new Intervals(((long) timeGtEq.getValue()), Long.MAX_VALUE); - } else if (filter instanceof TimeEq) { - TimeEq timeEq = ((TimeEq) filter); - return new Intervals(((long) timeEq.getValue()), ((long) timeEq.getValue())); - } else if (filter instanceof TimeNotEq) { - TimeNotEq timeNotEq = ((TimeNotEq) filter); - Intervals intervals = new Intervals(); - intervals.addInterval(Long.MIN_VALUE, (long) timeNotEq.getValue() - 1); - intervals.addInterval((long) timeNotEq.getValue() + 1, Long.MAX_VALUE); - return intervals; - } else if (filter instanceof TimeLt) { - TimeLt timeLt = ((TimeLt) filter); - return new Intervals(Long.MIN_VALUE, (long) timeLt.getValue() - 1); - } else if (filter instanceof TimeLtEq) { - TimeLtEq timeLtEq = ((TimeLtEq) filter); - return new Intervals(Long.MIN_VALUE, (long) timeLtEq.getValue()); - } else if (filter instanceof TimeIn) { - TimeIn timeIn = ((TimeIn) filter); - Intervals intervals = new Intervals(); - for (Object value : timeIn.getValues()) { - long time = ((long) value); - intervals.addInterval(time, time); - } - return intervals; - } else if (filter instanceof GroupByFilter) { - GroupByFilter groupByFilter = ((GroupByFilter) filter); - return new Intervals(groupByFilter.getStartTime(), groupByFilter.getEndTime() + 1); - } - // value filter - return Intervals.ALL_INTERVAL; - } - - /** All intervals are closed. */ - public static class Intervals extends ArrayList { - - static final Intervals ALL_INTERVAL = new Intervals(Long.MIN_VALUE, Long.MAX_VALUE); - - public Intervals() { - super(); - } - - Intervals(long lowerBound, long upperBound) { - super(); - addInterval(lowerBound, upperBound); - } - - public int getIntervalSize() { - return size() / 2; - } - - public long getLowerBound(int index) { - return get(index * 2); - } - - public long getUpperBound(int index) { - return get(index * 2 + 1); - } - - void setLowerBound(int index, long lb) { - set(index * 2, lb); - } - - void setUpperBound(int index, long ub) { - set(index * 2 + 1, ub); - } - - public void addInterval(long lowerBound, long upperBound) { - add(lowerBound); - add(upperBound); - } - - Intervals intersection(Intervals that) { - Intervals result = new Intervals(); - int thisSize = this.getIntervalSize(); - int thatSize = that.getIntervalSize(); - for (int i = 0; i < thisSize; i++) { - for (int j = 0; j < thatSize; j++) { - long thisLB = this.getLowerBound(i); - long thisUB = this.getUpperBound(i); - long thatLB = that.getLowerBound(i); - long thatUB = that.getUpperBound(i); - if (thisUB >= thatLB) { - if (thisUB <= thatUB) { - result.addInterval(Math.max(thisLB, thatLB), thisUB); - } else if (thisLB <= thatUB) { - result.addInterval(Math.max(thisLB, thatLB), thatUB); - } - } - } - } - return result; - } - - /** - * The union is implemented by merge, so the two intervals must be ordered. - * - * @param that - * @return - */ - Intervals union(Intervals that) { - if (this.isEmpty()) { - return that; - } else if (that.isEmpty()) { - return this; - } - Intervals result = new Intervals(); - - int thisSize = this.getIntervalSize(); - int thatSize = that.getIntervalSize(); - int thisIndex = 0; - int thatIndex = 0; - // merge the heads of the two intervals - while (thisIndex < thisSize && thatIndex < thatSize) { - long thisLB = this.getLowerBound(thisIndex); - long thisUB = this.getUpperBound(thisIndex); - long thatLB = that.getLowerBound(thatIndex); - long thatUB = that.getUpperBound(thatIndex); - if (thisLB <= thatLB) { - result.mergeLast(thisLB, thisUB); - thisIndex++; - } else { - result.mergeLast(thatLB, thatUB); - thatIndex++; - } - } - // merge the remaining intervals - Intervals remainingIntervals = thisIndex < thisSize ? this : that; - int remainingIndex = thisIndex < thisSize ? thisIndex : thatIndex; - mergeRemainingIntervals(remainingIndex, remainingIntervals, result); - - return result; - } - - private void mergeRemainingIntervals( - int remainingIndex, Intervals remainingIntervals, Intervals result) { - for (int i = remainingIndex; i < remainingIntervals.getIntervalSize(); i++) { - long lb = remainingIntervals.getLowerBound(i); - long ub = remainingIntervals.getUpperBound(i); - result.mergeLast(lb, ub); - } - } - - /** - * Merge an interval of [lowerBound, upperBound] with the last interval if they can be merged, - * or just add it as the last interval if its lowerBound is larger than the upperBound of the - * last interval. If the upperBound of the new interval is less than the lowerBound of the last - * interval, nothing will be done. - * - * @param lowerBound - * @param upperBound - */ - private void mergeLast(long lowerBound, long upperBound) { - if (getIntervalSize() == 0) { - addInterval(lowerBound, upperBound); - return; - } - int lastIndex = getIntervalSize() - 1; - long lastLB = getLowerBound(lastIndex); - long lastUB = getUpperBound(lastIndex); - if (lowerBound > lastUB + 1) { - // e.g., last [3, 5], new [7, 10], just add the new interval - addInterval(lowerBound, upperBound); - return; - } - if (upperBound < lastLB - 1) { - // e.g., last [7, 10], new [3, 5], do nothing - return; - } - // merge the new interval into the last one - setLowerBound(lastIndex, Math.min(lastLB, lowerBound)); - setUpperBound(lastIndex, Math.max(lastUB, upperBound)); - } - - public Intervals not() { - if (isEmpty()) { - return ALL_INTERVAL; - } - Intervals result = new Intervals(); - long firstLB = getLowerBound(0); - if (firstLB != Long.MIN_VALUE) { - result.addInterval(Long.MIN_VALUE, firstLB - 1); - } - - int intervalSize = getIntervalSize(); - for (int i = 0; i < intervalSize - 1; i++) { - long currentUB = getUpperBound(i); - long nextLB = getLowerBound(i + 1); - if (currentUB + 1 <= nextLB - 1) { - result.addInterval(currentUB + 1, nextLB - 1); - } - } - - long lastUB = getUpperBound(result.getIntervalSize() - 1); - if (lastUB != Long.MAX_VALUE) { - result.addInterval(lastUB + 1, Long.MAX_VALUE); - } - return result; - } - } - /** * Calculate the headers of the groups that possibly store the data of a timeseries over the given * time range. diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java index e0a28fe20fd0..88ada819560f 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java @@ -74,7 +74,6 @@ import org.apache.iotdb.cluster.server.service.MetaAsyncService; import org.apache.iotdb.cluster.utils.ClusterUtils; import org.apache.iotdb.cluster.utils.Constants; -import org.apache.iotdb.cluster.utils.PartitionUtils; import org.apache.iotdb.cluster.utils.StatusUtils; import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.auth.authorizer.IAuthorizer; @@ -100,6 +99,7 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.utils.TimeValuePairUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -1379,7 +1379,7 @@ public void testRouteIntervalsDisablePartition() StorageEngine.setEnablePartition(false); testMetaMember.setCharacter(LEADER); testMetaMember.setLeader(testMetaMember.getThisNode()); - PartitionUtils.Intervals intervals = new PartitionUtils.Intervals(); + TimeValuePairUtils.Intervals intervals = new TimeValuePairUtils.Intervals(); intervals.addInterval(Long.MIN_VALUE, Long.MAX_VALUE); List partitionGroups = @@ -1395,7 +1395,7 @@ public void testRouteIntervalsEnablePartition() StorageEngine.setEnablePartition(true); testMetaMember.setCharacter(LEADER); testMetaMember.setLeader(testMetaMember.getThisNode()); - PartitionUtils.Intervals intervals = new PartitionUtils.Intervals(); + TimeValuePairUtils.Intervals intervals = new TimeValuePairUtils.Intervals(); intervals.addInterval(Long.MIN_VALUE, Long.MAX_VALUE); List partitionGroups = diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java index 29254e72c339..6c5310255e3c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.query.dataset.groupby.GroupByWithoutValueFilterDataSet; import org.apache.iotdb.db.query.executor.fill.IFill; import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.utils.TimeValuePairUtils; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.expression.ExpressionType; @@ -47,6 +48,8 @@ import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer; import org.apache.iotdb.tsfile.read.filter.GroupByFilter; import org.apache.iotdb.tsfile.read.filter.GroupByMonthFilter; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.query.dataset.EmptyDataSet; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.slf4j.Logger; @@ -97,6 +100,14 @@ public QueryDataSet rawDataQuery(RawDataQueryPlan queryPlan, QueryContext contex throw new QueryProcessException(e); } return rawDataQueryExecutor.executeWithValueFilter(context); + } else if (optimizedExpression != null + && optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) { + Filter timeFilter = ((GlobalTimeExpression) queryPlan.getExpression()).getFilter(); + TimeValuePairUtils.Intervals intervals = TimeValuePairUtils.extractTimeInterval(timeFilter); + if (intervals.isEmpty()) { + logger.warn("The interval of the filter {} is empty.", timeFilter); + return new EmptyDataSet(); + } } // Currently, we only group the vector partial paths for raw query without value filter diff --git a/server/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java index ba560a6c6565..1df6be093b7b 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java @@ -22,6 +22,12 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.filter.GroupByFilter; +import org.apache.iotdb.tsfile.read.filter.TimeFilter; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.filter.operator.AndFilter; +import org.apache.iotdb.tsfile.read.filter.operator.NotFilter; +import org.apache.iotdb.tsfile.read.filter.operator.OrFilter; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsBinary; @@ -31,6 +37,8 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsInt; import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsLong; +import java.util.ArrayList; + public class TimeValuePairUtils { private TimeValuePairUtils() {} @@ -107,4 +115,228 @@ public static TimeValuePair getEmptyTimeValuePair(TSDataType dataType) { throw new UnsupportedOperationException("Unrecognized datatype: " + dataType); } } + + public static Intervals extractTimeInterval(Filter filter) { + if (filter == null) { + return Intervals.ALL_INTERVAL; + } + // and, or, not, value, time, group by + // eq, neq, gt, gteq, lt, lteq, in + if (filter instanceof AndFilter) { + AndFilter andFilter = ((AndFilter) filter); + Intervals leftIntervals = extractTimeInterval(andFilter.getLeft()); + Intervals rightIntervals = extractTimeInterval(andFilter.getRight()); + return leftIntervals.intersection(rightIntervals); + } else if (filter instanceof OrFilter) { + OrFilter orFilter = ((OrFilter) filter); + Intervals leftIntervals = extractTimeInterval(orFilter.getLeft()); + Intervals rightIntervals = extractTimeInterval(orFilter.getRight()); + return leftIntervals.union(rightIntervals); + } else if (filter instanceof NotFilter) { + NotFilter notFilter = ((NotFilter) filter); + return extractTimeInterval(notFilter.getFilter()).not(); + } else if (filter instanceof TimeFilter.TimeGt) { + TimeFilter.TimeGt timeGt = ((TimeFilter.TimeGt) filter); + return new Intervals(((long) timeGt.getValue()) + 1, Long.MAX_VALUE); + } else if (filter instanceof TimeFilter.TimeGtEq) { + TimeFilter.TimeGtEq timeGtEq = ((TimeFilter.TimeGtEq) filter); + return new Intervals(((long) timeGtEq.getValue()), Long.MAX_VALUE); + } else if (filter instanceof TimeFilter.TimeEq) { + TimeFilter.TimeEq timeEq = ((TimeFilter.TimeEq) filter); + return new Intervals(((long) timeEq.getValue()), ((long) timeEq.getValue())); + } else if (filter instanceof TimeFilter.TimeNotEq) { + TimeFilter.TimeNotEq timeNotEq = ((TimeFilter.TimeNotEq) filter); + Intervals intervals = new Intervals(); + intervals.addInterval(Long.MIN_VALUE, (long) timeNotEq.getValue() - 1); + intervals.addInterval((long) timeNotEq.getValue() + 1, Long.MAX_VALUE); + return intervals; + } else if (filter instanceof TimeFilter.TimeLt) { + TimeFilter.TimeLt timeLt = ((TimeFilter.TimeLt) filter); + return new Intervals(Long.MIN_VALUE, (long) timeLt.getValue() - 1); + } else if (filter instanceof TimeFilter.TimeLtEq) { + TimeFilter.TimeLtEq timeLtEq = ((TimeFilter.TimeLtEq) filter); + return new Intervals(Long.MIN_VALUE, (long) timeLtEq.getValue()); + } else if (filter instanceof TimeFilter.TimeIn) { + TimeFilter.TimeIn timeIn = ((TimeFilter.TimeIn) filter); + Intervals intervals = new Intervals(); + for (Object value : timeIn.getValues()) { + long time = ((long) value); + intervals.addInterval(time, time); + } + return intervals; + } else if (filter instanceof GroupByFilter) { + GroupByFilter groupByFilter = ((GroupByFilter) filter); + return new Intervals(groupByFilter.getStartTime(), groupByFilter.getEndTime() + 1); + } + // value filter + return Intervals.ALL_INTERVAL; + } + + /** All intervals are closed. */ + public static class Intervals extends ArrayList { + + static final Intervals ALL_INTERVAL = new Intervals(Long.MIN_VALUE, Long.MAX_VALUE); + + public Intervals() { + super(); + } + + Intervals(long lowerBound, long upperBound) { + super(); + addInterval(lowerBound, upperBound); + } + + public int getIntervalSize() { + return size() / 2; + } + + public long getLowerBound(int index) { + return get(index * 2); + } + + public long getUpperBound(int index) { + return get(index * 2 + 1); + } + + void setLowerBound(int index, long lb) { + set(index * 2, lb); + } + + void setUpperBound(int index, long ub) { + set(index * 2 + 1, ub); + } + + public void addInterval(long lowerBound, long upperBound) { + add(lowerBound); + add(upperBound); + } + + Intervals intersection(Intervals that) { + Intervals result = new Intervals(); + int thisSize = this.getIntervalSize(); + int thatSize = that.getIntervalSize(); + for (int i = 0; i < thisSize; i++) { + for (int j = 0; j < thatSize; j++) { + long thisLB = this.getLowerBound(i); + long thisUB = this.getUpperBound(i); + long thatLB = that.getLowerBound(i); + long thatUB = that.getUpperBound(i); + if (thisUB >= thatLB) { + if (thisUB <= thatUB) { + result.addInterval(Math.max(thisLB, thatLB), thisUB); + } else if (thisLB <= thatUB) { + result.addInterval(Math.max(thisLB, thatLB), thatUB); + } + } + } + } + return result; + } + + /** + * The union is implemented by merge, so the two intervals must be ordered. + * + * @param that + * @return + */ + Intervals union(Intervals that) { + if (this.isEmpty()) { + return that; + } else if (that.isEmpty()) { + return this; + } + Intervals result = new Intervals(); + + int thisSize = this.getIntervalSize(); + int thatSize = that.getIntervalSize(); + int thisIndex = 0; + int thatIndex = 0; + // merge the heads of the two intervals + while (thisIndex < thisSize && thatIndex < thatSize) { + long thisLB = this.getLowerBound(thisIndex); + long thisUB = this.getUpperBound(thisIndex); + long thatLB = that.getLowerBound(thatIndex); + long thatUB = that.getUpperBound(thatIndex); + if (thisLB <= thatLB) { + result.mergeLast(thisLB, thisUB); + thisIndex++; + } else { + result.mergeLast(thatLB, thatUB); + thatIndex++; + } + } + // merge the remaining intervals + Intervals remainingIntervals = thisIndex < thisSize ? this : that; + int remainingIndex = thisIndex < thisSize ? thisIndex : thatIndex; + mergeRemainingIntervals(remainingIndex, remainingIntervals, result); + + return result; + } + + private void mergeRemainingIntervals( + int remainingIndex, Intervals remainingIntervals, Intervals result) { + for (int i = remainingIndex; i < remainingIntervals.getIntervalSize(); i++) { + long lb = remainingIntervals.getLowerBound(i); + long ub = remainingIntervals.getUpperBound(i); + result.mergeLast(lb, ub); + } + } + + /** + * Merge an interval of [lowerBound, upperBound] with the last interval if they can be merged, + * or just add it as the last interval if its lowerBound is larger than the upperBound of the + * last interval. If the upperBound of the new interval is less than the lowerBound of the last + * interval, nothing will be done. + * + * @param lowerBound + * @param upperBound + */ + private void mergeLast(long lowerBound, long upperBound) { + if (getIntervalSize() == 0) { + addInterval(lowerBound, upperBound); + return; + } + int lastIndex = getIntervalSize() - 1; + long lastLB = getLowerBound(lastIndex); + long lastUB = getUpperBound(lastIndex); + if (lowerBound > lastUB + 1) { + // e.g., last [3, 5], new [7, 10], just add the new interval + addInterval(lowerBound, upperBound); + return; + } + if (upperBound < lastLB - 1) { + // e.g., last [7, 10], new [3, 5], do nothing + return; + } + // merge the new interval into the last one + setLowerBound(lastIndex, Math.min(lastLB, lowerBound)); + setUpperBound(lastIndex, Math.max(lastUB, upperBound)); + } + + public Intervals not() { + if (isEmpty()) { + return ALL_INTERVAL; + } + Intervals result = new Intervals(); + long firstLB = getLowerBound(0); + if (firstLB != Long.MIN_VALUE) { + result.addInterval(Long.MIN_VALUE, firstLB - 1); + } + + int intervalSize = getIntervalSize(); + for (int i = 0; i < intervalSize - 1; i++) { + long currentUB = getUpperBound(i); + long nextLB = getLowerBound(i + 1); + if (currentUB + 1 <= nextLB - 1) { + result.addInterval(currentUB + 1, nextLB - 1); + } + } + + long lastUB = getUpperBound(result.getIntervalSize() - 1); + if (lastUB != Long.MAX_VALUE) { + result.addInterval(lastUB + 1, Long.MAX_VALUE); + } + return result; + } + } } diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java index 2c69a5293505..2fc49778875e 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java @@ -40,6 +40,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.filter.ValueFilter; +import org.apache.iotdb.tsfile.read.filter.operator.AndFilter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.junit.AfterClass; @@ -316,4 +317,43 @@ public void readWithValueFilterTest() QueryResourceManager.getInstance().endQuery(TEST_QUERY_JOB_ID); } + + @Test + public void readIncorrectTimeFilterTest() + throws IllegalPathException, QueryProcessException, StorageEngineException, IOException { + + QueryRouter queryRouter = new QueryRouter(); + List pathList = new ArrayList<>(); + List dataTypes = new ArrayList<>(); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); + dataTypes.add(TSDataType.INT32); + pathList.add( + new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); + dataTypes.add(TSDataType.INT32); + + TimeFilter.TimeGt gtRight = TimeFilter.gt(10L); + TimeFilter.TimeLt ltLeft = TimeFilter.lt(5L); + AndFilter andFilter = new AndFilter(ltLeft, gtRight); + + GlobalTimeExpression globalTimeExpression = new GlobalTimeExpression(andFilter); + TEST_QUERY_JOB_ID = + QueryResourceManager.getInstance().assignQueryId(true, 1024, pathList.size()); + TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); + + RawDataQueryPlan queryPlan = new RawDataQueryPlan(); + queryPlan.setDeduplicatedDataTypes(dataTypes); + queryPlan.setDeduplicatedPathsAndUpdate(pathList); + queryPlan.setExpression(globalTimeExpression); + QueryDataSet queryDataSet = queryRouter.rawDataQuery(queryPlan, TEST_QUERY_CONTEXT); + + int cnt = 0; + while (queryDataSet.hasNext()) { + queryDataSet.next(); + cnt++; + } + assertEquals(0, cnt); + + QueryResourceManager.getInstance().endQuery(TEST_QUERY_JOB_ID); + } } From f1bc4e32ec11bc36dbae153e8b6a36e3a5505413 Mon Sep 17 00:00:00 2001 From: Hang Ji <55303647+ijihang@users.noreply.github.com> Date: Wed, 7 Jul 2021 15:38:11 +0800 Subject: [PATCH 3/7] [IOTDB-1399]Add a session interface to connect multiple nodes (#3434) --- .../java/org/apache/iotdb/SessionExample.java | 10 +++ .../org/apache/iotdb/session/Session.java | 64 ++++++++++++++ .../iotdb/session/SessionConnection.java | 68 ++++++++++++--- .../apache/iotdb/session/SessionUtils.java | 35 ++++++++ .../iotdb/session/IoTDBSessionComplexIT.java | 35 ++++++++ .../iotdb/db/sql/ClusterSessionSimpleIT.java | 85 +++++++++++++++++++ 6 files changed, 283 insertions(+), 14 deletions(-) create mode 100644 testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterSessionSimpleIT.java diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index a8294f13de45..e3f124314d3d 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -667,4 +667,14 @@ private static void setTimeout() throws StatementExecutionException { Session tempSession = new Session(LOCAL_HOST, 6667, "root", "root", 10000, 20000); tempSession.setTimeout(60000); } + + private static void createClusterSession() throws IoTDBConnectionException { + ArrayList nodeList = new ArrayList<>(); + nodeList.add("127.0.0.1:6669"); + nodeList.add("127.0.0.1:6667"); + nodeList.add("127.0.0.1:6668"); + Session clusterSession = new Session(nodeList, "root", "root"); + clusterSession.open(); + clusterSession.close(); + } } diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index cfe11a76be7a..ca69157ecccf 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -75,6 +75,7 @@ public class Session { public static final String MSG_UNSUPPORTED_DATA_TYPE = "Unsupported data type:"; public static final String MSG_DONOT_ENABLE_REDIRECT = "Query do not enable redirect," + " please confirm the session and server conf."; + protected List nodeUrls; protected String username; protected String password; protected int fetchSize; @@ -240,6 +241,66 @@ public Session( this.enableCacheLeader = enableCacheLeader; } + public Session(List nodeUrls, String username, String password) { + this( + nodeUrls, + username, + password, + Config.DEFAULT_FETCH_SIZE, + null, + Config.DEFAULT_INITIAL_BUFFER_CAPACITY, + Config.DEFAULT_MAX_FRAME_SIZE, + Config.DEFAULT_CACHE_LEADER_MODE); + } + + /** + * Multiple nodeUrl,If one node down, connect to the next one + * + * @param nodeUrls List Multiple ip:rpcPort eg.127.0.0.1:9001 + */ + public Session(List nodeUrls, String username, String password, int fetchSize) { + this( + nodeUrls, + username, + password, + fetchSize, + null, + Config.DEFAULT_INITIAL_BUFFER_CAPACITY, + Config.DEFAULT_MAX_FRAME_SIZE, + Config.DEFAULT_CACHE_LEADER_MODE); + } + + public Session(List nodeUrls, String username, String password, ZoneId zoneId) { + this( + nodeUrls, + username, + password, + Config.DEFAULT_FETCH_SIZE, + zoneId, + Config.DEFAULT_INITIAL_BUFFER_CAPACITY, + Config.DEFAULT_MAX_FRAME_SIZE, + Config.DEFAULT_CACHE_LEADER_MODE); + } + + public Session( + List nodeUrls, + String username, + String password, + int fetchSize, + ZoneId zoneId, + int thriftDefaultBufferSize, + int thriftMaxFrameSize, + boolean enableCacheLeader) { + this.nodeUrls = nodeUrls; + this.username = username; + this.password = password; + this.fetchSize = fetchSize; + this.zoneId = zoneId; + this.thriftDefaultBufferSize = thriftDefaultBufferSize; + this.thriftMaxFrameSize = thriftMaxFrameSize; + this.enableCacheLeader = enableCacheLeader; + } + public void setFetchSize(int fetchSize) { this.fetchSize = fetchSize; } @@ -294,6 +355,9 @@ public synchronized void close() throws IoTDBConnectionException { public SessionConnection constructSessionConnection( Session session, EndPoint endpoint, ZoneId zoneId) throws IoTDBConnectionException { + if (endpoint == null) { + return new SessionConnection(session, zoneId); + } return new SessionConnection(session, endpoint, zoneId); } diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index 387902b38dab..79b99677c98b 100644 --- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -60,7 +60,9 @@ import org.slf4j.LoggerFactory; import java.time.ZoneId; +import java.util.ArrayList; import java.util.List; +import java.util.Random; public class SessionConnection { @@ -74,6 +76,7 @@ public class SessionConnection { private long statementId; private ZoneId zoneId; private EndPoint endPoint; + private List endPointList = new ArrayList<>(); private boolean enableRedirect = false; // TestOnly @@ -83,10 +86,18 @@ public SessionConnection(Session session, EndPoint endPoint, ZoneId zoneId) throws IoTDBConnectionException { this.session = session; this.endPoint = endPoint; + endPointList.add(endPoint); this.zoneId = zoneId == null ? ZoneId.systemDefault() : zoneId; init(endPoint); } + public SessionConnection(Session session, ZoneId zoneId) throws IoTDBConnectionException { + this.session = session; + this.zoneId = zoneId == null ? ZoneId.systemDefault() : zoneId; + this.endPointList = SessionUtils.parseSeedNodeUrls(session.nodeUrls); + initClusterConn(); + } + private void init(EndPoint endPoint) throws IoTDBConnectionException { RpcTransportFactory.setDefaultBufferCapacity(session.thriftDefaultBufferSize); RpcTransportFactory.setThriftMaxFrameSize(session.thriftMaxFrameSize); @@ -145,6 +156,21 @@ private void init(EndPoint endPoint) throws IoTDBConnectionException { } } + private void initClusterConn() throws IoTDBConnectionException { + for (EndPoint endPoint : endPointList) { + try { + session.defaultEndPoint = endPoint; + init(endPoint); + } catch (IoTDBConnectionException e) { + if (!reconnect()) { + logger.error("Cluster has no nodes to connect"); + throw new IoTDBConnectionException(e); + } + } + break; + } + } + public void close() throws IoTDBConnectionException { TSCloseSessionReq req = new TSCloseSessionReq(sessionId); try { @@ -720,24 +746,38 @@ protected void testInsertTablets(TSInsertTabletsReq request) } private boolean reconnect() { - boolean flag = false; + boolean connectedSuccess = false; + Random random = new Random(); for (int i = 1; i <= Config.RETRY_NUM; i++) { - try { - if (transport != null) { - close(); - init(endPoint); - flag = true; - } - } catch (Exception e) { - try { - Thread.sleep(Config.RETRY_INTERVAL_MS); - } catch (InterruptedException e1) { - logger.error("reconnect is interrupted.", e1); - Thread.currentThread().interrupt(); + if (transport != null) { + transport.close(); + int currHostIndex = random.nextInt(endPointList.size()); + int tryHostNum = 0; + for (int j = currHostIndex; j < endPointList.size(); j++) { + if (tryHostNum == endPointList.size()) { + break; + } + session.defaultEndPoint = endPointList.get(j); + this.endPoint = endPointList.get(j); + if (j == endPointList.size() - 1) { + j = -1; + } + tryHostNum++; + try { + init(endPoint); + connectedSuccess = true; + } catch (IoTDBConnectionException e) { + logger.error("The current node may have been down {},try next node", endPoint); + continue; + } + break; } } + if (connectedSuccess) { + break; + } } - return flag; + return connectedSuccess; } protected void createSchemaTemplate(TSCreateSchemaTemplateReq request) diff --git a/session/src/main/java/org/apache/iotdb/session/SessionUtils.java b/session/src/main/java/org/apache/iotdb/session/SessionUtils.java index f0289903ba1e..f50ad52e465d 100644 --- a/session/src/main/java/org/apache/iotdb/session/SessionUtils.java +++ b/session/src/main/java/org/apache/iotdb/session/SessionUtils.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.session; +import org.apache.iotdb.service.rpc.thrift.EndPoint; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; @@ -27,10 +28,17 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; public class SessionUtils { + private static final Logger logger = LoggerFactory.getLogger(SessionUtils.class); + public static ByteBuffer getTimeBuffer(Tablet tablet) { ByteBuffer timeBuffer = ByteBuffer.allocate(tablet.getTimeBytesSize()); for (int i = 0; i < tablet.rowSize; i++) { @@ -149,4 +157,31 @@ private static void getValueBufferOfDataType( String.format("Data type %s is not supported.", dataType)); } } + + public static List parseSeedNodeUrls(List nodeUrls) { + if (nodeUrls == null) { + throw new NumberFormatException("nodeUrls is null"); + } + List endPointsList = new ArrayList<>(); + for (String nodeUrl : nodeUrls) { + EndPoint endPoint = parseNodeUrl(nodeUrl); + endPointsList.add(endPoint); + } + return endPointsList; + } + + private static EndPoint parseNodeUrl(String nodeUrl) { + EndPoint endPoint = new EndPoint(); + String[] split = nodeUrl.split(":"); + if (split.length != 2) { + throw new NumberFormatException("NodeUrl Incorrect format"); + } + String ip = split[0]; + try { + int rpcPort = Integer.parseInt(split[1]); + return endPoint.setIp(ip).setPort(rpcPort); + } catch (Exception e) { + throw new NumberFormatException("NodeUrl Incorrect format"); + } + } } diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java index 6b1234c98eba..760392176bcc 100644 --- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java +++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java @@ -889,4 +889,39 @@ public void testInsertTabletWithTriggers() session.close(); } + + @Test + public void testSessionCluster() throws IoTDBConnectionException, StatementExecutionException { + ArrayList nodeList = new ArrayList<>(); + nodeList.add("127.0.0.1:6669"); + nodeList.add("127.0.0.1:6667"); + nodeList.add("127.0.0.1:6668"); + session = new Session(nodeList, "root", "root"); + session.open(); + + session.setStorageGroup("root.sg1"); + + createTimeseries(); + insertByStr(); + + insertViaSQL(); + queryByDevice("root.sg1.d1"); + + session.close(); + } + + @Test + public void testErrorSessionCluster() throws IoTDBConnectionException { + ArrayList nodeList = new ArrayList<>(); + // test Format error + nodeList.add("127.0.0.16669"); + nodeList.add("127.0.0.1:6667"); + session = new Session(nodeList, "root", "root"); + try { + session.open(); + } catch (Exception e) { + Assert.assertEquals("NodeUrl Incorrect format", e.getMessage()); + } + session.close(); + } } diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterSessionSimpleIT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterSessionSimpleIT.java new file mode 100644 index 000000000000..6f9189fd5edf --- /dev/null +++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterSessionSimpleIT.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.sql; + +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.NoProjectNameDockerComposeContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +public class ClusterSessionSimpleIT { + + private static Logger node1Logger = LoggerFactory.getLogger("iotdb-server_1"); + private static Logger node2Logger = LoggerFactory.getLogger("iotdb-server_2"); + private static Logger node3Logger = LoggerFactory.getLogger("iotdb-server_3"); + + private Session session; + + @Rule + public DockerComposeContainer environment = + new NoProjectNameDockerComposeContainer( + "3nodes", new File("src/test/resources/3nodes/docker-compose.yaml")) + .withExposedService("iotdb-server_1", 6667, Wait.forListeningPort()) + .withLogConsumer("iotdb-server_1", new Slf4jLogConsumer(node1Logger)) + .withExposedService("iotdb-server_2", 6667, Wait.forListeningPort()) + .withLogConsumer("iotdb-server_2", new Slf4jLogConsumer(node2Logger)) + .withExposedService("iotdb-server_3", 6667, Wait.forListeningPort()) + .withLogConsumer("iotdb-server_3", new Slf4jLogConsumer(node3Logger)) + .withLocalCompose(true); + + protected DockerComposeContainer getContainer() { + return environment; + } + + @Test + public void testSessionCluster() throws IoTDBConnectionException, StatementExecutionException { + List stringList = new ArrayList<>(); + Integer service1Port = getContainer().getServicePort("iotdb-server_1", 6667); + Integer service2Port = getContainer().getServicePort("iotdb-server_2", 6667); + Integer service3Port = getContainer().getServicePort("iotdb-server_3", 6667); + stringList.add("localhost:" + service1Port); + stringList.add("localhost:" + service2Port); + stringList.add("localhost:" + service3Port); + session = new Session(stringList, "root", "root"); + session.open(); + session.setStorageGroup("root.sg1"); + session.createTimeseries( + "root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); + + session.createTimeseries( + "root.sg1.d2.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); + session.close(); + } +} From 118d6cb98265495a4a1650bd1baf6601624ff05a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Steve=20Yurong=20Su=20=28=E5=AE=87=E8=8D=A3=29?= Date: Wed, 7 Jul 2021 04:26:16 -0500 Subject: [PATCH 4/7] Primitive Array Manager v3 (#3513) --- .../apache/iotdb/db/metadata/MManager.java | 35 +- .../db/rescon/PrimitiveArrayManager.java | 360 +++++++++--------- .../iotdb/db/utils/datastructure/TVList.java | 2 +- 3 files changed, 176 insertions(+), 221 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index cea3f0d9285c..e0aa34d0b569 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -69,7 +69,6 @@ import org.apache.iotdb.db.query.dataset.ShowDevicesResult; import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult; import org.apache.iotdb.db.rescon.MemTableManager; -import org.apache.iotdb.db.rescon.PrimitiveArrayManager; import org.apache.iotdb.db.utils.RandomDeleteCache; import org.apache.iotdb.db.utils.SchemaUtils; import org.apache.iotdb.db.utils.TestOnly; @@ -154,8 +153,6 @@ public class MManager { // tag key -> tag value -> LeafMNode private Map>> tagIndex = new ConcurrentHashMap<>(); - // data type -> number - private Map schemaDataTypeNumMap = new ConcurrentHashMap<>(); private AtomicLong totalSeriesNumber = new AtomicLong(); private boolean initialized; protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @@ -332,7 +329,6 @@ public void clear() { tagLogFile.close(); tagLogFile = null; } - this.schemaDataTypeNumMap.clear(); initialized = false; if (config.isEnableMTreeSnapshot() && timedCreateMTreeSnapshotThread != null) { timedCreateMTreeSnapshotThread.shutdownNow(); @@ -473,7 +469,6 @@ public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws Meta logger.warn("Current series number {} is too large...", totalSeriesNumber); allowToCreateNewSeries = false; } - updateSchemaDataTypeNumMap(type, 1); // write log if (!isRecovering) { @@ -565,9 +560,6 @@ public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws Met logger.warn("Current series number {} is too large...", totalSeriesNumber); allowToCreateNewSeries = false; } - for (TSDataType type : dataTypes) { - updateSchemaDataTypeNumMap(type, 1); - } // write log if (!isRecovering) { logWriter.createAlignedTimeseries(plan); @@ -695,19 +687,12 @@ private PartialPath deleteOneTimeseriesUpdateStatisticsAndDropTrigger(PartialPat int timeseriesNum = 0; if (schema instanceof MeasurementSchema) { removeFromTagInvertedIndex(pair.right); - updateSchemaDataTypeNumMap(schema.getType(), -1); timeseriesNum = 1; } else if (schema instanceof VectorMeasurementSchema) { - for (TSDataType dataType : schema.getValueTSDataTypeList()) { - updateSchemaDataTypeNumMap(dataType, -1); - timeseriesNum++; - } + timeseriesNum += schema.getValueTSDataTypeList().size(); } PartialPath storageGroupPath = pair.left; - // update statistics in schemaDataTypeNumMap - updateSchemaDataTypeNumMap(pair.right.getSchema().getType(), -1); - // drop trigger with no exceptions TriggerEngine.drop(pair.right); @@ -762,8 +747,6 @@ public void deleteStorageGroups(List storageGroups) throws Metadata List leafMNodes = mtree.deleteStorageGroup(storageGroup); for (MeasurementMNode leafMNode : leafMNodes) { removeFromTagInvertedIndex(leafMNode); - // update statistics in schemaDataTypeNumMap - updateSchemaDataTypeNumMap(leafMNode.getSchema().getType(), -1); } // drop triggers with no exceptions @@ -783,22 +766,6 @@ public void deleteStorageGroups(List storageGroups) throws Metadata } } - /** - * update statistics in schemaDataTypeNumMap - * - * @param type data type - * @param num 1 for creating timeseries and -1 for deleting timeseries - */ - private synchronized void updateSchemaDataTypeNumMap(TSDataType type, int num) { - // add an array of the series type - schemaDataTypeNumMap.put(type, schemaDataTypeNumMap.getOrDefault(type, 0) + num); - // add an array of time - schemaDataTypeNumMap.put( - TSDataType.INT64, schemaDataTypeNumMap.getOrDefault(TSDataType.INT64, 0) + num); - - PrimitiveArrayManager.updateSchemaDataTypeNum(schemaDataTypeNumMap, totalSeriesNumber.get()); - } - /** * Check if the given path is storage group or not. * diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java index 10c822859ce7..0fd05245c33c 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.iotdb.db.rescon; import org.apache.iotdb.db.conf.IoTDBConfig; @@ -30,79 +29,168 @@ import java.util.ArrayDeque; import java.util.Arrays; -import java.util.EnumMap; -import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicLong; -/** Manage all primitive data list in memory, including get and release operation. */ +/** Manage all primitive data lists in memory, including get and release operations. */ public class PrimitiveArrayManager { - /** data type -> ArrayDeque of primitive arrays. */ - private static final Map> bufferedArraysMap = - new EnumMap<>(TSDataType.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PrimitiveArrayManager.class); - /** data type -> ratio of data type in schema, which could be seen as recommended ratio */ - private static final Map bufferedArraysNumRatio = - new EnumMap<>(TSDataType.class); + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); - private static final Logger logger = LoggerFactory.getLogger(PrimitiveArrayManager.class); + public static final int ARRAY_SIZE = CONFIG.getPrimitiveArraySize(); - private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + /** threshold total size of arrays for all data types */ + private static final double POOLED_ARRAYS_MEMORY_THRESHOLD = + CONFIG.getAllocateMemoryForWrite() * CONFIG.getBufferedArraysMemoryProportion(); - public static final int ARRAY_SIZE = config.getPrimitiveArraySize(); + /** TSDataType#serialize() -> ArrayDeque, VECTOR is ignored */ + private static final ArrayDeque[] POOLED_ARRAYS = new ArrayDeque[TSDataType.values().length - 1]; - /** threshold total size of arrays for all data types */ - private static final double BUFFERED_ARRAY_SIZE_THRESHOLD = - config.getAllocateMemoryForWrite() * config.getBufferedArraysMemoryProportion(); + /** TSDataType#serialize() -> max size of ArrayDeque, VECTOR is ignored */ + private static final int[] LIMITS = new int[TSDataType.values().length - 1]; - /** total size of buffered arrays */ - private static final AtomicLong bufferedArraysRamSize = new AtomicLong(); + /** LIMITS should be updated if (TOTAL_ALLOCATION_REQUEST_COUNT.get() > limitUpdateThreshold) */ + private static long limitUpdateThreshold; - /** total size of out of buffer arrays */ - private static final AtomicLong outOfBufferArraysRamSize = new AtomicLong(); + /** TSDataType#serialize() -> count of allocation requests, VECTOR is ignored */ + private static final AtomicLong[] ALLOCATION_REQUEST_COUNTS = + new AtomicLong[] { + new AtomicLong(0), + new AtomicLong(0), + new AtomicLong(0), + new AtomicLong(0), + new AtomicLong(0), + new AtomicLong(0) + }; + + private static final AtomicLong TOTAL_ALLOCATION_REQUEST_COUNT = new AtomicLong(0); static { - bufferedArraysMap.put(TSDataType.BOOLEAN, new ArrayDeque<>()); - bufferedArraysMap.put(TSDataType.INT32, new ArrayDeque<>()); - bufferedArraysMap.put(TSDataType.INT64, new ArrayDeque<>()); - bufferedArraysMap.put(TSDataType.FLOAT, new ArrayDeque<>()); - bufferedArraysMap.put(TSDataType.DOUBLE, new ArrayDeque<>()); - bufferedArraysMap.put(TSDataType.TEXT, new ArrayDeque<>()); - bufferedArraysMap.put(TSDataType.VECTOR, new ArrayDeque<>()); + init(); } - private PrimitiveArrayManager() { - logger.info("BufferedArraySizeThreshold is {}", BUFFERED_ARRAY_SIZE_THRESHOLD); + private static void init() { + LOGGER.info("BufferedArraySizeThreshold is {}", POOLED_ARRAYS_MEMORY_THRESHOLD); + + // POOLED_ARRAYS_MEMORY_THRESHOLD = ∑(datatype[i].getDataTypeSize() * ARRAY_SIZE * LIMITS[i]) + // we init all LIMITS[i] with the same value, so we have + // => LIMITS[i] = POOLED_ARRAYS_MEMORY_THRESHOLD / ARRAY_SIZE / ∑(datatype[i].getDataTypeSize()) + int totalDataTypeSize = 0; + for (TSDataType dataType : TSDataType.values()) { + // VECTOR is ignored + if (dataType.equals(TSDataType.VECTOR)) { + continue; + } + totalDataTypeSize += dataType.getDataTypeSize(); + } + @SuppressWarnings("squid:S3518") // totalDataTypeSize can not be zero + double limit = POOLED_ARRAYS_MEMORY_THRESHOLD / ARRAY_SIZE / totalDataTypeSize; + Arrays.fill(LIMITS, (int) limit); + + // limitUpdateThreshold = ∑(LIMITS[i]) + limitUpdateThreshold = (long) ((TSDataType.values().length - 1) * limit); + + for (int i = 0; i < POOLED_ARRAYS.length; ++i) { + POOLED_ARRAYS[i] = new ArrayDeque<>((int) limit); + } + + for (AtomicLong allocationRequestCount : ALLOCATION_REQUEST_COUNTS) { + allocationRequestCount.set(0); + } + + TOTAL_ALLOCATION_REQUEST_COUNT.set(0); } + private PrimitiveArrayManager() {} + /** - * Get primitive data lists according to type + * Get or allocate primitive data lists according to type. * - * @param dataType data type * @return an array */ - public static Object getPrimitiveArraysByType(TSDataType dataType) { - long delta = (long) ARRAY_SIZE * dataType.getDataTypeSize(); - - // check memory of buffered array, if already full, generate OOB - if (bufferedArraysRamSize.get() + delta > BUFFERED_ARRAY_SIZE_THRESHOLD) { - // return an out of buffer array - outOfBufferArraysRamSize.addAndGet(delta); - return createPrimitiveArray(dataType); + public static Object allocate(TSDataType dataType) { + if (dataType.equals(TSDataType.VECTOR)) { + throw new UnSupportedDataTypeException(TSDataType.VECTOR.name()); + } + + if (TOTAL_ALLOCATION_REQUEST_COUNT.get() > limitUpdateThreshold) { + synchronized (TOTAL_ALLOCATION_REQUEST_COUNT) { + if (TOTAL_ALLOCATION_REQUEST_COUNT.get() > limitUpdateThreshold) { + updateLimits(); + } + } + } + + int order = dataType.serialize(); + + ALLOCATION_REQUEST_COUNTS[order].incrementAndGet(); + TOTAL_ALLOCATION_REQUEST_COUNT.incrementAndGet(); + + Object array; + synchronized (POOLED_ARRAYS[order]) { + array = POOLED_ARRAYS[order].poll(); + } + if (array == null) { + array = createPrimitiveArray(dataType); + } + return array; + } + + private static void updateLimits() { + // we want to update LIMITS[i] according to ratios[i] + double[] ratios = new double[ALLOCATION_REQUEST_COUNTS.length]; + for (int i = 0; i < ALLOCATION_REQUEST_COUNTS.length; ++i) { + ratios[i] = + ALLOCATION_REQUEST_COUNTS[i].get() / (double) TOTAL_ALLOCATION_REQUEST_COUNT.get(); } - synchronized (bufferedArraysMap.get(dataType)) { - // try to get a buffered array - Object dataArray = bufferedArraysMap.get(dataType).poll(); - if (dataArray != null) { - return dataArray; + // initially we have: + // POOLED_ARRAYS_MEMORY_THRESHOLD = ∑(datatype[i].getDataTypeSize() * LIMITS[i]) * ARRAY_SIZE + // we can find a number called limitBase which satisfies: + // LIMITS[i] = limitBase * ratios[i] + + // => POOLED_ARRAYS_MEMORY_THRESHOLD = + // limitBase * ∑(datatype[i].getDataTypeSize() * ratios[i]) * ARRAY_SIZE + // => limitBase = POOLED_ARRAYS_MEMORY_THRESHOLD / ARRAY_SIZE + // / ∑(datatype[i].getDataTypeSize() * ratios[i]) + double weightedSumOfRatios = 0; + for (TSDataType dataType : TSDataType.values()) { + // VECTOR is ignored + if (dataType.equals(TSDataType.VECTOR)) { + continue; + } + weightedSumOfRatios += dataType.getDataTypeSize() * ratios[dataType.serialize()]; + } + @SuppressWarnings("squid:S3518") // weightedSumOfRatios can not be zero + double limitBase = POOLED_ARRAYS_MEMORY_THRESHOLD / ARRAY_SIZE / weightedSumOfRatios; + + // LIMITS[i] = limitBase * ratios[i] + for (int i = 0; i < LIMITS.length; ++i) { + int oldLimit = LIMITS[i]; + int newLimit = (int) (limitBase * ratios[i]); + LIMITS[i] = newLimit; + + if (LOGGER.isInfoEnabled()) { + LOGGER.info( + "limit of {} array deque size updated: {} -> {}", + TSDataType.deserialize((byte) i).name(), + oldLimit, + newLimit); } } - // no buffered array, create one - bufferedArraysRamSize.addAndGet(delta); - return createPrimitiveArray(dataType); + // limitUpdateThreshold = ∑(LIMITS[i]) + limitUpdateThreshold = 0; + for (int limit : LIMITS) { + limitUpdateThreshold += limit; + } + + for (AtomicLong allocationRequestCount : ALLOCATION_REQUEST_COUNTS) { + allocationRequestCount.set(0); + } + + TOTAL_ALLOCATION_REQUEST_COUNT.set(0); } private static Object createPrimitiveArray(TSDataType dataType) { @@ -126,16 +214,49 @@ private static Object createPrimitiveArray(TSDataType dataType) { case TEXT: dataArray = new Binary[ARRAY_SIZE]; break; - case VECTOR: - dataArray = new byte[ARRAY_SIZE][]; - break; default: - throw new UnSupportedDataTypeException(dataType.toString()); + throw new UnSupportedDataTypeException(dataType.name()); } return dataArray; } + /** + * This method is called when bringing back data array + * + * @param array data array to be released + */ + public static void release(Object array) { + int order; + if (array instanceof boolean[]) { + order = TSDataType.BOOLEAN.serialize(); + } else if (array instanceof int[]) { + order = TSDataType.INT32.serialize(); + } else if (array instanceof long[]) { + order = TSDataType.INT64.serialize(); + } else if (array instanceof float[]) { + order = TSDataType.FLOAT.serialize(); + } else if (array instanceof double[]) { + order = TSDataType.DOUBLE.serialize(); + } else if (array instanceof Binary[]) { + Arrays.fill((Binary[]) array, null); + order = TSDataType.TEXT.serialize(); + } else { + throw new UnSupportedDataTypeException(array.getClass().toString()); + } + + synchronized (POOLED_ARRAYS[order]) { + ArrayDeque arrays = POOLED_ARRAYS[order]; + if (arrays.size() < LIMITS[order]) { + arrays.add(array); + } + } + } + + public static void close() { + init(); + } + /** * Get primitive data lists according to data type and size, only for TVList's sorting * @@ -183,140 +304,7 @@ public static Object createDataListsByType(TSDataType dataType, int size) { } return binaries; default: - return null; + throw new UnSupportedDataTypeException(dataType.name()); } } - - /** - * This method is called when bringing back data array - * - * @param releasingArray data array to be released - */ - public static void release(Object releasingArray) { - TSDataType releasingType; - if (releasingArray instanceof boolean[]) { - releasingType = TSDataType.BOOLEAN; - } else if (releasingArray instanceof int[]) { - releasingType = TSDataType.INT32; - } else if (releasingArray instanceof long[]) { - releasingType = TSDataType.INT64; - } else if (releasingArray instanceof float[]) { - releasingType = TSDataType.FLOAT; - } else if (releasingArray instanceof double[]) { - releasingType = TSDataType.DOUBLE; - } else if (releasingArray instanceof Binary[]) { - Arrays.fill((Binary[]) releasingArray, null); - releasingType = TSDataType.TEXT; - } else { - throw new UnSupportedDataTypeException("Unknown data array type"); - } - - if (outOfBufferArraysRamSize.get() <= 0) { - // if there is no out of buffer array, bring back as buffered array directly - putBackBufferedArray(releasingType, releasingArray); - } else { - // if the system has out of buffer array, we need to release some memory - if (!isCurrentDataTypeExceeded(releasingType)) { - // if the buffered array of the releasingType is less than expected - // choose an array of redundantDataType to release and try to buffer the array of - // releasingType - for (Entry> entry : bufferedArraysMap.entrySet()) { - TSDataType dataType = entry.getKey(); - if (isCurrentDataTypeExceeded(dataType)) { - // if we find a replaced array, bring back the original array as a buffered array - if (logger.isDebugEnabled()) { - logger.debug( - "The ratio of {} in buffered array has not reached the schema ratio. discard a redundant array of {}", - releasingType, - dataType); - } - // bring back the replaced array as OOB array - replaceBufferedArray(releasingType, releasingArray, dataType); - break; - } - } - } - - releaseOutOfBuffer(releasingType); - } - } - - /** - * Bring back a buffered array - * - * @param dataType data type - * @param dataArray data array - */ - private static void putBackBufferedArray(TSDataType dataType, Object dataArray) { - synchronized (bufferedArraysMap.get(dataType)) { - bufferedArraysMap.get(dataType).add(dataArray); - } - } - - private static void replaceBufferedArray( - TSDataType releasingType, Object releasingArray, TSDataType redundantType) { - synchronized (bufferedArraysMap.get(redundantType)) { - if (bufferedArraysMap.get(redundantType).poll() != null) { - bufferedArraysRamSize.addAndGet((long) -ARRAY_SIZE * redundantType.getDataTypeSize()); - } - } - - if (bufferedArraysRamSize.get() + (long) ARRAY_SIZE * releasingType.getDataTypeSize() - < BUFFERED_ARRAY_SIZE_THRESHOLD) { - ArrayDeque releasingArrays = bufferedArraysMap.get(releasingType); - synchronized (releasingArrays) { - releasingArrays.add(releasingArray); - } - bufferedArraysRamSize.addAndGet((long) ARRAY_SIZE * releasingType.getDataTypeSize()); - } - } - - private static void releaseOutOfBuffer(TSDataType dataType) { - outOfBufferArraysRamSize.getAndUpdate( - l -> Math.max(0, l - (long) ARRAY_SIZE * dataType.getDataTypeSize())); - } - - /** - * @param schemaDataTypeNumMap schema DataType Num Map (for each series, increase a long and a - * specific type) - * @param totalSeries total time series number - */ - public static void updateSchemaDataTypeNum( - Map schemaDataTypeNumMap, long totalSeries) { - for (Map.Entry entry : schemaDataTypeNumMap.entrySet()) { - TSDataType dataType = entry.getKey(); - // one time series has 2 columns (time column + value column) - bufferedArraysNumRatio.put( - dataType, (double) schemaDataTypeNumMap.get(dataType) / (totalSeries * 2)); - } - } - - /** - * check whether the ratio of buffered array of specific data type reaches the ratio in schema (as - * recommended ratio) - * - * @param dataType data type - * @return true if the buffered array ratio exceeds the recommend ratio - */ - private static boolean isCurrentDataTypeExceeded(TSDataType dataType) { - long total = 0; - for (ArrayDeque value : bufferedArraysMap.values()) { - total += value.size(); - } - long arrayNumInBuffer = - bufferedArraysMap.containsKey(dataType) ? bufferedArraysMap.get(dataType).size() : 0; - return total != 0 - && ((double) arrayNumInBuffer / total > bufferedArraysNumRatio.getOrDefault(dataType, 0.0)); - } - - public static void close() { - for (ArrayDeque dataListQueue : bufferedArraysMap.values()) { - dataListQueue.clear(); - } - - bufferedArraysNumRatio.clear(); - - bufferedArraysRamSize.set(0); - outOfBufferArraysRamSize.set(0); - } } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index 988995b9ac97..5b960d0c401b 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -346,7 +346,7 @@ protected void checkExpansion() { } protected Object getPrimitiveArraysByType(TSDataType dataType) { - return PrimitiveArrayManager.getPrimitiveArraysByType(dataType); + return PrimitiveArrayManager.allocate(dataType); } protected long[] cloneTime(long[] array) { From 2784603be1cc95db488c84c9265f06db3c13674f Mon Sep 17 00:00:00 2001 From: Haonan Date: Wed, 7 Jul 2021 18:56:47 +0800 Subject: [PATCH 5/7] Use StringCachedPool in TsFileResource to reduce the memory size (#3514) (#3523) --- .../iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java | 1 + .../iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java | 1 + 2 files changed, 2 insertions(+) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java index 381d4b36a4a7..86d7dc6a4020 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java @@ -90,6 +90,7 @@ public void serialize(OutputStream outputStream) throws IOException { for (Entry stringIntegerEntry : deviceToIndex.entrySet()) { String deviceName = stringIntegerEntry.getKey(); + deviceName = cachedDevicePool.computeIfAbsent(deviceName, k -> k); int index = stringIntegerEntry.getValue(); ReadWriteIOUtils.write(deviceName, outputStream); ReadWriteIOUtils.write(index, outputStream); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java index e1e5251cc1f2..7c3f6ec880f0 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java @@ -67,6 +67,7 @@ public FileTimeIndex(Set devices, long startTime, long endTime) { public void serialize(OutputStream outputStream) throws IOException { ReadWriteIOUtils.write(devices.size(), outputStream); for (String device : devices) { + device = cachedDevicePool.computeIfAbsent(device, k -> k); ReadWriteIOUtils.write(device, outputStream); } ReadWriteIOUtils.write(startTime, outputStream); From d3acb1883353e2b74264691c40c640fb197695d2 Mon Sep 17 00:00:00 2001 From: wangchao316 <66939405+wangchao316@users.noreply.github.com> Date: Thu, 8 Jul 2021 09:44:19 +0800 Subject: [PATCH 6/7] =?UTF-8?q?[IOTDB-1407]=20Filtering=20time=20series=20?= =?UTF-8?q?based=20on=20tags=20query=20fails=20Occasion=E2=80=A6=20(#3292)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iotdb/cluster/metadata/CMManager.java | 11 ++- .../cluster/common/TestAsyncDataClient.java | 11 +++ .../query/ClusterPlanExecutorTest.java | 21 +++++ .../java/org/apache/iotdb/db/sql/Cases.java | 83 +++++++++++++++++++ 4 files changed, 125 insertions(+), 1 deletion(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java index e097ac690e6e..f01870ca1525 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java @@ -1529,7 +1529,16 @@ public List showTimeseries(ShowTimeSeriesPlan plan, QueryC ExecutorService pool = new ThreadPoolExecutor( THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); - List globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups(); + + List globalGroups = new ArrayList<>(); + try { + PartitionGroup partitionGroup = + metaGroupMember.getPartitionTable().partitionByPathTime(plan.getPath(), 0); + globalGroups.add(partitionGroup); + } catch (MetadataException e) { + // if the path location is not find, obtain the path location from all groups. + globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups(); + } int limit = plan.getLimit() == 0 ? Integer.MAX_VALUE : plan.getLimit(); int offset = plan.getOffset(); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java index db34e54e991d..c7b473d58149 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java @@ -277,4 +277,15 @@ public void previousFill( .previousFill(request, resultHandler)) .start(); } + + @Override + public void getAllMeasurementSchema( + RaftNode header, ByteBuffer planBinary, AsyncMethodCallback resultHandler) { + new Thread( + () -> { + new DataAsyncService(dataGroupMemberMap.get(header)) + .getAllMeasurementSchema(header, planBinary, resultHandler); + }) + .start(); + } } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPlanExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPlanExecutorTest.java index 139280b0d7d9..a0264bbcdbce 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPlanExecutorTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPlanExecutorTest.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; +import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.service.IoTDB; @@ -88,4 +89,24 @@ public void testGetAllStorageGroupNodes() { allStorageGroupNodes.get(i).getFullPath()); } } + + @Test + public void testShowTimeseries() + throws StorageEngineException, QueryFilterOptimizationException, MetadataException, + IOException, InterruptedException, QueryProcessException { + ShowTimeSeriesPlan showTimeSeriesPlan = new ShowTimeSeriesPlan(pathList.get(0)); + QueryContext context = + new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1)); + try { + QueryDataSet dataSet = queryExecutor.processQuery(showTimeSeriesPlan, context); + int count = 0; + while (dataSet.hasNext()) { + dataSet.next(); + count++; + } + assertEquals(count, 1); + } finally { + QueryResourceManager.getInstance().endQuery(context.getQueryId()); + } + } } diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java index e78bebdd7308..dc8f487f9646 100644 --- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java +++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java @@ -323,4 +323,87 @@ private void insertRecords() throws IoTDBConnectionException, StatementExecution session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); } + + // test https://issues.apache.org/jira/browse/IOTDB-1407 + @Test + public void showTimeseriesTagsTest() throws SQLException { + String createTimeSeries1 = + "create timeseries root.ln.wf01.wt1 WITH DATATYPE=DOUBLE, ENCODING=RLE, compression=SNAPPY tags(tag1=v1, tag2=v2)"; + String createTimeSeries2 = + "create timeseries root.ln.wf01.wt2 WITH DATATYPE=DOUBLE, ENCODING=RLE, compression=SNAPPY tags(tag1=v1, tag2=v2)"; + writeStatement.execute(createTimeSeries1); + writeStatement.execute(createTimeSeries2); + // try to read data on each node. select .* + for (Statement readStatement : readStatements) { + ResultSet resultSet = + readStatement.executeQuery("SHOW TIMESERIES root.ln.wf01.* where tag1=v1"); + int cnt = 0; + while (resultSet.next()) { + cnt++; + } + Assert.assertEquals(2, cnt); + resultSet.close(); + } + + // try to read data on each node. select from parent series + for (Statement readStatement : readStatements) { + ResultSet resultSet = + readStatement.executeQuery("SHOW TIMESERIES root.ln.wf01 where tag1=v1"); + int cnt = 0; + while (resultSet.next()) { + cnt++; + } + Assert.assertEquals(2, cnt); + resultSet.close(); + } + + // try to read data on each node. select from one series + for (Statement readStatement : readStatements) { + ResultSet resultSet = + readStatement.executeQuery("SHOW TIMESERIES root.ln.wf01.wt1 where tag1=v1"); + int cnt = 0; + while (resultSet.next()) { + cnt++; + } + Assert.assertEquals(1, cnt); + resultSet.close(); + } + + // try to read data on each node. select from root + for (Statement readStatement : readStatements) { + ResultSet resultSet = readStatement.executeQuery("SHOW TIMESERIES root where tag1=v1"); + int cnt = 0; + while (resultSet.next()) { + cnt++; + } + Assert.assertEquals(2, cnt); + resultSet.close(); + } + + // try to read data on each node. SHOW TIMESERIES root.ln.wf01.* where tag1=v3" + for (Statement readStatement : readStatements) { + ResultSet resultSet = + readStatement.executeQuery("SHOW TIMESERIES root.ln.wf01.* where tag1=v3"); + int cnt = 0; + while (resultSet.next()) { + cnt++; + } + Assert.assertEquals(0, cnt); + resultSet.close(); + } + + // try to read data on each node. SHOW TIMESERIES root.ln.wf01.* where tag3=v1" + for (Statement readStatement : readStatements) { + ResultSet resultSet = null; + try { + resultSet = readStatement.executeQuery("SHOW TIMESERIES root.ln.wf01.* where tag3=v1"); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("The key tag3 is not a tag")); + } finally { + if (resultSet != null) { + resultSet.close(); + } + } + } + } } From 18ae729e2792c709b7054cb0de25f4d331d7558e Mon Sep 17 00:00:00 2001 From: Jialin Qiao Date: Thu, 8 Jul 2021 00:40:07 -0500 Subject: [PATCH 7/7] [IOTDB-1471] Fix path not right in "sg may not ready" log (#3524) --- .../main/java/org/apache/iotdb/db/engine/StorageEngine.java | 3 +-- .../storagegroup/virtualSg/VirtualStorageGroupManager.java | 4 +++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 711256c7c5af..a33906ce28d6 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -445,8 +445,7 @@ private StorageGroupProcessor getStorageGroupProcessorByPath( if (virtualStorageGroupManager == null) { // if finish recover if (isAllSgReady.get()) { - waitAllSgReady(devicePath); - synchronized (storageGroupMNode) { + synchronized (this) { virtualStorageGroupManager = processorMap.get(storageGroupMNode.getPartialPath()); if (virtualStorageGroupManager == null) { virtualStorageGroupManager = new VirtualStorageGroupManager(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java index e4faadb7ac7e..460342b2c91f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java @@ -116,7 +116,9 @@ public StorageGroupProcessor getProcessor( } else { // not finished recover, refuse the request throw new StorageEngineException( - "the sg " + partialPath + " may not ready now, please wait and retry later", + "the sg " + + storageGroupMNode.getFullPath() + + " may not ready now, please wait and retry later", TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode()); } }