[fix](cloud) avoid NPE and clear stale cache on warmup job cancel/expire (#62805)#63019
Conversation
…ire (apache#62805) Fix two related bugs on the event-driven warm-up path. Together they stall BE's heavy work pool when a warm-up job is cancelled or expired while BE still has its id in `_tablet_replica_cache`. `be/src/cloud/cloud_warm_up_manager.cpp` used `st.is<CANCELED>()` (one L). `CANCELED` is not in `ErrorCode`; ADL resolved it to `PCacheStatus::CANCELED = 9` from a proto enum, so the check compared against 9 and was always false. When FE returned `TStatusCode.CANCELLED` (value 1) to tell BE a job was done, BE never pushed the `job_id` into `cancelled_jobs`, leaving a zombie entry in `_tablet_replica_cache` that every subsequent `commit_rowset` re-queried. Fix: use `st.is<ErrorCode::CANCELLED>()`, matching the same namespace-qualified form used elsewhere in the file. ```java if (job == null || job.isDone()) { LOG.info("warmup job {} is not running, notify caller BE {} to cancel job", job.getJobId(), clientAddr); // NPE when job == null ... } ``` Once a cancelled job was removed from `cloudWarmUpJobs` past `history_cloud_warm_up_job_keep_max_second`, `job` is null and the log call NPE'd. With bug apache#1 keeping the stale id in BE cache, BE kept hitting this RPC forever; each failure took the `apache::thrift::TException` branch in `thrift_rpc_helper.cpp`, which sleeps 2s inside `CloudWarmUpManager::_mtx`. That serialised `bthread_fork_join(commit_rowset)`, blocked heavy-pool threads in `CloudTabletsChannel::close`, and backed up the heavy-pool queue — leading to load timeouts and query `Fragment RPC Phase1` latency in the 10s range. Fix: log `request.getWarmUpJobId()` instead; it is guaranteed set by the enclosing `request.isSetWarmUpJobId()` check. (cherry picked from commit b1c1a5a)
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
There was a problem hiding this comment.
Pull request overview
This PR fixes a two-sided failure mode in the cloud event-driven warm-up flow: FE could throw an NPE when a warm-up job had already been expired/removed, and BE could fail to recognize job cancellation and therefore keep stale warm-up job entries in its tablet replica cache.
Changes:
- FE: Avoid NPE in
FrontendServiceImpl.getTabletReplicaInfos()whenCloudWarmUpJobis null by loggingrequest.getWarmUpJobId()and returningCANCELLED. - BE: Correctly detect cancellation using
Status::is<ErrorCode::CANCELLED>()and clear_tablet_replica_cacheentries for cancelled jobs. - Add FE unit test and a cloud regression test covering cancel + expire behavior and the stale-cache cleanup path.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_cancel_expired.groovy | New regression test reproducing cancel+expire warm-up issues and validating BE cache cleanup behavior. |
| fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java | Adds a unit test ensuring getTabletReplicaInfos returns CANCELLED instead of NPE when warm-up job is missing. |
| fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java | Fixes the null-job logging path to avoid calling job.getJobId() when job is null. |
| be/src/cloud/cloud_warm_up_manager.cpp | Fixes cancellation detection so BE can clear stale warm-up job cache entries. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def clearFileCache = { ip, port -> | ||
| def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true" | ||
| def response = new URL(url).text | ||
| def json = new JsonSlurper().parseText(response) | ||
| if (json.status != "OK") { | ||
| throw new RuntimeException("Clear cache on ${ip}:${port} failed: ${json.status}") | ||
| } |
| def url = "http://${ip}:${port}/brpc_metrics" | ||
| if ((context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true")) ?: false) { | ||
| url = url.replace("http://", "https://") + " --cert " + context.config.otherConfigs.get("trustCert") + " --cacert " + context.config.otherConfigs.get("trustCACert") + " --key " + context.config.otherConfigs.get("trustCAKey") | ||
| } | ||
| def metrics = new URL(url).text |
|
run buildall |
|
skip buildall |
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
Pick #62805