From 9b968de7ad9a4e47902edbdb39ed4c69256cded2 Mon Sep 17 00:00:00 2001 From: Jialin Qiao Date: Thu, 28 May 2020 16:30:20 +0800 Subject: [PATCH] [IOTDB-719] add avg_series_point_number_threshold in config (#1278) * add avg_series_point_number_threshold in config --- docs/UserGuide/Server/Config Manual.md | 36 +++++++++++++++++ docs/zh/UserGuide/Server/Config Manual.md | 36 +++++++++++++++++ .../resources/conf/iotdb-engine.properties | 2 + .../org/apache/iotdb/db/conf/IoTDBConfig.java | 13 ++++++ .../apache/iotdb/db/conf/IoTDBDescriptor.java | 8 +++- .../db/engine/flush/TsFileFlushPolicy.java | 8 ++-- .../db/engine/memtable/AbstractMemTable.java | 40 +++++++++++++++++-- .../iotdb/db/engine/memtable/IMemTable.java | 10 +++++ .../db/engine/memtable/IWritableMemChunk.java | 5 ++- .../db/engine/memtable/WritableMemChunk.java | 4 +- .../engine/storagegroup/TsFileProcessor.java | 24 +++++++++-- .../iotdb/db/qp/executor/PlanExecutor.java | 10 ++++- .../iotdb/db/qp/physical/crud/InsertPlan.java | 24 +++++------ .../iotdb/db/utils/datastructure/TVList.java | 4 +- .../iotdb/db/engine/storagegroup/TTLTest.java | 9 +++-- .../iotdb/db/utils/EnvironmentUtils.java | 1 + 16 files changed, 198 insertions(+), 36 deletions(-) diff --git a/docs/UserGuide/Server/Config Manual.md b/docs/UserGuide/Server/Config Manual.md index 86f816527371..dbc40fb5d25c 100644 --- a/docs/UserGuide/Server/Config Manual.md +++ b/docs/UserGuide/Server/Config Manual.md @@ -272,6 +272,42 @@ The permission definitions are in ${IOTDB\_CONF}/conf/jmx.access. |Default| true | |Effective|Trigger| +* enable\_parameter\_adapter + +|Name| enable\_parameter\_adapter | +|:---:|:---| +|Description| enable dynamically adjusting system to avoid OOM| +|Type|Bool| +|Default| true | +|Effective|After restart system| + +* memtable\_size\_threshold + +|Name| memtable\_size\_threshold | +|:---:|:---| +|Description| max memtable size| +|Type|Long| +|Default| 1073741824 | +|Effective| when enable\_parameter\_adapter is false & After restart system| + +* avg\_series\_point\_number\_threshold + +|Name| avg\_series\_point\_number\_threshold | +|:---:|:---| +|Description| max average number of point of each series in memtable| +|Type|Int32| +|Default| 5000 | +|Effective|After restart system| + +* tsfile\_size\_threshold + +|Name| tsfile\_size\_threshold | +|:---:|:---| +|Description| max tsfile size| +|Type|Long| +|Default| 536870912 | +|Effective| when enable\_parameter\_adapter is false & After restart system| + * enable\_partition |Name| enable\_partition | diff --git a/docs/zh/UserGuide/Server/Config Manual.md b/docs/zh/UserGuide/Server/Config Manual.md index d75c9789f6bb..7589fed08670 100644 --- a/docs/zh/UserGuide/Server/Config Manual.md +++ b/docs/zh/UserGuide/Server/Config Manual.md @@ -253,6 +253,42 @@ |默认值| 0 | |改后生效方式|重启服务器生效| +* enable\_parameter\_adapter + +|Name| enable\_parameter\_adapter | +|:---:|:---| +|Description| 开启自动调整系统参数,避免爆内存| +|Type|Bool| +|Default| true | +|Effective|重启服务器生效| + +* memtable\_size\_threshold + +|Name| memtable\_size\_threshold | +|:---:|:---| +|Description| 内存缓冲区 memtable 阈值| +|Type|Long| +|Default| 1073741824 | +|Effective|enable\_parameter\_adapter为false时生效、重启服务器生效| + +* avg\_series\_point\_number\_threshold + +|Name| avg\_series\_point\_number\_threshold | +|:---:|:---| +|Description| 内存中平均每个时间序列点数最大值,达到触发flush| +|Type|Int32| +|Default| 5000 | +|Effective|重启服务器生效| + +* tsfile\_size\_threshold + +|Name| tsfile\_size\_threshold | +|:---:|:---| +|Description| 每个 tsfile 大小| +|Type|Long| +|Default| 536870912 | +|Effective|enable\_parameter\_adapter为false时生效、重启服务器生效| + * enable\_partition |Name| enable\_partition | diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index e892740d8601..19f1f4038145 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -181,6 +181,8 @@ tsfile_size_threshold=536870912 # When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 1 GB. memtable_size_threshold=1073741824 +avg_series_point_number_threshold=5000 + # How many threads can concurrently flush. When <= 0, use CPU core number. concurrent_flush_thread=0 diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 64d87fa08432..1aecad9eadc3 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -229,6 +229,11 @@ public class IoTDBConfig { */ private long memtableSizeThreshold = 128 * 1024 * 1024L; + /** + * When average series point number reaches this, flush the memtable to disk + */ + private int avgSeriesPointNumberThreshold = 5000; + /** * whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */ @@ -1106,6 +1111,14 @@ public void setMemtableSizeThreshold(long memtableSizeThreshold) { this.memtableSizeThreshold = memtableSizeThreshold; } + public int getAvgSeriesPointNumberThreshold() { + return avgSeriesPointNumberThreshold; + } + + public void setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold) { + this.avgSeriesPointNumberThreshold = avgSeriesPointNumberThreshold; + } + public MergeFileStrategy getMergeFileStrategy() { return mergeFileStrategy; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 40599ffa0836..fe033ee8abdd 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -226,7 +226,7 @@ private void loadProps() { long tsfileSizeThreshold = Long.parseLong(properties .getProperty("tsfile_size_threshold", Long.toString(conf.getTsFileSizeThreshold())).trim()); - if (tsfileSizeThreshold > 0) { + if (tsfileSizeThreshold >= 0) { conf.setTsFileSizeThreshold(tsfileSizeThreshold); } @@ -237,6 +237,10 @@ private void loadProps() { conf.setMemtableSizeThreshold(memTableSizeThreshold); } + conf.setAvgSeriesPointNumberThreshold(Integer.parseInt(properties + .getProperty("avg_series_point_number_threshold", + Integer.toString(conf.getAvgSeriesPointNumberThreshold())))); + conf.setSyncEnable(Boolean .parseBoolean(properties.getProperty("is_sync_enable", Boolean.toString(conf.isSyncEnable())))); @@ -583,7 +587,7 @@ public void loadHotModifiedProps() throws QueryProcessException { long tsfileSizeThreshold = Long.parseLong(properties .getProperty("tsfile_size_threshold", Long.toString(conf.getTsFileSizeThreshold())).trim()); - if (tsfileSizeThreshold > 0 && !conf.isEnableParameterAdapter()) { + if (tsfileSizeThreshold >= 0 && !conf.isEnableParameterAdapter()) { conf.setTsFileSizeThreshold(tsfileSizeThreshold); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java index 7df2508334ff..400a1e177315 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java @@ -40,14 +40,14 @@ class DirectFlushPolicy implements TsFileFlushPolicy{ @Override public void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor tsFileProcessor, boolean isSeq) { - logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}", - tsFileProcessor.getWorkMemTableMemory(), - tsFileProcessor.getTsFileResource().getFile().getAbsolutePath()); - if (tsFileProcessor.shouldClose()) { storageGroupProcessor.asyncCloseOneTsFileProcessor(isSeq, tsFileProcessor); + logger.info("Async close tsfile: {}", + tsFileProcessor.getTsFileResource().getFile().getAbsolutePath()); } else { tsFileProcessor.asyncFlush(); + logger.info("Async flush a memtable to tsfile: {}", + tsFileProcessor.getTsFileResource().getFile().getAbsolutePath()); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index dde8bee883ed..5661dda65b4e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; @@ -41,10 +42,22 @@ public abstract class AbstractMemTable implements IMemTable { private final Map> memTableMap; + private long version = Long.MAX_VALUE; + private List modifications = new ArrayList<>(); + + private int avgSeriesPointNumThreshold = IoTDBDescriptor.getInstance().getConfig() + .getAvgSeriesPointNumberThreshold(); + private long memSize = 0; + private int seriesNumber = 0; + + private long totalPointsNum = 0; + + private long totalPointsNumThreshold = 0; + public AbstractMemTable() { this.memTableMap = new HashMap<>(); } @@ -75,6 +88,8 @@ private IWritableMemChunk createIfNotExistAndGet(String deviceId, String measure Map memSeries = memTableMap.get(deviceId); if (!memSeries.containsKey(measurement)) { memSeries.put(measurement, genMemSeries(schema)); + seriesNumber++; + totalPointsNumThreshold += avgSeriesPointNumThreshold; } return memSeries.get(measurement); } @@ -91,6 +106,8 @@ public void insert(InsertPlan insertPlan) { write(insertPlan.getDeviceId(), insertPlan.getMeasurements()[i], insertPlan.getSchemas()[i], insertPlan.getTime(), value); } + + totalPointsNum += insertPlan.getValues().length; } @Override @@ -98,8 +115,8 @@ public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end) throws WriteProcessException { try { write(insertTabletPlan, start, end); - long recordSizeInByte = MemUtils.getRecordSize(insertTabletPlan, start, end); - memSize += recordSizeInByte; + memSize += MemUtils.getRecordSize(insertTabletPlan, start, end); + totalPointsNum += insertTabletPlan.getMeasurements().length * (end - start); } catch (RuntimeException e) { throw new WriteProcessException(e.getMessage()); } @@ -124,6 +141,14 @@ public void write(InsertTabletPlan insertTabletPlan, int start, int end) { } + public int getSeriesNumber() { + return seriesNumber; + } + + public long getTotalPointsNum() { + return totalPointsNum; + } + @Override public long size() { long sum = 0; @@ -140,11 +165,19 @@ public long memSize() { return memSize; } + @Override + public boolean reachTotalPointNumThreshold() { + return totalPointsNum >= totalPointsNumThreshold; + } + @Override public void clear() { memTableMap.clear(); modifications.clear(); memSize = 0; + seriesNumber = 0; + totalPointsNum = 0; + totalPointsNumThreshold = 0; } @Override @@ -190,7 +223,8 @@ public void delete(String deviceId, String measurementId, long timestamp) { if (chunk == null) { return; } - chunk.delete(timestamp); + int deletedPointsNumber = chunk.delete(timestamp); + totalPointsNum -= deletedPointsNumber; } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java index c84d497e8d93..203b1b0e016f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java @@ -56,6 +56,16 @@ void write(String deviceId, String measurement, MeasurementSchema schema, */ long memSize(); + /** + * @return whether the average number of points in each WritableChunk reaches the threshold + */ + boolean reachTotalPointNumThreshold(); + + int getSeriesNumber(); + + long getTotalPointsNum(); + + void insert(InsertPlan insertPlan) throws WriteProcessException; /** diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java index abd8887f86c1..02bb35f023a5 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java @@ -98,5 +98,8 @@ default long getMinTime() { return Long.MIN_VALUE; } - void delete(long upperBound); + /** + * @return how many points are deleted + */ + int delete(long upperBound); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java index 68547a23f4a1..4e115b63ea57 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java @@ -215,8 +215,8 @@ public long getMinTime() { } @Override - public void delete(long upperBound) { - list.delete(upperBound); + public int delete(long upperBound) { + return list.delete(upperBound); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 9433bcb342d5..31c76f28b6b1 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -70,6 +70,9 @@ public class TsFileProcessor { private static final Logger logger = LoggerFactory.getLogger(TsFileProcessor.class); private final String storageGroupName; + + private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + /** * sync this object in query() and asyncTryToFlush() */ @@ -260,8 +263,24 @@ public TsFileResource getTsFileResource() { boolean shouldFlush() { - return workMemTable != null - && workMemTable.memSize() > getMemtableSizeThresholdBasedOnSeriesNum(); + if (workMemTable == null) { + return false; + } + + if (workMemTable.memSize() >= getMemtableSizeThresholdBasedOnSeriesNum()) { + logger.info("The memtable size {} of tsfile {} reaches the threshold", + workMemTable.memSize(), tsFileResource.getFile().getAbsolutePath()); + return true; + } + + if (workMemTable.reachTotalPointNumThreshold()) { + logger.info("The avg series points num {} of tsfile {} reaches the threshold", + workMemTable.getTotalPointsNum() / workMemTable.getSeriesNumber(), + tsFileResource.getFile().getAbsolutePath()); + return true; + } + + return false; } /** @@ -272,7 +291,6 @@ boolean shouldFlush() { * size. We need to adjust it according to the number of timeseries in a specific storage group. */ private long getMemtableSizeThresholdBasedOnSeriesNum() { - IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); if(!config.isEnableParameterAdapter()){ return config.getMemtableSizeThreshold(); } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index 38ab58ac5b5b..ca9c13b3c26c 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -877,7 +877,13 @@ public void insert(InsertPlan insertPlan) throws QueryProcessException { if (!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) { throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurement); } - TSDataType dataType = TypeInferenceUtils.getPredictedDataType(insertPlan.getValues()[i]); + TSDataType dataType; + if (insertPlan.getStrValues() != null) { + // infer type for insert sql + dataType = TypeInferenceUtils.getPredictedDataType(insertPlan.getStrValues()[i]); + } else { + dataType = TypeInferenceUtils.getPredictedDataType(insertPlan.getValues()[i]); + } Path path = new Path(deviceId, measurement); internalCreateTimeseries(path.toString(), dataType); } @@ -886,7 +892,7 @@ public void insert(InsertPlan insertPlan) throws QueryProcessException { // reset measurement to common name instead of alias measurementList[i] = measurementNode.getName(); - if(insertPlan.getStrValueList() == null) { + if(insertPlan.getStrValues() == null) { checkType(insertPlan, i, measurementNode.getSchema().getType()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java index 8174fc21964a..b2c2c2a55ec2 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java @@ -50,16 +50,16 @@ public class InsertPlan extends PhysicalPlan { private TSDataType[] types; private MeasurementSchema[] schemas; - public String[] getStrValueList() { - return strValueList; + public String[] getStrValues() { + return strValues; } - public void setStrValueList(String[] strValueList) { - this.strValueList = strValueList; + public void setStrValues(String[] strValues) { + this.strValues = strValues; } // only for sql - private String[] strValueList; + private String[] strValues; public InsertPlan() { super(false, OperatorType.INSERT); @@ -140,7 +140,7 @@ public InsertPlan(String deviceId, long insertTime, String[] measurementList, // build types and values this.types = new TSDataType[measurements.length]; this.values = new Object[measurements.length]; - this.strValueList = insertValues; + this.strValues = insertValues; canbeSplit = false; } @@ -157,18 +157,14 @@ public MeasurementSchema[] getSchemas() { return schemas; } - public void setSchemas(MeasurementSchema[] schemas) { + public void setSchemas(MeasurementSchema[] schemas) throws QueryProcessException { this.schemas = schemas; - if (strValueList != null) { + if (strValues != null) { for (int i = 0; i < schemas.length; i++) { types[i] = schemas[i].getType(); - try { - values[i] = CommonUtils.parseValue(types[i], strValueList[i]); - } catch (QueryProcessException e) { - e.printStackTrace(); - } + values[i] = CommonUtils.parseValue(types[i], strValues[i]); } - strValueList = null; + strValues = null; } } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index 72568a8cf260..63ac17fc70b2 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -202,7 +202,7 @@ protected void releaseLastTimeArray() { PrimitiveArrayPool.getInstance().release(timestamps.remove(timestamps.size() - 1)); } - public void delete(long upperBound) { + public int delete(long upperBound) { int newSize = 0; minTime = Long.MAX_VALUE; for (int i = 0; i < size; i++) { @@ -212,6 +212,7 @@ public void delete(long upperBound) { minTime = time < minTime ? time : minTime; } } + int deletedNumber = size - newSize; size = newSize; // release primitive arrays that are empty int newArrayNum = newSize / ARRAY_SIZE; @@ -222,6 +223,7 @@ public void delete(long upperBound) { releaseLastTimeArray(); releaseLastValueArray(); } + return deletedNumber; } protected void cloneAs(TVList cloneList) { diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java index 18598a4867f7..302336782dea 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java @@ -126,7 +126,7 @@ public void testSetMetaTTL() throws IOException, MetadataException { } @Test - public void testTTLWrite() throws WriteProcessException { + public void testTTLWrite() throws WriteProcessException, QueryProcessException { InsertPlan insertPlan = new InsertPlan(); insertPlan.setDeviceId(sg1); insertPlan.setTime(System.currentTimeMillis()); @@ -153,7 +153,7 @@ public void testTTLWrite() throws WriteProcessException { storageGroupProcessor.insert(insertPlan); } - private void prepareData() throws WriteProcessException { + private void prepareData() throws WriteProcessException, QueryProcessException { InsertPlan insertPlan = new InsertPlan(); insertPlan.setDeviceId(sg1); insertPlan.setTime(System.currentTimeMillis()); @@ -234,7 +234,8 @@ public void testTTLRead() } @Test - public void testTTLRemoval() throws StorageEngineException, WriteProcessException { + public void testTTLRemoval() + throws StorageEngineException, WriteProcessException, QueryProcessException { prepareData(); storageGroupProcessor.syncCloseAllWorkingTsFileProcessors(); @@ -345,7 +346,7 @@ public void testShowTTL() } @Test - public void testTTLCleanFile() throws WriteProcessException { + public void testTTLCleanFile() throws WriteProcessException, QueryProcessException { prepareData(); storageGroupProcessor.syncCloseAllWorkingTsFileProcessors(); diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java index d64a8b36d775..302ddaaebea7 100644 --- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java +++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java @@ -187,6 +187,7 @@ public static void envSetUp() { IoTDBDescriptor.getInstance().getConfig().setThriftServerAwaitTimeForStopService(0); //we do not start 8181 port in test. IoTDBDescriptor.getInstance().getConfig().setEnableMetricService(false); + IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(Integer.MAX_VALUE); if (daemon == null) { daemon = new IoTDB(); }