Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8a6afad
add some impl
Dec 30, 2019
f4fd809
fix bugs
Dec 30, 2019
254c976
modify overlapped to Statistics
Dec 30, 2019
e8bf53c
delete useless
Dec 30, 2019
a9a7c0d
add asf header
Dec 30, 2019
b76cf66
Merge branch 'new_series_reader' of https://github.com/apache/incubat…
Jan 2, 2020
3a2c30d
fix GroupByWithoutValueFilterDataSet
Jan 3, 2020
b05f90f
fix all GroupBy data
Jan 5, 2020
fdfa758
fix ifill bug
Jan 6, 2020
268fad7
rewrite reader for merge muti unseq
Jan 6, 2020
2a29285
fix many bug
Jan 7, 2020
d9e66ff
revert tsfile changes
Jan 7, 2020
56692e4
add comments
Jan 7, 2020
71244c2
fix groupby [0-100)
Jan 7, 2020
33bb57c
all done
Jan 7, 2020
5dee0bf
fix merge test
Jan 8, 2020
80b44ae
Merge branch 'new_series_reader' of https://github.com/apache/incubat…
Jan 10, 2020
7a8c4af
fix MergeOverLapTest
Jan 10, 2020
d78f923
Merge branch 'new_series_reader' of https://github.com/apache/incubat…
Jan 11, 2020
dcd47ee
fix MergeTaskTest
Jan 11, 2020
9deee19
fix MergeTaskTest
Jan 11, 2020
99f9dbe
close chunkloader
Jan 11, 2020
8dfdb9d
close chunkloader
Jan 11, 2020
590ece1
change with review
Jan 13, 2020
ec8e944
change with review
Jan 13, 2020
a8d7ae1
Merge branch 'new_series_reader' of https://github.com/apache/incubat…
Jan 13, 2020
e7b2dad
fix group by in last max
Jan 13, 2020
a86aef7
add javadoc for reader
Jan 13, 2020
2fe8dcc
rename AggregateFunction to AggregateResult
Jan 13, 2020
6f6a1b8
Merge branch 'new_series_reader' of https://github.com/apache/incubat…
Jan 13, 2020
3a34bcc
delete IPageSkipReader and modify AbstractDataReader
Jan 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ private static Object parseValue(TSDataType dataType, String value) throws Query
}

