From 75620dd4d681fd6c6133d8da9e03e669442496d3 Mon Sep 17 00:00:00 2001 From: Zhong Wang Date: Thu, 21 Apr 2022 17:28:37 +0800 Subject: [PATCH 1/2] [IOTDB-2971] Fix sink handle memory leak (#5626) --- .../iotdb/db/mpp/buffer/DataBlockManager.java | 12 +-- .../iotdb/db/mpp/buffer/ISinkHandle.java | 13 ++- .../iotdb/db/mpp/buffer/ISourceHandle.java | 10 +- .../iotdb/db/mpp/buffer/SinkHandle.java | 16 +-- .../iotdb/db/mpp/buffer/SourceHandle.java | 13 +-- .../iotdb/db/mpp/buffer/SinkHandleTest.java | 101 ++++++++++++++++-- .../iotdb/db/mpp/buffer/SourceHandleTest.java | 93 ++++++++++++++-- 7 files changed, 216 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java index 6a8f34ccf550..e09acf28cb75 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java @@ -258,7 +258,7 @@ public DataBlockServiceImpl getOrCreateDataBlockServiceImpl() { @Override public ISinkHandle createSinkHandle( TFragmentInstanceId localFragmentInstanceId, - TEndPoint endpoint, + TEndPoint remoteEndpoint, TFragmentInstanceId remoteFragmentInstanceId, String remotePlanNodeId, FragmentInstanceContext instanceContext) { @@ -274,13 +274,13 @@ public ISinkHandle createSinkHandle( SinkHandle sinkHandle = new SinkHandle( - endpoint.toString(), + remoteEndpoint, remoteFragmentInstanceId, remotePlanNodeId, localFragmentInstanceId, localMemoryManager, executorService, - clientFactory.getDataBlockServiceClient(endpoint), + clientFactory.getDataBlockServiceClient(remoteEndpoint), tsBlockSerdeFactory.get(), new SinkHandleListenerImpl(instanceContext)); sinkHandles.put(localFragmentInstanceId, sinkHandle); @@ -291,7 +291,7 @@ public ISinkHandle createSinkHandle( public ISourceHandle createSourceHandle( TFragmentInstanceId localFragmentInstanceId, String localPlanNodeId, - TEndPoint endpoint, + TEndPoint remoteEndpoint, TFragmentInstanceId remoteFragmentInstanceId) { if (sourceHandles.containsKey(localFragmentInstanceId) && sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) { @@ -311,13 +311,13 @@ public ISourceHandle createSourceHandle( SourceHandle sourceHandle = new SourceHandle( - endpoint.getIp(), + remoteEndpoint, remoteFragmentInstanceId, localFragmentInstanceId, localPlanNodeId, localMemoryManager, executorService, - clientFactory.getDataBlockServiceClient(endpoint), + clientFactory.getDataBlockServiceClient(remoteEndpoint), tsBlockSerdeFactory.get(), new SourceHandleListenerImpl()); sourceHandles diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java index 6300c5beef0d..b27e86186b55 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java @@ -38,8 +38,8 @@ public interface ISinkHandle extends AutoCloseable { /** * Send a list of tsblocks to an unpartitioned output buffer. If no-more-tsblocks has been set, - * the send tsblock call is ignored. This can happen with limit queries. A {@link - * RuntimeException} will be thrown if any exception happened * during the data transmission. + * the invocation will be ignored. This can happen with limit queries. A {@link RuntimeException} + * will be thrown if any exception happened during the data transmission. */ void send(List tsBlocks) throws IOException; @@ -57,13 +57,13 @@ public interface ISinkHandle extends AutoCloseable { void setNoMoreTsBlocks(); /** If the handle is closed. */ - public boolean isClosed(); + boolean isClosed(); /** * If no more tsblocks will be sent and all the tsblocks have been fetched by downstream fragment * instances. */ - public boolean isFinished(); + boolean isFinished(); /** * Close the handle. The output buffer will not be cleared until all tsblocks are fetched by @@ -73,6 +73,9 @@ public interface ISinkHandle extends AutoCloseable { @Override void close() throws IOException; - /** Abort the sink handle, discarding all tsblocks which may still be in memory buffer. */ + /** + * Abort the sink handle. Discard all tsblocks which may still be in the memory buffer and cancel + * the future returned by {@link #isFull()}. + */ void abort(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java index dfb9257b97a8..9e3191862712 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java @@ -39,16 +39,16 @@ public interface ISourceHandle extends Closeable { /** If there are more tsblocks. */ boolean isFinished(); - /** - * Get a future that will be completed when the input buffer is not empty. The future will not - * complete even when the handle is finished or closed. - */ + /** Get a future that will be completed when the input buffer is not empty. */ ListenableFuture isBlocked(); /** If this handle is closed. */ boolean isClosed(); - /** Close the handle. Discarding all tsblocks which may still be in memory buffer. */ + /** + * Close the handle. Discard all tsblocks which may still be in the memory buffer and complete the + * future returned by {@link #isBlocked()}. + */ @Override void close(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java index a72dcb9cb6fc..f66d8b49f355 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.mpp.buffer; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener; import org.apache.iotdb.db.mpp.memory.LocalMemoryManager; import org.apache.iotdb.mpp.rpc.thrift.DataBlockService; @@ -53,7 +54,7 @@ public class SinkHandle implements ISinkHandle { public static final int MAX_ATTEMPT_TIMES = 3; - private final String remoteHostname; + private final TEndPoint remoteEndpoint; private final TFragmentInstanceId remoteFragmentInstanceId; private final String remotePlanNodeId; private final TFragmentInstanceId localFragmentInstanceId; @@ -76,7 +77,7 @@ public class SinkHandle implements ISinkHandle { private Throwable throwable; public SinkHandle( - String remoteHostname, + TEndPoint remoteEndpoint, TFragmentInstanceId remoteFragmentInstanceId, String remotePlanNodeId, TFragmentInstanceId localFragmentInstanceId, @@ -85,7 +86,7 @@ public SinkHandle( DataBlockService.Iface client, TsBlockSerde serde, SinkHandleListener sinkHandleListener) { - this.remoteHostname = Validate.notNull(remoteHostname); + this.remoteEndpoint = Validate.notNull(remoteEndpoint); this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId); this.remotePlanNodeId = Validate.notNull(remotePlanNodeId); this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId); @@ -220,6 +221,9 @@ public void abort() { synchronized (this) { sequenceIdToTsBlock.clear(); closed = true; + if (blocked != null && !blocked.isDone()) { + blocked.cancel(true); + } if (bufferRetainedSizeInBytes > 0) { localMemoryManager .getQueryPool() @@ -292,8 +296,8 @@ void acknowledgeTsBlock(int startSequenceId, int endSequenceId) { localMemoryManager.getQueryPool().free(localFragmentInstanceId.getQueryId(), freedBytes); } - String getRemoteHostname() { - return remoteHostname; + TEndPoint getRemoteEndpoint() { + return remoteEndpoint; } TFragmentInstanceId getRemoteFragmentInstanceId() { @@ -311,7 +315,7 @@ TFragmentInstanceId getLocalFragmentInstanceId() { @Override public String toString() { return new StringJoiner(", ", SinkHandle.class.getSimpleName() + "[", "]") - .add("remoteHostname='" + remoteHostname + "'") + .add("remoteEndpoint='" + remoteEndpoint + "'") .add("remoteFragmentInstanceId=" + remoteFragmentInstanceId) .add("remotePlanNodeId='" + remotePlanNodeId + "'") .add("localFragmentInstanceId=" + localFragmentInstanceId) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java index 37a5bce1b0ae..1d6b92d25e16 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.mpp.buffer; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener; import org.apache.iotdb.db.mpp.memory.LocalMemoryManager; import org.apache.iotdb.mpp.rpc.thrift.DataBlockService; @@ -53,7 +54,7 @@ public class SourceHandle implements ISourceHandle { public static final int MAX_ATTEMPT_TIMES = 3; - private final String remoteHostname; + private final TEndPoint remoteEndpoint; private final TFragmentInstanceId remoteFragmentInstanceId; private final TFragmentInstanceId localFragmentInstanceId; private final String localPlanNodeId; @@ -75,7 +76,7 @@ public class SourceHandle implements ISourceHandle { private Throwable throwable; public SourceHandle( - String remoteHostname, + TEndPoint remoteEndpoint, TFragmentInstanceId remoteFragmentInstanceId, TFragmentInstanceId localFragmentInstanceId, String localPlanNodeId, @@ -84,7 +85,7 @@ public SourceHandle( DataBlockService.Iface client, TsBlockSerde serde, SourceHandleListener sourceHandleListener) { - this.remoteHostname = Validate.notNull(remoteHostname); + this.remoteEndpoint = Validate.notNull(remoteEndpoint); this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId); this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId); this.localPlanNodeId = Validate.notNull(localPlanNodeId); @@ -249,8 +250,8 @@ private boolean remoteTsBlockedConsumedUp() { return currSequenceId - 1 == lastSequenceId; } - String getRemoteHostname() { - return remoteHostname; + TEndPoint getRemoteEndpoint() { + return remoteEndpoint; } TFragmentInstanceId getRemoteFragmentInstanceId() { @@ -278,7 +279,7 @@ public boolean isClosed() { @Override public String toString() { return new StringJoiner(", ", SourceHandle.class.getSimpleName() + "[", "]") - .add("remoteHostname='" + remoteHostname + "'") + .add("remoteEndpoint='" + remoteEndpoint + "'") .add("remoteFragmentInstanceId=" + remoteFragmentInstanceId) .add("localFragmentInstanceId=" + localFragmentInstanceId) .add("localPlanNodeId='" + localPlanNodeId + "'") diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java index b47b3c57c8f1..e8a7767100c0 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.mpp.buffer; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener; import org.apache.iotdb.db.mpp.memory.LocalMemoryManager; import org.apache.iotdb.db.mpp.memory.MemoryPool; @@ -37,6 +39,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.Executors; +import java.util.concurrent.Future; public class SinkHandleTest { @@ -45,7 +48,9 @@ public void testOneTimeNotBlockedSend() { final String queryId = "q0"; final long mockTsBlockSize = 1024L * 1024L; final int numOfMockTsBlock = 10; - final String remoteHostname = "remote"; + final TEndPoint remoteEndpoint = + new TEndPoint( + "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()); final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0"); final String remotePlanNodeId = "exchange_0"; final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0"); @@ -75,7 +80,7 @@ public void testOneTimeNotBlockedSend() { // Construct SinkHandle. SinkHandle sinkHandle = new SinkHandle( - remoteHostname, + remoteEndpoint, remoteFragmentInstanceId, remotePlanNodeId, localFragmentInstanceId, @@ -174,7 +179,9 @@ public void testMultiTimesBlockedSend() { final String queryId = "q0"; final long mockTsBlockSize = 1024L * 1024L; final int numOfMockTsBlock = 10; - final String remoteHostname = "remote"; + final TEndPoint remoteEndpoint = + new TEndPoint( + "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()); final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0"); final String remotePlanNodeId = "exchange_0"; final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0"); @@ -206,7 +213,7 @@ public void testMultiTimesBlockedSend() { // Construct SinkHandle. SinkHandle sinkHandle = new SinkHandle( - remoteHostname, + remoteEndpoint, remoteFragmentInstanceId, remotePlanNodeId, localFragmentInstanceId, @@ -356,7 +363,9 @@ public void testFailedSend() { final String queryId = "q0"; final long mockTsBlockSize = 1024L * 1024L; final int numOfMockTsBlock = 10; - final String remoteHostname = "remote"; + final TEndPoint remoteEndpoint = + new TEndPoint( + "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()); final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0"); final String remotePlanNodeId = "exchange_0"; final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0"); @@ -387,7 +396,7 @@ public void testFailedSend() { // Construct SinkHandle. SinkHandle sinkHandle = new SinkHandle( - remoteHostname, + remoteEndpoint, remoteFragmentInstanceId, remotePlanNodeId, localFragmentInstanceId, @@ -457,4 +466,84 @@ public void testFailedSend() { Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onAborted(sinkHandle); Mockito.verify(mockSinkHandleListener, Mockito.times(0)).onFinish(sinkHandle); } + + @Test + public void testAbort() { + final String queryId = "q0"; + final long mockTsBlockSize = 1024L * 1024L; + final int numOfMockTsBlock = 10; + final TEndPoint remoteEndpoint = + new TEndPoint( + "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()); + final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0"); + final String remotePlanNodeId = "exchange_0"; + final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0"); + + // Construct a mock LocalMemoryManager that returns blocked futures. + LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class); + MemoryPool mockMemoryPool = + Utils.createMockBlockedMemoryPool(queryId, numOfMockTsBlock, mockTsBlockSize); + Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool); + + // Construct a mock SinkHandleListener. + SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class); + // Construct several mock TsBlock(s). + List mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize); + // Construct a mock client. + Client mockClient = Mockito.mock(Client.class); + try { + Mockito.doNothing() + .when(mockClient) + .onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class)); + Mockito.doNothing() + .when(mockClient) + .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class)); + } catch (TException e) { + e.printStackTrace(); + Assert.fail(); + } + + // Construct SinkHandle. + SinkHandle sinkHandle = + new SinkHandle( + remoteEndpoint, + remoteFragmentInstanceId, + remotePlanNodeId, + localFragmentInstanceId, + mockLocalMemoryManager, + Executors.newSingleThreadExecutor(), + mockClient, + Utils.createMockTsBlockSerde(mockTsBlockSize), + mockSinkHandleListener); + Assert.assertTrue(sinkHandle.isFull().isDone()); + Assert.assertFalse(sinkHandle.isFinished()); + Assert.assertFalse(sinkHandle.isClosed()); + Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes()); + Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks()); + + // Send tsblocks. + try { + sinkHandle.send(mockTsBlocks); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + Future blocked = sinkHandle.isFull(); + Assert.assertFalse(blocked.isDone()); + Assert.assertFalse(blocked.isCancelled()); + Assert.assertFalse(sinkHandle.isFinished()); + Assert.assertFalse(sinkHandle.isClosed()); + Assert.assertEquals( + mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes()); + Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks()); + + sinkHandle.abort(); + Assert.assertTrue(blocked.isDone()); + Assert.assertTrue(blocked.isCancelled()); + Assert.assertFalse(sinkHandle.isFinished()); + Assert.assertTrue(sinkHandle.isClosed()); + Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes()); + Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks()); + Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onAborted(sinkHandle); + } } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java index 461fac1e13f3..dc84cff50143 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.mpp.buffer; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener; import org.apache.iotdb.db.mpp.memory.LocalMemoryManager; import org.apache.iotdb.db.mpp.memory.MemoryPool; @@ -39,6 +41,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -49,7 +52,9 @@ public void testNonBlockedOneTimeReceive() { final String queryId = "q0"; final long mockTsBlockSize = 1024L * 1024L; final int numOfMockTsBlock = 10; - final String remoteHostname = "remote"; + final TEndPoint remoteEndpoint = + new TEndPoint( + "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()); final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0"); final String localPlanNodeId = "exchange_0"; final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0"); @@ -84,7 +89,7 @@ public void testNonBlockedOneTimeReceive() { SourceHandle sourceHandle = new SourceHandle( - remoteHostname, + remoteEndpoint, remoteFragmentInstanceId, localFragmentInstanceId, localPlanNodeId, @@ -168,7 +173,9 @@ public void testBlockedOneTimeReceive() { final String queryId = "q0"; final long mockTsBlockSize = 1024L * 1024L; final int numOfMockTsBlock = 10; - final String remoteHostname = "remote"; + final TEndPoint remoteEndpoint = + new TEndPoint( + "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()); final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0"); final String localPlanNodeId = "exchange_0"; final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0"); @@ -204,7 +211,7 @@ public void testBlockedOneTimeReceive() { SourceHandle sourceHandle = new SourceHandle( - remoteHostname, + remoteEndpoint, remoteFragmentInstanceId, localFragmentInstanceId, localPlanNodeId, @@ -313,7 +320,9 @@ public void testMultiTimesReceive() { final String queryId = "q0"; final long mockTsBlockSize = 1024L * 1024L; final int numOfMockTsBlock = 10; - final String remoteHostname = "remote"; + final TEndPoint remoteEndpoint = + new TEndPoint( + "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()); final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0"); final String localPlanNodeId = "exchange_0"; final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0"); @@ -348,7 +357,7 @@ public void testMultiTimesReceive() { SourceHandle sourceHandle = new SourceHandle( - remoteHostname, + remoteEndpoint, remoteFragmentInstanceId, localFragmentInstanceId, localPlanNodeId, @@ -504,7 +513,9 @@ public void testFailedReceive() { final String queryId = "q0"; final long mockTsBlockSize = 1024L * 1024L; final int numOfMockTsBlock = 10; - final String remoteHostname = "remote"; + final TEndPoint remoteEndpoint = + new TEndPoint( + "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()); final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0"); final String localPlanNodeId = "exchange_0"; final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0"); @@ -530,7 +541,7 @@ public void testFailedReceive() { SourceHandle sourceHandle = new SourceHandle( - remoteHostname, + remoteEndpoint, remoteFragmentInstanceId, localFragmentInstanceId, localPlanNodeId, @@ -585,4 +596,70 @@ public void testFailedReceive() { Assert.assertFalse(sourceHandle.isFinished()); Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes()); } + + @Test + public void testForceClose() { + final String queryId = "q0"; + final long mockTsBlockSize = 1024L * 1024L; + final TEndPoint remoteEndpoint = + new TEndPoint( + "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()); + final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0"); + final String localPlanNodeId = "exchange_0"; + final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0"); + + // Construct a mock LocalMemoryManager that do not block any reservation. + LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class); + MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool(); + Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool); + // Construct a mock client. + Client mockClient = Mockito.mock(Client.class); + try { + Mockito.doAnswer( + invocation -> { + TGetDataBlockRequest req = invocation.getArgument(0); + List byteBuffers = + new ArrayList<>(req.getEndSequenceId() - req.getStartSequenceId()); + for (int i = 0; i < req.getEndSequenceId() - req.getStartSequenceId(); i++) { + byteBuffers.add(ByteBuffer.allocate(0)); + } + return new TGetDataBlockResponse(byteBuffers); + }) + .when(mockClient) + .getDataBlock(Mockito.any(TGetDataBlockRequest.class)); + } catch (TException e) { + e.printStackTrace(); + Assert.fail(); + } + // Construct a mock SourceHandleListener. + SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class); + // Construct a mock TsBlockSerde that deserializes any bytebuffer into a mock TsBlock. + TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(mockTsBlockSize); + + SourceHandle sourceHandle = + new SourceHandle( + remoteEndpoint, + remoteFragmentInstanceId, + localFragmentInstanceId, + localPlanNodeId, + mockLocalMemoryManager, + Executors.newSingleThreadExecutor(), + mockClient, + mockTsBlockSerde, + mockSourceHandleListener); + Future blocked = sourceHandle.isBlocked(); + Assert.assertFalse(blocked.isDone()); + Assert.assertFalse(blocked.isCancelled()); + Assert.assertFalse(sourceHandle.isClosed()); + Assert.assertFalse(sourceHandle.isFinished()); + Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes()); + + sourceHandle.close(); + Assert.assertTrue(blocked.isDone()); + Assert.assertTrue(blocked.isCancelled()); + Assert.assertTrue(sourceHandle.isClosed()); + Assert.assertFalse(sourceHandle.isFinished()); + Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes()); + Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onClosed(sourceHandle); + } } From b2520b31b02902921dad83926c9bba6eaafffabe Mon Sep 17 00:00:00 2001 From: cmlmakahts <82880298+cmlmakahts@users.noreply.github.com> Date: Thu, 21 Apr 2022 20:54:10 +0800 Subject: [PATCH 2/2] [IOTDB-2880] Fix import check style (#5629) --- .../procedure/CompletedProcedureCleaner.java | 20 +------------------ .../org/apache/iotdb/procedure/Procedure.java | 6 +++++- .../iotdb/procedure/ProcedureExecutor.java | 12 +++++++++-- .../iotdb/procedure/store/ProcedureWAL.java | 5 ++++- 4 files changed, 20 insertions(+), 23 deletions(-) diff --git a/procedure/src/main/java/org/apache/iotdb/procedure/CompletedProcedureCleaner.java b/procedure/src/main/java/org/apache/iotdb/procedure/CompletedProcedureCleaner.java index 60eca313dc7a..4b319290300a 100644 --- a/procedure/src/main/java/org/apache/iotdb/procedure/CompletedProcedureCleaner.java +++ b/procedure/src/main/java/org/apache/iotdb/procedure/CompletedProcedureCleaner.java @@ -29,25 +29,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -/** - * Internal cleaner that removes the completed procedure results after a TTL. - * - *

NOTE: This is a special case handled in timeoutLoop(). - * - *

Since the client code looks more or less like: - * - *

- *   procId = master.doOperation()
- *   while (master.getProcResult(procId) == ProcInProgress);
- * 
- * - * The master should not throw away the proc result as soon as the procedure is done but should wait - * a result request from the client (see executor.removeResult(procId)) The client will call - * something like master.isProcDone() or master.getProcResult() which will return the result/state - * to the client, and it will mark the completed proc as ready to delete. note that the client may - * not receive the response from the master (e.g. master failover) so, if we delay a bit the real - * deletion of the proc result the client will be able to get the result the next try. - */ +/** Internal cleaner that removes the completed procedure results after a TTL. */ public class CompletedProcedureCleaner extends InternalProcedure { private static final Logger LOG = LoggerFactory.getLogger(CompletedProcedureCleaner.class); diff --git a/procedure/src/main/java/org/apache/iotdb/procedure/Procedure.java b/procedure/src/main/java/org/apache/iotdb/procedure/Procedure.java index 6cbb3ec69af1..9122bf610ab3 100644 --- a/procedure/src/main/java/org/apache/iotdb/procedure/Procedure.java +++ b/procedure/src/main/java/org/apache/iotdb/procedure/Procedure.java @@ -19,7 +19,11 @@ package org.apache.iotdb.procedure; -import org.apache.iotdb.procedure.exception.*; +import org.apache.iotdb.procedure.exception.ProcedureAbortedException; +import org.apache.iotdb.procedure.exception.ProcedureException; +import org.apache.iotdb.procedure.exception.ProcedureSuspendedException; +import org.apache.iotdb.procedure.exception.ProcedureTimeoutException; +import org.apache.iotdb.procedure.exception.ProcedureYieldException; import org.apache.iotdb.procedure.store.IProcedureStore; import org.apache.iotdb.service.rpc.thrift.ProcedureState; diff --git a/procedure/src/main/java/org/apache/iotdb/procedure/ProcedureExecutor.java b/procedure/src/main/java/org/apache/iotdb/procedure/ProcedureExecutor.java index c8a8c5b2687a..cddc7796dfe6 100644 --- a/procedure/src/main/java/org/apache/iotdb/procedure/ProcedureExecutor.java +++ b/procedure/src/main/java/org/apache/iotdb/procedure/ProcedureExecutor.java @@ -33,8 +33,16 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Deque; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; diff --git a/procedure/src/main/java/org/apache/iotdb/procedure/store/ProcedureWAL.java b/procedure/src/main/java/org/apache/iotdb/procedure/store/ProcedureWAL.java index 2be9831008a5..9400d49777dd 100644 --- a/procedure/src/main/java/org/apache/iotdb/procedure/store/ProcedureWAL.java +++ b/procedure/src/main/java/org/apache/iotdb/procedure/store/ProcedureWAL.java @@ -24,7 +24,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files;