Conversation
…educe memory (apache#61318) Issue Number: close #xxx Related PR: #xxx Problem Summary: Reduce FE memory by 1. moving top-N table stats filtering from PrometheusMetricVisitor into CloudTabletStatMgr so it's computed once per stat cycle instead of per Prometheus scrape, 2. removing the unused beToTablets field from InfightTask to avoid retaining a large map reference 3. changing InfightTablet.tabletId from Long to long to avoid boxing overhead. None - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
There was a problem hiding this comment.
Pull request overview
Cherry-pick set for branch-4.0 focused on reducing FE memory usage in cloud tablet statistics collection/Prometheus output, and improving cloud metadata persistence via checkpoints.
Changes:
- Refactors cloud tablet stats collection to support “active tablet + interval ladder” fetching and to compute Prometheus top-N table stats once per stats cycle (instead of per scrape), with master-to-follower stats push via a new FE thrift RPC.
- Threads committed tablet IDs through BE→FE report and FE transaction commit post-processing to trigger targeted stats refresh; adds a session variable–gated proc path to force-sync tablet stats.
- Enhances cloud checkpoints to optionally trigger even without new journals (stale-image threshold) and to post-process checkpoint metadata with serving-env tablet stats; adds latest image create-time tracking.
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| gensrc/thrift/FrontendService.thrift | Adds tabletIds to commit-txn report request and introduces syncCloudTabletStats RPC. |
| fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java | Extends afterCommitTxnResp to accept tablet IDs for follow-up processing. |
| fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java | Updates interface implementation signature (no-op for non-cloud). |
| fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java | Uses tablet IDs for targeted stats refresh; implements syncCloudTabletStats. |
| fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java | Adds cloud_force_sync_tablet_stats session variable. |
| fe/fe-core/src/main/java/org/apache/doris/persist/Storage.java | Tracks latest image file last-modified time for stale-image checkpoint decisions. |
| fe/fe-core/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java | Switches to pre-filtered cloud table stats list + cached total size. |
| fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java | Adds stale-image checkpoint trigger and cloud metadata post-processing into image. |
| fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java | Optional force-sync of tablet stats for a table via session variable. |
| fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java | Adds dedicated FE client pool for stats sync RPCs. |
| fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java | Collects tablet IDs on commit and marks them active for stats refresh. |
| fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java | Minor memory optimizations (avoid boxing, remove unused map retention). |
| fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java | Persists interval-ladder state and last-stats timestamp for tablets. |
| fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java | Reduces boxing overhead by using primitive counters. |
| fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java | Adds serialized names + changes defaults for rowset/segment counts (persistence-sensitive). |
| fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java | Changes stats fields to primitives to reduce memory/boxing overhead. |
| fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java | Major rework: active tablets, interval ladder, top-N filtering, master push to followers. |
| fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java | Marks tablets active after schema change commit to refresh stats. |
| fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java | Marks tablets active after rollup creation to refresh stats. |
| fe/fe-common/src/main/java/org/apache/doris/common/Config.java | Adds cloud checkpoint stale threshold + cloud tablet stats versioning/config knobs. |
| be/src/cloud/cloud_meta_mgr.cpp | Sends tablet IDs to FE with commit/compaction notifications to drive targeted refresh. |
Comments suppressed due to low confidence (1)
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:4574
- Catching
InvalidProtocolBufferExceptionand callinge.printStackTrace()will bypass the FE logger and still returnsOKto the caller. Please log viaLOG.warn/errorand return an appropriate non-OKTStatus(e.g.,INVALID_ARGUMENT) so callers can detect malformed payloads and logs are centralized.
} catch (InvalidProtocolBufferException e) {
// Handle the exception, log it, or take appropriate action
e.printStackTrace();
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } catch (InterruptedException | ExecutionException e) { | ||
| LOG.error("Error waiting for get tablet stats tasks to complete", e); |
There was a problem hiding this comment.
Same as above: this block catches InterruptedException but doesn't restore the thread interrupt status. Please call Thread.currentThread().interrupt() when interrupted to avoid losing cancellation signals.
| } catch (InterruptedException | ExecutionException e) { | |
| LOG.error("Error waiting for get tablet stats tasks to complete", e); | |
| } catch (InterruptedException e) { | |
| Thread.currentThread().interrupt(); | |
| LOG.error("Error waiting for get tablet stats tasks to complete", e); | |
| } catch (ExecutionException e) { | |
| LOG.error("Error waiting for get tablet stats tasks to complete", e); |
| + "Version 1: get all tablets; Version 2: get active and interval expired tablets"}) | ||
| public static int cloud_get_tablet_stats_version = 2; | ||
|
|
||
| @ConfField(description = {"Maximum concurrent number of get tablet stat jobs."}) |
There was a problem hiding this comment.
The cloud_sync_tablet_stats_task_threads_num description currently says "Maximum concurrent number of get tablet stat jobs", which is misleading (this config controls FE->FE sync/push tasks). Please update the description to match the actual purpose to avoid operator confusion.
| @ConfField(description = {"Maximum concurrent number of get tablet stat jobs."}) | |
| @ConfField(description = { | |
| "存算分离模式下 Master FE 发送给其它 FE 同步 tablet stats 任务的最大并发数", | |
| "Maximal concurrent number of tablet stats sync/push tasks between Master FE and other FEs in cloud mode."}) |
| @SerializedName(value = "sc") | ||
| private long segmentCount = 0L; | ||
| private long rowsetCount = 0L; | ||
| @SerializedName(value = "rsc") | ||
| private long rowsetCount = 1L; // [0-1] rowset |
There was a problem hiding this comment.
segmentCount/rowsetCount were previously serialized with their field names; adding @SerializedName("sc")/@SerializedName("rsc") without alternate values will make Gson ignore older images/journals and reset these fields to defaults on upgrade. Add alternate={"segmentCount"} and alternate={"rowsetCount"} (and keep defaults consistent) to preserve backward compatibility when loading existing metadata.
| // But in cloud mode, we may still want to force a checkpoint if the latest image file is expired. | ||
| // This helps that image can keep the newer table version, partition version, tablet stats. | ||
| LOG.info("Trigger checkpoint in cloud mode because latest image is expired. " | ||
| + "latestImageSeq: {}, latestImageCreateTime: {}", imageVersion, latestImageCreateTime); |
There was a problem hiding this comment.
When forcing a checkpoint due to cloud_checkpoint_image_stale_threshold_seconds, imageVersion can equal checkPointVersion (no new finalized journals). In that case Env.saveImage() will try to rename image.ckpt to image.<replayedJournalId> where the target file already exists, and File.renameTo may fail on some platforms/filesystems. Consider explicitly handling the "same version" case (e.g., delete/replace the existing image file safely before rename, or ensure a new image sequence is used) to avoid repeated checkpoint failures.
| + "latestImageSeq: {}, latestImageCreateTime: {}", imageVersion, latestImageCreateTime); | |
| + "latestImageSeq: {}, latestImageCreateTime: {}", imageVersion, latestImageCreateTime); | |
| // In this cloud-mode stale-image case, imageVersion can equal checkPointVersion (no new finalized | |
| // journals). Env.saveImage() will later try to rename image.ckpt to image.<checkPointVersion>. | |
| // If the target image.<checkPointVersion> already exists, File.renameTo may fail on some platforms. | |
| // To avoid this, ensure we use a strictly newer image sequence when forcing the checkpoint. | |
| if (imageVersion == checkPointVersion) { | |
| long newCheckPointVersion = checkPointVersion + 1; | |
| LOG.info("Adjust checkpoint version from {} to {} to avoid duplicate image sequence when " | |
| + "forcing checkpoint in cloud mode (stale image).", | |
| checkPointVersion, newCheckPointVersion); | |
| checkPointVersion = newCheckPointVersion; | |
| } |
| for (Table table : db.getTables()) { | ||
| Table servingTable = servingDb.getTableNullable(table.getId()); | ||
| if (servingTable == null) { | ||
| if (LOG.isDebugEnabled()) { | ||
| LOG.debug("serving table is null. dbId: {}, table: {}", db.getId(), table); | ||
| } | ||
| continue; | ||
| } | ||
| if (!(table instanceof OlapTable) || !(servingTable instanceof OlapTable)) { | ||
| continue; | ||
| } | ||
| OlapTable olapTable = (OlapTable) table; | ||
| OlapTable servingOlapTable = (OlapTable) servingTable; | ||
|
|
||
| List<Partition> partitions = olapTable.getAllPartitions(); | ||
| for (Partition partition : partitions) { | ||
| Partition servingPartition = servingOlapTable.getPartition(partition.getId()); | ||
| if (servingPartition == null) { | ||
| if (LOG.isDebugEnabled()) { | ||
| LOG.debug("serving partition is null. tableId: {}, partitionId: {}", table.getId(), | ||
| partition.getId()); | ||
| } | ||
| continue; | ||
| } | ||
| // set tablet stats | ||
| setTabletStats(table.getId(), partition, servingPartition); | ||
| } |
There was a problem hiding this comment.
postProcessCloudMetadata() traverses servingDb/servingTable/... without acquiring any catalog/table read locks. Since DDL, load, or stats updates can concurrently mutate partitions/tablets/replicas, this risks inconsistent reads or ConcurrentModificationException (e.g., Tablet#getReplicas() exposes the mutable internal list). Consider locking the serving OlapTable (and/or using defensive copies of collections) while reading from servingOlapTable to make the checkpoint post-processing robust under concurrent metadata changes.
| for (Table table : db.getTables()) { | |
| Table servingTable = servingDb.getTableNullable(table.getId()); | |
| if (servingTable == null) { | |
| if (LOG.isDebugEnabled()) { | |
| LOG.debug("serving table is null. dbId: {}, table: {}", db.getId(), table); | |
| } | |
| continue; | |
| } | |
| if (!(table instanceof OlapTable) || !(servingTable instanceof OlapTable)) { | |
| continue; | |
| } | |
| OlapTable olapTable = (OlapTable) table; | |
| OlapTable servingOlapTable = (OlapTable) servingTable; | |
| List<Partition> partitions = olapTable.getAllPartitions(); | |
| for (Partition partition : partitions) { | |
| Partition servingPartition = servingOlapTable.getPartition(partition.getId()); | |
| if (servingPartition == null) { | |
| if (LOG.isDebugEnabled()) { | |
| LOG.debug("serving partition is null. tableId: {}, partitionId: {}", table.getId(), | |
| partition.getId()); | |
| } | |
| continue; | |
| } | |
| // set tablet stats | |
| setTabletStats(table.getId(), partition, servingPartition); | |
| } | |
| servingDb.readLock(); | |
| try { | |
| for (Table table : db.getTables()) { | |
| Table servingTable = servingDb.getTableNullable(table.getId()); | |
| if (servingTable == null) { | |
| if (LOG.isDebugEnabled()) { | |
| LOG.debug("serving table is null. dbId: {}, table: {}", db.getId(), table); | |
| } | |
| continue; | |
| } | |
| if (!(table instanceof OlapTable) || !(servingTable instanceof OlapTable)) { | |
| continue; | |
| } | |
| OlapTable olapTable = (OlapTable) table; | |
| OlapTable servingOlapTable = (OlapTable) servingTable; | |
| servingOlapTable.readLock(); | |
| try { | |
| List<Partition> partitions = olapTable.getAllPartitions(); | |
| for (Partition partition : partitions) { | |
| Partition servingPartition = servingOlapTable.getPartition(partition.getId()); | |
| if (servingPartition == null) { | |
| if (LOG.isDebugEnabled()) { | |
| LOG.debug("serving partition is null. tableId: {}, partitionId: {}", table.getId(), | |
| partition.getId()); | |
| } | |
| continue; | |
| } | |
| // set tablet stats | |
| setTabletStats(table.getId(), partition, servingPartition); | |
| } | |
| } finally { | |
| servingOlapTable.readUnlock(); | |
| } | |
| } | |
| } finally { | |
| servingDb.readUnlock(); |
| try { | ||
| for (Future<Void> future : futures) { | ||
| future.get(); | ||
| } | ||
| } catch (InterruptedException | ExecutionException e) { | ||
| LOG.error("Error waiting for get tablet stats tasks to complete", e); | ||
| } |
There was a problem hiding this comment.
These catch (InterruptedException | ExecutionException ...) blocks swallow InterruptedException without restoring the interrupt flag. Please Thread.currentThread().interrupt() when catching InterruptedException (and consider aborting the wait) so higher-level shutdown/interrupt logic works correctly.
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run cloud_p0 |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
1 similar comment
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
pick: