Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1335,6 +1335,8 @@ protected Pair<IMetadataIndexEntry, Long> getMetadataAndEndOffsetOfDeviceNode(
} else {
return metadataIndex.getChildIndexEntry(deviceID, exactSearch);
}
} catch (StopReadTsFileByInterruptException e) {
throw e;
} catch (Exception e) {
logger.error("Something error happened while deserializing MetadataIndex of file {}", file);
throw e;
Expand Down Expand Up @@ -2177,15 +2179,26 @@ public List<AlignedChunkMetadata> getAlignedChunkMetadata(IDeviceID device) thro
}
ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
MetadataIndexNode metadataIndexNode;
TimeseriesMetadata firstTimeseriesMetadata;
try {
// next layer MeasurementNode of the specific DeviceNode
metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer, false);
} catch (Exception e) {
logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
throw e;
}
firstTimeseriesMetadata = tryToGetFirstTimeseriesMetadata(metadataIndexNode);
return getAlignedChunkMetadataByMetadataIndexNode(device, metadataIndexNode);
}

/**
* Get AlignedChunkMetadata of sensors under one device. Notice: if all the value chunks is empty
* chunk, then return empty list.
*
* @param device device name
* @param metadataIndexNode the first measurement metadata index node of the device
*/
public List<AlignedChunkMetadata> getAlignedChunkMetadataByMetadataIndexNode(
IDeviceID device, MetadataIndexNode metadataIndexNode) throws IOException {
TimeseriesMetadata firstTimeseriesMetadata = tryToGetFirstTimeseriesMetadata(metadataIndexNode);
if (firstTimeseriesMetadata == null) {
throw new IOException("Timeseries of device {" + device + "} are not aligned");
}
Expand All @@ -2199,7 +2212,7 @@ public List<AlignedChunkMetadata> getAlignedChunkMetadata(IDeviceID device) thro
if (i != metadataIndexEntryList.size() - 1) {
endOffset = metadataIndexEntryList.get(i + 1).getOffset();
}
buffer = readData(metadataIndexEntry.getOffset(), endOffset);
ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset);
if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
while (buffer.hasRemaining()) {
Expand Down Expand Up @@ -2419,9 +2432,22 @@ public LinkedHashMap<String, List<ChunkMetadata>> next() {
};
}

Queue<Pair<Long, Long>> queue = new LinkedList<>();
ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
collectEachLeafMeasurementNodeOffsetRange(buffer, queue);
MetadataIndexNode firstMeasurementNode = MetadataIndexNode.deserializeFrom(buffer, false);
return getMeasurementChunkMetadataListMapIterator(firstMeasurementNode);
}

/**
* @return An iterator of linked hashmaps ( measurement -> chunk metadata list ). When traversing
* the linked hashmap, you will get chunk metadata lists according to the lexicographic order
* of the measurements. The first measurement of the linked hashmap of each iteration is
* always larger than the last measurement of the linked hashmap of the previous iteration in
* lexicographic order.
*/
public Iterator<Map<String, List<ChunkMetadata>>> getMeasurementChunkMetadataListMapIterator(
MetadataIndexNode firstMeasurementMetadataIndexNodeOfDevice) throws IOException {
Queue<Pair<Long, Long>> queue = new LinkedList<>();
collectEachLeafMeasurementNodeOffsetRange(firstMeasurementMetadataIndexNodeOfDevice, queue);

return new Iterator<Map<String, List<ChunkMetadata>>>() {

Expand Down Expand Up @@ -2462,9 +2488,8 @@ public LinkedHashMap<String, List<ChunkMetadata>> next() {
}

private void collectEachLeafMeasurementNodeOffsetRange(
ByteBuffer buffer, Queue<Pair<Long, Long>> queue) throws IOException {
MetadataIndexNode metadataIndexNode, Queue<Pair<Long, Long>> queue) throws IOException {
try {
final MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer, false);
final MetadataIndexNodeType metadataIndexNodeType = metadataIndexNode.getNodeType();
final int metadataIndexListSize = metadataIndexNode.getChildren().size();
for (int i = 0; i < metadataIndexListSize; ++i) {
Expand All @@ -2477,7 +2502,8 @@ private void collectEachLeafMeasurementNodeOffsetRange(
queue.add(new Pair<>(startOffset, endOffset));
continue;
}
collectEachLeafMeasurementNodeOffsetRange(readData(startOffset, endOffset), queue);
collectEachLeafMeasurementNodeOffsetRange(
MetadataIndexNode.deserializeFrom(readData(startOffset, endOffset), false), queue);
}
} catch (StopReadTsFileByInterruptException e) {
throw e;
Expand Down