From 565766ce165fc49a435e1862c3e450b9f15cd77f Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 30 Apr 2026 11:31:47 +0800 Subject: [PATCH] mb --- .../PipeConfigNodeRemainingTimeOperator.java | 24 +++---- ...DataNodeRemainingEventAndTimeOperator.java | 48 +++++-------- .../fetcher/cache/TableDeviceCacheEntry.java | 70 ++++++++----------- 3 files changed, 56 insertions(+), 86 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java index c27e4cd898b06..ff9a7ea2648d5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java @@ -66,14 +66,11 @@ class PipeConfigNodeRemainingTimeOperator extends PipeRemainingOperator { .reduce(Long::sum) .orElse(0L); - configRegionCommitMeter.updateAndGet( - meter -> { - if (Objects.nonNull(meter)) { - lastConfigRegionCommitSmoothingValue = - pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter); - } - return meter; - }); + final Meter configRegionMeter = configRegionCommitMeter.get(); + if (Objects.nonNull(configRegionMeter)) { + lastConfigRegionCommitSmoothingValue = + pipeRemainingTimeCommitRateAverageTime.getMeterRate(configRegionMeter); + } final double configRegionRemainingTime; if (totalConfigRegionWriteEventCount <= 0) { @@ -101,13 +98,10 @@ void register(final IoTDBConfigRegionSource extractor) { //////////////////////////// Rate //////////////////////////// void markConfigRegionCommit() { - configRegionCommitMeter.updateAndGet( - meter -> { - if (Objects.nonNull(meter)) { - meter.mark(); - } - return meter; - }); + final Meter meter = configRegionCommitMeter.get(); + if (Objects.nonNull(meter)) { + meter.mark(); + } } //////////////////////////// Switch //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java index 38cfb579622a5..6ff3de4d9495c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java @@ -138,14 +138,11 @@ public double getRemainingTime() { + rawTabletEventCount.get() + insertNodeEventCount.get(); - dataRegionCommitMeter.updateAndGet( - meter -> { - if (Objects.nonNull(meter)) { - lastDataRegionCommitSmoothingValue = - pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter); - } - return meter; - }); + final Meter dataRegionMeter = dataRegionCommitMeter.get(); + if (Objects.nonNull(dataRegionMeter)) { + lastDataRegionCommitSmoothingValue = + pipeRemainingTimeCommitRateAverageTime.getMeterRate(dataRegionMeter); + } final double dataRegionRemainingTime; if (totalDataRegionWriteEventCount <= 0) { dataRegionRemainingTime = 0; @@ -162,14 +159,11 @@ public double getRemainingTime() { .reduce(Long::sum) .orElse(0L); - schemaRegionCommitMeter.updateAndGet( - meter -> { - if (Objects.nonNull(meter)) { - lastSchemaRegionCommitSmoothingValue = - pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter); - } - return meter; - }); + final Meter schemaRegionMeter = schemaRegionCommitMeter.get(); + if (Objects.nonNull(schemaRegionMeter)) { + lastSchemaRegionCommitSmoothingValue = + pipeRemainingTimeCommitRateAverageTime.getMeterRate(schemaRegionMeter); + } final double schemaRegionRemainingTime; if (totalSchemaRegionWriteEventCount <= 0) { schemaRegionRemainingTime = 0; @@ -199,23 +193,17 @@ void register(final IoTDBSchemaRegionSource source) { //////////////////////////// Rate //////////////////////////// void markDataRegionCommit() { - dataRegionCommitMeter.updateAndGet( - meter -> { - if (Objects.nonNull(meter)) { - meter.mark(); - } - return meter; - }); + final Meter meter = dataRegionCommitMeter.get(); + if (Objects.nonNull(meter)) { + meter.mark(); + } } void markSchemaRegionCommit() { - schemaRegionCommitMeter.updateAndGet( - meter -> { - if (Objects.nonNull(meter)) { - meter.mark(); - } - return meter; - }); + final Meter meter = schemaRegionCommitMeter.get(); + if (Objects.nonNull(meter)) { + meter.mark(); + } } void markTsFileCollectInvocationCount(final long collectInvocationCount) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java index 7af9c25af6d0d..4f151d15eaeb6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java @@ -38,7 +38,6 @@ import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE; @@ -80,29 +79,21 @@ int updateAttribute( } int invalidateAttribute() { - final AtomicInteger size = new AtomicInteger(0); - deviceSchema.updateAndGet( - schema -> { - if (schema instanceof TableAttributeSchema) { - size.set(schema.estimateSize()); - return null; - } - return schema; - }); - return size.get(); + IDeviceSchema schema; + do { + schema = deviceSchema.get(); + if (!(schema instanceof TableAttributeSchema)) { + return 0; + } + } while (!deviceSchema.compareAndSet(schema, null)); + return schema.estimateSize(); } int invalidateAttributeColumn(final String attribute) { - final AtomicInteger size = new AtomicInteger(0); - deviceSchema.updateAndGet( - schema -> { - if (schema instanceof TableAttributeSchema) { - size.set(((TableAttributeSchema) schema).removeAttribute(attribute)); - return schema; - } - return schema; - }); - return size.get(); + final IDeviceSchema schema = deviceSchema.get(); + return schema instanceof TableAttributeSchema + ? ((TableAttributeSchema) schema).removeAttribute(attribute) + : 0; } Map getAttributeMap() { @@ -162,17 +153,15 @@ IDeviceSchema getDeviceSchema() { } int invalidateTreeSchema() { - final AtomicInteger size = new AtomicInteger(0); - deviceSchema.updateAndGet( - schema -> { - if (schema instanceof TreeDeviceNormalSchema - || schema instanceof TreeDeviceTemplateSchema) { - size.set(schema.estimateSize()); - return null; - } - return schema; - }); - return size.get(); + IDeviceSchema schema; + do { + schema = deviceSchema.get(); + if (!(schema instanceof TreeDeviceNormalSchema) + && !(schema instanceof TreeDeviceTemplateSchema)) { + return 0; + } + } while (!deviceSchema.compareAndSet(schema, null)); + return schema.estimateSize(); } /////////////////////////////// Last Cache /////////////////////////////// @@ -240,15 +229,14 @@ Optional> getLastRow( } int invalidateLastCache() { - final AtomicInteger size = new AtomicInteger(0); - lastCache.updateAndGet( - cacheEntry -> { - if (Objects.nonNull(cacheEntry)) { - size.set(cacheEntry.estimateSize()); - } - return null; - }); - return size.get(); + TableDeviceLastCache cacheEntry; + do { + cacheEntry = lastCache.get(); + if (Objects.isNull(cacheEntry)) { + return 0; + } + } while (!lastCache.compareAndSet(cacheEntry, null)); + return cacheEntry.estimateSize(); } /////////////////////////////// Management ///////////////////////////////