Skip to content

Commit

Permalink
[IOTDB-719] add avg_series_point_number_threshold in config (#1278)
Browse files Browse the repository at this point in the history
* add avg_series_point_number_threshold in config
  • Loading branch information
Jialin Qiao committed May 28, 2020
1 parent 4e7be1c commit 9b968de
Show file tree
Hide file tree
Showing 16 changed files with 198 additions and 36 deletions.
36 changes: 36 additions & 0 deletions docs/UserGuide/Server/Config Manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,42 @@ The permission definitions are in ${IOTDB\_CONF}/conf/jmx.access.
|Default| true |
|Effective|Trigger|

* enable\_parameter\_adapter

|Name| enable\_parameter\_adapter |
|:---:|:---|
|Description| enable dynamically adjusting system to avoid OOM|
|Type|Bool|
|Default| true |
|Effective|After restart system|

* memtable\_size\_threshold

|Name| memtable\_size\_threshold |
|:---:|:---|
|Description| max memtable size|
|Type|Long|
|Default| 1073741824 |
|Effective| when enable\_parameter\_adapter is false & After restart system|

* avg\_series\_point\_number\_threshold

|Name| avg\_series\_point\_number\_threshold |
|:---:|:---|
|Description| max average number of point of each series in memtable|
|Type|Int32|
|Default| 5000 |
|Effective|After restart system|

* tsfile\_size\_threshold

|Name| tsfile\_size\_threshold |
|:---:|:---|
|Description| max tsfile size|
|Type|Long|
|Default| 536870912 |
|Effective| when enable\_parameter\_adapter is false & After restart system|

* enable\_partition

|Name| enable\_partition |
Expand Down
36 changes: 36 additions & 0 deletions docs/zh/UserGuide/Server/Config Manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,42 @@
|默认值| 0 |
|改后生效方式|重启服务器生效|

* enable\_parameter\_adapter

|Name| enable\_parameter\_adapter |
|:---:|:---|
|Description| 开启自动调整系统参数,避免爆内存|
|Type|Bool|
|Default| true |
|Effective|重启服务器生效|

* memtable\_size\_threshold

|Name| memtable\_size\_threshold |
|:---:|:---|
|Description| 内存缓冲区 memtable 阈值|
|Type|Long|
|Default| 1073741824 |
|Effective|enable\_parameter\_adapter为false时生效、重启服务器生效|

* avg\_series\_point\_number\_threshold

|Name| avg\_series\_point\_number\_threshold |
|:---:|:---|
|Description| 内存中平均每个时间序列点数最大值,达到触发flush|
|Type|Int32|
|Default| 5000 |
|Effective|重启服务器生效|

* tsfile\_size\_threshold

|Name| tsfile\_size\_threshold |
|:---:|:---|
|Description| 每个 tsfile 大小|
|Type|Long|
|Default| 536870912 |
|Effective|enable\_parameter\_adapter为false时生效、重启服务器生效|

* enable\_partition

|Name| enable\_partition |
Expand Down
2 changes: 2 additions & 0 deletions server/src/assembly/resources/conf/iotdb-engine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ tsfile_size_threshold=536870912
# When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 1 GB.
memtable_size_threshold=1073741824

avg_series_point_number_threshold=5000

# How many threads can concurrently flush. When <= 0, use CPU core number.
concurrent_flush_thread=0

Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ public class IoTDBConfig {
*/
private long memtableSizeThreshold = 128 * 1024 * 1024L;

/**
* When average series point number reaches this, flush the memtable to disk
*/
private int avgSeriesPointNumberThreshold = 5000;

/**
* whether to cache meta data(ChunkMetaData and TsFileMetaData) or not.
*/
Expand Down Expand Up @@ -1106,6 +1111,14 @@ public void setMemtableSizeThreshold(long memtableSizeThreshold) {
this.memtableSizeThreshold = memtableSizeThreshold;
}

public int getAvgSeriesPointNumberThreshold() {
return avgSeriesPointNumberThreshold;
}

public void setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold) {
this.avgSeriesPointNumberThreshold = avgSeriesPointNumberThreshold;
}

public MergeFileStrategy getMergeFileStrategy() {
return mergeFileStrategy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private void loadProps() {
long tsfileSizeThreshold = Long.parseLong(properties
.getProperty("tsfile_size_threshold",
Long.toString(conf.getTsFileSizeThreshold())).trim());
if (tsfileSizeThreshold > 0) {
if (tsfileSizeThreshold >= 0) {
conf.setTsFileSizeThreshold(tsfileSizeThreshold);
}

Expand All @@ -237,6 +237,10 @@ private void loadProps() {
conf.setMemtableSizeThreshold(memTableSizeThreshold);
}

conf.setAvgSeriesPointNumberThreshold(Integer.parseInt(properties
.getProperty("avg_series_point_number_threshold",
Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));

conf.setSyncEnable(Boolean
.parseBoolean(properties.getProperty("is_sync_enable",
Boolean.toString(conf.isSyncEnable()))));
Expand Down Expand Up @@ -583,7 +587,7 @@ public void loadHotModifiedProps() throws QueryProcessException {
long tsfileSizeThreshold = Long.parseLong(properties
.getProperty("tsfile_size_threshold",
Long.toString(conf.getTsFileSizeThreshold())).trim());
if (tsfileSizeThreshold > 0 && !conf.isEnableParameterAdapter()) {
if (tsfileSizeThreshold >= 0 && !conf.isEnableParameterAdapter()) {
conf.setTsFileSizeThreshold(tsfileSizeThreshold);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ class DirectFlushPolicy implements TsFileFlushPolicy{
@Override
public void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor tsFileProcessor,
boolean isSeq) {
logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
tsFileProcessor.getWorkMemTableMemory(),
tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());

if (tsFileProcessor.shouldClose()) {
storageGroupProcessor.asyncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
logger.info("Async close tsfile: {}",
tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
} else {
tsFileProcessor.asyncFlush();
logger.info("Async flush a memtable to tsfile: {}",
tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
Expand All @@ -41,10 +42,22 @@
public abstract class AbstractMemTable implements IMemTable {

private final Map<String, Map<String, IWritableMemChunk>> memTableMap;

private long version = Long.MAX_VALUE;

private List<Modification> modifications = new ArrayList<>();

private int avgSeriesPointNumThreshold = IoTDBDescriptor.getInstance().getConfig()
.getAvgSeriesPointNumberThreshold();

private long memSize = 0;

private int seriesNumber = 0;

private long totalPointsNum = 0;

private long totalPointsNumThreshold = 0;

public AbstractMemTable() {
this.memTableMap = new HashMap<>();
}
Expand Down Expand Up @@ -75,6 +88,8 @@ private IWritableMemChunk createIfNotExistAndGet(String deviceId, String measure
Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId);
if (!memSeries.containsKey(measurement)) {
memSeries.put(measurement, genMemSeries(schema));
seriesNumber++;
totalPointsNumThreshold += avgSeriesPointNumThreshold;
}
return memSeries.get(measurement);
}
Expand All @@ -91,15 +106,17 @@ public void insert(InsertPlan insertPlan) {
write(insertPlan.getDeviceId(), insertPlan.getMeasurements()[i],
insertPlan.getSchemas()[i], insertPlan.getTime(), value);
}

totalPointsNum += insertPlan.getValues().length;
}

@Override
public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end)
throws WriteProcessException {
try {
write(insertTabletPlan, start, end);
long recordSizeInByte = MemUtils.getRecordSize(insertTabletPlan, start, end);
memSize += recordSizeInByte;
memSize += MemUtils.getRecordSize(insertTabletPlan, start, end);
totalPointsNum += insertTabletPlan.getMeasurements().length * (end - start);
} catch (RuntimeException e) {
throw new WriteProcessException(e.getMessage());
}
Expand All @@ -124,6 +141,14 @@ public void write(InsertTabletPlan insertTabletPlan, int start, int end) {
}


public int getSeriesNumber() {
return seriesNumber;
}

public long getTotalPointsNum() {
return totalPointsNum;
}

@Override
public long size() {
long sum = 0;
Expand All @@ -140,11 +165,19 @@ public long memSize() {
return memSize;
}

@Override
public boolean reachTotalPointNumThreshold() {
return totalPointsNum >= totalPointsNumThreshold;
}

@Override
public void clear() {
memTableMap.clear();
modifications.clear();
memSize = 0;
seriesNumber = 0;
totalPointsNum = 0;
totalPointsNumThreshold = 0;
}

@Override
Expand Down Expand Up @@ -190,7 +223,8 @@ public void delete(String deviceId, String measurementId, long timestamp) {
if (chunk == null) {
return;
}
chunk.delete(timestamp);
int deletedPointsNumber = chunk.delete(timestamp);
totalPointsNum -= deletedPointsNumber;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ void write(String deviceId, String measurement, MeasurementSchema schema,
*/
long memSize();

/**
* @return whether the average number of points in each WritableChunk reaches the threshold
*/
boolean reachTotalPointNumThreshold();

int getSeriesNumber();

long getTotalPointsNum();


void insert(InsertPlan insertPlan) throws WriteProcessException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,8 @@ default long getMinTime() {
return Long.MIN_VALUE;
}

void delete(long upperBound);
/**
* @return how many points are deleted
*/
int delete(long upperBound);
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ public long getMinTime() {
}

@Override
public void delete(long upperBound) {
list.delete(upperBound);
public int delete(long upperBound) {
return list.delete(upperBound);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class TsFileProcessor {

private static final Logger logger = LoggerFactory.getLogger(TsFileProcessor.class);
private final String storageGroupName;

private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

/**
* sync this object in query() and asyncTryToFlush()
*/
Expand Down Expand Up @@ -260,8 +263,24 @@ public TsFileResource getTsFileResource() {


boolean shouldFlush() {
return workMemTable != null
&& workMemTable.memSize() > getMemtableSizeThresholdBasedOnSeriesNum();
if (workMemTable == null) {
return false;
}

if (workMemTable.memSize() >= getMemtableSizeThresholdBasedOnSeriesNum()) {
logger.info("The memtable size {} of tsfile {} reaches the threshold",
workMemTable.memSize(), tsFileResource.getFile().getAbsolutePath());
return true;
}

if (workMemTable.reachTotalPointNumThreshold()) {
logger.info("The avg series points num {} of tsfile {} reaches the threshold",
workMemTable.getTotalPointsNum() / workMemTable.getSeriesNumber(),
tsFileResource.getFile().getAbsolutePath());
return true;
}

return false;
}

/**
Expand All @@ -272,7 +291,6 @@ boolean shouldFlush() {
* size. We need to adjust it according to the number of timeseries in a specific storage group.
*/
private long getMemtableSizeThresholdBasedOnSeriesNum() {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
if(!config.isEnableParameterAdapter()){
return config.getMemtableSizeThreshold();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,13 @@ public void insert(InsertPlan insertPlan) throws QueryProcessException {
if (!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurement);
}
TSDataType dataType = TypeInferenceUtils.getPredictedDataType(insertPlan.getValues()[i]);
TSDataType dataType;
if (insertPlan.getStrValues() != null) {
// infer type for insert sql
dataType = TypeInferenceUtils.getPredictedDataType(insertPlan.getStrValues()[i]);
} else {
dataType = TypeInferenceUtils.getPredictedDataType(insertPlan.getValues()[i]);
}
Path path = new Path(deviceId, measurement);
internalCreateTimeseries(path.toString(), dataType);
}
Expand All @@ -886,7 +892,7 @@ public void insert(InsertPlan insertPlan) throws QueryProcessException {
// reset measurement to common name instead of alias
measurementList[i] = measurementNode.getName();

if(insertPlan.getStrValueList() == null) {
if(insertPlan.getStrValues() == null) {
checkType(insertPlan, i, measurementNode.getSchema().getType());
}
}
Expand Down
Loading

0 comments on commit 9b968de

Please sign in to comment.