Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,15 @@ public long getMeasurementSize(IDeviceID deviceId, String measurement) {
return memChunkGroup.getMeasurementSize(measurement);
}

@Override
public IWritableMemChunk getWritableMemChunk(IDeviceID deviceId, String measurement) {
IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
if (null == memChunkGroup) {
return null;
}
return memChunkGroup.getWritableMemChunk(measurement);
}

@Override
public int getSeriesNumber() {
return seriesNumber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ public long getMeasurementSize(String measurement) {
return memChunk.rowCount();
}

@Override
public IWritableMemChunk getWritableMemChunk(String measurement) {
return memChunk;
}

@Override
public long getMaxTime() {
return memChunk.getMaxTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ void queryForDeviceRegionScan(
/** only used when mem control enabled */
long getMeasurementSize(IDeviceID deviceId, String measurement);

IWritableMemChunk getWritableMemChunk(IDeviceID deviceId, String measurement);

/** only used when mem control enabled */
void addTextDataSize(long textDataIncrement);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,7 @@ void writeTablet(

long getMeasurementSize(String measurement);

IWritableMemChunk getWritableMemChunk(String measurement);

long getMaxTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -659,10 +659,11 @@ private long[] checkMemCostAndAddToTspInfoForRow(
} else {
// here currentChunkPointNum >= 1
long currentChunkPointNum = workMemTable.getMeasurementSize(deviceId, measurements[i]);
memTableIncrement +=
(currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0
? TVList.tvListArrayMemCost(dataTypes[i])
: 0;
IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, measurements[i]);
if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
memTableIncrement +=
memChunk != null ? memChunk.getWorkingTVList().tvListArrayMemCost() : 0;
}
}
// TEXT data mem size
if (dataTypes[i].isBinary() && values[i] != null) {
Expand Down Expand Up @@ -704,14 +705,17 @@ private long[] checkMemCostAndAddToTspInfoForRows(List<InsertRowNode> insertRowN
} else {
// here currentChunkPointNum >= 1
long currentChunkPointNum = workMemTable.getMeasurementSize(deviceId, measurements[i]);
IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, measurements[i]);
int addingPointNum =
increasingMemTableInfo
.computeIfAbsent(deviceId, k -> new HashMap<>())
.computeIfAbsent(measurements[i], k -> 0);
memTableIncrement +=
((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE) == 0
? TVList.tvListArrayMemCost(dataTypes[i])
: 0;
if ((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE == 0) {
memTableIncrement +=
memChunk != null
? memChunk.getWorkingTVList().tvListArrayMemCost()
: TVList.tvListArrayMemCost(dataTypes[i]);
}
increasingMemTableInfo.get(deviceId).computeIfPresent(measurements[i], (k, v) -> v + 1);
}
// TEXT data mem size
Expand Down Expand Up @@ -773,7 +777,7 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRow(
if ((alignedMemChunk.alignedListSize() % PrimitiveArrayManager.ARRAY_SIZE) == 0) {
dataTypesInTVList.addAll(
((AlignedTVList) alignedMemChunk.getWorkingTVList()).getTsDataTypes());
memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
memTableIncrement += alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost();
}
}

Expand Down Expand Up @@ -867,7 +871,11 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRows(List<InsertRowNode> ins
((AlignedTVList) alignedMemChunk.getWorkingTVList()).getTsDataTypes());
}
dataTypesInTVList.addAll(addingPointNumInfo.left.values());
memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
memTableIncrement +=
alignedMemChunk != null
? alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost()
: AlignedTVList.alignedTvListArrayMemCost(
dataTypesInTVList.toArray(new TSDataType[0]), null);
}
addingPointNumInfo.setRight(addingPointNum + 1);
}
Expand Down Expand Up @@ -969,16 +977,23 @@ private void updateMemCost(
* TVList.tvListArrayMemCost(dataType);
} else {
long currentChunkPointNum = workMemTable.getMeasurementSize(deviceId, measurement);
IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, measurement);
if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
memIncrements[0] +=
((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
* TVList.tvListArrayMemCost(dataType);
* (memChunk != null
? memChunk.getWorkingTVList().tvListArrayMemCost()
: TVList.tvListArrayMemCost(dataType));
} else {
long acquireArray =
(end - start - 1 + (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE))
/ PrimitiveArrayManager.ARRAY_SIZE;
if (acquireArray != 0) {
memIncrements[0] += acquireArray * TVList.tvListArrayMemCost(dataType);
memIncrements[0] +=
acquireArray
* (memChunk != null
? memChunk.getWorkingTVList().tvListArrayMemCost()
: TVList.tvListArrayMemCost(dataType));
}
}
}
Expand Down Expand Up @@ -1078,7 +1093,7 @@ private void updateAlignedMemCost(
dataTypesInTVList.addAll(
((AlignedTVList) alignedMemChunk.getWorkingTVList()).getTsDataTypes());
memIncrements[0] +=
acquireArray * AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
acquireArray * alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ public long getMeasurementSize(String measurement) {
return memChunkMap.get(measurement).rowCount();
}

@Override
public IWritableMemChunk getWritableMemChunk(String measurement) {
if (!memChunkMap.containsKey(measurement)) {
return null;
}
return memChunkMap.get(measurement);
}

@Override
public long getMaxTime() {
long maxTime = Long.MIN_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ public synchronized void putAlignedValue(long timestamp, Object[] value) {
break;
}
}
indices.get(arrayIndex)[elementIndex] = rowCount;
if (indices != null) {
indices.get(arrayIndex)[elementIndex] = rowCount;
}
rowCount++;
if (sorted) {
if (rowCount > 1 && timestamp < getTime(rowCount - 2)) {
Expand Down Expand Up @@ -261,9 +263,8 @@ private Object getAlignedValueForQuery(
if (index >= rowCount) {
throw new ArrayIndexOutOfBoundsException(index);
}
int arrayIndex = index / ARRAY_SIZE;
int elementIndex = index % ARRAY_SIZE;
int valueIndex = indices.get(arrayIndex)[elementIndex];
int valueIndex =
(indices != null) ? indices.get(index / ARRAY_SIZE)[index % ARRAY_SIZE] : index;
return getAlignedValueByValueIndex(valueIndex, null, floatPrecision, encodingList);
}

Expand Down Expand Up @@ -692,7 +693,9 @@ protected void clearBitMap() {

@Override
protected void expandValues() {
indices.add((int[]) getPrimitiveArraysByType(TSDataType.INT32));
if (indices != null) {
indices.add((int[]) getPrimitiveArraysByType(TSDataType.INT32));
}
for (int i = 0; i < dataTypes.size(); i++) {
values.get(i).add(getPrimitiveArraysByType(dataTypes.get(i)));
if (bitMaps != null && bitMaps.get(i) != null) {
Expand Down Expand Up @@ -750,14 +753,6 @@ protected TimeValuePair getTimeValuePair(
time, (TsPrimitiveType) getAlignedValueForQuery(index, floatPrecision, encodingList));
}

@Override
protected void releaseLastValueArray() {
PrimitiveArrayManager.release(indices.remove(indices.size() - 1));
for (List<Object> valueList : values) {
PrimitiveArrayManager.release(valueList.remove(valueList.size() - 1));
}
}

@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
@Override
public synchronized void putAlignedValues(
Expand All @@ -777,7 +772,9 @@ public synchronized void putAlignedValues(
System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, inputRemaining);
arrayCopy(value, idx, arrayIdx, elementIdx, inputRemaining);
for (int i = 0; i < inputRemaining; i++) {
indices.get(arrayIdx)[elementIdx + i] = rowCount;
if (indices != null) {
indices.get(arrayIdx)[elementIdx + i] = rowCount;
}
for (int j = 0; j < values.size(); j++) {
if (value[j] == null
|| bitMaps != null && bitMaps[j] != null && bitMaps[j].isMarked(idx + i)
Expand All @@ -796,7 +793,9 @@ public synchronized void putAlignedValues(
System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, internalRemaining);
arrayCopy(value, idx, arrayIdx, elementIdx, internalRemaining);
for (int i = 0; i < internalRemaining; i++) {
indices.get(arrayIdx)[elementIdx + i] = rowCount;
if (indices != null) {
indices.get(arrayIdx)[elementIdx + i] = rowCount;
}
for (int j = 0; j < values.size(); j++) {
if (value[j] == null
|| bitMaps != null && bitMaps[j] != null && bitMaps[j].isMarked(idx + i)
Expand Down Expand Up @@ -895,7 +894,7 @@ public TSDataType getDataType() {

@Override
public long calculateRamSize() {
return timestamps.size() * alignedTvListArrayMemCost(dataTypes);
return timestamps.size() * alignedTvListArrayMemCost();
}

/**
Expand Down Expand Up @@ -925,8 +924,6 @@ public static long alignedTvListArrayMemCost(
}
// time array mem size
size += PrimitiveArrayManager.ARRAY_SIZE * 8L;
// index array mem size
size += PrimitiveArrayManager.ARRAY_SIZE * 4L;
// array headers mem size
size += (long) NUM_BYTES_ARRAY_HEADER * (2 + measurementColumnNum);
// Object references size in ArrayList
Expand All @@ -937,13 +934,12 @@ public static long alignedTvListArrayMemCost(
/**
* Get the single alignedTVList array mem cost by give types.
*
* @param types the types in the vector
* @return AlignedTvListArrayMemSize
*/
public static long alignedTvListArrayMemCost(List<TSDataType> types) {
public long alignedTvListArrayMemCost() {
long size = 0;
// value & bitmap array mem size
for (TSDataType type : types) {
for (TSDataType type : dataTypes) {
if (type != null) {
size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize();
size += (long) PrimitiveArrayManager.ARRAY_SIZE / 8 + 1;
Expand All @@ -956,11 +952,11 @@ public static long alignedTvListArrayMemCost(List<TSDataType> types) {
// time array mem size
size += PrimitiveArrayManager.ARRAY_SIZE * 8L;
// index array mem size
size += PrimitiveArrayManager.ARRAY_SIZE * 4L;
size += (indices != null) ? PrimitiveArrayManager.ARRAY_SIZE * 4L : 0;
// array headers mem size
size += (long) NUM_BYTES_ARRAY_HEADER * (2 + types.size());
size += (long) NUM_BYTES_ARRAY_HEADER * (2 + dataTypes.size());
// Object references size in ArrayList
size += (long) NUM_BYTES_OBJECT_REF * (2 + types.size());
size += (long) NUM_BYTES_OBJECT_REF * (2 + dataTypes.size());
return size;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ public synchronized void putBinary(long timestamp, Binary value) {
minTime = Math.min(minTime, timestamp);
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
indices.get(arrayIndex)[elementIndex] = rowCount;
if (indices != null) {
indices.get(arrayIndex)[elementIndex] = rowCount;
}
rowCount++;
if (sorted) {
if (rowCount > 1 && timestamp < getTime(rowCount - 2)) {
Expand Down Expand Up @@ -123,7 +125,9 @@ protected void clearValue() {

@Override
protected void expandValues() {
indices.add((int[]) getPrimitiveArraysByType(TSDataType.INT32));
if (indices != null) {
indices.add((int[]) getPrimitiveArraysByType(TSDataType.INT32));
}
values.add((Binary[]) getPrimitiveArraysByType(TSDataType.TEXT));
if (bitMap != null) {
bitMap.add(null);
Expand Down Expand Up @@ -160,11 +164,6 @@ protected void writeValidValuesIntoTsBlock(
}
}

@Override
protected void releaseLastValueArray() {
PrimitiveArrayManager.release(values.remove(values.size() - 1));
}

@Override
public synchronized void putBinaries(
long[] time, Binary[] value, BitMap bitMap, int start, int end) {
Expand Down Expand Up @@ -201,8 +200,10 @@ public synchronized void putBinaries(
System.arraycopy(
time, idx - timeIdxOffset, timestamps.get(arrayIdx), elementIdx, inputRemaining);
System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, inputRemaining);
int[] indexes = IntStream.range(rowCount, rowCount + inputRemaining).toArray();
System.arraycopy(indexes, 0, indices.get(arrayIdx), elementIdx, inputRemaining);
if (indices != null) {
int[] indexes = IntStream.range(rowCount, rowCount + inputRemaining).toArray();
System.arraycopy(indexes, 0, indices.get(arrayIdx), elementIdx, inputRemaining);
}
rowCount += inputRemaining;
break;
} else {
Expand All @@ -211,8 +212,10 @@ public synchronized void putBinaries(
System.arraycopy(
time, idx - timeIdxOffset, timestamps.get(arrayIdx), elementIdx, internalRemaining);
System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, internalRemaining);
int[] indexes = IntStream.range(rowCount, rowCount + internalRemaining).toArray();
System.arraycopy(indexes, 0, indices.get(arrayIdx), elementIdx, internalRemaining);
if (indices != null) {
int[] indexes = IntStream.range(rowCount, rowCount + internalRemaining).toArray();
System.arraycopy(indexes, 0, indices.get(arrayIdx), elementIdx, internalRemaining);
}
idx += internalRemaining;
rowCount += internalRemaining;
checkExpansion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ public synchronized void putBoolean(long timestamp, boolean value) {
minTime = Math.min(minTime, timestamp);
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
indices.get(arrayIndex)[elementIndex] = rowCount;
if (indices != null) {
indices.get(arrayIndex)[elementIndex] = rowCount;
}
rowCount++;
if (sorted) {
if (rowCount > 1 && timestamp < getTime(rowCount - 2)) {
Expand Down Expand Up @@ -122,7 +124,9 @@ protected void clearValue() {

@Override
protected void expandValues() {
indices.add((int[]) getPrimitiveArraysByType(TSDataType.INT32));
if (indices != null) {
indices.add((int[]) getPrimitiveArraysByType(TSDataType.INT32));
}
values.add((boolean[]) getPrimitiveArraysByType(TSDataType.BOOLEAN));
if (bitMap != null) {
bitMap.add(null);
Expand Down Expand Up @@ -160,11 +164,6 @@ protected void writeValidValuesIntoTsBlock(
}
}

@Override
protected void releaseLastValueArray() {
PrimitiveArrayManager.release(values.remove(values.size() - 1));
}

@Override
public synchronized void putBooleans(
long[] time, boolean[] value, BitMap bitMap, int start, int end) {
Expand Down Expand Up @@ -201,8 +200,10 @@ public synchronized void putBooleans(
System.arraycopy(
time, idx - timeIdxOffset, timestamps.get(arrayIdx), elementIdx, inputRemaining);
System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, inputRemaining);
int[] indexes = IntStream.range(rowCount, rowCount + inputRemaining).toArray();
System.arraycopy(indexes, 0, indices.get(arrayIdx), elementIdx, inputRemaining);
if (indices != null) {
int[] indexes = IntStream.range(rowCount, rowCount + inputRemaining).toArray();
System.arraycopy(indexes, 0, indices.get(arrayIdx), elementIdx, inputRemaining);
}
rowCount += inputRemaining;
break;
} else {
Expand All @@ -211,8 +212,10 @@ public synchronized void putBooleans(
System.arraycopy(
time, idx - timeIdxOffset, timestamps.get(arrayIdx), elementIdx, internalRemaining);
System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, internalRemaining);
int[] indexes = IntStream.range(rowCount, rowCount + internalRemaining).toArray();
System.arraycopy(indexes, 0, indices.get(arrayIdx), elementIdx, internalRemaining);
if (indices != null) {
int[] indexes = IntStream.range(rowCount, rowCount + internalRemaining).toArray();
System.arraycopy(indexes, 0, indices.get(arrayIdx), elementIdx, internalRemaining);
}
idx += internalRemaining;
rowCount += internalRemaining;
checkExpansion();
Expand Down
Loading
Loading