From 8444591ddaba87799ff51898593d238eca698287 Mon Sep 17 00:00:00 2001 From: chaow Date: Wed, 18 Nov 2020 11:14:12 +0800 Subject: [PATCH 1/3] fix transport not close --- .../assembly/resources/conf/iotdb-engine.properties | 2 +- cluster/src/assembly/resources/conf/logback.xml | 5 ++++- .../iotdb/cluster/client/async/AsyncClientPool.java | 12 ++++++++++-- .../iotdb/cluster/client/async/AsyncDataClient.java | 9 +++++++-- .../iotdb/cluster/client/async/AsyncMetaClient.java | 6 ++++++ .../apache/iotdb/cluster/config/ClusterConfig.java | 2 +- .../iotdb/cluster/log/catchup/CatchUpTask.java | 11 +++++++---- .../cluster/log/catchup/SnapshotCatchUpTask.java | 12 +++++++++++- .../org/apache/iotdb/cluster/server/RaftServer.java | 4 ++++ .../handlers/caller/SnapshotCatchUpHandler.java | 2 +- .../iotdb/cluster/server/member/DataGroupMember.java | 2 +- .../iotdb/cluster/common/TestAsyncMetaClient.java | 1 + .../apache/iotdb/cluster/common/TestSnapshot.java | 7 ++++++- .../iotdb/cluster/utils/SerializeUtilTest.java | 3 +++ server/src/assembly/resources/conf/logback.xml | 6 +++--- .../db/engine/compaction/utils/CompactionUtils.java | 3 +++ 16 files changed, 69 insertions(+), 18 deletions(-) diff --git a/cluster/src/assembly/resources/conf/iotdb-engine.properties b/cluster/src/assembly/resources/conf/iotdb-engine.properties index 8e2b91b94daf6..50fd41307a7e9 100644 --- a/cluster/src/assembly/resources/conf/iotdb-engine.properties +++ b/cluster/src/assembly/resources/conf/iotdb-engine.properties @@ -159,7 +159,7 @@ wal_buffer_size=16777216 time_zone=+08:00 # When a TsFile's file size (in byte) exceeds this, the TsFile is forced closed. The default threshold is 512 MB. -tsfile_size_threshold=536870912 +tsfile_size_threshold=0 # When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 128 MB. memtable_size_threshold=134217728 diff --git a/cluster/src/assembly/resources/conf/logback.xml b/cluster/src/assembly/resources/conf/logback.xml index de8d852a573c9..2a3da9819e315 100644 --- a/cluster/src/assembly/resources/conf/logback.xml +++ b/cluster/src/assembly/resources/conf/logback.xml @@ -231,7 +231,10 @@ + + + - + diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java index b0ac1ef07210c..d32d195f7dc60 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java @@ -128,8 +128,8 @@ private AsyncClient waitForClient(Deque clientStack, ClusterNode no this.wait(WAIT_CLIENT_TIMEOUT_MS); if (clientStack.isEmpty() && System.currentTimeMillis() - waitStart >= WAIT_CLIENT_TIMEOUT_MS) { - logger.warn("Cannot get an available client after {}ms, create a new one", - WAIT_CLIENT_TIMEOUT_MS); + logger.warn("Cannot get an available client after {}ms, create a new one, factory {} now is {}", + WAIT_CLIENT_TIMEOUT_MS, asyncClientFactory, nodeClientNum); nodeClientNumMap.put(node, nodeClientNum + 1); return asyncClientFactory.getAsyncClient(node, this); } @@ -174,6 +174,14 @@ void onError(Node node) { synchronized (this) { Deque clientStack = clientCaches .computeIfAbsent(clusterNode, n -> new ArrayDeque<>()); + while (!clientStack.isEmpty()) { + AsyncClient client = clientStack.pop(); + if (client instanceof AsyncDataClient) { + ((AsyncDataClient) client).close(); + } else if (client instanceof AsyncMetaClient) { + ((AsyncMetaClient) client).close(); + } + } clientStack.clear(); nodeClientNumMap.put(clusterNode, 0); this.notifyAll(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java index 84e49f06844cc..00f9300b1191e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java @@ -82,6 +82,11 @@ public void onError(Exception e) { } } + public void close() { + ___transport.close(); + ___currentMethod = null; + } + public static class FactoryAsync extends AsyncClientFactory { public FactoryAsync(org.apache.thrift.protocol.TProtocolFactory protocolFactory) { @@ -129,11 +134,11 @@ public Node getNode() { } public boolean isReady() { - if (___currentMethod != null) { + if (___currentMethod != null || hasError()) { logger.warn("Client {} is running {} and will timeout at {}", hashCode(), ___currentMethod, new Date(___currentMethod.getTimeoutTimestamp())); } - return ___currentMethod == null; + return ___currentMethod == null && !hasError(); } TAsyncMethodCall getCurrMethod() { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java index 47f5dc01b4cee..4bbe4125fd9f1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java @@ -100,6 +100,12 @@ public String toString() { '}'; } + + public void close() { + ___transport.close(); + ___currentMethod = null; + } + public Node getNode() { return node; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java index bfd5b9b5185d2..9fdc32fcca84f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java @@ -87,7 +87,7 @@ public class ClusterConfig { * ClientPool will have so many selector threads (TAsyncClientManager) to distribute to its * clients. */ - private int selectorNumOfClientPool = Runtime.getRuntime().availableProcessors() * 2; + private int selectorNumOfClientPool = Runtime.getRuntime().availableProcessors() / 3; /** * Whether creating schema automatically is enabled, this will replace the one in diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java index dc30c6560b4f1..61b97006ea5f7 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java @@ -74,10 +74,13 @@ private boolean checkMatchIndex() logger.debug("Checking the match index of {}", node); long localFirstIndex = 0; try { - localFirstIndex = raftMember.getLogManager().getFirstIndex(); - lo = Math.max(localFirstIndex, peer.getMatchIndex() + 1); - hi = raftMember.getLogManager().getLastLogIndex() + 1; - logs = raftMember.getLogManager().getEntries(lo, hi); + // to avoid snapshot catch up when index is volatile + synchronized (raftMember.getLogManager()) { + localFirstIndex = raftMember.getLogManager().getFirstIndex(); + lo = Math.max(localFirstIndex, peer.getMatchIndex() + 1); + hi = raftMember.getLogManager().getLastLogIndex() + 1; + logs = raftMember.getLogManager().getEntries(lo, hi); + } // this may result from peer's match index being changed concurrently, making the peer // actually catch up now if (logs.isEmpty()) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java index 5c1c4538cb5b0..e8dff8aab17c8 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java @@ -19,9 +19,12 @@ package org.apache.iotdb.cluster.log.catchup; +import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.iotdb.cluster.client.async.AsyncDataClient; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.exception.LeaderUnknownException; import org.apache.iotdb.cluster.log.Log; @@ -61,7 +64,11 @@ private void doSnapshotCatchUp() if (raftMember.getHeader() != null) { request.setHeader(raftMember.getHeader()); } - request.setSnapshotBytes(snapshot.serialize()); + ByteBuffer data = snapshot.serialize(); + if (logger.isDebugEnabled()) { + logger.debug("do snapshot catch up with size {}", data.array().length); + } + request.setSnapshotBytes(data); synchronized (raftMember.getTerm()) { // make sure this node is still a leader @@ -96,6 +103,9 @@ private boolean sendSnapshotAsync(SendSnapshotRequest request) raftMember.getLastCatchUpResponseTime().put(node, System.currentTimeMillis()); succeed.wait(SEND_SNAPSHOT_WAIT_MS); } + if (logger.isDebugEnabled()) { + logger.debug("send snapshot to node {} success {}", raftMember.getThisNode(), succeed.get()); + } return succeed.get(); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java index 92303cea4f589..0ee845f03be8f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java @@ -19,6 +19,8 @@ package org.apache.iotdb.cluster.server; +import java.io.IOException; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; @@ -41,6 +43,8 @@ import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadedSelectorServer; +import org.apache.thrift.transport.TFastFramedTransport; +import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransportException; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/SnapshotCatchUpHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/SnapshotCatchUpHandler.java index cfcdd1b4be3f3..7b1a6f582de70 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/SnapshotCatchUpHandler.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/SnapshotCatchUpHandler.java @@ -54,7 +54,7 @@ public void onComplete(Void resp) { @Override public void onError(Exception exception) { - logger.error("Cannot send snapshot {} to {}", snapshot, receiver); + logger.error("Cannot send snapshot {} to {}", snapshot, receiver, exception); synchronized (succeed) { succeed.notifyAll(); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index e02bcceb668f0..b4c7adf93f848 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -383,7 +383,7 @@ long checkElectorLogProgress(ElectionRequest electionRequest) { * @param request */ public void receiveSnapshot(SendSnapshotRequest request) throws SnapshotInstallationException { - logger.debug("{}: received a snapshot", name); + logger.debug("{}: received a snapshot from {} with size {}", name, request.getHeader(), request.getSnapshotBytes().length); PartitionedSnapshot snapshot = new PartitionedSnapshot<>(FileSnapshot.Factory.INSTANCE); snapshot.deserialize(ByteBuffer.wrap(request.getSnapshotBytes())); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncMetaClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncMetaClient.java index dedea8c734ce8..294adfb44f3d0 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncMetaClient.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncMetaClient.java @@ -38,6 +38,7 @@ public TestAsyncMetaClient(TProtocolFactory protocolFactory, this.node = node; } + @Override public Node getNode() { return node; } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestSnapshot.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestSnapshot.java index 903b915d35e6c..39f1946ae3f7f 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestSnapshot.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestSnapshot.java @@ -30,18 +30,22 @@ public class TestSnapshot extends Snapshot { private int id; + private ByteBuffer data; public TestSnapshot() { + data = ByteBuffer.wrap(new byte[8192*2048]); } public TestSnapshot(int id) { this.id = id; + data = ByteBuffer.wrap(new byte[8192*2048]); } @Override public ByteBuffer serialize() { - ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES); + ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES + 8192 * 2048); byteBuffer.putInt(id); + byteBuffer.put(data); byteBuffer.flip(); return byteBuffer; } @@ -49,6 +53,7 @@ public ByteBuffer serialize() { @Override public void deserialize(ByteBuffer buffer) { id = buffer.getInt(); + data.put(buffer); } @Override diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java index 953a8787c5f00..134b2fe0a5cda 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java @@ -29,6 +29,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; + import org.apache.iotdb.cluster.common.TestUtils; import org.apache.iotdb.cluster.exception.UnknownLogTypeException; import org.apache.iotdb.cluster.log.Log; @@ -37,6 +39,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; diff --git a/server/src/assembly/resources/conf/logback.xml b/server/src/assembly/resources/conf/logback.xml index 22a80d07a05c3..7bf7f33585fb7 100644 --- a/server/src/assembly/resources/conf/logback.xml +++ b/server/src/assembly/resources/conf/logback.xml @@ -52,7 +52,7 @@ 10 - 1MB + 10MB true @@ -73,7 +73,7 @@ 10 - 50MB + 100MB true @@ -126,7 +126,7 @@ 10 - 50MB + 100MB true diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java index b98750d3ad0eb..eaf046002d8b5 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java @@ -198,6 +198,9 @@ public static void merge(TsFileResource targetResource, for (TsFileResource levelResource : tsFileResources) { TsFileSequenceReader reader = buildReaderFromTsFileResource(levelResource, tsFileSequenceReaderMap, storageGroup); + if (reader == null) { + continue; + } Map> chunkMetadataMap = reader .readChunkMetadataInDevice(device); for (Entry> entry : chunkMetadataMap.entrySet()) { From 40c75b660870c96ee3edaa38e08184127a82d0fd Mon Sep 17 00:00:00 2001 From: chaow Date: Wed, 18 Nov 2020 11:41:51 +0800 Subject: [PATCH 2/3] remove invalid import --- cluster/src/assembly/resources/conf/logback.xml | 3 --- .../main/java/org/apache/iotdb/cluster/server/RaftServer.java | 4 ---- 2 files changed, 7 deletions(-) diff --git a/cluster/src/assembly/resources/conf/logback.xml b/cluster/src/assembly/resources/conf/logback.xml index 2a3da9819e315..2bf9b1ff5c985 100644 --- a/cluster/src/assembly/resources/conf/logback.xml +++ b/cluster/src/assembly/resources/conf/logback.xml @@ -231,9 +231,6 @@ - - - diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java index 0ee845f03be8f..92303cea4f589 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java @@ -19,8 +19,6 @@ package org.apache.iotdb.cluster.server; -import java.io.IOException; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; @@ -43,8 +41,6 @@ import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadedSelectorServer; -import org.apache.thrift.transport.TFastFramedTransport; -import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransportException; From ab26779cd717787e9842f6207df7e5e9d367c970 Mon Sep 17 00:00:00 2001 From: chaow Date: Wed, 18 Nov 2020 14:41:53 +0800 Subject: [PATCH 3/3] fix ut --- .../org/apache/iotdb/cluster/client/async/AsyncDataClient.java | 2 +- .../apache/iotdb/cluster/client/async/AsyncDataClientTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java index 00f9300b1191e..7e55f9ad2f75f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java @@ -134,7 +134,7 @@ public Node getNode() { } public boolean isReady() { - if (___currentMethod != null || hasError()) { + if (___currentMethod != null) { logger.warn("Client {} is running {} and will timeout at {}", hashCode(), ___currentMethod, new Date(___currentMethod.getTimeoutTimestamp())); } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java index 9c84b20d582b7..7030fc1e68de9 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java @@ -52,7 +52,7 @@ public void onError(Exception e) { client.onError(new Exception()); assertNull(client.getCurrMethod()); - assertTrue(client.isReady()); + assertFalse(client.isReady()); assertEquals( "DataClient{node=ClusterNode{ ip='192.168.0.0', metaPort=9003, nodeIdentifier=0, dataPort=40010, clientPort=0}}",