From a4a0e2948a86534dbdca6a255a5063e8366ddcde Mon Sep 17 00:00:00 2001 From: spricoder Date: Mon, 22 Jul 2024 09:41:17 +0800 Subject: [PATCH 1/2] fix memory concurrency problem --- .../schemaregion/impl/SchemaRegionMemoryImpl.java | 3 +-- .../schemaregion/impl/SchemaRegionPBTreeImpl.java | 3 +-- .../storageengine/rescon/memory/SystemInfo.java | 15 ++++++++------- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java index e9173cc5888a1..6a0b9d197dfb3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java @@ -193,8 +193,7 @@ public synchronized void init() throws MetadataException { if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) { long memCost = config.getSchemaRatisConsensusLogAppenderBufferSizeMax(); - if (!SystemInfo.getInstance() - .addDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax())) { + if (!SystemInfo.getInstance().addDirectBufferMemoryCost(memCost)) { throw new MetadataException( "Total allocated memory for direct buffer will be " + (SystemInfo.getInstance().getDirectBufferMemoryCost() + memCost) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java index 4dbfaf5c3bf67..9c7a1b5ace821 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java @@ -190,8 +190,7 @@ public synchronized void init() throws MetadataException { if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) { long memCost = config.getSchemaRatisConsensusLogAppenderBufferSizeMax(); - if (!SystemInfo.getInstance() - .addDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax())) { + if (!SystemInfo.getInstance().addDirectBufferMemoryCost(memCost)) { throw new MetadataException( "Total allocated memory for direct buffer will be " + (SystemInfo.getInstance().getDirectBufferMemoryCost() + memCost) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java index 4b0f56f858d62..a64a4fe24bda2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java @@ -40,6 +40,7 @@ import java.util.PriorityQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -196,15 +197,15 @@ public synchronized void resetFlushingMemTableCost(long flushingMemTableCost) { } public boolean addDirectBufferMemoryCost(long size) { - while (true) { - long memCost = directBufferMemoryCost.get(); + AtomicBoolean result = new AtomicBoolean(false); + directBufferMemoryCost.updateAndGet(memCost -> { if (memCost + size > totalDirectBufferMemorySizeLimit) { - return false; + return memCost; } - if (directBufferMemoryCost.compareAndSet(memCost, memCost + size)) { - return true; - } - } + result.set(true); + return memCost + size; + }); + return result.get(); } public void decreaseDirectBufferMemoryCost(long size) { From 64e93d361040e017a699d6c65431c7b317311e8c Mon Sep 17 00:00:00 2001 From: OneSizeFitQuorum Date: Mon, 22 Jul 2024 10:00:20 +0800 Subject: [PATCH 2/2] spotless Signed-off-by: OneSizeFitQuorum --- .../storageengine/rescon/memory/SystemInfo.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java index a64a4fe24bda2..0458320af052a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java @@ -198,13 +198,14 @@ public synchronized void resetFlushingMemTableCost(long flushingMemTableCost) { public boolean addDirectBufferMemoryCost(long size) { AtomicBoolean result = new AtomicBoolean(false); - directBufferMemoryCost.updateAndGet(memCost -> { - if (memCost + size > totalDirectBufferMemorySizeLimit) { - return memCost; - } - result.set(true); - return memCost + size; - }); + directBufferMemoryCost.updateAndGet( + memCost -> { + if (memCost + size > totalDirectBufferMemorySizeLimit) { + return memCost; + } + result.set(true); + return memCost + size; + }); return result.get(); }