@Override
public void insertBatch(BatchInsertPlan batchInsertPlan, List<Integer> indexes) throws QueryProcessException {
public void insertBatch(BatchInsertPlan batchInsertPlan, List<Integer> indexes)
throws QueryProcessException {
try {
write(batchInsertPlan, indexes);
long recordSizeInByte = MemUtils.getRecordSize(batchInsertPlan);
Expand Down Expand Up @@ -207,7 +208,7 @@ public ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType da
chunkCopy.setTimeOffset(undeletedTime);
sorter = chunkCopy;
}
return new ReadOnlyMemChunk(dataType, sorter, props);
return new ReadOnlyMemChunk(measurement, dataType, sorter, props);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,22 @@
import java.util.Map;
import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
import org.apache.iotdb.db.engine.memtable.TimeValuePairSorter;
import org.apache.iotdb.db.query.reader.MemChunkLoader;
import org.apache.iotdb.db.utils.MathUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsDouble;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsFloat;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsDouble;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsFloat;

//TODO: merge ReadOnlyMemChunk and WritableMemChunk and IWritableMemChunk
public class ReadOnlyMemChunk implements TimeValuePairSorter {

private boolean initialized;

private String measurementUid;
private TSDataType dataType;
private TimeValuePairSorter memSeries;
private List<TimeValuePair> sortedTimeValuePairList;
Expand All @@ -47,7 +50,9 @@ public class ReadOnlyMemChunk implements TimeValuePairSorter {
/**
* init by TSDataType and TimeValuePairSorter.
*/
public ReadOnlyMemChunk(TSDataType dataType, TimeValuePairSorter memSeries, Map<String, String> props) {
public ReadOnlyMemChunk(String measurementUid, TSDataType dataType, TimeValuePairSorter memSeries,
Map<String, String> props) {
this.measurementUid = measurementUid;
this.dataType = dataType;
this.memSeries = memSeries;
this.initialized = false;
Expand Down Expand Up @@ -107,4 +112,40 @@ public boolean isEmpty() {
checkInitialized();
return sortedTimeValuePairList.isEmpty();
}

public ChunkMetaData getChunkMetaData() {
Statistics statsByType = Statistics.getStatsByType(dataType);
ChunkMetaData metaData = new ChunkMetaData(measurementUid, dataType, 0, statsByType);
if (!isEmpty()) {
List<TimeValuePair> sortedTimeValuePairList = getSortedTimeValuePairList();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could TimeValuePairSorter be replaced by TVList?

for (TimeValuePair timeValuePair : sortedTimeValuePairList) {
switch (dataType) {
case BOOLEAN:
statsByType.update(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
break;
case TEXT:
statsByType.update(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
break;
case FLOAT:
statsByType.update(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat());
break;
case INT32:
statsByType.update(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt());
break;
case INT64:
statsByType.update(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong());
break;
case DOUBLE:
statsByType.update(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble());
break;
default:
throw new RuntimeException("Unsupported data types");
}
}
}
statsByType.setEmpty(isEmpty());
metaData.setChunkLoader(new MemChunkLoader(this));
metaData.setVersion(Long.MAX_VALUE);
return metaData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,20 @@
/**
* For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
* TsFileProcessor in the working status. <br/>
*
* <p>
* There are two situations to set the working TsFileProcessor to closing status:<br/>
*
* <p>
* (1) when inserting data into the TsFileProcessor, and the TsFileProcessor shouldFlush() (or
* shouldClose())<br/>
*
* <p>
* (2) someone calls waitForAllCurrentTsFileProcessorsClosed(). (up to now, only flush command from
* cli will call this method)<br/>
*
* <p>
* UnSequence data has the similar process as above.
*
* <p>
* When a sequence TsFileProcessor is submitted to be flushed, the updateLatestFlushTimeCallback()
* method will be called as a callback.<br/>
*
* <p>
* When a TsFileProcessor is closed, the closeUnsealedTsFileProcessor() method will be called as a
* callback.
*/
Expand Down Expand Up @@ -192,7 +192,8 @@ public class StorageGroupProcessor {
// including the files generated by merge
private Set<Long> allDirectFileVersions = new HashSet<>();

public StorageGroupProcessor(String systemInfoDir, String storageGroupName, TsFileFlushPolicy fileFlushPolicy)
public StorageGroupProcessor(String systemInfoDir, String storageGroupName,
TsFileFlushPolicy fileFlushPolicy)
throws StorageGroupProcessorException {
this.storageGroupName = storageGroupName;
this.fileFlushPolicy = fileFlushPolicy;
Expand Down Expand Up @@ -572,7 +573,7 @@ public void moveOneWorkProcessorToClosingList(boolean sequence) {
workSequenceTsFileProcessor.asyncClose();
workSequenceTsFileProcessor = null;
logger.info("close a sequence tsfile processor {}", storageGroupName);
} else if (workUnSequenceTsFileProcessor != null){
} else if (workUnSequenceTsFileProcessor != null) {
closingUnSequenceTsFileProcessor.add(workUnSequenceTsFileProcessor);
workUnSequenceTsFileProcessor.asyncClose();
workUnSequenceTsFileProcessor = null;
Expand Down Expand Up @@ -842,11 +843,13 @@ private List<TsFileResource> getFileReSourceListForQuery(List<TsFileResource> ts
tsfileResourcesForQuery.add(tsFileResource);
} else {
// left: in-memory data, right: meta of disk data
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = tsFileResource.getUnsealedFileProcessor()
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = tsFileResource
.getUnsealedFileProcessor()
.query(deviceId, measurementId, dataType, mSchema.getProps(), context);

tsfileResourcesForQuery.add(new TsFileResource(tsFileResource.getFile(),
tsFileResource.getStartTimeMap(), tsFileResource.getEndTimeMap(), pair.left, pair.right));
tsFileResource.getStartTimeMap(), tsFileResource.getEndTimeMap(), pair.left,
pair.right));
}
} finally {
closeQueryLock.readLock().unlock();
Expand Down Expand Up @@ -874,9 +877,9 @@ private boolean testResourceDevice(TsFileResource tsFileResource, String deviceI
* Delete data whose timestamp <= 'timestamp' and belongs to the timeseries
* deviceId.measurementId.
*
* @param deviceId the deviceId of the timeseries to be deleted.
* @param deviceId the deviceId of the timeseries to be deleted.
* @param measurementId the measurementId of the timeseries to be deleted.
* @param timestamp the delete range is (0, timestamp].
* @param timestamp the delete range is (0, timestamp].
*/
public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
// TODO: how to avoid partial deletion?
Expand Down Expand Up @@ -943,7 +946,7 @@ private void deleteDataInFiles(List<TsFileResource> tsFileResourceList, Deletion

// write deletion into modification file
tsFileResource.getModFile().write(deletion);

tsFileResource.getModFile().close();
// delete data in memory of unsealed file
if (!tsFileResource.isClosed()) {
TsFileProcessor tsfileProcessor = tsFileResource.getUnsealedFileProcessor();
Expand Down Expand Up @@ -1136,7 +1139,8 @@ private void updateMergeModification(TsFileResource seqFile) {
try {
seqFile.getModFile().close();
} catch (IOException e) {
logger.error("Cannot close the ModificationFile {}", seqFile.getModFile().getFilePath(), e);
logger
.error("Cannot close the ModificationFile {}", seqFile.getModFile().getFilePath(), e);
}
}
} catch (IOException e) {
Expand Down Expand Up @@ -1190,14 +1194,14 @@ protected void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource
}

/**
* Load a new tsfile to storage group processor. The mechanism of the sync module will make sure that
* there has no file which is overlapping with the new file.
*
* Load a new tsfile to storage group processor. The mechanism of the sync module will make sure
* that there has no file which is overlapping with the new file.
* <p>
* Firstly, determine the loading type of the file, whether it needs to be loaded in sequence list
* or unsequence list.
*
* <p>
* Secondly, execute the loading process by the type.
*
* <p>
* Finally, update the latestTimeForEachDevice and latestFlushedTimeForEachDevice.
*
* @param newTsFileResource tsfile resource
Expand Down Expand Up @@ -1226,12 +1230,12 @@ public void loadNewTsFileForSync(TsFileResource newTsFileResource)

/**
* Load a new tsfile to storage group processor. Tne file may have overlap with other files.
*
* <p>
* Firstly, determine the loading type of the file, whether it needs to be loaded in sequence list
* or unsequence list.
*
* <p>
* Secondly, execute the loading process by the type.
*
* <p>
* Finally, update the latestTimeForEachDevice and latestFlushedTimeForEachDevice.
*
* @param newTsFileResource tsfile resource
Expand Down Expand Up @@ -1323,21 +1327,21 @@ public void loadNewTsFile(TsFileResource newTsFileResource)
/**
* Get an appropriate filename to ensure the order between files. The tsfile is named after
* ({systemTime}-{versionNum}-{mergeNum}.tsfile).
*
* <p>
* The sorting rules for tsfile names @see {@link this#compareFileName}, we can restore the list
* based on the file name and ensure the correctness of the order, so there are three cases.
*
* <p>
* 1. The tsfile is to be inserted in the first place of the list. If the timestamp in the file
* name is less than the timestamp in the file name of the first tsfile in the list, then the
* file name is legal and the file name is returned directly. Otherwise, its timestamp can be set
* to half of the timestamp value in the file name of the first tsfile in the list , and the
* version number is the version number in the file name of the first tsfile in the list.
*
* <p>
* 2. The tsfile is to be inserted in the last place of the list. If the timestamp in the file
* name is lager than the timestamp in the file name of the last tsfile in the list, then the
* file name is legal and the file name is returned directly. Otherwise, the file name is
* generated by the system according to the naming rules and returned.
*
* <p>
* 3. This file is inserted between two files. If the timestamp in the name of the file satisfies
* the timestamp between the timestamps in the name of the two files, then it is a legal name and
* returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
Expand Down Expand Up @@ -1424,9 +1428,9 @@ private void updateLatestTimeMap(TsFileResource newTsFileResource) {
/**
* Execute the loading process by the type.
*
* @param type load type
* @param type load type
* @param tsFileResource tsfile resource to be loaded
* @param index the index in sequenceFileList/unSequenceFileList
* @param index the index in sequenceFileList/unSequenceFileList
* @UsedBy sync module, load external tsfile module.
*/
private void loadTsFileByType(LoadTsFileType type, File syncedTsFile,
Expand Down Expand Up @@ -1492,9 +1496,9 @@ private void loadTsFileByType(LoadTsFileType type, File syncedTsFile,

/**
* Delete tsfile if it exists.
*
* <p>
* Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList.
*
* <p>
* Secondly, delete the tsfile and .resource file.
*
* @param tsfieToBeDeleted tsfile to be deleted
Expand Down Expand Up @@ -1546,9 +1550,9 @@ public boolean deleteTsfile(File tsfieToBeDeleted) {

/**
* Move tsfile to the target directory if it exists.
*
* <p>
* Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList.
*
* <p>
* Secondly, move the tsfile and .resource file to the target directory.
*
* @param fileToBeMoved tsfile to be moved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,14 @@ public class TsFileProcessor {
logger.info("create a new tsfile processor {}", tsfile.getAbsolutePath());

// a file generated by flush has only one historical version, which is itself
this.tsFileResource.setHistoricalVersions(Collections.singleton(versionController.currVersion()));
this.tsFileResource
.setHistoricalVersions(Collections.singleton(versionController.currVersion()));
}

public TsFileProcessor(String storageGroupName, TsFileResource tsFileResource, Schema schema,
VersionController versionController, CloseTsFileCallBack closeUnsealedTsFileProcessor,
Supplier updateLatestFlushTimeCallback, boolean sequence, RestorableTsFileIOWriter writer) {
this.storageGroupName =storageGroupName;
this.storageGroupName = storageGroupName;
this.tsFileResource = tsFileResource;
this.schema = schema;
this.versionController = versionController;
Expand Down Expand Up @@ -200,7 +201,7 @@ public boolean insertBatch(BatchInsertPlan batchInsertPlan, List<Integer> indexe
getLogNode().write(batchInsertPlan);
} catch (IOException e) {
logger.error("write WAL failed", e);
for (int index: indexes) {
for (int index : indexes) {
results[index] = TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode();
}
return false;
Expand All @@ -221,7 +222,7 @@ public boolean insertBatch(BatchInsertPlan batchInsertPlan, List<Integer> indexe
/**
* Delete data which belongs to the timeseries `deviceId.measurementId` and the timestamp of which
* <= 'timestamp' in the deletion. <br/>
*
* <p>
* Delete data in both working MemTable and flushing MemTables.
*/
public void deleteDataInMemory(Deletion deletion) {
Expand Down Expand Up @@ -256,12 +257,12 @@ boolean shouldFlush() {
* However, considering that the number of timeseries between storage groups may vary greatly,
* it's inappropriate to judge whether to flush the memtable according to the average memtable
* size. We need to adjust it according to the number of timeseries in a specific storage group.
*
*/
private long getMemtableSizeThresholdBasedOnSeriesNum() {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
long memTableSize = (long) (config.getMemtableSizeThreshold() * config.getMaxMemtableNumber()
/ IoTDBConstant.MEMTABLE_NUM_IN_EACH_STORAGE_GROUP * ActiveTimeSeriesCounter.getInstance().getActiveRatio(storageGroupName));
/ IoTDBConstant.MEMTABLE_NUM_IN_EACH_STORAGE_GROUP * ActiveTimeSeriesCounter.getInstance()
.getActiveRatio(storageGroupName));
return Math.max(memTableSize, config.getMemtableSizeThreshold());
}

Expand Down Expand Up @@ -577,9 +578,9 @@ public String getStorageGroupName() {
* memtables and then compact them into one TimeValuePairSorter). Then get the related
* ChunkMetadata of data on disk.
*
* @param deviceId device id
* @param deviceId device id
* @param measurementId sensor id
* @param dataType data type
* @param dataType data type
* @return left: the chunk data in memory; right: the chunkMetadatas of data on disk
*/
public Pair<ReadOnlyMemChunk, List<ChunkMetaData>> query(String deviceId,
Expand All @@ -606,8 +607,8 @@ public Pair<ReadOnlyMemChunk, List<ChunkMetaData>> query(String deviceId,
}
// memSeriesLazyMerger has handled the props,
// so we do not need to handle it again in the following readOnlyMemChunk
ReadOnlyMemChunk timeValuePairSorter = new ReadOnlyMemChunk(dataType, memSeriesLazyMerger,
Collections.emptyMap());
ReadOnlyMemChunk timeValuePairSorter = new ReadOnlyMemChunk(measurementId, dataType,
memSeriesLazyMerger, Collections.emptyMap());

ModificationFile modificationFile = tsFileResource.getModFile();
List<Modification> modifications = context.getPathModifications(modificationFile,
Expand Down
Loading