Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IOTDB-719] add avg_series_point_number_threshold in config #1278

Merged
merged 8 commits into from
May 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions docs/UserGuide/Server/Config Manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,42 @@ The permission definitions are in ${IOTDB\_CONF}/conf/jmx.access.
|Default| true |
|Effective|Trigger|

* enable\_parameter\_adapter

|Name| enable\_parameter\_adapter |
|:---:|:---|
|Description| enable dynamically adjusting system to avoid OOM|
|Type|Bool|
|Default| true |
|Effective|After restart system|

* memtable\_size\_threshold

|Name| memtable\_size\_threshold |
|:---:|:---|
|Description| max memtable size|
|Type|Long|
|Default| 1073741824 |
|Effective| when enable\_parameter\_adapter is false & After restart system|

* avg\_series\_point\_number\_threshold

|Name| avg\_series\_point\_number\_threshold |
|:---:|:---|
|Description| max average number of point of each series in memtable|
|Type|Int32|
|Default| 5000 |
|Effective|After restart system|

* tsfile\_size\_threshold

|Name| tsfile\_size\_threshold |
|:---:|:---|
|Description| max tsfile size|
|Type|Long|
|Default| 536870912 |
|Effective| when enable\_parameter\_adapter is false & After restart system|

* enable\_partition

|Name| enable\_partition |
Expand Down
36 changes: 36 additions & 0 deletions docs/zh/UserGuide/Server/Config Manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,42 @@
|默认值| 0 |
|改后生效方式|重启服务器生效|

* enable\_parameter\_adapter

|Name| enable\_parameter\_adapter |
|:---:|:---|
|Description| 开启自动调整系统参数,避免爆内存|
|Type|Bool|
|Default| true |
|Effective|重启服务器生效|

* memtable\_size\_threshold

|Name| memtable\_size\_threshold |
|:---:|:---|
|Description| 内存缓冲区 memtable 阈值|
|Type|Long|
|Default| 1073741824 |
|Effective|enable\_parameter\_adapter为false时生效、重启服务器生效|

* avg\_series\_point\_number\_threshold

|Name| avg\_series\_point\_number\_threshold |
|:---:|:---|
|Description| 内存中平均每个时间序列点数最大值,达到触发flush|
|Type|Int32|
|Default| 5000 |
|Effective|重启服务器生效|

* tsfile\_size\_threshold

|Name| tsfile\_size\_threshold |
|:---:|:---|
|Description| 每个 tsfile 大小|
|Type|Long|
|Default| 536870912 |
|Effective|enable\_parameter\_adapter为false时生效、重启服务器生效|

* enable\_partition

|Name| enable\_partition |
Expand Down
2 changes: 2 additions & 0 deletions server/src/assembly/resources/conf/iotdb-engine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ tsfile_size_threshold=536870912
# When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 1 GB.
memtable_size_threshold=1073741824

avg_series_point_number_threshold=5000

