diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java index b25b7c16d140..360acb54585d 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java @@ -46,7 +46,7 @@ public class IoTDBConstant { public static final String OVERFLOW_LOG_NODE_SUFFIX = "-overflow"; public static final String PATH_ROOT = "root"; - public static final char PATH_SEPARATER = '.'; + public static final char PATH_SEPARATOR = '.'; public static final String ADMIN_NAME = "root"; public static final String ADMIN_PW = "root"; public static final String PROFILE_SUFFIX = ".profile"; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java index 86d39725ec7d..16a5c0d71364 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java @@ -27,6 +27,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; + import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -40,6 +41,7 @@ import org.apache.iotdb.db.engine.pool.FlushManager; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.utils.FlushStatus; +import org.apache.iotdb.db.engine.version.VersionController; import org.apache.iotdb.db.exception.BufferWriteProcessorException; import org.apache.iotdb.db.utils.MemUtils; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; @@ -79,6 +81,7 @@ public class BufferWriteProcessor extends Processor { private String bufferWriteRelativePath; private WriteLogNode logNode; + private VersionController versionController; /** * constructor of BufferWriteProcessor. @@ -91,8 +94,8 @@ public class BufferWriteProcessor extends Processor { * @throws BufferWriteProcessorException BufferWriteProcessorException */ public BufferWriteProcessor(String baseDir, String processorName, String fileName, - Map parameters, - FileSchema fileSchema) throws BufferWriteProcessorException { + Map parameters, VersionController versionController, + FileSchema fileSchema) throws BufferWriteProcessorException { super(processorName); this.fileSchema = fileSchema; this.baseDir = baseDir; @@ -131,6 +134,7 @@ public BufferWriteProcessor(String baseDir, String processorName, String fileNam throw new BufferWriteProcessorException(e); } } + this.versionController = versionController; } /** @@ -216,8 +220,8 @@ private void checkMemThreshold4Flush(long addedMemory) throws BufferWriteProcess * @return corresponding chunk data and chunk metadata in memory */ public Pair> queryBufferWriteData(String deviceId, - String measurementId, - TSDataType dataType) { + String measurementId, + TSDataType dataType) { flushQueryLock.lock(); try { MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger(); @@ -258,14 +262,15 @@ private void switchFlushToWork() { } } - private void flushOperation(String flushFunction) { + private void flushOperation(String flushFunction, long version) { long flushStartTime = System.currentTimeMillis(); LOGGER.info("The bufferwrite processor {} starts flushing {}.", getProcessorName(), flushFunction); try { if (flushMemTable != null && !flushMemTable.isEmpty()) { // flush data - MemTableFlushUtil.flushMemTable(fileSchema, writer, flushMemTable); + MemTableFlushUtil.flushMemTable(fileSchema, writer, flushMemTable, + version); // write restore information writer.flush(); } @@ -346,13 +351,14 @@ private Future flush(boolean synchronization) throws IOException { valueCount = 0; flushStatus.setFlushing(); switchWorkToFlush(); + long version = versionController.nextVersion(); BasicMemController.getInstance().reportFree(this, memSize.get()); memSize.set(0); // switch if (synchronization) { - flushOperation("synchronously"); + flushOperation("synchronously", version); } else { - FlushManager.getInstance().submit(() -> flushOperation("asynchronously")); + FlushManager.getInstance().submit(() -> flushOperation("asynchronously", version)); } } // TODO return a meaningful Future @@ -500,4 +506,20 @@ public void setNewProcessor(boolean isNewProcessor) { public WriteLogNode getLogNode() { return logNode; } + + /** + * Delete data whose timestamp <= 'timestamp' and belonging to timeseries deviceId.measurementId. + * Delete data in both working MemTable and flushing MemTable. + * @param deviceId the deviceId of the timeseries to be deleted. + * @param measurementId the measurementId of the timeseries to be deleted. + * @param timestamp the upper-bound of deletion time. + */ + public void delete(String deviceId, String measurementId, long timestamp) { + workMemTable.delete(deviceId, measurementId, timestamp); + if (isFlush) { + // flushing MemTable cannot be directly modified since another thread is reading it + flushMemTable = flushMemTable.copy(); + flushMemTable.delete(deviceId, measurementId, timestamp); + } + } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java index dd00d874b69b..dcff4b8269a4 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java @@ -52,7 +52,6 @@ import org.apache.iotdb.db.monitor.IStatistic; import org.apache.iotdb.db.monitor.MonitorConstants; import org.apache.iotdb.db.monitor.StatMonitor; -import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.crud.UpdatePlan; import org.apache.iotdb.db.query.control.FileReaderManager; @@ -496,66 +495,66 @@ public void update(String deviceId, String measurementId, long startTime, long e /** * delete data. */ - public void delete(String deviceId, String measurementId, long timestamp, TSDataType type) - throws FileNodeManagerException { + public void delete(String deviceId, String measurementId, long timestamp) + throws FileNodeManagerException { FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true); try { long lastUpdateTime = fileNodeProcessor.getLastUpdateTime(deviceId); // no tsfile data, the delete operation is invalid if (lastUpdateTime == -1) { - LOGGER.warn( - "The last update time is -1, delete overflow is invalid, the filenode processor is {}", - fileNodeProcessor.getProcessorName()); + LOGGER.warn("The last update time is -1, delete overflow is invalid, " + + "the filenode processor is {}", + fileNodeProcessor.getProcessorName()); } else { - if (timestamp > lastUpdateTime) { - timestamp = lastUpdateTime; - } - String filenodeName = fileNodeProcessor.getProcessorName(); - // get overflow processor - OverflowProcessor overflowProcessor; try { - overflowProcessor = fileNodeProcessor.getOverflowProcessor(filenodeName); + fileNodeProcessor.delete(deviceId, measurementId, timestamp); } catch (IOException e) { - LOGGER.error("Get the overflow processor failed, the filenode is {}, delete time is {}.", - filenodeName, timestamp); throw new FileNodeManagerException(e); } - overflowProcessor.delete(deviceId, measurementId, timestamp, type); // change the type of tsfile to overflowed fileNodeProcessor.changeTypeToChangedForDelete(deviceId, timestamp); fileNodeProcessor.setOverflowed(true); - // if (shouldMerge) { - // LOGGER.info( - // "The overflow file or metadata reaches the threshold, - // merge the filenode processor {}", - // filenodeName); - // fileNodeProcessor.submitToMerge(); - // } - fileNodeProcessor.changeTypeToChangedForDelete(deviceId, timestamp); - fileNodeProcessor.setOverflowed(true); - // write wal + // TODO: support atomic deletion + /*// write wal + // get processors for wal + String filenodeName = fileNodeProcessor.getProcessorName(); + OverflowProcessor overflowProcessor; + BufferWriteProcessor bufferWriteProcessor; + try { + overflowProcessor = fileNodeProcessor.getOverflowProcessor(filenodeName); + bufferWriteProcessor = fileNodeProcessor.getBufferWriteProcessor(); + } catch (IOException | FileNodeProcessorException e) { + LOGGER.error("Getting the processor failed, the filenode is {}, delete time is {}.", + filenodeName, timestamp); + throw new FileNodeManagerException(e); + } try { if (IoTDBDescriptor.getInstance().getConfig().enableWal) { overflowProcessor.getLogNode() - .write(new DeletePlan(timestamp, new Path(deviceId + "." + measurementId))); + .write(new DeletePlan(timestamp, + new Path(deviceId + "." + measurementId))); + bufferWriteProcessor.getLogNode() + .write(new DeletePlan(timestamp, + new Path(deviceId + "." + measurementId))); } } catch (IOException e) { throw new FileNodeManagerException(e); - } + }*/ } } finally { fileNodeProcessor.writeUnlock(); } + } /** * try to delete the filenode processor. */ private void delete(String processorName, - Iterator> processorIterator) - throws FileNodeManagerException { + Iterator> processorIterator) + throws FileNodeManagerException { if (processorMap.containsKey(processorName)) { LOGGER.info("Try to delete the filenode processor {}.", processorName); FileNodeProcessor processor = processorMap.get(processorName); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java index a5415b8d6c3e..63eaa9fa18a5 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java @@ -37,7 +37,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.Directories; import org.apache.iotdb.db.engine.Processor; @@ -45,6 +47,7 @@ import org.apache.iotdb.db.engine.bufferwrite.ActionException; import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor; import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants; +import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.overflow.ioV2.OverflowProcessor; import org.apache.iotdb.db.engine.pool.MergeManager; import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource; @@ -52,6 +55,8 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile; +import org.apache.iotdb.db.engine.version.SimpleFileVersionController; +import org.apache.iotdb.db.engine.version.VersionController; import org.apache.iotdb.db.exception.BufferWriteProcessorException; import org.apache.iotdb.db.exception.ErrorDebugException; import org.apache.iotdb.db.exception.FileNodeProcessorException; @@ -197,6 +202,7 @@ public void act() throws ActionException { }; // Token for query which used to private int multiPassLockToken = 0; + private VersionController versionController; /** * constructor of FileNodeProcessor. @@ -263,6 +269,11 @@ public FileNodeProcessor(String fileNodeDirPath, String processorName) registStatMetadata(); statMonitor.registStatistics(statStorageDeltaName, this); } + try { + versionController = new SimpleFileVersionController(fileNodeDirPath); + } catch (IOException e) { + throw new FileNodeProcessorException(e); + } } public HashMap getStatParamsHashMap() { @@ -302,7 +313,7 @@ public HashMap getAllStatisticsValue() { HashMap hashMap = getStatParamsHashMap(); tsRecord.dataPointList = new ArrayList() { { - for (Map.Entry entry : hashMap.entrySet()) { + for (Entry entry : hashMap.entrySet()) { add(new LongDataPoint(entry.getKey(), entry.getValue().get())); } } @@ -435,7 +446,7 @@ public void fileNodeRecovery() throws FileNodeProcessorException { getProcessorName(), fileNames[fileNames.length - 1]); try { bufferWriteProcessor = new BufferWriteProcessor(baseDir, getProcessorName(), - fileNames[fileNames.length - 1], parameters, fileSchema); + fileNames[fileNames.length - 1], parameters, versionController, fileSchema); } catch (BufferWriteProcessorException e) { // unlock writeUnlock(); @@ -452,7 +463,8 @@ public void fileNodeRecovery() throws FileNodeProcessorException { parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction); parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction); try { - overflowProcessor = new OverflowProcessor(getProcessorName(), parameters, fileSchema); + overflowProcessor = new OverflowProcessor(getProcessorName(), parameters, fileSchema, + versionController); } catch (IOException e) { writeUnlock(); LOGGER.error("The filenode processor {} failed to recovery the overflow processor.", @@ -500,7 +512,7 @@ public BufferWriteProcessor getBufferWriteProcessor(String processorName, long i try { bufferWriteProcessor = new BufferWriteProcessor(baseDir, processorName, insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis(), - parameters, fileSchema); + parameters, versionController, fileSchema); } catch (BufferWriteProcessorException e) { LOGGER.error("The filenode processor {} failed to get the bufferwrite processor.", processorName, e); @@ -531,7 +543,8 @@ public OverflowProcessor getOverflowProcessor(String processorName) throws IOExc parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction); parameters .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction); - overflowProcessor = new OverflowProcessor(processorName, parameters, fileSchema); + overflowProcessor = new OverflowProcessor(processorName, parameters, fileSchema, + versionController); } return overflowProcessor; } @@ -1444,7 +1457,7 @@ private void switchWaitingToWorkingv2(List backupIntervalFiles } private TSRecord constructTsRecord(TimeValuePair timeValuePair, String deviceId, - String measurementId) { + String measurementId) { TSRecord record = new TSRecord(timeValuePair.getTimestamp(), deviceId); record.addTuple(DataPoint.getDataPoint(timeValuePair.getValue().getDataType(), measurementId, timeValuePair.getValue().getValue().toString())); @@ -1538,7 +1551,7 @@ private String queryAndWriteDataForMerge(IntervalFileNode backupIntervalFile) // end the new rowGroupMetadata long size = fileIoWriter.getPos() - startPos; footer = new ChunkGroupFooter(deviceId, size, numOfChunk); - fileIoWriter.endChunkGroup(footer); + fileIoWriter.endChunkGroup(footer, versionController.nextVersion()); } } if (fileIoWriter != null) { @@ -1553,9 +1566,9 @@ private String queryAndWriteDataForMerge(IntervalFileNode backupIntervalFile) } private int writeOneSeries(String deviceId, String measurement, ChunkWriterImpl seriesWriterImpl, - TSDataType dataType, IReader seriesReader, Map startTimeMap, - Map endTimeMap, - TimeValuePair timeValuePair) throws IOException { + TSDataType dataType, IReader seriesReader, Map startTimeMap, + Map endTimeMap, + TimeValuePair timeValuePair) throws IOException { int count = 0; long startTime = -1; long endTime = -1; @@ -1849,6 +1862,15 @@ public void closeOverflow() throws FileNodeProcessorException { public void close() throws FileNodeProcessorException { closeBufferWrite(); closeOverflow(); + for (IntervalFileNode fileNode : newFileNodes) { + if (fileNode.getModFile() != null) { + try { + fileNode.getModFile().close(); + } catch (IOException e) { + throw new FileNodeProcessorException(e); + } + } + } } /** @@ -1862,6 +1884,15 @@ public void delete() throws ProcessorException { } closeBufferWrite(); closeOverflow(); + for (IntervalFileNode fileNode : newFileNodes) { + if (fileNode.getModFile() != null) { + try { + fileNode.getModFile().close(); + } catch (IOException e) { + throw new FileNodeProcessorException(e); + } + } + } } @Override @@ -1902,14 +1933,44 @@ private FileNodeProcessorStore readStoreFromDisk() throws FileNodeProcessorExcep new IntervalFileNode(OverflowChangeType.NO_CHANGE, null), new ArrayList(), FileNodeProcessorStatus.NONE, 0)); } catch (IOException e) { - e.printStackTrace(); throw new FileNodeProcessorException(e); } return processorStore; } } - public String getFileNodeRestoreFilePath() { - return fileNodeRestoreFilePath; - } + public String getFileNodeRestoreFilePath() { + return fileNodeRestoreFilePath; + } + + /** + * Delete data whose timestamp <= 'timestamp' and belong to timeseries deviceId.measurementId. + * @param deviceId the deviceId of the timeseries to be deleted. + * @param measurementId the measurementId of the timeseries to be deleted. + * @param timestamp the delete range is (0, timestamp]. + */ + public void delete(String deviceId, String measurementId, long timestamp) throws IOException { + // TODO: how to avoid partial deletion? + long version = versionController.nextVersion(); + + String fullPath = deviceId + + IoTDBConstant.PATH_SEPARATOR + measurementId; + Deletion deletion = new Deletion(fullPath, version, timestamp); + if (currentIntervalFileNode != null && currentIntervalFileNode.containsDevice(deviceId)) { + currentIntervalFileNode.getModFile().write(deletion); + } + for (IntervalFileNode fileNode : newFileNodes) { + if(fileNode != currentIntervalFileNode && fileNode.containsDevice(deviceId) + && fileNode.getStartTime(deviceId) <= timestamp) { + fileNode.getModFile().write(deletion); + } + } + // delete data in memory + OverflowProcessor overflowProcessor = getOverflowProcessor(getProcessorName()); + overflowProcessor.delete(deviceId, measurementId, timestamp, version); + if (bufferWriteProcessor != null) { + bufferWriteProcessor.delete(deviceId, measurementId, timestamp); + } + } + } \ No newline at end of file diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/IntervalFileNode.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/IntervalFileNode.java index 7565bdd2b874..a6f523cddc5c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/IntervalFileNode.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/IntervalFileNode.java @@ -24,7 +24,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; + import org.apache.iotdb.db.conf.directories.Directories; +import org.apache.iotdb.db.engine.modification.ModificationFile; /** * This class is used to store one bufferwrite file status.
@@ -38,6 +40,7 @@ public class IntervalFileNode implements Serializable { private Map startTimeMap; private Map endTimeMap; private Set mergeChanged = new HashSet<>(); + private transient ModificationFile modFile; public IntervalFileNode(Map startTimeMap, Map endTimeMap, OverflowChangeType type, int baseDirIndex, String relativePath) { @@ -48,7 +51,9 @@ public IntervalFileNode(Map startTimeMap, Map endTim this.startTimeMap = startTimeMap; this.endTimeMap = endTimeMap; - + this.modFile = new ModificationFile( + Directories.getInstance().getTsFileFolder(baseDirIndex) + File.separator + + relativePath + ModificationFile.FILE_SUFFIX); } /** @@ -65,6 +70,9 @@ public IntervalFileNode(OverflowChangeType type, int baseDirIndex, String relati startTimeMap = new HashMap<>(); endTimeMap = new HashMap<>(); + this.modFile = new ModificationFile( + Directories.getInstance().getTsFileFolder(baseDirIndex) + File.separator + + relativePath + ModificationFile.FILE_SUFFIX); } public IntervalFileNode(OverflowChangeType type, String baseDir, String relativePath) { @@ -75,6 +83,9 @@ public IntervalFileNode(OverflowChangeType type, String baseDir, String relative startTimeMap = new HashMap<>(); endTimeMap = new HashMap<>(); + this.modFile = new ModificationFile( + Directories.getInstance().getTsFileFolder(baseDirIndex) + File.separator + + relativePath + ModificationFile.FILE_SUFFIX); } public IntervalFileNode(OverflowChangeType type, String relativePath) { @@ -274,4 +285,18 @@ public String toString() { + " endTimeMap=%s, mergeChanged=%s]", relativePath, overflowChangeType, startTimeMap, endTimeMap, mergeChanged); } + + public synchronized ModificationFile getModFile() { + if (modFile == null) { + modFile = new ModificationFile( + Directories.getInstance().getTsFileFolder(baseDirIndex) + File.separator + + relativePath + ModificationFile.FILE_SUFFIX); + } + return modFile; + } + + public boolean containsDevice(String deviceId) { + return endTimeMap.containsKey(deviceId); + } + } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index 0596e61527b0..bb95a25feca5 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -19,7 +19,10 @@ package org.apache.iotdb.db.engine.memtable; import java.util.HashMap; +import java.util.List; import java.util.Map; + +import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; public abstract class AbstractMemTable implements IMemTable { @@ -30,6 +33,10 @@ public AbstractMemTable() { this.memTableMap = new HashMap<>(); } + public AbstractMemTable(Map> memTableMap) { + this.memTableMap = memTableMap; + } + @Override public Map> getMemTableMap() { return memTableMap; @@ -45,7 +52,7 @@ private boolean checkPath(String deviceId, String measurement) { } private IWritableMemChunk createIfNotExistAndGet(String deviceId, String measurement, - TSDataType dataType) { + TSDataType dataType) { if (!memTableMap.containsKey(deviceId)) { memTableMap.put(deviceId, new HashMap<>()); } @@ -60,7 +67,7 @@ private IWritableMemChunk createIfNotExistAndGet(String deviceId, String measure @Override public void write(String deviceId, String measurement, TSDataType dataType, long insertTime, - String insertValue) { + String insertValue) { IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, measurement, dataType); memSeries.write(insertTime, insertValue); } @@ -94,4 +101,58 @@ public TimeValuePairSorter query(String deviceId, String measurement, TSDataType return memTableMap.get(deviceId).get(measurement); } + @Override + public void delete(String deviceId, String measurementId, long timestamp) { + Map deviceMap = memTableMap.get(deviceId); + if (deviceMap != null) { + IWritableMemChunk chunk = deviceMap.get(measurementId); + IWritableMemChunk newChunk = filterChunk(chunk, timestamp); + if (newChunk != null) { + deviceMap.put(measurementId, newChunk); + } + } + } + + /** + * If chunk contains data with timestamp less than 'timestamp', create a copy and delete all + * those data. Otherwise return null. + * @param chunk the source chunk. + * @param timestamp the upper-bound of deletion time. + * @return A reduced copy of chunk if chunk contains data with timestamp less than 'timestamp', + * of null. + */ + private IWritableMemChunk filterChunk(IWritableMemChunk chunk, long timestamp) { + List timeValuePairs = chunk.getSortedTimeValuePairList(); + if (timeValuePairs.size() > 0 && timeValuePairs.get(0).getTimestamp() <= timestamp) { + TSDataType dataType = chunk.getType(); + IWritableMemChunk newChunk = genMemSeries(dataType); + for (TimeValuePair pair : timeValuePairs) { + if (pair.getTimestamp() > timestamp) { + switch (dataType) { + case BOOLEAN: + newChunk.putBoolean(pair.getTimestamp(), pair.getValue().getBoolean()); + break; + case DOUBLE: + newChunk.putDouble(pair.getTimestamp(), pair.getValue().getDouble()); + break; + case INT64: + newChunk.putLong(pair.getTimestamp(), pair.getValue().getLong()); + break; + case INT32: + newChunk.putInt(pair.getTimestamp(), pair.getValue().getInt()); + break; + case FLOAT: + newChunk.putFloat(pair.getTimestamp(), pair.getValue().getFloat()); + break; + case TEXT: + newChunk.putBinary(pair.getTimestamp(), pair.getValue().getBinary()); + break; + } + } + } + return newChunk; + } + return null; + } + } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java index f1e75e3a56a2..4833ffac09e2 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.engine.memtable; import java.util.Map; + import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; /** @@ -33,7 +34,7 @@ public interface IMemTable { Map> getMemTableMap(); void write(String deviceId, String measurement, TSDataType dataType, - long insertTime, String insertValue); + long insertTime, String insertValue); int size(); @@ -46,4 +47,18 @@ void write(String deviceId, String measurement, TSDataType dataType, boolean isEmpty(); + /** + * Delete data in it whose timestamp <= 'timestamp' and belonging to timeseries + * deviceId.measurementId. + * @param deviceId the deviceId of the timeseries to be deleted. + * @param measurementId the measurementId of the timeseries to be deleted. + * @param timestamp the upper-bound of deletion time. + */ + void delete(String deviceId, String measurementId, long timestamp); + + /** + * Make a copy of this MemTable. + * @return a MemTable with the same data as this one. + */ + IMemTable copy(); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java index aba946bc4efd..05b05223d349 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.engine.memtable; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; public interface IWritableMemChunk extends TimeValuePairSorter { @@ -39,4 +40,6 @@ public interface IWritableMemChunk extends TimeValuePairSorter { void reset(); int count(); + + TSDataType getType(); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java index f3d8a1314670..32d1cff3076c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; + import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter; @@ -44,7 +45,7 @@ private MemTableFlushUtil(){ } private static int writeOneSeries(List tvPairs, IChunkWriter seriesWriterImpl, - TSDataType dataType) + TSDataType dataType) throws IOException { int count = 0; switch (dataType) { @@ -98,7 +99,7 @@ private static int writeOneSeries(List tvPairs, IChunkWriter seri * the function for flushing memtable. */ public static void flushMemTable(FileSchema fileSchema, TsFileIOWriter tsFileIoWriter, - IMemTable imemTable) + IMemTable imemTable, long version) throws IOException { for (String deviceId : imemTable.getMemTableMap().keySet()) { long startPos = tsFileIoWriter.getPos(); @@ -117,7 +118,7 @@ public static void flushMemTable(FileSchema fileSchema, TsFileIOWriter tsFileIoW } long memSize = tsFileIoWriter.getPos() - startPos; ChunkGroupFooter footer = new ChunkGroupFooter(deviceId, memSize, seriesNumber); - tsFileIoWriter.endChunkGroup(footer); + tsFileIoWriter.endChunkGroup(footer, version); } } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java index 123e4d2d8879..455196a37848 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java @@ -16,14 +16,32 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.engine.memtable; +import java.util.HashMap; +import java.util.Map; + import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; public class PrimitiveMemTable extends AbstractMemTable { + public PrimitiveMemTable() { + } + + public PrimitiveMemTable(Map> memTableMap) { + super(memTableMap); + } + @Override protected IWritableMemChunk genMemSeries(TSDataType dataType) { return new WritableMemChunk(dataType); } + + @Override + public IMemTable copy() { + Map> newMap = new HashMap<>(getMemTableMap()); + + return new PrimitiveMemTable(newMap); + } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java index 4d2122d39297..a7c01e4ede97 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.TreeMap; + import org.apache.iotdb.db.utils.PrimitiveArrayList; import org.apache.iotdb.db.utils.PrimitiveArrayListFactory; import org.apache.iotdb.db.utils.TimeValuePair; @@ -120,4 +121,9 @@ public int count() { return list.size(); } + @Override + public TSDataType getType() { + return dataType; + } + } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java new file mode 100644 index 000000000000..bf59f04cc107 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.modification; + +import java.util.Objects; + +/** + * Deletion is a delete operation on a timeseries. + */ +public class Deletion extends Modification { + private long timestamp; + + public Deletion(String path, long versionNum, long timestamp) { + super(Type.DELETION, path, versionNum); + this.timestamp = timestamp; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Modification)) + return false; + Deletion del = (Deletion) obj; + return super.equals(obj) && del.timestamp == this.timestamp; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), timestamp); + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java new file mode 100644 index 000000000000..81186a1053fd --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.modification; + +import java.util.Objects; + +/** + * Modification represents an UPDATE or DELETE operation on a certain timeseries. + */ +public abstract class Modification { + + protected Type type; + protected String path; + protected long versionNum; + + Modification(Type type, String path, long versionNum) { + this.type = type; + this.path = path; + this.versionNum = versionNum; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public long getVersionNum() { + return versionNum; + } + + public void setVersionNum(long versionNum) { + this.versionNum = versionNum; + } + + public Type getType() { + return type; + } + + public void setType(Type type) { + this.type = type; + } + + public enum Type { + DELETION + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Modification)) + return false; + Modification mod = (Modification) obj; + return mod.type.equals(this.type) && mod.path.equals(this.path) + && mod.versionNum == this.versionNum; + } + + @Override + public int hashCode() { + return Objects.hash(type, path, versionNum); + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java new file mode 100644 index 000000000000..c46226b99426 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.modification; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor; +import org.apache.iotdb.db.engine.modification.io.ModificationReader; +import org.apache.iotdb.db.engine.modification.io.ModificationWriter; + +/** + * ModificationFile stores the Modifications of a TsFile or unseq file in another file in the same + * directory. + * Methods in this class are highly synchronized for concurrency safety. + */ +public class ModificationFile { + public static final String FILE_SUFFIX = ".mods"; + + private Collection modifications; + private ModificationWriter writer; + private ModificationReader reader; + + /** + * Construct a ModificationFile using a file as its storage. + * @param filePath the path of the storage file. + */ + public ModificationFile(String filePath) { + LocalTextModificationAccessor accessor = new LocalTextModificationAccessor(filePath); + this.writer = accessor; + this.reader = accessor; + } + + private void init() throws IOException { + synchronized (this) { + Collection mods = reader.read(); + if (mods == null) { + mods = new ArrayList<>(); + } + modifications = mods; + } + } + + private void checkInit() throws IOException { + if (modifications == null) { + init(); + } + } + + /** + * Release resources such as streams and caches. + */ + public void close() throws IOException { + synchronized (this) { + writer.close(); + modifications = null; + } + } + + /** + * Write a modification in this file. The modification will first be written to the persistent + * store then the memory cache. + * @param mod the modification to be written. + * @throws IOException if IOException is thrown when writing the modification to the store. + */ + public void write(Modification mod) throws IOException { + synchronized (this) { + checkInit(); + writer.write(mod); + modifications.add(mod); + } + } + + /** + * Get all modifications stored in this file. + * @return an ArrayList of modifications. + */ + public Collection getModifications() throws IOException { + synchronized (this) { + checkInit(); + return new ArrayList<>(modifications); + } + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java new file mode 100644 index 000000000000..ee9abaff74de --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.modification.io; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.iotdb.db.engine.modification.Deletion; +import org.apache.iotdb.db.engine.modification.Modification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * LocalTextModificationAccessor uses a file on local file system to store the modifications + * in text format, and writes modifications by appending to the tail of the file. + */ +public class LocalTextModificationAccessor implements ModificationReader, ModificationWriter { + + private static final Logger logger = LoggerFactory.getLogger(LocalTextModificationAccessor.class); + private static final String SEPARATOR = ","; + + private String filePath; + private BufferedWriter writer; + + /** + * Construct a LocalTextModificationAccessor using a file specified by filePath. + * + * @param filePath the path of the file that is used for storing modifications. + */ + public LocalTextModificationAccessor(String filePath) { + this.filePath = filePath; + } + @Override + public Collection read() throws IOException { + BufferedReader reader; + try { + reader = new BufferedReader(new FileReader(filePath)); + } catch (FileNotFoundException e) { + return null; + } + String line; + + List modificationList = new ArrayList<>(); + try { + while ((line = reader.readLine()) != null) { + modificationList.add(decodeModification(line)); + } + } catch (IOException e) { + logger.error("An error occurred when reading modifications, and the remaining modifications " + + "were ignored.", e); + } finally { + reader.close(); + } + return modificationList; + } + + @Override + public void close() throws IOException { + if (writer != null) { + writer.close(); + } + } + + @Override + public void write(Modification mod) throws IOException { + if (writer == null) { + writer = new BufferedWriter(new FileWriter(filePath, true)); + } + writer.write(encodeModification(mod)); + writer.newLine(); + writer.flush(); + } + + private static String encodeModification(Modification mod) { + if (mod instanceof Deletion) + return encodeDeletion((Deletion) mod); + return null; + } + + private static Modification decodeModification(String src) throws IOException { + String[] fields = src.split(SEPARATOR); + if (Modification.Type.DELETION.name().equals(fields[0])) { + return decodeDeletion(fields); + } + throw new IOException("Unknown modification type: " + fields[0]); + } + + private static String encodeDeletion(Deletion del) { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(del.getType().toString()).append(SEPARATOR).append(del.getPath()) + .append(SEPARATOR).append(del.getVersionNum()).append(SEPARATOR) + .append(del.getTimestamp()); + return stringBuilder.toString(); + } + + private static Deletion decodeDeletion(String[] fields) throws IOException { + if (fields.length != 4) { + throw new IOException("Incorrect deletion fields number: " + fields.length); + } + + String path = fields[1]; + long versionNum, timestamp; + try { + versionNum = Long.parseLong(fields[2]); + } catch (NumberFormatException e) { + throw new IOException("Invalide version number: " + fields[2]); + } + try { + timestamp = Long.parseLong(fields[3]); + } catch (NumberFormatException e) { + throw new IOException("Invalide timestamp: " + fields[3]); + } + + return new Deletion(path, versionNum, timestamp); + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java new file mode 100644 index 000000000000..1abfaddf779d --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.modification.io; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.iotdb.db.engine.modification.Modification; + +/** + * ModificationReader reads all modifications from a persistent medium like file system. + */ +public interface ModificationReader { + + /** + * Read all modifications from a persistent medium. + * + * @return a list of modifications contained the medium. + */ + Collection read() throws IOException; + + /** + * Release resources like streams. + */ + void close() throws IOException; +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java new file mode 100644 index 000000000000..a817ca46f44c --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.modification.io; + +import java.io.IOException; + +import org.apache.iotdb.db.engine.modification.Modification; + +/** + * ModificationWriter provides methods for writing a modification to a persistent medium like file + * system. + */ +public interface ModificationWriter { + + /** + * Write a new modification to the persistent medium. + * @param mod the modification to be written. + */ + void write(Modification mod) throws IOException; + + /** + * Release resources like streams. + */ + void close() throws IOException; +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/package-info.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/package-info.java new file mode 100644 index 000000000000..e2fdfa80a833 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * modification is the functional module responsible for processing UPDATE and DELETE. + */ +package org.apache.iotdb.db.engine.modification; \ No newline at end of file diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessor.java index 67b740d183ec..537d3dcd3b03 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessor.java @@ -29,6 +29,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; + import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -45,6 +46,7 @@ import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.utils.FlushStatus; +import org.apache.iotdb.db.engine.version.VersionController; import org.apache.iotdb.db.exception.OverflowProcessorException; import org.apache.iotdb.db.utils.MemUtils; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; @@ -89,12 +91,14 @@ public class OverflowProcessor extends Processor { private AtomicLong memSize = new AtomicLong(); private WriteLogNode logNode; + private VersionController versionController; public OverflowProcessor(String processorName, Map parameters, - FileSchema fileSchema) + FileSchema fileSchema, VersionController versionController) throws IOException { super(processorName); this.fileSchema = fileSchema; + this.versionController = versionController; String overflowDirPath = TsFileDBConf.overflowDataDir; if (overflowDirPath.length() > 0 && overflowDirPath.charAt(overflowDirPath.length() - 1) != File.separatorChar) { @@ -124,12 +128,12 @@ private void recovery(File parentFile) throws IOException { String[] subFilePaths = clearFile(parentFile.list()); if (subFilePaths.length == 0) { workResource = new OverflowResource(parentPath, - String.valueOf(dataPahtCount.getAndIncrement())); + String.valueOf(dataPahtCount.getAndIncrement()), versionController); return; } else if (subFilePaths.length == 1) { long count = Long.valueOf(subFilePaths[0]); dataPahtCount.addAndGet(count + 1); - workResource = new OverflowResource(parentPath, String.valueOf(count)); + workResource = new OverflowResource(parentPath, String.valueOf(count), versionController); LOGGER.info("The overflow processor {} recover from work status.", getProcessorName()); } else { long count1 = Long.valueOf(subFilePaths[0]); @@ -141,8 +145,8 @@ private void recovery(File parentFile) throws IOException { } dataPahtCount.addAndGet(count2 + 1); // work dir > merge dir - workResource = new OverflowResource(parentPath, String.valueOf(count2)); - mergeResource = new OverflowResource(parentPath, String.valueOf(count1)); + workResource = new OverflowResource(parentPath, String.valueOf(count2), versionController); + mergeResource = new OverflowResource(parentPath, String.valueOf(count1), versionController); LOGGER.info("The overflow processor {} recover from merge status.", getProcessorName()); } } @@ -226,17 +230,20 @@ private byte[] convertStringToBytes(TSDataType type, String o) { } /** - * delete one time-series data which time range is from 0 to time-stamp. + * Delete data of a timeseries whose time ranges from 0 to timestamp. * - * @param deviceId - * @param measurementId - * @param timestamp - * @param type + * @param deviceId the deviceId of the timeseries. + * @param measurementId the measurementId of the timeseries. + * @param timestamp the upper-bound of deletion time. + * @param version the version number of this deletion. */ - @Deprecated - public void delete(String deviceId, String measurementId, long timestamp, TSDataType type) { - workSupport.delete(deviceId, measurementId, timestamp, type); - valueCount++; + public void delete(String deviceId, String measurementId, long timestamp, long version) throws IOException { + workResource.delete(deviceId, measurementId, timestamp, version); + workSupport.delete(deviceId, measurementId, timestamp, false); + if (flushStatus.isFlushing()) { + mergeResource.delete(deviceId, measurementId, timestamp, version); + flushSupport.delete(deviceId, measurementId, timestamp, true); + } } /** @@ -251,7 +258,7 @@ public void delete(String deviceId, String measurementId, long timestamp, TSData * @throws IOException */ public OverflowSeriesDataSource query(String deviceId, String measurementId, Filter filter, - TSDataType dataType) + TSDataType dataType) throws IOException { queryFlushLock.lock(); try { @@ -292,7 +299,7 @@ public OverflowSeriesDataSource query(String deviceId, String measurementId, Fil * @return insert data in SeriesChunkInMemTable */ private TimeValuePairSorter queryOverflowInsertInMemory(String deviceId, String measurementId, - TSDataType dataType) { + TSDataType dataType) { MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger(); if (flushStatus.isFlushing()) { @@ -314,8 +321,8 @@ private TimeValuePairSorter queryOverflowInsertInMemory(String deviceId, String * @return the seriesPath of unseqTsFile, List of TimeSeriesChunkMetaData for the special time-series. */ private Pair> queryWorkDataInOverflowInsert(String deviceId, - String measurementId, - TSDataType dataType) { + String measurementId, + TSDataType dataType) { Pair> pair = new Pair>( workResource.getInsertFilePath(), workResource.getInsertMetadatas(deviceId, measurementId, dataType)); @@ -331,7 +338,7 @@ private Pair> queryWorkDataInOverflowInsert(String d * @return MergeSeriesDataSource */ public MergeSeriesDataSource queryMerge(String deviceId, String measurementId, - TSDataType dataType) { + TSDataType dataType) { Pair> mergeInsert = queryMergeDataInOverflowInsert(deviceId, measurementId, dataType); @@ -339,8 +346,8 @@ public MergeSeriesDataSource queryMerge(String deviceId, String measurementId, } public OverflowSeriesDataSource queryMerge(String deviceId, String measurementId, - TSDataType dataType, - boolean isMerge) { + TSDataType dataType, + boolean isMerge) { Pair> mergeInsert = queryMergeDataInOverflowInsert(deviceId, measurementId, dataType); @@ -362,8 +369,8 @@ public OverflowSeriesDataSource queryMerge(String deviceId, String measurementId * @return the seriesPath of unseqTsFile, List of TimeSeriesChunkMetaData for the special time-series. */ private Pair> queryMergeDataInOverflowInsert(String deviceId, - String measurementId, - TSDataType dataType) { + String measurementId, + TSDataType dataType) { if (!isMerge) { return new Pair>(null, null); } @@ -399,7 +406,7 @@ public void switchWorkToMerge() throws IOException { mergeResource = workResource; // TODO: NEW ONE workResource workResource = new OverflowResource(parentPath, - String.valueOf(dataPahtCount.getAndIncrement())); + String.valueOf(dataPahtCount.getAndIncrement()), versionController); } isMerge = true; LOGGER.info("The overflow processor {} switch from WORK to MERGE", getProcessorName()); @@ -590,8 +597,7 @@ public long getMetaSize() { * @return The size of overflow file corresponding to this processor. */ public long getFileSize() { - return workResource.getInsertFile().length() + workResource.getUpdateDeleteFile().length() - + memoryUsage(); + return workResource.getInsertFile().length() + memoryUsage(); } /** diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResource.java index ee83f47cc7a4..773f9ccee4b6 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResource.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResource.java @@ -27,9 +27,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; + import org.apache.commons.io.FileUtils; +import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.engine.memtable.IMemTable; import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil; +import org.apache.iotdb.db.engine.modification.Deletion; +import org.apache.iotdb.db.engine.modification.ModificationFile; +import org.apache.iotdb.db.engine.version.VersionController; import org.apache.iotdb.db.utils.MemUtils; import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData; import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; @@ -44,9 +49,10 @@ public class OverflowResource { private static final Logger LOGGER = LoggerFactory.getLogger(OverflowResource.class); + private static final String INSERT_FILE_NAME = "unseqTsFile"; - private static final String UPDATE_DELETE_FILE_NAME = "overflowFile"; private static final String POSITION_FILE_NAME = "positionFile"; + private static final int FOOTER_LENGTH = 4; private static final int POS_LENGTH = 8; private String parentPath; @@ -54,12 +60,14 @@ public class OverflowResource { private String insertFilePath; private String positionFilePath; private File insertFile; - private File updateFile; private OverflowIO insertIO; private Map>> insertMetadatas; private List appendInsertMetadatas; + private VersionController versionController; + private ModificationFile modificationFile; - public OverflowResource(String parentPath, String dataPath) throws IOException { + public OverflowResource(String parentPath, String dataPath, VersionController versionController) + throws IOException { this.insertMetadatas = new HashMap<>(); this.appendInsertMetadatas = new ArrayList<>(); this.parentPath = parentPath; @@ -70,8 +78,8 @@ public OverflowResource(String parentPath, String dataPath) throws IOException { } insertFile = new File(dataFile, INSERT_FILE_NAME); insertFilePath = insertFile.getPath(); - updateFile = new File(dataFile, UPDATE_DELETE_FILE_NAME); positionFilePath = new File(dataFile, POSITION_FILE_NAME).getPath(); + Pair position = readPositionInfo(); try { // insert stream @@ -89,6 +97,8 @@ public OverflowResource(String parentPath, String dataPath) throws IOException { LOGGER.error("Failed to construct the OverflowIO.", e); throw e; } + this.versionController = versionController; + modificationFile = new ModificationFile(insertFilePath + ModificationFile.FILE_SUFFIX); } private Pair readPositionInfo() { @@ -109,9 +119,6 @@ private Pair readPositionInfo() { if (insertTempFile.exists()) { left = insertTempFile.length(); } - if (updateFile.exists()) { - right = updateFile.length(); - } return new Pair(left, right); } } @@ -159,7 +166,7 @@ private void readMetadata() throws IOException { } public List getInsertMetadatas(String deviceId, String measurementId, - TSDataType dataType) { + TSDataType dataType) { List chunkMetaDatas = new ArrayList<>(); if (insertMetadatas.containsKey(deviceId) && insertMetadatas.get(deviceId) .containsKey(measurementId)) { @@ -174,7 +181,7 @@ public List getInsertMetadatas(String deviceId, String measuremen } public void flush(FileSchema fileSchema, IMemTable memTable, - Map> overflowTrees, String processorName) + Map> overflowTrees, String processorName) throws IOException { // insert data long startPos = insertIO.getPos(); @@ -194,7 +201,8 @@ public void flush(FileSchema fileSchema, IMemTable memTable) throws IOException if (memTable != null && !memTable.isEmpty()) { insertIO.toTail(); long lastPosition = insertIO.getPos(); - MemTableFlushUtil.flushMemTable(fileSchema, insertIO, memTable); + MemTableFlushUtil.flushMemTable(fileSchema, insertIO, memTable, + versionController.nextVersion()); List rowGroupMetaDatas = insertIO.getChunkGroupMetaDatas(); appendInsertMetadatas.addAll(rowGroupMetaDatas); if (!rowGroupMetaDatas.isEmpty()) { @@ -235,15 +243,12 @@ public String getPositionFilePath() { return positionFilePath; } - public File getUpdateDeleteFile() { - return updateFile; - } - public void close() throws IOException { insertMetadatas.clear(); // updateDeleteMetadatas.clear(); insertIO.close(); // updateDeleteIO.close(); + modificationFile.close(); } public void deleteResource() throws IOException { @@ -275,4 +280,17 @@ private void addInsertMetadata(String deviceId, String measurementId, } insertMetadatas.get(deviceId).get(measurementId).add(chunkMetaData); } + + /** + * Delete data of a timeseries whose time ranges from 0 to timestamp. + * + * @param deviceId the deviceId of the timeseries. + * @param measurementId the measurementId of the timeseries. + * @param timestamp the upper-bound of deletion time. + */ + public void delete(String deviceId, String measurementId, long timestamp, long version) + throws IOException { + modificationFile.write(new Deletion(deviceId + IoTDBConstant.PATH_SEPARATOR + + measurementId, version, timestamp)); + } } \ No newline at end of file diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowSupport.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowSupport.java index f53a0e738ed3..f4f1652d4e99 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowSupport.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowSupport.java @@ -20,11 +20,11 @@ import java.util.HashMap; import java.util.Map; + import org.apache.iotdb.db.engine.memtable.IMemTable; import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable; import org.apache.iotdb.db.engine.memtable.TimeValuePairSorter; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; @@ -70,15 +70,13 @@ public void update(String deviceId, String measurementId, long startTime, long e indexTrees.get(deviceId).get(measurementId).update(startTime, endTime, value); } - @Deprecated - public void delete(String deviceId, String measurementId, long timestamp, TSDataType dataType) { - if (!indexTrees.containsKey(deviceId)) { - indexTrees.put(deviceId, new HashMap<>()); + public void delete(String deviceId, String measurementId, long timestamp, boolean isFlushing) { + if (isFlushing) { + memTable = memTable.copy(); + memTable.delete(deviceId, measurementId, timestamp); + } else { + memTable.delete(deviceId, measurementId, timestamp); } - if (!indexTrees.get(deviceId).containsKey(measurementId)) { - indexTrees.get(deviceId).put(measurementId, new OverflowSeriesImpl(measurementId, dataType)); - } - indexTrees.get(deviceId).get(measurementId).delete(timestamp); } public TimeValuePairSorter queryOverflowInsertInMemory(String deviceId, String measurementId, @@ -86,16 +84,6 @@ public TimeValuePairSorter queryOverflowInsertInMemory(String deviceId, String m return memTable.query(deviceId, measurementId, dataType); } - public BatchData queryOverflowUpdateInMemory(String deviceId, String measurementId, - TSDataType dataType, - BatchData data) { - if (indexTrees.containsKey(deviceId) && indexTrees.get(deviceId).containsKey(measurementId) - && indexTrees.get(deviceId).get(measurementId).getDataType().equals(dataType)) { - return indexTrees.get(deviceId).get(measurementId).query(data); - } - return null; - } - public boolean isEmptyOfOverflowSeriesMap() { return indexTrees.isEmpty(); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java new file mode 100644 index 000000000000..a38f4beee0b6 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java @@ -0,0 +1,115 @@ +/** + * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.engine.version; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * SimpleFileVersionController uses a local file and its file name to store the version. + */ +public class SimpleFileVersionController implements VersionController { + private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFileVersionController.class); + /** + * Every time currVersion - prevVersion >= SAVE_INTERVAL, currVersion is persisted and prevVersion + * is set to currVersion. When recovering from file, the version number is automatically increased + * by SAVE_INTERVAL to avoid conflicts. + */ + public static final long SAVE_INTERVAL = 100; + private static final String FILE_PREFIX = "Version-"; + private long prevVersion; + private long currVersion; + private String directoryPath; + + public SimpleFileVersionController(String directoryPath) throws IOException { + this.directoryPath = directoryPath; + restore(); + } + + @Override + public synchronized long nextVersion() { + currVersion ++; + try { + checkPersist(); + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + return currVersion; + } + + /** + * Test only method, no need for concurrency. + * @return the current version. + */ + @Override + public long currVersion() { + return currVersion; + } + + private void checkPersist() throws IOException { + if ((currVersion - prevVersion) >= SAVE_INTERVAL) { + persist(); + } + } + + private void persist() throws IOException { + File oldFile = new File(directoryPath, FILE_PREFIX + prevVersion); + File newFile = new File(directoryPath, FILE_PREFIX + currVersion); + if (!oldFile.renameTo(newFile)) { + throw new IOException(String + .format("can not rename file %s to file %s", oldFile.getAbsolutePath(), + newFile.getAbsolutePath())); + } + prevVersion = currVersion; + } + + private void restore() throws IOException { + File directory = new File(directoryPath); + File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(FILE_PREFIX)); + File versionFile = null; + if (versionFiles != null && versionFiles.length > 0) { + long maxVersion = 0; + int maxVersionIndex = 0; + for (int i = 0; i < versionFiles.length; i ++) { + // extract version from "Version-123456" + long fileVersion = Long.parseLong(versionFiles[i].getName().split("-")[1]); + if (fileVersion > maxVersion) { + maxVersion = fileVersion; + maxVersionIndex = i; + } + } + prevVersion = maxVersion; + for(int i = 0; i < versionFiles.length; i ++) { + if (i != maxVersionIndex) { + versionFiles[i].delete(); + } + } + } else { + versionFile = new File(directory, FILE_PREFIX + "0"); + prevVersion = 0; + new FileOutputStream(versionFile).close(); + } + // prevent overlapping in case of failure + currVersion = prevVersion + SAVE_INTERVAL; + persist(); + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java new file mode 100644 index 000000000000..21e9787d4747 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.engine.version; + +/** + * SysTimeVersionController uses system timestamp as the version number. + */ +public class SysTimeVersionController implements VersionController { + + public static final SysTimeVersionController INSTANCE = new SysTimeVersionController(); + + private SysTimeVersionController() { + + } + + @Override + public long nextVersion() { + return System.currentTimeMillis(); + } + + @Override + public long currVersion() { + return System.currentTimeMillis(); + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java new file mode 100644 index 000000000000..68ca4b9e46a4 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.engine.version; + +/** + * VersionController controls the version(a monotonic increasing long) of a FileNode. + */ +public interface VersionController { + /** + * Get the next version number. + * @return the next version number. + */ + long nextVersion(); + + /** + * Get the current version number. + * @return the current version number. + */ + long currVersion(); +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/IStatistic.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/IStatistic.java index 37198b298c5b..a616ad89d65e 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/monitor/IStatistic.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/IStatistic.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.concurrent.atomic.AtomicLong; + import org.apache.iotdb.tsfile.write.record.TSRecord; public interface IStatistic { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java index b00d18c33a88..849af4b7112d 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java @@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; + import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.concurrent.ThreadName; import org.apache.iotdb.db.conf.IoTDBConfig; @@ -37,7 +38,6 @@ import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.service.IService; import org.apache.iotdb.db.service.ServiceType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint; @@ -98,7 +98,7 @@ public static StatMonitor getInstance() { * @return TSRecord contains the DataPoints of a statGroupDeltaName */ public static TSRecord convertToTSRecord(HashMap hashMap, - String statGroupDeltaName, long curTime) { + String statGroupDeltaName, long curTime) { TSRecord tsRecord = new TSRecord(curTime, statGroupDeltaName); tsRecord.dataPointList = new ArrayList() { { @@ -359,7 +359,7 @@ public void run() { for (Map.Entry entry : statisticMap.entrySet()) { for (String statParamName : entry.getValue().getStatParamsHashMap().keySet()) { fManager.delete(entry.getKey(), statParamName, - currentTimeMillis - statMonitorRetainIntervalSec * 1000, TSDataType.INT64); + currentTimeMillis - statMonitorRetainIntervalSec * 1000); } } } catch (FileNodeManagerException e) { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java index 8c44cd9ff04c..e28f3254b6be 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java @@ -226,7 +226,7 @@ protected boolean delete(Path path, long timestamp) throws ProcessorException { } mManager.getFileNameByPath(path.getFullPath()); TSDataType type = mManager.getSeriesType(path.getFullPath()); - fileNodeManager.delete(deviceId, measurementId, timestamp, type); + fileNodeManager.delete(deviceId, measurementId, timestamp); return true; } catch (PathErrorException e) { throw new ProcessorException(e.getMessage()); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/AuthUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/AuthUtils.java index 28c52002c010..5d81fad04997 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/utils/AuthUtils.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/AuthUtils.java @@ -167,7 +167,7 @@ public static String encryptPassword(String password) { public static boolean pathBelongsTo(String pathA, String pathB) { return pathA.equals(pathB) || (pathA.startsWith(pathB) - && pathA.charAt(pathB.length()) == IoTDBConstant.PATH_SEPARATER); + && pathA.charAt(pathB.length()) == IoTDBConstant.PATH_SEPARATOR); } /** diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java index 10ba6853ddbf..b9d3aa327cb1 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.writelog.replay; import java.util.List; + import org.apache.iotdb.db.engine.filenode.FileNodeManager; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; @@ -87,12 +88,10 @@ private void update(UpdatePlan updatePlan) throws FileNodeManagerException, Path } } - private void delete(DeletePlan deletePlan) throws FileNodeManagerException, PathErrorException { - MManager memManager = MManager.getInstance(); + private void delete(DeletePlan deletePlan) throws FileNodeManagerException { for (Path path : deletePlan.getPaths()) { FileNodeManager.getInstance() - .delete(path.getDevice(), path.getMeasurement(), deletePlan.getDeleteTime(), - memManager.getSeriesType(path.getFullPath())); + .delete(path.getDevice(), path.getMeasurement(), deletePlan.getDeleteTime()); } } } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java index 4634e18827c7..73f20ec31e22 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.commons.io.FileUtils; +import org.apache.iotdb.db.engine.version.SysTimeVersionController; import org.apache.iotdb.db.exception.BufferWriteProcessorException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; @@ -92,9 +93,9 @@ public void act() throws ActionException { } }); - BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor("BufferBenchmark", "bench", - "benchFile", - parameters, fileSchema); + BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor("BufferBenchmark", + "bench", "benchFile", + parameters, SysTimeVersionController.INSTANCE, fileSchema); long startTime = System.currentTimeMillis(); for (int i = 0; i < numOfPoint; i++) { diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java index 0efe087ff1e4..8299d7508dba 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.engine.bufferwrite; import static org.junit.Assert.assertEquals; @@ -27,9 +28,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; + import org.apache.iotdb.db.conf.directories.Directories; import org.apache.iotdb.db.engine.MetadataManagerHelper; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; +import org.apache.iotdb.db.engine.version.SysTimeVersionController; import org.apache.iotdb.db.exception.BufferWriteProcessorException; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.utils.FileSchemaUtils; @@ -90,18 +93,19 @@ public void tearDown() throws Exception { @Test public void testWriteAndFlush() - throws BufferWriteProcessorException, WriteProcessException, IOException, InterruptedException { + throws BufferWriteProcessorException, WriteProcessException, IOException, InterruptedException { bufferwrite = new BufferWriteProcessor(Directories.getInstance().getFolderForTest(), - processorName, filename, - parameters, FileSchemaUtils.constructFileSchema(processorName)); + processorName, filename, + parameters, SysTimeVersionController.INSTANCE, + FileSchemaUtils.constructFileSchema(processorName)); assertEquals(filename, bufferwrite.getFileName()); assertEquals(processorName + File.separator + filename, bufferwrite.getFileRelativePath()); assertEquals(true, bufferwrite.isNewProcessor()); bufferwrite.setNewProcessor(false); assertEquals(false, bufferwrite.isNewProcessor()); Pair> pair = bufferwrite - .queryBufferWriteData(processorName, - measurementId, dataType); + .queryBufferWriteData(processorName, + measurementId, dataType); ReadOnlyMemChunk left = pair.left; List right = pair.right; assertEquals(true, left.isEmpty()); @@ -141,8 +145,9 @@ public void testWriteAndFlush() // test recovery BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor( - Directories.getInstance().getFolderForTest(), processorName, filename, parameters, - FileSchemaUtils.constructFileSchema(processorName)); + Directories.getInstance().getFolderForTest(), processorName, filename, parameters, + SysTimeVersionController.INSTANCE, + FileSchemaUtils.constructFileSchema(processorName)); pair = bufferWriteProcessor.queryBufferWriteData(processorName, measurementId, dataType); left = pair.left; right = pair.right; diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java index 95e0d557cb1c..6b3ccbec8a24 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.engine.bufferwrite; import static org.junit.Assert.assertEquals; @@ -28,10 +29,12 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; + import org.apache.iotdb.db.conf.directories.Directories; import org.apache.iotdb.db.engine.MetadataManagerHelper; import org.apache.iotdb.db.engine.PathUtils; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; +import org.apache.iotdb.db.engine.version.SysTimeVersionController; import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.utils.FileSchemaUtils; @@ -109,10 +112,10 @@ public void tearDown() throws Exception { @Test public void testWriteAndAbnormalRecover() - throws WriteProcessException, InterruptedException, IOException, ProcessorException { + throws WriteProcessException, InterruptedException, IOException, ProcessorException { bufferwrite = new BufferWriteProcessor(directories.getFolderForTest(), deviceId, insertPath, - parameters, - FileSchemaUtils.constructFileSchema(deviceId)); + parameters, SysTimeVersionController.INSTANCE, + FileSchemaUtils.constructFileSchema(deviceId)); for (int i = 1; i < 100; i++) { bufferwrite.write(deviceId, measurementId, i, dataType, String.valueOf(i)); } @@ -135,13 +138,14 @@ public void testWriteAndAbnormalRecover() bufferwrite.close(); file.renameTo(restoreFile); BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor( - directories.getFolderForTest(), deviceId, - insertPath, parameters, FileSchemaUtils.constructFileSchema(deviceId)); + directories.getFolderForTest(), deviceId, + insertPath, parameters, SysTimeVersionController.INSTANCE, + FileSchemaUtils.constructFileSchema(deviceId)); assertEquals(true, insertFile.exists()); assertEquals(insertFileLength, insertFile.length()); Pair> pair = bufferWriteProcessor - .queryBufferWriteData(deviceId, - measurementId, dataType); + .queryBufferWriteData(deviceId, + measurementId, dataType); assertEquals(true, pair.left.isEmpty()); assertEquals(1, pair.right.size()); ChunkMetaData chunkMetaData = pair.right.get(0); @@ -153,10 +157,10 @@ public void testWriteAndAbnormalRecover() @Test public void testWriteAndNormalRecover() - throws WriteProcessException, ProcessorException, InterruptedException { + throws WriteProcessException, ProcessorException, InterruptedException { bufferwrite = new BufferWriteProcessor(directories.getFolderForTest(), deviceId, insertPath, - parameters, - FileSchemaUtils.constructFileSchema(deviceId)); + parameters, SysTimeVersionController.INSTANCE, + FileSchemaUtils.constructFileSchema(deviceId)); for (int i = 1; i < 100; i++) { bufferwrite.write(deviceId, measurementId, i, dataType, String.valueOf(i)); } @@ -168,11 +172,12 @@ public void testWriteAndNormalRecover() File restoreFile = new File(dataFile, restoreFilePath); assertEquals(true, restoreFile.exists()); BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor( - directories.getFolderForTest(), deviceId, - insertPath, parameters, FileSchemaUtils.constructFileSchema(deviceId)); + directories.getFolderForTest(), deviceId, + insertPath, parameters, SysTimeVersionController.INSTANCE, + FileSchemaUtils.constructFileSchema(deviceId)); Pair> pair = bufferWriteProcessor - .queryBufferWriteData(deviceId, - measurementId, dataType); + .queryBufferWriteData(deviceId, + measurementId, dataType); assertEquals(true, pair.left.isEmpty()); assertEquals(1, pair.right.size()); ChunkMetaData chunkMetaData = pair.right.get(0); @@ -185,10 +190,10 @@ public void testWriteAndNormalRecover() @Test public void testWriteAndQuery() - throws WriteProcessException, InterruptedException, ProcessorException { + throws WriteProcessException, InterruptedException, ProcessorException { bufferwrite = new BufferWriteProcessor(directories.getFolderForTest(), deviceId, insertPath, - parameters, - FileSchemaUtils.constructFileSchema(deviceId)); + parameters, SysTimeVersionController.INSTANCE, + FileSchemaUtils.constructFileSchema(deviceId)); assertEquals(false, bufferwrite.isFlush()); assertEquals(true, bufferwrite.canBeClosed()); assertEquals(0, bufferwrite.memoryUsage()); @@ -206,8 +211,8 @@ public void testWriteAndQuery() assertEquals(0, bufferwrite.memoryUsage()); // query result Pair> pair = bufferwrite - .queryBufferWriteData(deviceId, measurementId, - dataType); + .queryBufferWriteData(deviceId, measurementId, + dataType); assertEquals(true, pair.left.isEmpty()); assertEquals(1, pair.right.size()); ChunkMetaData chunkMetaData = pair.right.get(0); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java index 5762a451868c..e19925f2e0f5 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java @@ -164,7 +164,7 @@ public void testWriteAndRecover() throws IOException { memTable.write("d1", "s2", TSDataType.INT32, 3, "1"); memTable.write("d2", "s2", TSDataType.INT32, 2, "1"); memTable.write("d2", "s2", TSDataType.INT32, 4, "1"); - MemTableFlushUtil.flushMemTable(schema, writer, memTable); + MemTableFlushUtil.flushMemTable(schema, writer, memTable, 0); writer.flush(); writer.appendMetadata(); writer.getOutput().close(); @@ -217,7 +217,7 @@ public void testFlushAndGetMetadata() throws IOException { MemTableTestUtils.measurementId0, MemTableTestUtils.dataType0); - MemTableFlushUtil.flushMemTable(MemTableTestUtils.getFileSchema(), writer, memTable); + MemTableFlushUtil.flushMemTable(MemTableTestUtils.getFileSchema(), writer, memTable, 0); writer.flush(); assertEquals(0, diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java index 7fa3f0392ae1..4a62b144dc97 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.engine.bufferwrite.ActionException; import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor; import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants; +import org.apache.iotdb.db.engine.version.SysTimeVersionController; import org.apache.iotdb.db.exception.BufferWriteProcessorException; import org.apache.iotdb.db.metadata.ColumnSchema; import org.apache.iotdb.db.metadata.MManager; @@ -143,7 +144,7 @@ public void test() throws BufferWriteProcessorException, WriteProcessException { try { processor = new BufferWriteProcessor(Directories.getInstance().getFolderForTest(), nsp, filename, - parameters, constructFileSchema(nsp)); + parameters, SysTimeVersionController.INSTANCE, constructFileSchema(nsp)); } catch (BufferWriteProcessorException e) { e.printStackTrace(); fail(e.getMessage()); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java index e58d7238d819..78d90c15b57f 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.engine.bufferwrite.ActionException; import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor; import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants; +import org.apache.iotdb.db.engine.version.SysTimeVersionController; import org.apache.iotdb.db.exception.BufferWriteProcessorException; import org.apache.iotdb.db.metadata.ColumnSchema; import org.apache.iotdb.db.metadata.MManager; @@ -142,8 +143,8 @@ public void test() throws BufferWriteProcessorException, WriteProcessException { try { processor = new BufferWriteProcessor(Directories.getInstance().getFolderForTest(), nsp, - filename, - parameters, constructFileSchema(nsp)); + filename, + parameters, SysTimeVersionController.INSTANCE, constructFileSchema(nsp)); } catch (BufferWriteProcessorException e) { e.printStackTrace(); fail(e.getMessage()); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java index 34b6f8b7fa9c..8e98fe77c95c 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.engine.bufferwrite.ActionException; import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants; import org.apache.iotdb.db.engine.overflow.ioV2.OverflowProcessor; +import org.apache.iotdb.db.engine.version.SysTimeVersionController; import org.apache.iotdb.db.exception.OverflowProcessorException; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.utils.FileSchemaUtils; @@ -120,7 +121,7 @@ public void testInsert() throws InterruptedException, IOException, WriteProcessE // insert one point: int try { ofprocessor = new OverflowProcessor(nameSpacePath, parameters, - FileSchemaUtils.constructFileSchema(deviceId)); + FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE); for (int i = 1; i < 1000000; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataTypes[0], measurementIds[0], String.valueOf(i))); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java index e2c378e8957d..7de6a8c756f1 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.engine.bufferwrite.ActionException; import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants; import org.apache.iotdb.db.engine.overflow.ioV2.OverflowProcessor; +import org.apache.iotdb.db.engine.version.SysTimeVersionController; import org.apache.iotdb.db.exception.OverflowProcessorException; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.utils.FileSchemaUtils; @@ -120,7 +121,7 @@ public void testInsert() throws InterruptedException, IOException, WriteProcessE // insert one point: int try { ofprocessor = new OverflowProcessor(nameSpacePath, parameters, - FileSchemaUtils.constructFileSchema(deviceId)); + FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE); for (int i = 1; i < 1000000; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataTypes[0], measurementIds[0], String.valueOf(i))); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java new file mode 100644 index 000000000000..70281fba9f59 --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.modification; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.conf.directories.Directories; +import org.apache.iotdb.db.engine.filenode.FileNodeManager; +import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.exception.FileNodeManagerException; +import org.apache.iotdb.db.exception.MetadataArgsErrorException; +import org.apache.iotdb.db.exception.PathErrorException; +import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; +import org.apache.iotdb.tsfile.write.record.TSRecord; +import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; + +public class DeletionFileNodeTest { + + private String processorName = "root.test"; + + private static String[] measurements = new String[10]; + private String dataType = TSDataType.DOUBLE.toString(); + private String encoding = TSEncoding.PLAIN.toString(); + private String[] args = new String[0]; + + static { + for (int i = 0; i < 10; i++) { + measurements[i] = "m" + i; + } + } + + @Before + public void setup() throws MetadataArgsErrorException, + PathErrorException, IOException, FileNodeManagerException { + MManager.getInstance().setStorageLevelToMTree(processorName); + for (int i = 0; i < 10; i++) { + MManager.getInstance().addPathToMTree(processorName + "." + measurements[i], dataType, + encoding, args); + FileNodeManager.getInstance().addTimeSeries(new Path(processorName, measurements[i]), dataType, + encoding); + } + } + + @After + public void teardown() throws IOException, FileNodeManagerException { + EnvironmentUtils.cleanEnv(); + } + + @Test + public void testDeleteInBufferWriteCache() throws + FileNodeManagerException { + + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, processorName); + for (int j = 0; j < 10; j++) { + record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); + } + FileNodeManager.getInstance().insert(record, false); + } + + FileNodeManager.getInstance().delete(processorName, measurements[3], 50); + FileNodeManager.getInstance().delete(processorName, measurements[4], 50); + FileNodeManager.getInstance().delete(processorName, measurements[5], 30); + FileNodeManager.getInstance().delete(processorName, measurements[5], 50); + + SingleSeriesExpression expression = new SingleSeriesExpression(new Path(processorName, + measurements[5]), null); + QueryDataSource dataSource = FileNodeManager.getInstance().query(expression); + Iterator timeValuePairs = + dataSource.getSeqDataSource().getReadableChunk().getIterator(); + int count = 0; + while (timeValuePairs.hasNext()) { + timeValuePairs.next(); + count++; + } + assertEquals(count, 50); + } + + @Test + public void testDeleteInBufferWriteFile() throws FileNodeManagerException, IOException { + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, processorName); + for (int j = 0; j < 10; j++) { + record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); + } + FileNodeManager.getInstance().insert(record, false); + } + FileNodeManager.getInstance().closeAll(); + + FileNodeManager.getInstance().delete(processorName, measurements[5], 50); + FileNodeManager.getInstance().delete(processorName, measurements[4], 40); + FileNodeManager.getInstance().delete(processorName, measurements[3], 30); + + Modification[] realModifications = new Modification[]{ + new Deletion(processorName + "." + measurements[5], 102, 50), + new Deletion(processorName + "." + measurements[4], 103, 40), + new Deletion(processorName + "." + measurements[3], 104, 30), + }; + + String fileNodePath = Directories.getInstance().getTsFileFolder(0) + File.separator + + processorName; + File fileNodeDir = new File(fileNodePath); + File[] modFiles = fileNodeDir.listFiles((dir, name) + -> name.endsWith(ModificationFile.FILE_SUFFIX)); + assertEquals(modFiles.length, 1); + + LocalTextModificationAccessor accessor = + new LocalTextModificationAccessor(modFiles[0].getPath()); + Collection modifications = accessor.read(); + assertEquals(modifications.size(), 3); + int i = 0; + for (Modification modification : modifications) { + assertTrue(modification.equals(realModifications[i++])); + } + } + + @Test + public void testDeleteInOverflowCache() throws FileNodeManagerException { + // insert into BufferWrite + for (int i = 101; i <= 200; i++) { + TSRecord record = new TSRecord(i, processorName); + for (int j = 0; j < 10; j++) { + record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); + } + FileNodeManager.getInstance().insert(record, false); + } + FileNodeManager.getInstance().closeAll(); + + // insert into Overflow + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, processorName); + for (int j = 0; j < 10; j++) { + record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); + } + FileNodeManager.getInstance().insert(record, false); + } + + FileNodeManager.getInstance().delete(processorName, measurements[3], 50); + FileNodeManager.getInstance().delete(processorName, measurements[4], 50); + FileNodeManager.getInstance().delete(processorName, measurements[5], 30); + FileNodeManager.getInstance().delete(processorName, measurements[5], 50); + + SingleSeriesExpression expression = new SingleSeriesExpression(new Path(processorName, + measurements[5]), null); + QueryDataSource dataSource = FileNodeManager.getInstance().query(expression); + Iterator timeValuePairs = + dataSource.getOverflowSeriesDataSource().getReadableMemChunk().getIterator(); + int count = 0; + while (timeValuePairs.hasNext()) { + timeValuePairs.next(); + count++; + } + assertEquals(count, 50); + } + + @Test + public void testDeleteInOverflowFile() throws FileNodeManagerException, IOException { + // insert into BufferWrite + for (int i = 101; i <= 200; i++) { + TSRecord record = new TSRecord(i, processorName); + for (int j = 0; j < 10; j++) { + record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); + } + FileNodeManager.getInstance().insert(record, false); + } + FileNodeManager.getInstance().closeAll(); + + // insert into Overflow + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, processorName); + for (int j = 0; j < 10; j++) { + record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0)); + } + FileNodeManager.getInstance().insert(record, false); + } + FileNodeManager.getInstance().closeAll(); + + FileNodeManager.getInstance().delete(processorName, measurements[5], 50); + FileNodeManager.getInstance().delete(processorName, measurements[4], 40); + FileNodeManager.getInstance().delete(processorName, measurements[3], 30); + + Modification[] realModifications = new Modification[]{ + new Deletion(processorName + "." + measurements[5], 103, 50), + new Deletion(processorName + "." + measurements[4], 104, 40), + new Deletion(processorName + "." + measurements[3], 105, 30), + }; + + String fileNodePath = IoTDBDescriptor.getInstance().getConfig().overflowDataDir + File.separator + + processorName + File.separator + "0" + File.separator; + File fileNodeDir = new File(fileNodePath); + File[] modFiles = fileNodeDir.listFiles((dir, name) + -> name.endsWith(ModificationFile.FILE_SUFFIX)); + assertEquals(modFiles.length, 1); + + LocalTextModificationAccessor accessor = + new LocalTextModificationAccessor(modFiles[0].getPath()); + Collection modifications = accessor.read(); + assertEquals(modifications.size(), 3); + int i = 0; + for (Modification modification : modifications) { + assertTrue(modification.equals(realModifications[i++])); + } + } +} diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java new file mode 100644 index 000000000000..35d5dfd29989 --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.modification; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class ModificationFileTest { + @Test + public void readMyWrite() { + String tempFileName = "mod.temp"; + Modification[] modifications = new Modification[]{ + new Deletion("p1", 1, 1), + new Deletion("p2", 2, 2), + new Deletion("p3", 3, 3), + new Deletion("p4", 4, 4), + }; + try { + ModificationFile mFile = new ModificationFile(tempFileName); + for (int i = 0; i < 2; i++) { + mFile.write(modifications[i]); + } + List modificationList = (List) mFile.getModifications(); + for (int i = 0; i < 2; i++) { + assertEquals(modifications[i], modificationList.get(i)); + } + + for (int i = 2; i < 4; i++) { + mFile.write(modifications[i]); + } + modificationList = (List) mFile.getModifications(); + for (int i = 0; i < 4; i++) { + assertEquals(modifications[i], modificationList.get(i)); + } + mFile.close(); + } catch (IOException e) { + fail(e.getMessage()); + } finally { + new File(tempFileName).delete(); + } + } +} \ No newline at end of file diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java new file mode 100644 index 000000000000..91f92cc0185b --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.modification.io; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import org.apache.iotdb.db.engine.modification.Deletion; +import org.apache.iotdb.db.engine.modification.Modification; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class LocalTextModificationAccessorTest { + + @Test + public void readMyWrite() { + String tempFileName = "mod.temp"; + Modification[] modifications = new Modification[]{ + new Deletion("p1", 1, 1), + new Deletion("p2", 2, 2), + new Deletion("p3", 3, 3), + new Deletion("p4", 4, 4), + }; + try { + LocalTextModificationAccessor accessor = new LocalTextModificationAccessor(tempFileName); + for (int i = 0; i < 2; i++) { + accessor.write(modifications[i]); + } + List modificationList = (List) accessor.read(); + for (int i = 0; i < 2; i++) { + assertEquals(modifications[i], modificationList.get(i)); + } + + for (int i = 2; i < 4; i++) { + accessor.write(modifications[i]); + } + modificationList = (List) accessor.read(); + for (int i = 0; i < 4; i++) { + assertEquals(modifications[i], modificationList.get(i)); + } + accessor.close(); + } catch (IOException e) { + fail(e.getMessage()); + } finally { + new File(tempFileName).delete(); + } + } + + @Test + public void readNull() throws IOException { + String tempFileName = "mod.temp"; + LocalTextModificationAccessor accessor = null; + accessor = new LocalTextModificationAccessor(tempFileName); + new File(tempFileName).delete(); + Collection modifications = accessor.read(); + assertNull(modifications); + } +} \ No newline at end of file diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessorBenchmark.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessorBenchmark.java index 5a7cb1d643bc..1e797c0b8e52 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessorBenchmark.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessorBenchmark.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.engine.bufferwrite.Action; import org.apache.iotdb.db.engine.bufferwrite.ActionException; import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants; +import org.apache.iotdb.db.engine.version.SysTimeVersionController; import org.apache.iotdb.db.exception.OverflowProcessorException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; @@ -92,7 +93,7 @@ public void act() throws ActionException { } }); OverflowProcessor overflowProcessor = new OverflowProcessor("Overflow_bench", parameters, - fileSchema); + fileSchema, SysTimeVersionController.INSTANCE); long startTime = System.currentTimeMillis(); for (int i = 0; i < numOfPoint; i++) { for (int j = 0; j < numOfDevice; j++) { diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessorTest.java index aa24bd6b3078..b76db836fa60 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessorTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessorTest.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants; import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource; import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource; +import org.apache.iotdb.db.engine.version.SysTimeVersionController; import org.apache.iotdb.db.exception.OverflowProcessorException; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.utils.TimeValuePair; @@ -76,8 +77,10 @@ public void tearDown() throws Exception { @Test public void testInsertUpdate() throws IOException, OverflowProcessorException, InterruptedException { - processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema()); - assertEquals(true, new File(PathUtils.getOverflowWriteDir(processorName), "0").exists()); + processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema(), + SysTimeVersionController.INSTANCE); + assertEquals(true, new File(PathUtils.getOverflowWriteDir(processorName), + "0").exists()); assertEquals(false, processor.isFlush()); assertEquals(false, processor.isMerge()); // write update data @@ -141,7 +144,8 @@ public void testInsertUpdate() @Test public void testWriteMemoryAndQuery() throws IOException, OverflowProcessorException { - processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema()); + processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema(), + SysTimeVersionController.INSTANCE); OverflowTestUtils.produceInsertData(processor); processor.close(); // test query @@ -155,7 +159,8 @@ public void testWriteMemoryAndQuery() throws IOException, OverflowProcessorExcep @Test public void testFlushAndQuery() throws IOException, OverflowProcessorException { - processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema()); + processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema(), + SysTimeVersionController.INSTANCE); processor.flush(); // waiting for the end of flush. try { @@ -173,13 +178,14 @@ public void testFlushAndQuery() throws IOException, OverflowProcessorException { @Test public void testRecovery() throws OverflowProcessorException, IOException { - processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema()); + processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema(), + SysTimeVersionController.INSTANCE); processor.close(); processor.switchWorkToMerge(); assertEquals(true, processor.isMerge()); processor.clear(); OverflowProcessor overflowProcessor = new OverflowProcessor(processorName, parameters, - OverflowTestUtils.getFileSchema()); + OverflowTestUtils.getFileSchema(), SysTimeVersionController.INSTANCE); // recovery query assertEquals(false, overflowProcessor.isMerge()); overflowProcessor.switchWorkToMerge(); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResourceTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResourceTest.java index eb5de0fdefae..6a7d26db5fcc 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResourceTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResourceTest.java @@ -24,6 +24,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.List; + +import org.apache.iotdb.db.engine.version.SysTimeVersionController; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; import org.junit.After; @@ -45,7 +47,7 @@ public class OverflowResourceTest { @Before public void setUp() throws Exception { - work = new OverflowResource(filePath, dataPath); + work = new OverflowResource(filePath, dataPath, SysTimeVersionController.INSTANCE); insertFile = new File(new File(filePath, dataPath), insertFileName); updateFile = new File(new File(filePath, dataPath), updateDeleteFileName); positionFile = new File(new File(filePath, dataPath), positionFileName); @@ -81,7 +83,7 @@ public void testOverflowInsert() throws IOException { fileOutputStream.write(new byte[20]); fileOutputStream.close(); assertEquals(originlength + 20, insertFile.length()); - work = new OverflowResource(filePath, dataPath); + work = new OverflowResource(filePath, dataPath, SysTimeVersionController.INSTANCE); chunkMetaDatas = work .getInsertMetadatas(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowSupportTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowSupportTest.java index 616696fd07c7..cc50e0a7bd74 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowSupportTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowSupportTest.java @@ -48,7 +48,7 @@ public void setUp() throws Exception { support.update(deviceId1, measurementId1, 20, 30, dataType1, BytesUtils.intToBytes(20)); // time :[2,10] [20,30] value: int [10,10] int[20,20] // d1 s2 - support.delete(deviceId1, measurementId2, 10, dataType1); + support.delete(deviceId1, measurementId2, 10, false); support.update(deviceId1, measurementId2, 20, 30, dataType1, BytesUtils.intToBytes(20)); // time: [0,-10] [20,30] value[20,20] // d2 s1 @@ -57,7 +57,7 @@ public void setUp() throws Exception { // time: [5,9] [10,40] value [10.5,10.5] [20.5,20.5] // d2 s2 support.update(deviceId2, measurementId2, 2, 10, dataType2, BytesUtils.floatToBytes(5.5f)); - support.delete(deviceId2, measurementId2, 20, dataType2); + support.delete(deviceId2, measurementId2, 20, false); // time : [0,-20] } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java new file mode 100644 index 000000000000..fe40ee991bdd --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.version; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.io.FileUtils; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.iotdb.db.engine.version.SimpleFileVersionController.SAVE_INTERVAL; +import static org.junit.Assert.assertEquals; + +public class SimpleFileVersionControllerTest { + @Test + public void test() throws IOException { + String tempFilePath = "version.tmp"; + + try { + if (!new File(tempFilePath).mkdir()) { + Assert.fail("can not create version.tmp folder"); + } + VersionController versionController = new SimpleFileVersionController(tempFilePath); + assertEquals(versionController.currVersion(), SAVE_INTERVAL); + for (int i = 0; i < 150; i++) { + versionController.nextVersion(); + } + assertEquals(versionController.currVersion(), SAVE_INTERVAL + 150); + versionController = new SimpleFileVersionController(tempFilePath); + assertEquals(versionController.currVersion(), SAVE_INTERVAL + 200); + } finally { + FileUtils.deleteDirectory(new File(tempFilePath)); + } + } +} \ No newline at end of file diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java new file mode 100644 index 000000000000..4e063adb106a --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.version; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +public class SysTimeVersionControllerTest { + + @Test + public void test() { + VersionController versionController = SysTimeVersionController.INSTANCE; + long diff = versionController.currVersion() - System.currentTimeMillis(); + assertTrue(diff >= -1 && diff <= 1); + diff = versionController.nextVersion() - System.currentTimeMillis(); + assertTrue(diff >= -1 && diff <= 1); + } +} \ No newline at end of file diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java index 163a083d773f..0bb646b7a427 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java @@ -104,11 +104,11 @@ public void SimpleTest() throws ClassNotFoundException, SQLException { + " StorageGroup: root.vehicle \n" + " }\n" + " },\n" + " d5:{\n" + " s9:{\n" + " DataType: FLOAT,\n" + " Encoding: PLAIN,\n" - + " args: {compressor=SNAPPY, MAX_POINT_NUMBER=10},\n" + + " args: {MAX_POINT_NUMBER=10, compressor=SNAPPY},\n" + " StorageGroup: root.vehicle \n" + " }\n" + " },\n" + " d6:{\n" + " s10:{\n" + " DataType: DOUBLE,\n" + " Encoding: RLE,\n" - + " args: {compressor=UNCOMPRESSOR, MAX_POINT_NUMBER=10},\n" + + " args: {MAX_POINT_NUMBER=10, compressor=UNCOMPRESSOR},\n" + " StorageGroup: root.vehicle \n" + " }\n" + " }\n" + " }\n" + "}", "DELETE TIMESERIES root.vehicle.*", "SHOW TIMESERIES", diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java index a3fa58436c7b..94cc02c3550b 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java @@ -366,7 +366,7 @@ private void ShowTimeseriesInJson() { + " StorageGroup: root.ln.wf01.wt01 \n" + " },\n" + " temperature:{\n" + " DataType: FLOAT,\n" + " Encoding: RLE,\n" - + " args: {compressor=SNAPPY, MAX_POINT_NUMBER=3},\n" + + " args: {MAX_POINT_NUMBER=3, compressor=SNAPPY},\n" + " StorageGroup: root.ln.wf01.wt01 \n" + " }\n" + " }\n" + " }\n" + " }\n" + "}"; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java index ceac1b8c767f..9bfa058b7b2a 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; + import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; /** @@ -51,6 +52,8 @@ private ChunkGroupMetaData() { chunkMetaDataList = new ArrayList<>(); } + private long version; + /** * constructor of ChunkGroupMetaData. * @@ -80,10 +83,11 @@ public static ChunkGroupMetaData deserializeFrom(InputStream inputStream) throws ChunkGroupMetaData chunkGroupMetaData = new ChunkGroupMetaData(); chunkGroupMetaData.deviceID = ReadWriteIOUtils.readString(inputStream); + chunkGroupMetaData.version = ReadWriteIOUtils.readLong(inputStream); int size = ReadWriteIOUtils.readInt(inputStream); chunkGroupMetaData.serializedSize = - Integer.BYTES + chunkGroupMetaData.deviceID.length() + Integer.BYTES; + Integer.BYTES + Long.BYTES + chunkGroupMetaData.deviceID.length() + Integer.BYTES; List chunkMetaDataList = new ArrayList<>(); @@ -107,11 +111,12 @@ public static ChunkGroupMetaData deserializeFrom(ByteBuffer buffer) { ChunkGroupMetaData chunkGroupMetaData = new ChunkGroupMetaData(); chunkGroupMetaData.deviceID = (ReadWriteIOUtils.readString(buffer)); + chunkGroupMetaData.version = ReadWriteIOUtils.readLong(buffer); int size = ReadWriteIOUtils.readInt(buffer); chunkGroupMetaData.serializedSize = - Integer.BYTES + chunkGroupMetaData.deviceID.length() + Integer.BYTES; + Integer.BYTES + Long.BYTES + chunkGroupMetaData.deviceID.length() + Integer.BYTES; List chunkMetaDataList = new ArrayList<>(); for (int i = 0; i < size; i++) { @@ -129,7 +134,8 @@ public int getSerializedSize() { } void reCalculateSerializedSize() { - serializedSize = Integer.BYTES + deviceID.length() + Integer.BYTES; // size of chunkMetaDataList + serializedSize = Integer.BYTES + Long.BYTES + + deviceID.length() + Integer.BYTES; // size of chunkMetaDataList for (ChunkMetaData chunk : chunkMetaDataList) { serializedSize += chunk.getSerializedSize(); } @@ -171,6 +177,7 @@ public String getDeviceID() { public int serializeTo(OutputStream outputStream) throws IOException { int byteLen = 0; byteLen += ReadWriteIOUtils.write(deviceID, outputStream); + byteLen += ReadWriteIOUtils.write(version, outputStream); byteLen += ReadWriteIOUtils.write(chunkMetaDataList.size(), outputStream); for (ChunkMetaData chunkMetaData : chunkMetaDataList) { @@ -191,6 +198,7 @@ public int serializeTo(ByteBuffer buffer) throws IOException { int byteLen = 0; byteLen += ReadWriteIOUtils.write(deviceID, buffer); + byteLen += ReadWriteIOUtils.write(version, buffer); byteLen += ReadWriteIOUtils.write(chunkMetaDataList.size(), buffer); for (ChunkMetaData chunkMetaData : chunkMetaDataList) { @@ -201,4 +209,13 @@ public int serializeTo(ByteBuffer buffer) throws IOException { return byteLen; } + + public long getVersion() { + return version; + } + + public void setVersion(long version) { + this.version = version; + } + } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java index 9619601382d6..6f64d98fb5c4 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; + import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.exception.write.NoMeasurementException; @@ -72,6 +73,7 @@ public class TsFileWriter { **/ private long recordCountForNextMemCheck = 100; private long chunkGroupSizeThreshold; + private long version = 0; /** * init this TsFileWriter. @@ -258,7 +260,7 @@ private boolean flushAllChunkGroups() throws IOException { chunkGroupFooter.getDataSize(), fileWriter.getPos() - pos)); } - fileWriter.endChunkGroup(chunkGroupFooter); + fileWriter.endChunkGroup(chunkGroupFooter, version++); } long actualTotalChunkGroupSize = fileWriter.getPos() - totalMemStart; LOG.info("total chunk group size:{}", actualTotalChunkGroupSize); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java index 3cffa30fc6f6..38db7a74e84a 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; + import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.common.constant.StatisticConstant; import org.apache.iotdb.tsfile.file.MetaMarker; @@ -142,9 +143,9 @@ public void startFlushChunkGroup(String deviceId) throws IOException { * @throws IOException if I/O error occurs */ public int startFlushChunk(MeasurementSchema descriptor, CompressionType compressionCodecName, - TSDataType tsDataType, TSEncoding encodingType, Statistics statistics, long maxTime, - long minTime, - int datasize, int numOfPages) throws IOException { + TSDataType tsDataType, TSEncoding encodingType, Statistics statistics, long maxTime, + long minTime, + int datasize, int numOfPages) throws IOException { LOG.debug("start series chunk:{}, file position {}", descriptor, out.getPosition()); currentChunkMetaData = new ChunkMetaData(descriptor.getMeasurementId(), tsDataType, @@ -191,8 +192,9 @@ public void endChunk(long totalValueCount) { * * @param chunkGroupFooter -use to serialize */ - public void endChunkGroup(ChunkGroupFooter chunkGroupFooter) throws IOException { + public void endChunkGroup(ChunkGroupFooter chunkGroupFooter, long version) throws IOException { chunkGroupFooter.serializeTo(out.wrapAsStream()); + currentChunkGroupMetaData.setVersion(version); chunkGroupMetaDataList.add(currentChunkGroupMetaData); LOG.debug("end chunk group:{}", currentChunkGroupMetaData); currentChunkGroupMetaData = null; diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java index e73e232c3b38..6894702f9def 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java @@ -61,7 +61,7 @@ public void before() throws IOException { measurementSchema.getType(), measurementSchema.getEncodingType(), statistics, 0, 0, 0, 0); writer.endChunk(0); ChunkGroupFooter footer = new ChunkGroupFooter(deviceId, 0, 1); - writer.endChunkGroup(footer); + writer.endChunkGroup(footer, 0); // end file writer.endFile(fileSchema);