Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public void setQueryStatistics(QueryStatistics queryStatistics) {
this.queryStatistics = queryStatistics;
}

public void addTVListToSet(Map<TVList, Integer> tvListMap) {
tvListSet.addAll(tvListMap.keySet());
public void addTVListToSet(Set<TVList> set) {
tvListSet.addAll(set);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,46 @@
}

if (!isWorkMemTable) {
/*
* 1. Q1 queries this TVList while it is still in the working memtable and records a smaller
* visible row count.
* 2. Later writes append out-of-order rows to the same TVList, then FLUSH moves the
* memtable to the flushing list.
* 3. Q2 queries the flushing memtable. If Q2 directly reuses the original mutable TVList,
* Q2's query-side sort may reorder the indices in place.
* 4. Q1 continues to read with its old row count and the reordered indices. The converted
* value index can exceed Q1's bitmap range and cause out-of-bound access.
*
* Therefore, this flushing branch can reuse the original list only when it is already
* sorted or no active query is using it. Otherwise, Q2 should read from
* workingListForFlush.
*/
boolean canUseListDirectly = list.isSorted() || list.getQueryContextSet().isEmpty();
LOGGER.debug(
"Flushing MemTable - add current query context to mutable TVList's query list");
list.getQueryContextSet().add(context);
tvListQueryMap.put(list, list.rowCount());
if (canUseListDirectly) {
list.getQueryContextSet().add(context);
tvListQueryMap.put(list, list.rowCount());
} else {
TVList workingListForFlushSort = memChunk.initWorkingListForFlushIfNecessary(list, true);
/*
* The query will read from workingListForFlushSort, but cloneForFlushSort() only clones
* times and indices. The value arrays and bitmaps are still shared with the original
* list.
*
* Therefore, this query must also hold the original list until it finishes. Adding
* context to list.getQueryContextSet() lets flush/query cleanup see that the original
* list is still in use. Adding list to context.tvListSet makes
* releaseTVListOwnedByQuery() remove this context from the original list later.
*
* Do not put the original list into tvListQueryMap here. The actual read path must use
* workingListForFlushSort to avoid sorting the original list in place.
*/
list.getQueryContextSet().add(context);
context.addTVListToSet(Collections.singleton(list));
workingListForFlushSort.getQueryContextSet().add(context);
tvListQueryMap.put(workingListForFlushSort, workingListForFlushSort.rowCount());
}
} else {
if (list.isSorted() || list.getQueryContextSet().isEmpty()) {
LOGGER.debug(
Expand Down Expand Up @@ -233,7 +269,7 @@
* have chunkMetadata, but query will use these, so we need to generate it for them.
*/
@Override
public AlignedTimeSeriesMetadata generateTimeSeriesMetadata(

Check warning on line 272 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 91 to 64, Complexity from 20 to 14, Nesting Level from 5 to 2, Number of Variables from 21 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ46wrKW1NKbCjkFQneK&open=AZ46wrKW1NKbCjkFQneK&pullRequest=17709
List<ReadOnlyMemChunk> readOnlyMemChunk,
List<IChunkMetadata> chunkMetadataList,
Filter globalTimeFilter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ public void run() {
times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
}
writableMemChunk.encode(ioTaskQueue, encodeInfo, times);
writableMemChunk.releaseTemporaryTvListForFlush();
long subTaskTime = System.currentTimeMillis() - starTime;
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime);
memSerializeTime += subTaskTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@
/*
* Concurrency background:
*
* A query may start earlier and record the current row count (rows) of the TVList as its visible range.
* After that, new unseq writes may arrive and immediately trigger a flush, which will sort the TVList.
* A query may start earlier and record the current row count (rows) of the TVList as its
* visible range. After that, new unseq writes may arrive and immediately trigger a flush, which
* will sort the TVList.
*
* During sorting, the underlying indices array of the TVList may be reordered.
* If the query continues to use the previously recorded rows as its upper bound,
Expand All @@ -219,6 +220,9 @@
* To avoid this issue, when there are active queries on the working TVList, we must
* clone the times and indices before sorting, so that the flush sort does not mutate
* the data structures that concurrent queries rely on.
*
* Flushing-memtable queries may also reuse workingListForFlush instead of the original working
* TVList for the same reason.
*/
boolean needCloneTimesAndIndicesInWorkingTVList;
workingList.lockQueryList();
Expand All @@ -228,7 +232,7 @@
workingList.unlockQueryList();
}
workingListForFlush =
needCloneTimesAndIndicesInWorkingTVList ? workingList.cloneForFlushSort() : workingList;
initWorkingListForFlushIfNecessary(workingList, needCloneTimesAndIndicesInWorkingTVList);
workingListForFlush.sort();
}

Expand Down Expand Up @@ -267,4 +271,14 @@

@Override
public abstract int serializedSize();

@Override
public synchronized TVList initWorkingListForFlushIfNecessary(
TVList workingList, boolean needCloneTimesAndIndicesInWorkingTVList) {

Check warning on line 277 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'needCloneTimesAndIndicesInWorkingTVList' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ46wrJV1NKbCjkFQneI&open=AZ46wrJV1NKbCjkFQneI&pullRequest=17709
if (workingListForFlush == null) {
workingListForFlush =
needCloneTimesAndIndicesInWorkingTVList ? workingList.cloneForFlushSort() : workingList;
}
return workingListForFlush;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
this.valueStatisticsList = new ArrayList<>();
this.alignedTvListQueryMap = alignedTvListQueryMap;
this.columnIndexList = columnIndexList;
this.context.addTVListToSet(alignedTvListQueryMap);
this.context.addTVListToSet(alignedTvListQueryMap.keySet());
}

@Override
Expand Down Expand Up @@ -142,7 +142,7 @@
}

@Override
public void initChunkMetaFromTvLists(Filter globalTimeFilter) {

Check warning on line 145 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 126 to 64, Complexity from 31 to 14, Nesting Level from 5 to 2, Number of Variables from 22 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ49_PyTntx3TBG5wwOQ&open=AZ49_PyTntx3TBG5wwOQ&pullRequest=17709
// init chunk meta
Statistics<? extends Serializable> chunkTimeStatistics =
Statistics.getStatsByType(TSDataType.VECTOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,7 @@
TVList getWorkingTVList();

void setWorkingTVList(TVList list);

TVList initWorkingListForFlushIfNecessary(
TVList workingList, boolean needCloneTimesAndIndicesInWorkingTVList);

Check warning on line 131 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'needCloneTimesAndIndicesInWorkingTVList' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ46wrJq1NKbCjkFQneJ&open=AZ46wrJq1NKbCjkFQneJ&pullRequest=17709
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
this.deletionList = deletionList;
this.tvListQueryMap = tvListQueryMap;
this.pageStatisticsList = new ArrayList<>();
this.context.addTVListToSet(tvListQueryMap);
this.context.addTVListToSet(tvListQueryMap.keySet());
}

public void sortTvLists() {
Expand All @@ -154,7 +154,7 @@
}
}

public void initChunkMetaFromTvLists(Filter globalTimeFilter) {

Check warning on line 157 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 67 to 64, Complexity from 19 to 14, Nesting Level from 4 to 2, Number of Variables from 17 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ49_Pxdntx3TBG5wwOP&open=AZ49_Pxdntx3TBG5wwOP&pullRequest=17709
// create chunk statistics
Statistics<? extends Serializable> chunkStatistics = Statistics.getStatsByType(dataType);
timeValuePairIterator = createMemPointIterator(Ordering.ASC, globalTimeFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
Expand Down Expand Up @@ -133,13 +133,13 @@ public void testTVListOwnerTransfer() throws InterruptedException {
FragmentInstanceExecution execution1 =
createFragmentInstanceExecution(1, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext1 = execution1.getFragmentInstanceContext();
fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0));
fragmentInstanceContext1.addTVListToSet(ImmutableSet.of(tvList));
tvList.getQueryContextSet().add(fragmentInstanceContext1);

FragmentInstanceExecution execution2 =
createFragmentInstanceExecution(2, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext2 = execution2.getFragmentInstanceContext();
fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0));
fragmentInstanceContext2.addTVListToSet(ImmutableSet.of(tvList));
tvList.getQueryContextSet().add(fragmentInstanceContext2);

// mock flush's behavior
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,63 @@
}
}