# How many threads can concurrently flush. When <= 0, use CPU core number.
concurrent_flush_thread=0

Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ public class IoTDBConfig {
*/
private long memtableSizeThreshold = 128 * 1024 * 1024L;

/**
* When average series point number reaches this, flush the memtable to disk
*/
private int avgSeriesPointNumberThreshold = 5000;

/**
* whether to cache meta data(ChunkMetaData and TsFileMetaData) or not.
*/
Expand Down Expand Up @@ -1106,6 +1111,14 @@ public void setMemtableSizeThreshold(long memtableSizeThreshold) {
this.memtableSizeThreshold = memtableSizeThreshold;
}

public int getAvgSeriesPointNumberThreshold() {
return avgSeriesPointNumberThreshold;
}

public void setAvgSeriesPointNumberThreshold(int avgSeriesPointNumberThreshold) {
this.avgSeriesPointNumberThreshold = avgSeriesPointNumberThreshold;
}

public MergeFileStrategy getMergeFileStrategy() {
return mergeFileStrategy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private void loadProps() {
long tsfileSizeThreshold = Long.parseLong(properties
.getProperty("tsfile_size_threshold",
Long.toString(conf.getTsFileSizeThreshold())).trim());
if (tsfileSizeThreshold > 0) {
if (tsfileSizeThreshold >= 0) {
conf.setTsFileSizeThreshold(tsfileSizeThreshold);
}

Expand All @@ -237,6 +237,10 @@ private void loadProps() {
conf.setMemtableSizeThreshold(memTableSizeThreshold);
}

conf.setAvgSeriesPointNumberThreshold(Integer.parseInt(properties
.getProperty("avg_series_point_number_threshold",
Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));

conf.setSyncEnable(Boolean
.parseBoolean(properties.getProperty("is_sync_enable",
Boolean.toString(conf.isSyncEnable()))));
Expand Down Expand Up @@ -583,7 +587,7 @@ public void loadHotModifiedProps() throws QueryProcessException {
long tsfileSizeThreshold = Long.parseLong(properties
.getProperty("tsfile_size_threshold",
Long.toString(conf.getTsFileSizeThreshold())).trim());
if (tsfileSizeThreshold > 0 && !conf.isEnableParameterAdapter()) {
if (tsfileSizeThreshold >= 0 && !conf.isEnableParameterAdapter()) {
conf.setTsFileSizeThreshold(tsfileSizeThreshold);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ class DirectFlushPolicy implements TsFileFlushPolicy{
@Override
public void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor tsFileProcessor,
boolean isSeq) {
logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
tsFileProcessor.getWorkMemTableMemory(),
tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());

if (tsFileProcessor.shouldClose()) {
storageGroupProcessor.asyncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
logger.info("Async close tsfile: {}",
tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
} else {
tsFileProcessor.asyncFlush();
logger.info("Async flush a memtable to tsfile: {}",
tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
Expand All @@ -41,10 +42,22 @@
public abstract class AbstractMemTable implements IMemTable {

private final Map<String, Map<String, IWritableMemChunk>> memTableMap;

private long version = Long.MAX_VALUE;

private List<Modification> modifications = new ArrayList<>();

private int avgSeriesPointNumThreshold = IoTDBDescriptor.getInstance().getConfig()
.getAvgSeriesPointNumberThreshold();

private long memSize = 0;

private int seriesNumber = 0;

private long totalPointsNum = 0;

private long totalPointsNumThreshold = 0;

public AbstractMemTable() {
this.memTableMap = new HashMap<>();
}
Expand Down Expand Up @@ -75,6 +88,8 @@ private IWritableMemChunk createIfNotExistAndGet(String deviceId, String measure
Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId);
if (!memSeries.containsKey(measurement)) {
memSeries.put(measurement, genMemSeries(schema));
seriesNumber++;
totalPointsNumThreshold += avgSeriesPointNumThreshold;
}
return memSeries.get(measurement);
}
Expand All @@ -91,15 +106,17 @@ public void insert(InsertPlan insertPlan) {
write(insertPlan.getDeviceId(), insertPlan.getMeasurements()[i],
insertPlan.getSchemas()[i], insertPlan.getTime(), value);
}

totalPointsNum += insertPlan.getValues().length;
}

@Override
public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end)
throws WriteProcessException {
try {
write(insertTabletPlan, start, end);
long recordSizeInByte = MemUtils.getRecordSize(insertTabletPlan, start, end);
memSize += recordSizeInByte;
memSize += MemUtils.getRecordSize(insertTabletPlan, start, end);
totalPointsNum += insertTabletPlan.getMeasurements().length * (end - start);
} catch (RuntimeException e) {
throw new WriteProcessException(e.getMessage());
}
Expand All @@ -124,6 +141,14 @@ public void write(InsertTabletPlan insertTabletPlan, int start, int end) {
}


public int getSeriesNumber() {
return seriesNumber;
}

public long getTotalPointsNum() {
return totalPointsNum;
}

@Override
public long size() {
long sum = 0;
Expand All @@ -140,11 +165,19 @@ public long memSize() {
return memSize;
}

@Override
public boolean reachTotalPointNumThreshold() {
return totalPointsNum >= totalPointsNumThreshold;
}

@Override
public void clear() {
memTableMap.clear();
modifications.clear();
memSize = 0;
seriesNumber = 0;
totalPointsNum = 0;
totalPointsNumThreshold = 0;
}

@Override
Expand Down Expand Up @@ -190,7 +223,8 @@ public void delete(String deviceId, String measurementId, long timestamp) {
if (chunk == null) {
return;
}
chunk.delete(timestamp);
int deletedPointsNumber = chunk.delete(timestamp);
totalPointsNum -= deletedPointsNumber;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ void write(String deviceId, String measurement, MeasurementSchema schema,
*/
long memSize();

/**
* @return whether the average number of points in each WritableChunk reaches the threshold
*/
boolean reachTotalPointNumThreshold();

int getSeriesNumber();

long getTotalPointsNum();


void insert(InsertPlan insertPlan) throws WriteProcessException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,8 @@ default long getMinTime() {
return Long.MIN_VALUE;
}

void delete(long upperBound);
/**
* @return how many points are deleted
*/
int delete(long upperBound);
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ public long getMinTime() {
}

@Override
public void delete(long upperBound) {
list.delete(upperBound);
public int delete(long upperBound) {
return list.delete(upperBound);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class TsFileProcessor {

private static final Logger logger = LoggerFactory.getLogger(TsFileProcessor.class);
private final String storageGroupName;

private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

/**
* sync this object in query() and asyncTryToFlush()
*/
Expand Down Expand Up @@ -260,8 +263,24 @@ public TsFileResource getTsFileResource() {


boolean shouldFlush() {
return workMemTable != null
&& workMemTable.memSize() > getMemtableSizeThresholdBasedOnSeriesNum();
if (workMemTable == null) {
return false;
}

if (workMemTable.memSize() >= getMemtableSizeThresholdBasedOnSeriesNum()) {
logger.info("The memtable size {} of tsfile {} reaches the threshold",
workMemTable.memSize(), tsFileResource.getFile().getAbsolutePath());
return true;
}

if (workMemTable.reachTotalPointNumThreshold()) {
logger.info("The avg series points num {} of tsfile {} reaches the threshold",
workMemTable.getTotalPointsNum() / workMemTable.getSeriesNumber(),
tsFileResource.getFile().getAbsolutePath());
return true;
}

return false;
}

/**
Expand All @@ -272,7 +291,6 @@ boolean shouldFlush() {
* size. We need to adjust it according to the number of timeseries in a specific storage group.
*/
private long getMemtableSizeThresholdBasedOnSeriesNum() {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
if(!config.isEnableParameterAdapter()){
return config.getMemtableSizeThreshold();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,13 @@ public void insert(InsertPlan insertPlan) throws QueryProcessException {
if (!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurement);
}
TSDataType dataType = TypeInferenceUtils.getPredictedDataType(insertPlan.getValues()[i]);
TSDataType dataType;
if (insertPlan.getStrValues() != null) {
// infer type for insert sql
dataType = TypeInferenceUtils.getPredictedDataType(insertPlan.getStrValues()[i]);
} else {
dataType = TypeInferenceUtils.getPredictedDataType(insertPlan.getValues()[i]);
}
Path path = new Path(deviceId, measurement);
internalCreateTimeseries(path.toString(), dataType);
}
Expand All @@ -886,7 +892,7 @@ public void insert(InsertPlan insertPlan) throws QueryProcessException {
// reset measurement to common name instead of alias
measurementList[i] = measurementNode.getName();

if(insertPlan.getStrValueList() == null) {
if(insertPlan.getStrValues() == null) {
checkType(insertPlan, i, measurementNode.getSchema().getType());
}
}
Expand Down
Loading