Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,11 @@ class PipeConfigNodeRemainingTimeOperator extends PipeRemainingOperator {
.reduce(Long::sum)
.orElse(0L);

configRegionCommitMeter.updateAndGet(
meter -> {
if (Objects.nonNull(meter)) {
lastConfigRegionCommitSmoothingValue =
pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
}
return meter;
});
final Meter configRegionMeter = configRegionCommitMeter.get();
if (Objects.nonNull(configRegionMeter)) {
lastConfigRegionCommitSmoothingValue =
pipeRemainingTimeCommitRateAverageTime.getMeterRate(configRegionMeter);
}

final double configRegionRemainingTime;
if (totalConfigRegionWriteEventCount <= 0) {
Expand Down Expand Up @@ -101,13 +98,10 @@ void register(final IoTDBConfigRegionSource extractor) {
//////////////////////////// Rate ////////////////////////////

void markConfigRegionCommit() {
configRegionCommitMeter.updateAndGet(
meter -> {
if (Objects.nonNull(meter)) {
meter.mark();
}
return meter;
});
final Meter meter = configRegionCommitMeter.get();
if (Objects.nonNull(meter)) {
meter.mark();
}
}

//////////////////////////// Switch ////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,11 @@ public double getRemainingTime() {
+ rawTabletEventCount.get()
+ insertNodeEventCount.get();

dataRegionCommitMeter.updateAndGet(
meter -> {
if (Objects.nonNull(meter)) {
lastDataRegionCommitSmoothingValue =
pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
}
return meter;
});
final Meter dataRegionMeter = dataRegionCommitMeter.get();
if (Objects.nonNull(dataRegionMeter)) {
lastDataRegionCommitSmoothingValue =
pipeRemainingTimeCommitRateAverageTime.getMeterRate(dataRegionMeter);
}
final double dataRegionRemainingTime;
if (totalDataRegionWriteEventCount <= 0) {
dataRegionRemainingTime = 0;
Expand All @@ -162,14 +159,11 @@ public double getRemainingTime() {
.reduce(Long::sum)
.orElse(0L);

schemaRegionCommitMeter.updateAndGet(
meter -> {
if (Objects.nonNull(meter)) {
lastSchemaRegionCommitSmoothingValue =
pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
}
return meter;
});
final Meter schemaRegionMeter = schemaRegionCommitMeter.get();
if (Objects.nonNull(schemaRegionMeter)) {
lastSchemaRegionCommitSmoothingValue =
pipeRemainingTimeCommitRateAverageTime.getMeterRate(schemaRegionMeter);
}
final double schemaRegionRemainingTime;
if (totalSchemaRegionWriteEventCount <= 0) {
schemaRegionRemainingTime = 0;
Expand Down Expand Up @@ -199,23 +193,17 @@ void register(final IoTDBSchemaRegionSource source) {
//////////////////////////// Rate ////////////////////////////

void markDataRegionCommit() {
dataRegionCommitMeter.updateAndGet(
meter -> {
if (Objects.nonNull(meter)) {
meter.mark();
}
return meter;
});
final Meter meter = dataRegionCommitMeter.get();
if (Objects.nonNull(meter)) {
meter.mark();
}
}

void markSchemaRegionCommit() {
schemaRegionCommitMeter.updateAndGet(
meter -> {
if (Objects.nonNull(meter)) {
meter.mark();
}
return meter;
});
final Meter meter = schemaRegionCommitMeter.get();
if (Objects.nonNull(meter)) {
meter.mark();
}
}

void markTsFileCollectInvocationCount(final long collectInvocationCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE;
Expand Down Expand Up @@ -80,29 +79,21 @@ int updateAttribute(
}

int invalidateAttribute() {
final AtomicInteger size = new AtomicInteger(0);
deviceSchema.updateAndGet(
schema -> {
if (schema instanceof TableAttributeSchema) {
size.set(schema.estimateSize());
return null;
}
return schema;
});
return size.get();
IDeviceSchema schema;
do {
schema = deviceSchema.get();
if (!(schema instanceof TableAttributeSchema)) {
return 0;
}
} while (!deviceSchema.compareAndSet(schema, null));
return schema.estimateSize();
}

int invalidateAttributeColumn(final String attribute) {
final AtomicInteger size = new AtomicInteger(0);
deviceSchema.updateAndGet(
schema -> {
if (schema instanceof TableAttributeSchema) {
size.set(((TableAttributeSchema) schema).removeAttribute(attribute));
return schema;
}
return schema;
});
return size.get();
final IDeviceSchema schema = deviceSchema.get();
return schema instanceof TableAttributeSchema
? ((TableAttributeSchema) schema).removeAttribute(attribute)
: 0;
}

Map<String, Binary> getAttributeMap() {
Expand Down Expand Up @@ -162,17 +153,15 @@ IDeviceSchema getDeviceSchema() {
}

int invalidateTreeSchema() {
final AtomicInteger size = new AtomicInteger(0);
deviceSchema.updateAndGet(
schema -> {
if (schema instanceof TreeDeviceNormalSchema
|| schema instanceof TreeDeviceTemplateSchema) {
size.set(schema.estimateSize());
return null;
}
return schema;
});
return size.get();
IDeviceSchema schema;
do {
schema = deviceSchema.get();
if (!(schema instanceof TreeDeviceNormalSchema)
&& !(schema instanceof TreeDeviceTemplateSchema)) {
return 0;
}
} while (!deviceSchema.compareAndSet(schema, null));
return schema.estimateSize();
}

/////////////////////////////// Last Cache ///////////////////////////////
Expand Down Expand Up @@ -240,15 +229,14 @@ Optional<Pair<OptionalLong, TsPrimitiveType[]>> getLastRow(
}

int invalidateLastCache() {
final AtomicInteger size = new AtomicInteger(0);
lastCache.updateAndGet(
cacheEntry -> {
if (Objects.nonNull(cacheEntry)) {
size.set(cacheEntry.estimateSize());
}
return null;
});
return size.get();
TableDeviceLastCache cacheEntry;
do {
cacheEntry = lastCache.get();
if (Objects.isNull(cacheEntry)) {
return 0;
}
} while (!lastCache.compareAndSet(cacheEntry, null));
return cacheEntry.estimateSize();
}

/////////////////////////////// Management ///////////////////////////////
Expand Down
Loading