From 2b47bcd9dee0451a7b87b7658e5a648e16f92718 Mon Sep 17 00:00:00 2001 From: Xin Liao Date: Thu, 30 Apr 2026 10:16:03 +0800 Subject: [PATCH] [fix](cloud) avoid NPE and clear stale cache on warmup job cancel/expire (#62805) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix two related bugs on the event-driven warm-up path. Together they stall BE's heavy work pool when a warm-up job is cancelled or expired while BE still has its id in `_tablet_replica_cache`. `be/src/cloud/cloud_warm_up_manager.cpp` used `st.is()` (one L). `CANCELED` is not in `ErrorCode`; ADL resolved it to `PCacheStatus::CANCELED = 9` from a proto enum, so the check compared against 9 and was always false. When FE returned `TStatusCode.CANCELLED` (value 1) to tell BE a job was done, BE never pushed the `job_id` into `cancelled_jobs`, leaving a zombie entry in `_tablet_replica_cache` that every subsequent `commit_rowset` re-queried. Fix: use `st.is()`, matching the same namespace-qualified form used elsewhere in the file. ```java if (job == null || job.isDone()) { LOG.info("warmup job {} is not running, notify caller BE {} to cancel job", job.getJobId(), clientAddr); // NPE when job == null ... } ``` Once a cancelled job was removed from `cloudWarmUpJobs` past `history_cloud_warm_up_job_keep_max_second`, `job` is null and the log call NPE'd. With bug #1 keeping the stale id in BE cache, BE kept hitting this RPC forever; each failure took the `apache::thrift::TException` branch in `thrift_rpc_helper.cpp`, which sleeps 2s inside `CloudWarmUpManager::_mtx`. That serialised `bthread_fork_join(commit_rowset)`, blocked heavy-pool threads in `CloudTabletsChannel::close`, and backed up the heavy-pool queue — leading to load timeouts and query `Fragment RPC Phase1` latency in the 10s range. Fix: log `request.getWarmUpJobId()` instead; it is guaranteed set by the enclosing `request.isSetWarmUpJobId()` check. (cherry picked from commit b1c1a5ad6df63c9d57d4d3153b86f6d8485d0539) --- be/src/cloud/cloud_warm_up_manager.cpp | 2 +- .../doris/service/FrontendServiceImpl.java | 2 +- .../service/FrontendServiceImplTest.java | 42 ++++ ...arm_up_cluster_event_cancel_expired.groovy | 203 ++++++++++++++++++ 4 files changed, 247 insertions(+), 2 deletions(-) create mode 100644 regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_cancel_expired.groovy diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index a397498a56d6cf..b63af0592d2252 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -541,7 +541,7 @@ std::vector CloudWarmUpManager::get_replica_info(int64_t tablet_id auto st = Status::create(result.status); if (!st.ok()) { - if (st.is()) { + if (st.is()) { LOG(INFO) << "get_replica_info: warm up job cancelled, tablet_id=" << tablet_id << ", job_id=" << job_id; cancelled_jobs.push_back(job_id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 7b57377d173bfe..513bf0a4f5d33c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2783,7 +2783,7 @@ public TGetTabletReplicaInfosResult getTabletReplicaInfos(TGetTabletReplicaInfos .getCloudWarmUpJob(request.getWarmUpJobId()); if (job == null || job.isDone()) { LOG.info("warmup job {} is not running, notify caller BE {} to cancel job", - job.getJobId(), clientAddr); + request.getWarmUpJobId(), clientAddr); // notify client to cancel this job result.setStatus(new TStatus(TStatusCode.CANCELLED)); return result; diff --git a/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java b/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java index f78d1bdf6e7710..c748266ecf860d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java @@ -21,6 +21,8 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; +import org.apache.doris.cloud.CacheHotspotManager; +import org.apache.doris.cloud.catalog.CloudEnv; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.nereids.parser.NereidsParser; @@ -37,6 +39,8 @@ import org.apache.doris.thrift.TFetchSchemaTableDataResult; import org.apache.doris.thrift.TGetDbsParams; import org.apache.doris.thrift.TGetDbsResult; +import org.apache.doris.thrift.TGetTabletReplicaInfosRequest; +import org.apache.doris.thrift.TGetTabletReplicaInfosResult; import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TMetadataType; import org.apache.doris.thrift.TNullableStringLiteral; @@ -53,9 +57,12 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.UUID; @@ -247,4 +254,39 @@ public void testShowUser() { TShowUserResult result = impl.showUser(request); System.out.println(result); } + + @Test + public void testGetTabletReplicaInfosNullJobReturnsCancelledWithoutNpe() { + String originalCloudUniqueId = Config.cloud_unique_id; + Config.cloud_unique_id = "gettabletreplicainfostest"; + + CloudEnv cloudEnv = Mockito.mock(CloudEnv.class); + CacheHotspotManager cacheHotspotManager = Mockito.mock(CacheHotspotManager.class); + Mockito.when(cloudEnv.getCacheHotspotMgr()).thenReturn(cacheHotspotManager); + Mockito.when(cacheHotspotManager.getCloudWarmUpJob(123456L)).thenReturn(null); + + MockedStatic envMock = Mockito.mockStatic(Env.class); + try { + envMock.when(Env::getCurrentEnv).thenReturn(cloudEnv); + + FrontendServiceImpl frontendService = new FrontendServiceImpl(exeEnv); + TGetTabletReplicaInfosRequest request = new TGetTabletReplicaInfosRequest(); + request.setTabletIds(Collections.singletonList(789L)); + request.setWarmUpJobId(123456L); + + TGetTabletReplicaInfosResult result; + try { + result = frontendService.getTabletReplicaInfos(request); + } catch (NullPointerException e) { + throw new AssertionError("getTabletReplicaInfos must not NPE when the " + + "warm-up job has been removed from CacheHotspotManager", e); + } + + Assert.assertNotNull(result.getStatus()); + Assert.assertEquals(TStatusCode.CANCELLED, result.getStatus().getStatusCode()); + } finally { + envMock.close(); + Config.cloud_unique_id = originalCloudUniqueId; + } + } } diff --git a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_cancel_expired.groovy b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_cancel_expired.groovy new file mode 100644 index 00000000000000..31c28cbf2b7afa --- /dev/null +++ b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_cancel_expired.groovy @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import groovy.json.JsonSlurper + +// Covers a two-part bug on the event-driven warm-up path: +// 1. BE side (cloud_warm_up_manager.cpp): `st.is()` used the +// proto enum value PCacheStatus::CANCELED=9 instead of +// ErrorCode::CANCELLED=1, so BE never cleared a cancelled job from +// `_tablet_replica_cache`. +// 2. FE side (FrontendServiceImpl.getTabletReplicaInfos): when the job +// had been removed from `cloudWarmUpJobs` (after +// history_cloud_warm_up_job_keep_max_second), the branch still +// called `job.getJobId()` on a null reference, throwing NPE to BE. +// +// After the fix, once a warm-up job is cancelled BE must drop it from +// its cache on the next RPC (via the CANCELLED TStatus), so that later +// expiry removal on FE never receives follow-up requests with the dead +// job_id and no NPE path is exercised. +suite('test_warm_up_cluster_event_cancel_expired', 'docker') { + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + // Keep expiry small so FE removes the cancelled job quickly. + 'history_cloud_warm_up_job_keep_max_second=30', + ] + options.beConfigs += [ + 'file_cache_enter_disk_resource_limit_mode_percent=99', + 'enable_evict_file_cache_in_advance=false', + 'file_cache_background_monitor_interval_ms=1000', + ] + options.cloudMode = true + + def clearFileCache = { ip, port -> + def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true" + def response = new URL(url).text + def json = new JsonSlurper().parseText(response) + if (json.status != "OK") { + throw new RuntimeException("Clear cache on ${ip}:${port} failed: ${json.status}") + } + } + + def clearFileCacheOnAllBackends = { + def backends = sql """SHOW BACKENDS""" + for (be in backends) { + clearFileCache(be[1], be[4]) + } + sleep(10000) + } + + def getBrpcMetrics = { ip, port, name -> + def url = "http://${ip}:${port}/brpc_metrics" + if ((context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true")) ?: false) { + url = url.replace("http://", "https://") + " --cert " + context.config.otherConfigs.get("trustCert") + " --cacert " + context.config.otherConfigs.get("trustCACert") + " --key " + context.config.otherConfigs.get("trustCAKey") + } + def metrics = new URL(url).text + def matcher = metrics =~ ~"${name}\\s+(\\d+)" + if (matcher.find()) { + return matcher[0][1] as long + } else { + throw new RuntimeException("${name} not found for ${ip}:${port}") + } + } + + def getSkippedRowsetSum = { cluster -> + def backends = sql """SHOW BACKENDS""" + def cluster_bes = backends.findAll { + it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") + } + long sum = 0 + for (be in cluster_bes) { + sum += getBrpcMetrics(be[1], be[5], "file_cache_event_driven_warm_up_skipped_rowset_num") + } + return sum + } + + docker(options) { + def clusterSrc = "warmup_source" + def clusterDst = "warmup_target" + + cluster.addBackend(1, clusterSrc) + cluster.addBackend(1, clusterDst) + + sql """use @${clusterSrc}""" + sql """CREATE TABLE IF NOT EXISTS t_exp ( + id INT, v STRING + ) DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 3 + PROPERTIES ("file_cache_ttl_seconds" = "3600")""" + + // 1. Start event-driven LOAD warm-up job; this is the only mode + // that populates `_tablet_replica_cache[jobId]` on BE. + def jobRows = sql """ + WARM UP CLUSTER ${clusterDst} WITH CLUSTER ${clusterSrc} + PROPERTIES ( + "sync_mode" = "event_driven", + "sync_event" = "load" + ) + """ + def jobId = jobRows[0][0] + logger.info("event-driven warm-up jobId=${jobId}") + clearFileCacheOnAllBackends() + sleep(15000) + + // 2. Drive some loads so BE caches the job_id in _tablet_replica_cache + // (we don't care about the warmed data itself for this test). + for (int i = 0; i < 20; i++) { + sql """INSERT INTO t_exp VALUES (${i}, 'x')""" + } + sleep(5000) + + // 3. Cancel the job. After the fix, BE sees TStatus.CANCELLED on + // the next get_replica_info and drops the job from + // _tablet_replica_cache; before the fix (CANCELED typo) it + // would keep the entry. + sql """CANCEL WARM UP JOB WHERE ID = ${jobId}""" + def st = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" + assertEquals("CANCELLED", st[0][3]) + + // 4. One more batch so BE actually sees the CANCELLED status + // and (with the fix) purges its cache entry. + for (int i = 0; i < 20; i++) { + sql """INSERT INTO t_exp VALUES (${100 + i}, 'y')""" + } + sleep(5000) + + // 5. Baseline for the skipped-rowset counter. After BE has + // cleaned its cache, subsequent warm_up_rowset calls return + // early (empty replicas -> "skipping rowset") and bump this + // counter on every commit. If the typo is unfixed the counter + // stays flat because BE keeps calling FE. + def skippedBaseline = getSkippedRowsetSum(clusterDst) + logger.info("skipped_rowset baseline=${skippedBaseline}") + + // 6. Wait past history_cloud_warm_up_job_keep_max_second plus one + // JobDaemon cycle (~20s, CYCLE_COUNT_TO_CHECK_EXPIRE=20 ticks + // at 1s each) so FE removes the cancelled job from its map. + logger.info("waiting for FE to expire+remove cancelled job") + def removed = false + for (int i = 0; i < 60; i++) { + def rows = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" + if (rows.isEmpty()) { + removed = true + logger.info("job ${jobId} removed from FE after ${i}s") + break + } + sleep(1000) + } + assertTrue(removed, "FE should have removed expired warm-up job") + + // 7. After FE removal, run more loads. Buggy code path: + // BE still has job_id in _tablet_replica_cache -> RPC to FE -> + // FE's LOG.info("...", job.getJobId(), ...) NPE's on + // `job == null` -> BE falls into the 2s thrift exception + // sleep *inside* CloudWarmUpManager::_mtx, serialising every + // commit_rowset and blowing up heavy_work_pool. + // + // Fixed code path: BE cache already cleaned in step 4, so + // warm_up_rowset takes the empty-replicas fast path, loads + // stay fast, and FE never gets called with the dead job_id. + def t0 = System.currentTimeMillis() + for (int i = 0; i < 20; i++) { + sql """INSERT INTO t_exp VALUES (${200 + i}, 'z')""" + } + def elapsedMs = System.currentTimeMillis() - t0 + logger.info("20 INSERTs after FE removal took ${elapsedMs}ms") + + // 20 INSERTs, each commit would cost ~2s of serialised sleep in + // the buggy case (>= 40s). Threshold of 30s is a generous + // upper bound that the fixed path comfortably meets but the + // buggy path cannot. + assertTrue(elapsedMs < 30_000, + "post-removal inserts should not be blocked by NPE sleeps, " + + "took ${elapsedMs}ms") + + // 8. On the fixed path every commit short-circuits through + // g_file_cache_event_driven_warm_up_skipped_rowset_num. + // We expect it to grow; on the buggy path it would be flat + // since BE never stopped pursuing FE replicas. + def skippedAfter = getSkippedRowsetSum(clusterDst) + logger.info("skipped_rowset after=${skippedAfter}") + assertTrue(skippedAfter > skippedBaseline, + "BE should skip warm_up_rowset for tablets after cancel+expire, " + + "baseline=${skippedBaseline} after=${skippedAfter}") + + sql """DROP TABLE IF EXISTS t_exp""" + } +}