Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d072e9e
move from old repo
jt2594838 Jan 20, 2019
f1cabf8
move tests from old repo
jt2594838 Jan 20, 2019
4a38abe
move srcs from the old repo
jt2594838 Jan 20, 2019
6559289
move tests from the old repo
jt2594838 Jan 20, 2019
ba0bed9
fix BufferWriteProcessor
jt2594838 Jan 20, 2019
9cd5d11
fix MemTableFlushUtils
jt2594838 Jan 20, 2019
1bf0ddf
fix tsfile
jt2594838 Jan 20, 2019
a6dbdbc
fix tsfile writer
jt2594838 Jan 20, 2019
8af13f1
multiple fixes
jt2594838 Jan 20, 2019
fb5802d
revert the delete order: disk first then memory
jt2594838 Jan 20, 2019
dbcf2ed
fix the header error in package-info of modification
jt2594838 Jan 20, 2019
e6ead42
Merge branch 'delete_dev1' into delete_dev2
jt2594838 Jan 20, 2019
ebbf9f8
Merge branch 'delete_main' into delete_dev1
jt2594838 Jan 21, 2019
df9fd23
Merge branch 'delete_dev1' into delete_dev2
jt2594838 Jan 21, 2019
4740d95
Merge branch 'delete_main' into delete_dev1
jt2594838 Jan 22, 2019
a2f3ec8
Merge branch 'delete_dev1' into delete_dev2
jt2594838 Jan 22, 2019
0cb7a98
Merge branch 'delete_main' into delete_dev1
jt2594838 Jan 22, 2019
12adb38
Merge branch 'delete_dev1' into delete_dev2
jt2594838 Jan 22, 2019
2ffc23a
Merge branch 'delete_main' into delete_dev1
jt2594838 Jan 24, 2019
e86f93a
Merge branch 'delete_dev1' into delete_dev2
jt2594838 Jan 24, 2019
228f614
replace the file header with standard Apache Header
Feb 14, 2019
014aa86
add hashCode() method
Feb 15, 2019
5c971b4
Merge remote-tracking branch 'origin/delete_dev1' into delete_dev2
Feb 15, 2019
ebcdccf
fix sonar tips
Feb 15, 2019
0986aff
fix Apache Header in some files
Feb 16, 2019
ba51118
fix by comments
jt2594838 Feb 17, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class IoTDBConstant {
public static final String OVERFLOW_LOG_NODE_SUFFIX = "-overflow";

public static final String PATH_ROOT = "root";
public static final char PATH_SEPARATER = '.';
public static final char PATH_SEPARATOR = '.';
public static final String ADMIN_NAME = "root";
public static final String ADMIN_PW = "root";
public static final String PROFILE_SUFFIX = ".profile";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
Expand All @@ -40,6 +41,7 @@
import org.apache.iotdb.db.engine.pool.FlushManager;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.utils.FlushStatus;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
Expand Down Expand Up @@ -79,6 +81,7 @@ public class BufferWriteProcessor extends Processor {
private String bufferWriteRelativePath;

private WriteLogNode logNode;
private VersionController versionController;

/**
* constructor of BufferWriteProcessor.
Expand All @@ -91,8 +94,8 @@ public class BufferWriteProcessor extends Processor {
* @throws BufferWriteProcessorException BufferWriteProcessorException
*/
public BufferWriteProcessor(String baseDir, String processorName, String fileName,
Map<String, Action> parameters,
FileSchema fileSchema) throws BufferWriteProcessorException {
Map<String, Action> parameters, VersionController versionController,
FileSchema fileSchema) throws BufferWriteProcessorException {
super(processorName);
this.fileSchema = fileSchema;
this.baseDir = baseDir;
Expand Down Expand Up @@ -131,6 +134,7 @@ public BufferWriteProcessor(String baseDir, String processorName, String fileNam
throw new BufferWriteProcessorException(e);
}
}
this.versionController = versionController;
}

/**
Expand Down Expand Up @@ -216,8 +220,8 @@ private void checkMemThreshold4Flush(long addedMemory) throws BufferWriteProcess
* @return corresponding chunk data and chunk metadata in memory
*/
public Pair<ReadOnlyMemChunk, List<ChunkMetaData>> queryBufferWriteData(String deviceId,
String measurementId,
TSDataType dataType) {
String measurementId,
TSDataType dataType) {
flushQueryLock.lock();
try {
MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
Expand Down Expand Up @@ -258,14 +262,15 @@ private void switchFlushToWork() {
}
}

private void flushOperation(String flushFunction) {
private void flushOperation(String flushFunction, long version) {
long flushStartTime = System.currentTimeMillis();
LOGGER.info("The bufferwrite processor {} starts flushing {}.", getProcessorName(),
flushFunction);
try {
if (flushMemTable != null && !flushMemTable.isEmpty()) {
// flush data
MemTableFlushUtil.flushMemTable(fileSchema, writer, flushMemTable);
MemTableFlushUtil.flushMemTable(fileSchema, writer, flushMemTable,
version);
// write restore information
writer.flush();
}
Expand Down Expand Up @@ -346,13 +351,14 @@ private Future<?> flush(boolean synchronization) throws IOException {
valueCount = 0;
flushStatus.setFlushing();
switchWorkToFlush();
long version = versionController.nextVersion();
BasicMemController.getInstance().reportFree(this, memSize.get());
memSize.set(0);
// switch
if (synchronization) {
flushOperation("synchronously");
flushOperation("synchronously", version);
} else {
FlushManager.getInstance().submit(() -> flushOperation("asynchronously"));
FlushManager.getInstance().submit(() -> flushOperation("asynchronously", version));
}
}
// TODO return a meaningful Future
Expand Down Expand Up @@ -500,4 +506,20 @@ public void setNewProcessor(boolean isNewProcessor) {
public WriteLogNode getLogNode() {
return logNode;
}

/**
* Delete data whose timestamp <= 'timestamp' and belonging to timeseries deviceId.measurementId.
* Delete data in both working MemTable and flushing MemTable.
* @param deviceId the deviceId of the timeseries to be deleted.
* @param measurementId the measurementId of the timeseries to be deleted.
* @param timestamp the upper-bound of deletion time.
*/
public void delete(String deviceId, String measurementId, long timestamp) {
workMemTable.delete(deviceId, measurementId, timestamp);
if (isFlush) {
// flushing MemTable cannot be directly modified since another thread is reading it
flushMemTable = flushMemTable.copy();
flushMemTable.delete(deviceId, measurementId, timestamp);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.iotdb.db.monitor.IStatistic;
import org.apache.iotdb.db.monitor.MonitorConstants;
import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
import org.apache.iotdb.db.query.control.FileReaderManager;
Expand Down Expand Up @@ -496,66 +495,66 @@ public void update(String deviceId, String measurementId, long startTime, long e
/**
* delete data.
*/
public void delete(String deviceId, String measurementId, long timestamp, TSDataType type)
throws FileNodeManagerException {
public void delete(String deviceId, String measurementId, long timestamp)
throws FileNodeManagerException {

FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
try {
long lastUpdateTime = fileNodeProcessor.getLastUpdateTime(deviceId);
// no tsfile data, the delete operation is invalid
if (lastUpdateTime == -1) {
LOGGER.warn(
"The last update time is -1, delete overflow is invalid, the filenode processor is {}",
fileNodeProcessor.getProcessorName());
LOGGER.warn("The last update time is -1, delete overflow is invalid, "
+ "the filenode processor is {}",
fileNodeProcessor.getProcessorName());
} else {
if (timestamp > lastUpdateTime) {
timestamp = lastUpdateTime;
}
String filenodeName = fileNodeProcessor.getProcessorName();
// get overflow processor
OverflowProcessor overflowProcessor;
try {
overflowProcessor = fileNodeProcessor.getOverflowProcessor(filenodeName);
fileNodeProcessor.delete(deviceId, measurementId, timestamp);
} catch (IOException e) {
LOGGER.error("Get the overflow processor failed, the filenode is {}, delete time is {}.",
filenodeName, timestamp);
throw new FileNodeManagerException(e);
}
overflowProcessor.delete(deviceId, measurementId, timestamp, type);
// change the type of tsfile to overflowed
fileNodeProcessor.changeTypeToChangedForDelete(deviceId, timestamp);
fileNodeProcessor.setOverflowed(true);
// if (shouldMerge) {
// LOGGER.info(
// "The overflow file or metadata reaches the threshold,
// merge the filenode processor {}",
// filenodeName);
// fileNodeProcessor.submitToMerge();
// }
fileNodeProcessor.changeTypeToChangedForDelete(deviceId, timestamp);
fileNodeProcessor.setOverflowed(true);

// write wal
// TODO: support atomic deletion
/*// write wal
// get processors for wal
String filenodeName = fileNodeProcessor.getProcessorName();
OverflowProcessor overflowProcessor;
BufferWriteProcessor bufferWriteProcessor;
try {
overflowProcessor = fileNodeProcessor.getOverflowProcessor(filenodeName);
bufferWriteProcessor = fileNodeProcessor.getBufferWriteProcessor();
} catch (IOException | FileNodeProcessorException e) {
LOGGER.error("Getting the processor failed, the filenode is {}, delete time is {}.",
filenodeName, timestamp);
throw new FileNodeManagerException(e);
}
try {
if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
overflowProcessor.getLogNode()
.write(new DeletePlan(timestamp, new Path(deviceId + "." + measurementId)));
.write(new DeletePlan(timestamp,
new Path(deviceId + "." + measurementId)));
bufferWriteProcessor.getLogNode()
.write(new DeletePlan(timestamp,
new Path(deviceId + "." + measurementId)));
}
} catch (IOException e) {
throw new FileNodeManagerException(e);
}
}*/
}
} finally {
fileNodeProcessor.writeUnlock();
}

}

/**
* try to delete the filenode processor.
*/
private void delete(String processorName,
Iterator<Map.Entry<String, FileNodeProcessor>> processorIterator)
throws FileNodeManagerException {
Iterator<Map.Entry<String, FileNodeProcessor>> processorIterator)
throws FileNodeManagerException {
if (processorMap.containsKey(processorName)) {
LOGGER.info("Try to delete the filenode processor {}.", processorName);
FileNodeProcessor processor = processorMap.get(processorName);
Expand Down
Loading