From 7d5d206a63f5461d2a808fda94c97ba533a35cd1 Mon Sep 17 00:00:00 2001 From: Haonan Date: Mon, 6 Mar 2023 17:27:16 +0800 Subject: [PATCH] [IOTDB-5620] Fix flush stuck when there is a lot of time partitions in each DataRegion (#9218) --- .../iotdb/it/env/cluster/MppCommonConfig.java | 6 ++ .../it/env/cluster/MppSharedCommonConfig.java | 7 ++ .../it/env/remote/RemoteCommonConfig.java | 5 ++ .../apache/iotdb/itbase/env/CommonConfig.java | 2 + .../db/it/IoTDBInsertMultiPartitionIT.java | 71 +++++++++++++++++++ .../org/apache/iotdb/db/conf/IoTDBConfig.java | 1 - .../apache/iotdb/db/conf/IoTDBDescriptor.java | 2 +- .../storagegroup/TimePartitionManager.java | 9 ++- 8 files changed, 100 insertions(+), 3 deletions(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBInsertMultiPartitionIT.java diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java index c1bcbde4ec2f2..fe9f3a6767f02 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java @@ -334,4 +334,10 @@ public CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate) { setProperty("schema_memory_allocate_proportion", String.valueOf(schemaMemoryAllocate)); return this; } + + @Override + public CommonConfig setWriteMemoryProportion(String writeMemoryProportion) { + setProperty("write_memory_proportion", writeMemoryProportion); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java index ddaba5a331668..275ed1f8f8db5 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java @@ -343,4 +343,11 @@ public CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate) { cnConfig.setSchemaMemoryAllocate(schemaMemoryAllocate); return this; } + + @Override + public CommonConfig setWriteMemoryProportion(String writeMemoryProportion) { + dnConfig.setWriteMemoryProportion(writeMemoryProportion); + cnConfig.setWriteMemoryProportion(writeMemoryProportion); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java index 440b38253e106..526df5dbab8e6 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java @@ -246,4 +246,9 @@ public CommonConfig setSeriesSlotNum(int seriesSlotNum) { public CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate) { return this; } + + @Override + public CommonConfig setWriteMemoryProportion(String writeMemoryProportion) { + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index 59dcd778ab91e..4792b8b3e7b44 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -110,4 +110,6 @@ CommonConfig setEnableAutoLeaderBalanceForIoTConsensus( CommonConfig setSeriesSlotNum(int seriesSlotNum); CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate); + + CommonConfig setWriteMemoryProportion(String writeMemoryProportion); } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBInsertMultiPartitionIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBInsertMultiPartitionIT.java new file mode 100644 index 0000000000000..fac1175f74e45 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBInsertMultiPartitionIT.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.itbase.category.RemoteIT; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.Statement; + +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class, ClusterIT.class, RemoteIT.class}) +public class IoTDBInsertMultiPartitionIT { + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getDataNodeCommonConfig() + .setWriteMemoryProportion("10000000:1"); + EnvFactory.getEnv().initClusterEnvironment(); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testInsertMultiPartition() { + + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg.d1(time,s1) values(1,2)"); + statement.execute("flush"); + statement.execute("insert into root.sg.d1(time,s1) values(2,2)"); + statement.execute("insert into root.sg.d1(time,s1) values(604800001,2)"); + statement.execute("flush"); + } catch (Exception e) { + fail(e.getMessage()); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 75a461b006fa9..70a0b0f3da568 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1785,7 +1785,6 @@ public long getAllocateMemoryForStorageEngine() { public void setAllocateMemoryForStorageEngine(long allocateMemoryForStorageEngine) { this.allocateMemoryForStorageEngine = allocateMemoryForStorageEngine; - this.allocateMemoryForTimePartitionInfo = allocateMemoryForStorageEngine * 50 / 1001; } public long getAllocateMemoryForSchema() { diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index a4c40333abe1a..0432754ab73ff 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -1612,7 +1612,7 @@ private void initStorageEngineAllocate(Properties properties) { double memtableProportionForWrite = ((double) (proportionForMemTable) / (double) (proportionForMemTable + proportionForTimePartitionInfo)); - Double.parseDouble(properties.getProperty("flush_time_memory_proportion", "0.05")); + double timePartitionInfoForWrite = ((double) (proportionForTimePartitionInfo) / (double) (proportionForMemTable + proportionForTimePartitionInfo)); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java index 1f77b36b8283c..640f8dfb959d2 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java @@ -24,6 +24,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.HashMap; import java.util.Map; import java.util.TreeMap; @@ -31,6 +34,7 @@ /** Manage all the time partitions for all data regions and control the total memory of them */ public class TimePartitionManager { + private static final Logger logger = LoggerFactory.getLogger(TimePartitionManager.class); final Map> timePartitionInfoMap; long memCost = 0; @@ -102,7 +106,10 @@ private void evictOldPartition() { } while (memCost > timePartitionInfoMemoryThreshold) { - TimePartitionInfo timePartitionInfo = treeSet.first(); + TimePartitionInfo timePartitionInfo = treeSet.pollFirst(); + if (timePartitionInfo == null) { + return; + } memCost -= timePartitionInfo.memSize; DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(timePartitionInfo.dataRegionId);