@Test
public void testFlushingQueryDoesNotSortWorkingTVListUsedByPreviousQuery()

Check warning on line 226 in iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'testFlushingQueryDoesNotSortWorkingTVListUsedByPreviousQuery' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ46wrIT1NKbCjkFQneH&open=AZ46wrIT1NKbCjkFQneH&pullRequest=17709
throws QueryProcessException, IOException, IllegalPathException {

PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0");
List<IMeasurementSchema> measurementSchemas =
Arrays.asList(
new MeasurementSchema("s1", TSDataType.INT32),
new MeasurementSchema("s2", TSDataType.INT32),
new MeasurementSchema("s3", TSDataType.INT32));
PlainDeviceID deviceID = new PlainDeviceID("root.test.d1");
for (int i = 1000; i < 2000; i++) {
memTable.writeAlignedRow(deviceID, measurementSchemas, i, new Object[] {i, i, i});
}

ResourceByPathUtils resourcesByPathUtils =
ResourceByPathUtils.getResourceInstance(
new AlignedPath("root.test.d1", Arrays.asList("s1", "s2", "s3"), measurementSchemas));
AlignedReadOnlyMemChunk firstQueryMemChunk =
(AlignedReadOnlyMemChunk)
resourcesByPathUtils.getReadOnlyMemChunkFromMemTable(
new QueryContext(1), memTable, null, Long.MAX_VALUE, null);
TVList originalWorkingList = memTable.getWritableMemChunk(deviceID, "").getWorkingTVList();
Assert.assertSame(
originalWorkingList,
firstQueryMemChunk.getAligendTvListQueryMap().keySet().iterator().next());

for (int i = 1; i <= 50; i++) {
memTable.writeAlignedRow(deviceID, measurementSchemas, i, new Object[] {i, i, i});
}
MeasurementPath path = new MeasurementPath("root.test.d1.s1", TSDataType.INT32);
memTable.delete(path, path.getDevicePath(), 1, 10);
path = new MeasurementPath("root.test.d1.s2", TSDataType.INT32);
memTable.delete(path, path.getDevicePath(), 1, 10);
path = new MeasurementPath("root.test.d1.s3", TSDataType.INT32);
memTable.delete(path, path.getDevicePath(), 1, 10);
Assert.assertFalse(originalWorkingList.isSorted());

AlignedReadOnlyMemChunk flushingQueryMemChunk =
(AlignedReadOnlyMemChunk)
resourcesByPathUtils.getReadOnlyMemChunkFromMemTable(
new QueryContext(2), memTable, new ArrayList<>(), Long.MAX_VALUE, null);
TVList flushingQueryList =
flushingQueryMemChunk.getAligendTvListQueryMap().keySet().iterator().next();
Assert.assertNotSame(originalWorkingList, flushingQueryList);

flushingQueryMemChunk.sortTvLists();
Assert.assertFalse(originalWorkingList.isSorted());

firstQueryMemChunk.sortTvLists();
MemPointIterator memPointIterator =
firstQueryMemChunk.createMemPointIterator(Ordering.ASC, null);
while (memPointIterator.hasNextBatch()) {
memPointIterator.nextBatch();
}
}

@Test
public void memSeriesToStringTest() throws IOException {
TSDataType dataType = TSDataType.INT32;
Expand Down Expand Up @@ -743,7 +800,7 @@
list.getQueryContextSet().add(queryContext);
Map<TVList, Integer> tvlistMap = new HashMap<>();
tvlistMap.put(list, 100);
queryContext.addTVListToSet(tvlistMap);
queryContext.addTVListToSet(tvlistMap.keySet());

// fragment instance execution
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
Expand Down
Loading