diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 1286c9129e3d..bf96fcabe7ae 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -7,6 +7,20 @@ * Add `ServerStatusService` in the core module to provide a new way to expose booting status to other modules. * Adds Micrometer as a new component.(ID=141) * Refactor session cache in MetricsPersistentWorker. +* Cache enhancement - don't read new metrics from database in minute dimensionality. +``` + // When + // (1) the time bucket of the server's latest stability status is provided + // 1.1 the OAP has booted successfully + // 1.2 the current dimensionality is in minute. + // (2) the metrics are from the time after the timeOfLatestStabilitySts + // (3) the metrics don't exist in the cache + // the kernel should NOT try to load it from the database. + // + // Notice, about condition (2), + // for the specific minute of booted successfully, the metrics are expected to load from database when + // it doesn't exist in the cache. +``` #### UI diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index 51e9dfd937ad..169332a168b1 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -25,11 +25,15 @@ import java.util.Properties; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.analysis.DownSampling; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData; import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.exporter.ExportEvent; +import org.apache.skywalking.oap.server.core.status.ServerStatusService; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback; import org.apache.skywalking.oap.server.core.storage.model.Model; @@ -89,6 +93,16 @@ public class MetricsPersistentWorker extends PersistenceWorker { * @since 8.7.0 TTL settings from {@link org.apache.skywalking.oap.server.core.CoreModuleConfig#getMetricsDataTTL()} */ private int metricsDataTTL; + /** + * @since 9.4.0 + */ + private final ServerStatusService serverStatusService; + /** + * The time bucket is 0 or in minute dimensionality of the system in the latest stability status. + * + * @since 9.4.0 + */ + private long timeOfLatestStabilitySts = 0; MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, AbstractWorker nextAlarmWorker, AbstractWorker nextExportWorker, @@ -145,6 +159,7 @@ public class MetricsPersistentWorker extends PersistenceWorker { new MetricsTag.Keys("status"), new MetricsTag.Values("cached") ); SESSION_TIMEOUT_OFFSITE_COUNTER++; + serverStatusService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ServerStatusService.class); } /** @@ -191,6 +206,11 @@ public List buildBatchRequests() { return Collections.emptyList(); } + if (model.getDownsampling().equals(DownSampling.Minute)) { + timeOfLatestStabilitySts = TimeBucket.getMinuteTimeBucket( + serverStatusService.getBootingStatus().getUptime()); + } + /* * Hard coded the max size. This only affect the multiIDRead if the data doesn't hit the cache. */ @@ -299,7 +319,7 @@ private void loadFromStorage(List metrics) { List notInCacheMetrics = metrics.stream() .filter(m -> { - final Metrics cachedValue = sessionCache.get(m); + final Metrics cachedValue = requireInitialization(m); // the metric is tagged `not in cache`. if (cachedValue == null) { return true; @@ -340,6 +360,41 @@ public void endOfRound() { sessionCache.removeExpired(); } + /** + * Check the metrics whether in the cache, and whether the worker should go further to load from database. + * + * @param metrics the metrics in the streaming process. + * @return metrics in cache or null if try to read the metrics from the database. + */ + private Metrics requireInitialization(Metrics metrics) { + final Metrics cached = sessionCache.get(metrics); + + // All cached metrics, it at least had been written once. + if (cached != null) { + return cached; + } + + // When + // (1) the time bucket of the server's latest stability status is provided + // 1.1 the OAP has booted successfully + // 1.2 the current dimensionality is in minute. + // (2) the metrics are from the time after the timeOfLatestStabilitySts + // (3) the metrics don't exist in the cache + // the kernel should NOT try to load it from the database. + // + // Notice, about condition (2), + // for the specific minute of booted successfully, the metrics are expected to load from database when + // it doesn't exist in the cache. + if (timeOfLatestStabilitySts > 0 && + metrics.getTimeBucket() > timeOfLatestStabilitySts + && cached == null) { + // Return metrics as input to avoid reading from database. + return metrics; + } + + return null; + } + /** * Metrics queue processor, merge the received metrics if existing one with same ID(s) and time bucket. * diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/status/ServerStatusService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/status/ServerStatusService.java index 27a3b328dfa5..fac17a1b5d9a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/status/ServerStatusService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/status/ServerStatusService.java @@ -18,6 +18,7 @@ package org.apache.skywalking.oap.server.core.status; +import lombok.Getter; import lombok.RequiredArgsConstructor; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.Service; @@ -35,6 +36,7 @@ @RequiredArgsConstructor public class ServerStatusService implements Service { private final ModuleManager manager; + @Getter private BootingStatus bootingStatus = new BootingStatus(); public void bootedNow(long uptime) {