Skip to content

Commit

Permalink
avoid flushing empty memtable (#926)
Browse files Browse the repository at this point in the history
* avoid flushing empty nomal memtable
  • Loading branch information
Jialin Qiao committed Mar 20, 2020
1 parent 7dcef6f commit a5cfb37
Show file tree
Hide file tree
Showing 18 changed files with 113 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
Expand Down Expand Up @@ -253,23 +252,21 @@ public synchronized void reset() {
*
* @param insertPlan physical plan of insertion
*/
public void insert(InsertPlan insertPlan)
throws StorageEngineException, QueryProcessException {
public void insert(InsertPlan insertPlan) throws StorageEngineException {

StorageGroupProcessor storageGroupProcessor;
try {
storageGroupProcessor = getProcessor(insertPlan.getDeviceId());
} catch (StorageEngineException e) {
logger.warn("get StorageGroupProcessor of device {} failed, because {}",
insertPlan.getDeviceId(), e.getMessage(), e);
throw new StorageEngineException(e);
} catch (Exception e) {
throw new StorageEngineException(
"get StorageGroupProcessor of device failed: " + insertPlan.getDeviceId(), e);
}

// TODO monitor: update statistics
try {
storageGroupProcessor.insert(insertPlan);
} catch (QueryProcessException e) {
throw new QueryProcessException(e);
} catch (WriteProcessException e) {
throw new StorageEngineException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.rescon.TVListAllocator;
Expand All @@ -37,7 +37,6 @@
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Binary;

public abstract class AbstractMemTable implements IMemTable {

Expand Down Expand Up @@ -86,7 +85,7 @@ private IWritableMemChunk createIfNotExistAndGet(String deviceId, String measure
protected abstract IWritableMemChunk genMemSeries(TSDataType dataType);

@Override
public void insert(InsertPlan insertPlan) throws QueryProcessException {
public void insert(InsertPlan insertPlan) throws WriteProcessException {
try {
for (int i = 0; i < insertPlan.getValues().length; i++) {

Expand All @@ -96,20 +95,20 @@ public void insert(InsertPlan insertPlan) throws QueryProcessException {
}
long recordSizeInByte = MemUtils.getRecordSize(insertPlan);
memSize += recordSizeInByte;
} catch (RuntimeException e) {
throw new QueryProcessException(e.getMessage());
} catch (Exception e) {
throw new WriteProcessException(e.getMessage());
}
}

@Override
public void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end)
throws QueryProcessException {
throws WriteProcessException {
try {
write(batchInsertPlan, start, end);
long recordSizeInByte = MemUtils.getRecordSize(batchInsertPlan, start, end);
memSize += recordSizeInByte;
} catch (RuntimeException e) {
throw new QueryProcessException(e.getMessage());
throw new WriteProcessException(e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
Expand Down Expand Up @@ -54,13 +55,13 @@ void write(String deviceId, String measurement, TSDataType dataType,
*/
long memSize();

void insert(InsertPlan insertPlan) throws QueryProcessException;
void insert(InsertPlan insertPlan) throws WriteProcessException;

/**
* [start, end)
*/
void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end)
throws QueryProcessException;
throws WriteProcessException;

ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType,
TSEncoding encoding, Map<String, String> props, long timeLowerBound)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,7 @@ private void recover() throws StorageGroupProcessorException {
.putAll(resource.getEndTimeMap());
partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>())
.putAll(resource.getEndTimeMap());

for (Map.Entry<String, Long> mapEntry : resource.getEndTimeMap().entrySet()) {
if (!globalLatestFlushedTimeForEachDevice.containsKey(mapEntry.getKey())
|| globalLatestFlushedTimeForEachDevice.get(mapEntry.getKey())
< mapEntry.getValue()) {
globalLatestFlushedTimeForEachDevice.put(mapEntry.getKey(), mapEntry.getValue());
}
}
globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
}
}
}
Expand Down Expand Up @@ -466,7 +459,7 @@ public void addMeasurement(String measurementId, TSDataType dataType, TSEncoding
}
}

public void insert(InsertPlan insertPlan) throws QueryProcessException {
public void insert(InsertPlan insertPlan) throws WriteProcessException {
// reject insertions that are out of ttl
if (!checkTTL(insertPlan.getTime())) {
throw new OutOfTTLException(insertPlan.getTime(), (System.currentTimeMillis() - dataTTL));
Expand All @@ -475,15 +468,15 @@ public void insert(InsertPlan insertPlan) throws QueryProcessException {
try {
// init map
long timePartitionId = StorageEngine.fromTimeToTimePartition(insertPlan.getTime());
latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
.putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE);
partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>())
.putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE);

latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>());
partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>());

// insert to sequence or unSequence file
insertToTsFileProcessor(insertPlan,
insertPlan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
.get(insertPlan.getDeviceId()));
.getOrDefault(insertPlan.getDeviceId(), Long.MIN_VALUE));

} finally {
writeUnlock();
}
Expand Down Expand Up @@ -584,7 +577,7 @@ private boolean checkTTL(long time) {
private void insertBatchToTsFileProcessor(BatchInsertPlan batchInsertPlan,
int start, int end, boolean sequence, TSStatus[] results, long timePartitionId)
throws WriteProcessException {
// return when start <= end
// return when start >= end
if (start >= end) {
return;
}
Expand All @@ -598,23 +591,24 @@ private void insertBatchToTsFileProcessor(BatchInsertPlan batchInsertPlan,
return;
}

tsFileProcessor.insertBatch(batchInsertPlan, start, end, results);
try {
tsFileProcessor.insertBatch(batchInsertPlan, start, end, results);
} catch (WriteProcessException e) {
logger.error("insert to TsFileProcessor error ", e);
return;
}

latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new HashMap<>())
.putIfAbsent(batchInsertPlan.getDeviceId(), Long.MIN_VALUE);
latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new HashMap<>());
// try to update the latest time of the device of this tsRecord
if (sequence && latestTimeForEachDevice.get(timePartitionId).get(batchInsertPlan.getDeviceId())
if (sequence && latestTimeForEachDevice.get(timePartitionId)
.getOrDefault(batchInsertPlan.getDeviceId(), Long.MIN_VALUE)
< batchInsertPlan.getTimes()[end - 1]) {
latestTimeForEachDevice.get(timePartitionId)
.put(batchInsertPlan.getDeviceId(), batchInsertPlan.getTimes()[end - 1]);
}
long globalLatestFlushedTime =
globalLatestFlushedTimeForEachDevice.computeIfAbsent(
batchInsertPlan.getDeviceId(), k -> Long.MIN_VALUE);
long globalLatestFlushedTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
batchInsertPlan.getDeviceId(), Long.MIN_VALUE);
tryToUpdateBatchInsertLastCache(batchInsertPlan, globalLatestFlushedTime);
if (globalLatestFlushedTime < batchInsertPlan.getMaxTime())
globalLatestFlushedTimeForEachDevice.put(
batchInsertPlan.getDeviceId(), batchInsertPlan.getMaxTime());

// check memtable size and may async try to flush the work memtable
if (tsFileProcessor.shouldFlush()) {
Expand All @@ -640,9 +634,8 @@ public void tryToUpdateBatchInsertLastCache(BatchInsertPlan plan, Long latestFlu
}

private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence)
throws QueryProcessException {
throws WriteProcessException {
TsFileProcessor tsFileProcessor;
boolean result;
long timePartitionId = StorageEngine.fromTimeToTimePartition(insertPlan.getTime());

tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
Expand All @@ -652,22 +645,19 @@ private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence)
}

// insert TsFileProcessor
result = tsFileProcessor.insert(insertPlan);
tsFileProcessor.insert(insertPlan);

// try to update the latest time of the device of this tsRecord
if (result
&& latestTimeForEachDevice.get(timePartitionId).get(insertPlan.getDeviceId()) < insertPlan
.getTime()) {
if (latestTimeForEachDevice.get(timePartitionId)
.getOrDefault(insertPlan.getDeviceId(), Long.MIN_VALUE) < insertPlan.getTime()) {
latestTimeForEachDevice.get(timePartitionId)
.put(insertPlan.getDeviceId(), insertPlan.getTime());
}
long globalLatestFlushTime =
globalLatestFlushedTimeForEachDevice.computeIfAbsent(
insertPlan.getDeviceId(), k -> Long.MIN_VALUE);

long globalLatestFlushTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
insertPlan.getDeviceId(), Long.MIN_VALUE);

tryToUpdateInsertLastCache(insertPlan, globalLatestFlushTime);
if (result && globalLatestFlushTime < insertPlan.getTime()) {
globalLatestFlushedTimeForEachDevice.put(insertPlan.getDeviceId(), insertPlan.getTime());
}

// check memtable size and may asyncTryToFlush the work memtable
if (tsFileProcessor.shouldFlush()) {
Expand All @@ -676,7 +666,7 @@ private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence)
}

public void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime)
throws QueryProcessException {
throws WriteProcessException {
try {
MNode node =
MManager.getInstance().getDeviceNodeWithAutoCreateStorageGroup(plan.getDeviceId());
Expand All @@ -687,8 +677,8 @@ public void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime)
((LeafMNode) measurementNode)
.updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
}
} catch (MetadataException e) {
throw new QueryProcessException(e);
} catch (MetadataException | QueryProcessException e) {
throw new WriteProcessException(e);
}
}

Expand Down Expand Up @@ -1286,18 +1276,18 @@ private boolean updateLatestFlushTimeCallback(TsFileProcessor processor) {
.get(processor.getTimeRangeId());

if (curPartitionDeviceLatestTime == null) {
logger.error("Partition: " + processor.getTimeRangeId() +
" does't have latest time for each device record. Flushing tsfile is: "
+ processor.getTsFileResource().getFile());
logger.warn("Partition: {} does't have latest time for each device. "
+ "No valid record is written into memtable. Flushing tsfile is: {}",
processor.getTimeRangeId(), processor.getTsFileResource().getFile());
return false;
}

for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
.put(entry.getKey(), entry.getValue());
if (!globalLatestFlushedTimeForEachDevice.containsKey(entry.getKey())
|| globalLatestFlushedTimeForEachDevice.get(entry.getKey()) < entry.getValue()) {
if (globalLatestFlushedTimeForEachDevice
.getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
}
}
Expand Down Expand Up @@ -1762,14 +1752,11 @@ private void updateLatestTimeMap(TsFileResource newTsFileResource) {
Map<String, Long> latestFlushTimeForPartition = partitionLatestFlushedTimeForEachDevice
.getOrDefault(timePartitionId, new HashMap<>());

if (!latestFlushTimeForPartition.containsKey(device)
|| latestFlushTimeForPartition.get(device) < endTime) {
if (latestFlushTimeForPartition.getOrDefault(device, Long.MIN_VALUE) < endTime) {
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(timePartitionId, id -> new HashMap<>())
.put(device, endTime);
.computeIfAbsent(timePartitionId, id -> new HashMap<>()).put(device, endTime);
}
if (!globalLatestFlushedTimeForEachDevice.containsKey(device)
|| globalLatestFlushedTimeForEachDevice.get(device) < endTime) {
if (globalLatestFlushedTimeForEachDevice.getOrDefault(device, Long.MIN_VALUE) < endTime) {
globalLatestFlushedTimeForEachDevice.put(device, endTime);
}
}
Expand Down
Loading

0 comments on commit a5cfb37

Please sign in to comment.