From e98cfbfb7689586e756ab7b6122470b9a508c23f Mon Sep 17 00:00:00 2001 From: freemandealer Date: Sat, 11 Apr 2026 12:34:46 +0800 Subject: [PATCH 1/3] [fix](cloud) Deduplicate pending one-shot warm up jobs Issue Number: N/A Related PR: selectdb/selectdb-core#8320 Problem Summary: Equivalent one-shot warm up requests could accumulate duplicate PENDING jobs for the same destination. Reuse the oldest matching pending job for TABLE and CLUSTER once jobs instead of appending another duplicate. None - Test: No need to test (picked directly per request without compile/test) - Behavior changed: Yes (equivalent pending one-shot warm up requests now reuse the existing job id) - Does this need documentation: No --- .../doris/cloud/CacheHotspotManager.java | 172 ++++++++++++- .../cloud/cache/CacheHotspotManagerTest.java | 236 +++++++++++++++++- 2 files changed, 398 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java index 500f65de153df4..68a68f99494cbe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java @@ -67,6 +67,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -82,6 +83,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReentrantLock; public class CacheHotspotManager extends MasterDaemon { public static final int MAX_SHOW_ENTRIES = 2000; @@ -109,6 +111,8 @@ public class CacheHotspotManager extends MasterDaemon { private ConcurrentMap runnableCloudWarmUpJobs = Maps.newConcurrentMap(); + private final ConcurrentMap oncePendingCreateLocks = Maps.newConcurrentMap(); + private final ThreadPoolExecutor cloudWarmUpThreadPool = ThreadPoolManager.newDaemonCacheThreadPool( Config.max_active_cloud_warm_up_job, "cloud-warm-up-pool", true); @@ -148,10 +152,154 @@ public String toString() { } } + private static class OncePendingJobKey { + private final JobType jobType; + private final String srcName; + private final String dstName; + private final List normalizedTables; + private final boolean force; + + OncePendingJobKey(JobType jobType, String srcName, String dstName, + List normalizedTables, boolean force) { + this.jobType = jobType; + this.srcName = normalizeNullableName(srcName); + this.dstName = normalizeNullableName(dstName); + this.normalizedTables = normalizedTables.isEmpty() + ? Collections.emptyList() + : Collections.unmodifiableList(new ArrayList<>(normalizedTables)); + this.force = force; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof OncePendingJobKey)) { + return false; + } + OncePendingJobKey jobKey = (OncePendingJobKey) o; + return force == jobKey.force + && jobType == jobKey.jobType + && Objects.equals(srcName, jobKey.srcName) + && Objects.equals(dstName, jobKey.dstName) + && Objects.equals(normalizedTables, jobKey.normalizedTables); + } + + @Override + public int hashCode() { + return Objects.hash(jobType, srcName, dstName, normalizedTables, force); + } + + @Override + public String toString() { + return "OncePendingWarmUpJob{" + + "jobType=" + jobType + + ", src='" + srcName + '\'' + + ", dst='" + dstName + '\'' + + ", tables=" + normalizedTables + + ", force=" + force + + '}'; + } + } + // Tracks long-running jobs (event-driven and periodic). // Ensures only one active job exists per tuple. private Set repeatJobDetectionSet = ConcurrentHashMap.newKeySet(); + private static String normalizeNullableName(String value) { + return value == null ? "" : value; + } + + private static String normalizeTableKey(Triple tableTriple) { + String dbName = normalizeNullableName(tableTriple.getLeft()); + String tableName = normalizeNullableName(tableTriple.getMiddle()); + String partitionName = normalizeNullableName(tableTriple.getRight()); + if (partitionName.isEmpty()) { + return dbName + "." + tableName; + } + return dbName + "." + tableName + "." + partitionName; + } + + private static List normalizeTables(List> tables) { + if (tables == null || tables.isEmpty()) { + return Collections.emptyList(); + } + HashSet normalizedTables = new HashSet<>(); + for (Triple table : tables) { + normalizedTables.add(normalizeTableKey(table)); + } + List sortedTables = new ArrayList<>(normalizedTables); + Collections.sort(sortedTables); + return sortedTables; + } + + private boolean isClusterOnceCommand(WarmUpClusterCommand command) { + Map properties = command.getProperties(); + if (properties == null) { + return true; + } + String syncMode = properties.get("sync_mode"); + return !"periodic".equals(syncMode) && !"event_driven".equals(syncMode); + } + + private OncePendingJobKey buildOncePendingJobKey(WarmUpClusterCommand command) { + if (command.isWarmUpWithTable()) { + return new OncePendingJobKey(JobType.TABLE, "", command.getDstCluster(), + normalizeTables(command.getTables()), command.isForce()); + } + if (!isClusterOnceCommand(command)) { + return null; + } + return new OncePendingJobKey(JobType.CLUSTER, command.getSrcCluster(), + command.getDstCluster(), Collections.emptyList(), false); + } + + private OncePendingJobKey buildOncePendingJobKey(CloudWarmUpJob job) { + if (!job.isOnce()) { + return null; + } + if (job.getJobType() == JobType.TABLE) { + return new OncePendingJobKey(JobType.TABLE, "", job.getDstClusterName(), + normalizeTables(job.tables), job.force); + } + if (job.getJobType() == JobType.CLUSTER) { + return new OncePendingJobKey(JobType.CLUSTER, job.getSrcClusterName(), + job.getDstClusterName(), Collections.emptyList(), false); + } + return null; + } + + private CloudWarmUpJob findExistingPendingOnceJob(OncePendingJobKey key) { + CloudWarmUpJob selectedJob = null; + for (CloudWarmUpJob job : cloudWarmUpJobs.values()) { + if (job.getJobState() != JobState.PENDING || !job.isOnce()) { + continue; + } + OncePendingJobKey existingKey = buildOncePendingJobKey(job); + if (!key.equals(existingKey)) { + continue; + } + if (selectedJob == null + || job.getCreateTimeMs() < selectedJob.getCreateTimeMs() + || (job.getCreateTimeMs() == selectedJob.getCreateTimeMs() + && job.getJobId() < selectedJob.getJobId())) { + selectedJob = job; + } + } + return selectedJob; + } + + private ReentrantLock getOncePendingCreateLock(OncePendingJobKey key) { + ReentrantLock lock = oncePendingCreateLocks.get(key); + if (lock != null) { + return lock; + } + ReentrantLock newLock = new ReentrantLock(); + ReentrantLock existingLock = oncePendingCreateLocks.putIfAbsent(key, newLock); + return existingLock == null ? newLock : existingLock; + } + private void registerJobForRepeatDetection(CloudWarmUpJob job, boolean replay) throws AnalysisException { if (job.isDone()) { return; @@ -781,6 +929,26 @@ public Map> warmUpNewClusterByTable(long jobId, String dstClu } public long createJob(WarmUpClusterCommand stmt) throws AnalysisException { + OncePendingJobKey oncePendingJobKey = buildOncePendingJobKey(stmt); + if (oncePendingJobKey != null) { + ReentrantLock createLock = getOncePendingCreateLock(oncePendingJobKey); + createLock.lock(); + try { + CloudWarmUpJob existingPendingJob = findExistingPendingOnceJob(oncePendingJobKey); + if (existingPendingJob != null) { + LOG.info("reuse existing pending warm up job {} for key {}", + existingPendingJob.getJobId(), oncePendingJobKey); + return existingPendingJob.getJobId(); + } + return createJobInternal(stmt); + } finally { + createLock.unlock(); + } + } + return createJobInternal(stmt); + } + + private long createJobInternal(WarmUpClusterCommand stmt) throws AnalysisException { long jobId = Env.getCurrentEnv().getNextId(); CloudWarmUpJob warmUpJob; if (stmt.isWarmUpWithTable()) { @@ -800,6 +968,9 @@ public long createJob(WarmUpClusterCommand stmt) throws AnalysisException { .setJobType(JobType.CLUSTER); Map properties = stmt.getProperties(); + if (properties == null) { + properties = Collections.emptyMap(); + } if ("periodic".equals(properties.get("sync_mode"))) { String syncIntervalSecStr = properties.get("sync_interval_sec"); if (syncIntervalSecStr == null) { @@ -831,7 +1002,6 @@ public long createJob(WarmUpClusterCommand stmt) throws AnalysisException { } warmUpJob = builder.build(); } - addCloudWarmUpJob(warmUpJob); Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(warmUpJob); diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java index caf62468528f9d..da963e146f8303 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java @@ -17,30 +17,70 @@ package org.apache.doris.cloud.cache; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Tablet; import org.apache.doris.cloud.CacheHotspotManager; +import org.apache.doris.cloud.CloudWarmUpJob; +import org.apache.doris.cloud.CloudWarmUpJob.JobState; +import org.apache.doris.cloud.CloudWarmUpJob.JobType; +import org.apache.doris.cloud.CloudWarmUpJob.SyncEvent; +import org.apache.doris.cloud.CloudWarmUpJob.SyncMode; import org.apache.doris.cloud.catalog.CloudTablet; import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.Triple; +import org.apache.doris.nereids.trees.plans.commands.WarmUpClusterCommand; +import org.apache.doris.persist.EditLog; import org.apache.doris.system.Backend; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.MockedStatic; import org.mockito.Mockito; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; public class CacheHotspotManagerTest { private CacheHotspotManager cacheHotspotManager; private CloudSystemInfoService cloudSystemInfoService; - private Partition partition; + private boolean originalRunningUnitTest; + private AtomicLong nextJobId; + private Env env; + private EditLog editLog; + private MockedStatic envMockedStatic; + + @Before + public void setUp() { + originalRunningUnitTest = FeConstants.runningUnitTest; + FeConstants.runningUnitTest = true; + nextJobId = new AtomicLong(1000L); + env = Mockito.mock(Env.class); + editLog = Mockito.mock(EditLog.class); + Mockito.when(env.getNextId()).thenAnswer(invocation -> nextJobId.getAndIncrement()); + Mockito.when(env.getEditLog()).thenReturn(editLog); + envMockedStatic = Mockito.mockStatic(Env.class); + envMockedStatic.when(Env::getCurrentEnv).thenReturn(env); + cloudSystemInfoService = new CloudSystemInfoService(); + cacheHotspotManager = new CacheHotspotManager(cloudSystemInfoService); + } + + @After + public void tearDown() { + envMockedStatic.close(); + FeConstants.runningUnitTest = originalRunningUnitTest; + } @Test public void testWarmUpNewClusterByTable() { @@ -84,22 +124,200 @@ public void testWarmUpNewClusterByTable() { } }); - // Setup mock data long jobId = 1L; String dstClusterName = "test_cluster"; List> tables = new ArrayList<>(); tables.add(Triple.of("test_db", "test_table", "")); - // force = true Map> result = cacheHotspotManager.warmUpNewClusterByTable( jobId, dstClusterName, tables, true); - Assert.assertEquals(result.size(), 1); - Assert.assertEquals(result.get(11L).get(0).getId(), 1001L); + Assert.assertEquals(1, result.size()); + Assert.assertEquals(1001L, result.get(11L).get(0).getId()); - // force = false - RuntimeException exception = Assert.assertThrows(RuntimeException.class, () -> { - cacheHotspotManager.warmUpNewClusterByTable(jobId, dstClusterName, tables, false); - }); + RuntimeException exception = Assert.assertThrows(RuntimeException.class, () -> + cacheHotspotManager.warmUpNewClusterByTable(jobId, dstClusterName, tables, false)); Assert.assertEquals("The cluster " + dstClusterName + " cache size is not enough", exception.getMessage()); } + + @Test + public void testCreateTableOnceJobDedupesPendingOrderDifference() throws AnalysisException { + long firstJobId = cacheHotspotManager.createJob(newTableStmt("dst", false, + Triple.of("db1", "tbl1", ""), + Triple.of("db2", "tbl2", "p1"))); + long reusedJobId = cacheHotspotManager.createJob(newTableStmt("dst", false, + Triple.of("db2", "tbl2", "p1"), + Triple.of("db1", "tbl1", ""))); + + Assert.assertEquals(firstJobId, reusedJobId); + Assert.assertEquals(1, cacheHotspotManager.getCloudWarmUpJobs().size()); + Mockito.verify(env, Mockito.times(1)).getNextId(); + Mockito.verify(editLog, Mockito.times(1)).logModifyCloudWarmUpJob(Mockito.any(CloudWarmUpJob.class)); + } + + @Test + public void testCreateTableOnceJobDedupesDuplicateTableEntries() throws AnalysisException { + long firstJobId = cacheHotspotManager.createJob(newTableStmt("dst", false, + Triple.of("db1", "tbl1", ""), + Triple.of("db1", "tbl1", ""))); + long reusedJobId = cacheHotspotManager.createJob(newTableStmt("dst", false, + Triple.of("db1", "tbl1", ""))); + + Assert.assertEquals(firstJobId, reusedJobId); + Assert.assertEquals(1, cacheHotspotManager.getCloudWarmUpJobs().size()); + } + + @Test + public void testCreateTableOnceJobDoesNotDedupDifferentForce() throws AnalysisException { + long forceFalseJobId = cacheHotspotManager.createJob(newTableStmt("dst", false, + Triple.of("db1", "tbl1", ""))); + long forceTrueJobId = cacheHotspotManager.createJob(newTableStmt("dst", true, + Triple.of("db1", "tbl1", ""))); + + Assert.assertNotEquals(forceFalseJobId, forceTrueJobId); + Assert.assertEquals(2, cacheHotspotManager.getCloudWarmUpJobs().size()); + } + + @Test + public void testCreateClusterOnceJobDedupesPendingJob() throws AnalysisException { + long firstJobId = cacheHotspotManager.createJob(newClusterStmt("dst", "src", false)); + long reusedJobId = cacheHotspotManager.createJob(newClusterStmt("dst", "src", false)); + + Assert.assertEquals(firstJobId, reusedJobId); + Assert.assertEquals(1, cacheHotspotManager.getCloudWarmUpJobs().size()); + } + + @Test + public void testCreateClusterOnceJobDedupesRegardlessOfForceFlag() throws AnalysisException { + long firstJobId = cacheHotspotManager.createJob(newClusterStmt("dst", "src", false)); + long reusedJobId = cacheHotspotManager.createJob(newClusterStmt("dst", "src", true)); + + Assert.assertEquals(firstJobId, reusedJobId); + Assert.assertEquals(1, cacheHotspotManager.getCloudWarmUpJobs().size()); + } + + @Test + public void testCreateClusterOnceJobAllowsNewPendingWhenOnlyRunningExists() throws AnalysisException { + CloudWarmUpJob runningJob = newClusterJob(10L, "src", "dst", SyncMode.ONCE, JobState.RUNNING, 100L); + cacheHotspotManager.addCloudWarmUpJob(runningJob); + + long newJobId = cacheHotspotManager.createJob(newClusterStmt("dst", "src", false)); + + Assert.assertNotEquals(runningJob.getJobId(), newJobId); + Assert.assertEquals(2, cacheHotspotManager.getCloudWarmUpJobs().size()); + Assert.assertEquals(JobState.PENDING, cacheHotspotManager.getCloudWarmUpJob(newJobId).getJobState()); + } + + @Test + public void testCreateClusterOnceJobReusesPendingWhenRunningAndPendingExist() throws AnalysisException { + CloudWarmUpJob runningJob = newClusterJob(10L, "src", "dst", SyncMode.ONCE, JobState.RUNNING, 100L); + CloudWarmUpJob pendingJob = newClusterJob(11L, "src", "dst", SyncMode.ONCE, JobState.PENDING, 200L); + cacheHotspotManager.addCloudWarmUpJob(runningJob); + cacheHotspotManager.addCloudWarmUpJob(pendingJob); + + long reusedJobId = cacheHotspotManager.createJob(newClusterStmt("dst", "src", false)); + + Assert.assertEquals(pendingJob.getJobId(), reusedJobId); + Assert.assertEquals(2, cacheHotspotManager.getCloudWarmUpJobs().size()); + } + + @Test + public void testCreateOnceJobIgnoresFinishedHistory() throws AnalysisException { + CloudWarmUpJob finishedJob = newClusterJob(10L, "src", "dst", SyncMode.ONCE, JobState.FINISHED, 100L); + cacheHotspotManager.addCloudWarmUpJob(finishedJob); + + long newJobId = cacheHotspotManager.createJob(newClusterStmt("dst", "src", false)); + + Assert.assertNotEquals(finishedJob.getJobId(), newJobId); + Assert.assertEquals(2, cacheHotspotManager.getCloudWarmUpJobs().size()); + Assert.assertEquals(JobState.PENDING, cacheHotspotManager.getCloudWarmUpJob(newJobId).getJobState()); + } + + @Test + public void testCreateClusterOnceJobReusesOldestHistoricalPendingDuplicateAfterReplay() throws Exception { + CloudWarmUpJob newerPendingJob = newClusterJob(20L, "src", "dst", SyncMode.ONCE, JobState.PENDING, 200L); + CloudWarmUpJob olderPendingJob = newClusterJob(30L, "src", "dst", SyncMode.ONCE, JobState.PENDING, 100L); + cacheHotspotManager.replayCloudWarmUpJob(newerPendingJob); + cacheHotspotManager.replayCloudWarmUpJob(olderPendingJob); + + long reusedJobId = cacheHotspotManager.createJob(newClusterStmt("dst", "src", false)); + + Assert.assertEquals(olderPendingJob.getJobId(), reusedJobId); + Assert.assertEquals(2, cacheHotspotManager.getCloudWarmUpJobs().size()); + } + + @Test + public void testCreatePeriodicJobUnaffected() throws AnalysisException { + WarmUpClusterCommand periodicStmt = newClusterStmt("dst", "src", false, periodicProperties(60)); + long firstJobId = cacheHotspotManager.createJob(periodicStmt); + AnalysisException exception = Assert.assertThrows(AnalysisException.class, () -> + cacheHotspotManager.createJob(newClusterStmt("dst", "src", false, periodicProperties(60)))); + + Assert.assertEquals(1000L, firstJobId); + Assert.assertTrue(exception.getMessage().contains("already has a runnable job")); + Assert.assertEquals(1, cacheHotspotManager.getCloudWarmUpJobs().size()); + } + + @Test + public void testCreateEventDrivenJobUnaffected() throws AnalysisException { + WarmUpClusterCommand eventDrivenStmt = newClusterStmt("dst", "src", false, eventDrivenProperties("load")); + long firstJobId = cacheHotspotManager.createJob(eventDrivenStmt); + AnalysisException exception = Assert.assertThrows(AnalysisException.class, () -> + cacheHotspotManager.createJob(newClusterStmt("dst", "src", false, eventDrivenProperties("load")))); + + Assert.assertEquals(1000L, firstJobId); + Assert.assertTrue(exception.getMessage().contains("already has a runnable job")); + Assert.assertEquals(1, cacheHotspotManager.getCloudWarmUpJobs().size()); + } + + private WarmUpClusterCommand newTableStmt(String dstClusterName, boolean force, + Triple... tables) { + WarmUpClusterCommand stmt = new WarmUpClusterCommand(new ArrayList<>(), + null, dstClusterName, force, true); + for (Triple table : tables) { + stmt.getTables().add(table); + } + return stmt; + } + + private WarmUpClusterCommand newClusterStmt(String dstClusterName, String srcClusterName, boolean force) { + return newClusterStmt(dstClusterName, srcClusterName, force, new HashMap<>()); + } + + private WarmUpClusterCommand newClusterStmt(String dstClusterName, String srcClusterName, + boolean force, Map properties) { + return new WarmUpClusterCommand(null, srcClusterName, dstClusterName, force, false, properties); + } + + private Map periodicProperties(long syncIntervalSec) { + Map properties = new HashMap<>(); + properties.put("sync_mode", "periodic"); + properties.put("sync_interval_sec", String.valueOf(syncIntervalSec)); + return properties; + } + + private Map eventDrivenProperties(String syncEvent) { + Map properties = new HashMap<>(); + properties.put("sync_mode", "event_driven"); + properties.put("sync_event", syncEvent); + return properties; + } + + private CloudWarmUpJob newClusterJob(long jobId, String srcClusterName, String dstClusterName, + SyncMode syncMode, JobState jobState, long createTimeMs) { + CloudWarmUpJob.Builder builder = new CloudWarmUpJob.Builder() + .setJobId(jobId) + .setSrcClusterName(srcClusterName) + .setDstClusterName(dstClusterName) + .setJobType(JobType.CLUSTER) + .setSyncMode(syncMode); + if (syncMode == SyncMode.PERIODIC) { + builder.setSyncInterval(60L); + } else if (syncMode == SyncMode.EVENT_DRIVEN) { + builder.setSyncEvent(SyncEvent.LOAD); + } + CloudWarmUpJob job = builder.build(); + job.setJobState(jobState); + job.setCreateTimeMs(createTimeMs); + return job; + } } From ac53c1aa7de01b4c5c2c8cb9cbce1edc35ac1b22 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Thu, 23 Apr 2026 15:18:06 +0800 Subject: [PATCH 2/3] [fix](cloud) Release once warm up dedupe locks after last waiter ### What problem does this PR solve? Issue Number: None Related PR: None Problem Summary: Replace once-pending warm up creation locks with reference-counted per-key locks so dedupe entries are removed after the last holder or waiter exits instead of growing without bound. ### Release note None ### Check List (For Author) - Test: Unit Test - ./run-fe-ut.sh --run org.apache.doris.cloud.cache.CacheHotspotManagerTest (with CUSTOM_MVN=/tmp/mvn-fe-core-am) - Behavior changed: Yes (once warm up dedupe lock entries are released after the last holder or waiter exits; duplicate job reuse behavior stays the same) - Does this need documentation: No --- .../doris/cloud/CacheHotspotManager.java | 54 ++++++++-- .../cloud/cache/CacheHotspotManagerTest.java | 99 +++++++++++++++++++ 2 files changed, 143 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java index 68a68f99494cbe..72208bf57a5832 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java @@ -52,6 +52,7 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; @@ -111,7 +112,8 @@ public class CacheHotspotManager extends MasterDaemon { private ConcurrentMap runnableCloudWarmUpJobs = Maps.newConcurrentMap(); - private final ConcurrentMap oncePendingCreateLocks = Maps.newConcurrentMap(); + private final ConcurrentMap oncePendingCreateLocks + = Maps.newConcurrentMap(); private final ThreadPoolExecutor cloudWarmUpThreadPool = ThreadPoolManager.newDaemonCacheThreadPool( Config.max_active_cloud_warm_up_job, "cloud-warm-up-pool", true); @@ -203,6 +205,30 @@ public String toString() { } } + private static class RefCountedPendingCreateLock { + private final ReentrantLock lock = new ReentrantLock(); + + // Tracks holders and waiters that retained the entry before locking. + private volatile int refCount = 1; + + void retain() { + ++refCount; + } + + int release() { + Preconditions.checkState(refCount > 0, "once pending create lock ref count underflow"); + return --refCount; + } + + void lock() { + lock.lock(); + } + + void unlock() { + lock.unlock(); + } + } + // Tracks long-running jobs (event-driven and periodic). // Ensures only one active job exists per tuple. private Set repeatJobDetectionSet = ConcurrentHashMap.newKeySet(); @@ -290,14 +316,21 @@ private CloudWarmUpJob findExistingPendingOnceJob(OncePendingJobKey key) { return selectedJob; } - private ReentrantLock getOncePendingCreateLock(OncePendingJobKey key) { - ReentrantLock lock = oncePendingCreateLocks.get(key); - if (lock != null) { - return lock; - } - ReentrantLock newLock = new ReentrantLock(); - ReentrantLock existingLock = oncePendingCreateLocks.putIfAbsent(key, newLock); - return existingLock == null ? newLock : existingLock; + private RefCountedPendingCreateLock retainOncePendingCreateLock(OncePendingJobKey key) { + return oncePendingCreateLocks.compute(key, (ignored, existingLock) -> { + if (existingLock == null) { + return new RefCountedPendingCreateLock(); + } + existingLock.retain(); + return existingLock; + }); + } + + private void releaseOncePendingCreateLock(OncePendingJobKey key, RefCountedPendingCreateLock lock) { + oncePendingCreateLocks.compute(key, (ignored, existingLock) -> { + Preconditions.checkState(existingLock == lock, "unexpected once pending create lock entry"); + return existingLock.release() == 0 ? null : existingLock; + }); } private void registerJobForRepeatDetection(CloudWarmUpJob job, boolean replay) throws AnalysisException { @@ -931,7 +964,7 @@ public Map> warmUpNewClusterByTable(long jobId, String dstClu public long createJob(WarmUpClusterCommand stmt) throws AnalysisException { OncePendingJobKey oncePendingJobKey = buildOncePendingJobKey(stmt); if (oncePendingJobKey != null) { - ReentrantLock createLock = getOncePendingCreateLock(oncePendingJobKey); + RefCountedPendingCreateLock createLock = retainOncePendingCreateLock(oncePendingJobKey); createLock.lock(); try { CloudWarmUpJob existingPendingJob = findExistingPendingOnceJob(oncePendingJobKey); @@ -943,6 +976,7 @@ public long createJob(WarmUpClusterCommand stmt) throws AnalysisException { return createJobInternal(stmt); } finally { createLock.unlock(); + releaseOncePendingCreateLock(oncePendingJobKey, createLock); } } return createJobInternal(stmt); diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java index da963e146f8303..dcf27f7604e236 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java @@ -44,12 +44,19 @@ import org.mockito.MockedStatic; import org.mockito.Mockito; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class CacheHotspotManagerTest { @@ -245,6 +252,67 @@ public void testCreateClusterOnceJobReusesOldestHistoricalPendingDuplicateAfterR Assert.assertEquals(2, cacheHotspotManager.getCloudWarmUpJobs().size()); } + @Test + public void testCreateTableOnceJobRemovesLockEntryWhenCreateFails() throws Exception { + boolean previousRunningUnitTest = FeConstants.runningUnitTest; + FeConstants.runningUnitTest = false; + cacheHotspotManager = new CacheHotspotManager(cloudSystemInfoService) { + @Override + public Map> warmUpNewClusterByTable(long jobId, String dstClusterName, + List> tables, boolean isForce) { + throw new RuntimeException("mock create failure"); + } + }; + + try { + RuntimeException exception = Assert.assertThrows(RuntimeException.class, () -> + cacheHotspotManager.createJob(newTableStmt("dst", false, Triple.of("db1", "tbl1", "")))); + + Assert.assertEquals("mock create failure", exception.getMessage()); + Assert.assertEquals(0, getOncePendingCreateLockCount()); + Assert.assertEquals(0, cacheHotspotManager.getCloudWarmUpJobs().size()); + } finally { + FeConstants.runningUnitTest = previousRunningUnitTest; + } + } + + @Test + public void testConcurrentCreateClusterOnceJobReleasesRefCountedLockAfterWaiterCompletes() throws Exception { + CountDownLatch firstCreateEntered = new CountDownLatch(1); + CountDownLatch allowFirstCreateToContinue = new CountDownLatch(1); + AtomicInteger getNextIdCalls = new AtomicInteger(); + Mockito.when(env.getNextId()).thenAnswer(invocation -> { + if (getNextIdCalls.incrementAndGet() == 1) { + firstCreateEntered.countDown(); + Assert.assertTrue(allowFirstCreateToContinue.await(5, TimeUnit.SECONDS)); + } + return nextJobId.getAndIncrement(); + }); + + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + Future firstCreate = executor.submit(() -> cacheHotspotManager.createJob( + newClusterStmt("dst", "src", false))); + Assert.assertTrue(firstCreateEntered.await(5, TimeUnit.SECONDS)); + + Future secondCreate = executor.submit(() -> cacheHotspotManager.createJob( + newClusterStmt("dst", "src", false))); + waitForOncePendingCreateLockRefCount(2, 5000L); + + allowFirstCreateToContinue.countDown(); + + long firstJobId = firstCreate.get(5, TimeUnit.SECONDS); + long secondJobId = secondCreate.get(5, TimeUnit.SECONDS); + Assert.assertEquals(firstJobId, secondJobId); + Assert.assertEquals(1, getNextIdCalls.get()); + Assert.assertEquals(1, cacheHotspotManager.getCloudWarmUpJobs().size()); + Assert.assertEquals(0, getOncePendingCreateLockCount()); + } finally { + allowFirstCreateToContinue.countDown(); + executor.shutdownNow(); + } + } + @Test public void testCreatePeriodicJobUnaffected() throws AnalysisException { WarmUpClusterCommand periodicStmt = newClusterStmt("dst", "src", false, periodicProperties(60)); @@ -320,4 +388,35 @@ private CloudWarmUpJob newClusterJob(long jobId, String srcClusterName, String d job.setCreateTimeMs(createTimeMs); return job; } + + private int getOncePendingCreateLockCount() throws Exception { + return getOncePendingCreateLocks().size(); + } + + private int getOnlyOncePendingCreateLockRefCount() throws Exception { + Map locks = getOncePendingCreateLocks(); + Assert.assertEquals(1, locks.size()); + Object lockEntry = locks.values().iterator().next(); + Field refCountField = lockEntry.getClass().getDeclaredField("refCount"); + refCountField.setAccessible(true); + return refCountField.getInt(lockEntry); + } + + private Map getOncePendingCreateLocks() throws Exception { + Field locksField = CacheHotspotManager.class.getDeclaredField("oncePendingCreateLocks"); + locksField.setAccessible(true); + return (Map) locksField.get(cacheHotspotManager); + } + + private void waitForOncePendingCreateLockRefCount(int expectedRefCount, long timeoutMs) throws Exception { + long deadlineMs = System.currentTimeMillis() + timeoutMs; + while (System.currentTimeMillis() < deadlineMs) { + if (getOncePendingCreateLockCount() == 1 + && getOnlyOncePendingCreateLockRefCount() == expectedRefCount) { + return; + } + Thread.sleep(10L); + } + Assert.fail("Timed out waiting for once pending create lock ref count " + expectedRefCount); + } } From d2b8bbbedd9d7cadd5f29ab390a0fcc299ce2807 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Wed, 27 May 2026 02:28:00 +0800 Subject: [PATCH 3/3] [fix](fe) Fix FE build and unit tests after rebase ### What problem does this PR solve? Issue Number: None Related PR: #8320 Problem Summary: After rebasing onto latest master, FE build failed because Immutables generated FlightAuthResult code references SuppressFBWarnings but fe-core did not include the findbugs annotations artifact. Full FE UT also failed because the COS test expected constructor-time region validation although the typed S3-compatible properties now support endpoint-only configs, the authentication handler test expected an OIDC plugin that is not present in the current authentication plugin modules, the warm up job lock concurrency test relied on a main-thread static Env mock from worker threads, and java-common readable vector metadata omitted the const-column flag that its reader and meta size accounting expect. This change adds the annotation dependency for generated Immutables code, updates the COS missing-region assertion to the native presigned URL path, aligns authentication plugin manager tests with the password and LDAP plugins available on the test classpath, registers the warm up lock concurrency test Env mock inside each worker thread, and writes the const flag into vector metadata to keep writable and readable metadata layouts consistent. ### Release note None ### Check List (For Author) - Test: - ./build.sh --fe - ./run-fe-ut.sh - mvn test -pl fe-filesystem/fe-filesystem-cos -am -Dcheckstyle.skip=true -DfailIfNoTests=false -Dmaven.build.cache.enabled=false -Dtest=CosObjStorageTest - mvn test -pl fe-authentication/fe-authentication-handler -am -Dcheckstyle.skip=true -DfailIfNoTests=false -Dmaven.build.cache.enabled=false -Dtest=AuthenticationPluginManagerTest - mvn test -pl fe-core -am -Dcheckstyle.skip=true -DfailIfNoTests=false -Dmaven.build.cache.enabled=false -Dtest=org.apache.doris.cloud.cache.CacheHotspotManagerTest#testConcurrentCreateClusterOnceJobReleasesRefCountedLockAfterWaiterCompletes - mvn test -pl be-java-extensions/java-common -am -Dcheckstyle.skip=true -DfailIfNoTests=false -Dmaven.build.cache.enabled=false -Dtest=JniScannerTest - Behavior changed: No - Does this need documentation: No --- .../org/apache/doris/common/jni/vec/VectorColumn.java | 1 + .../fe-authentication-handler/pom.xml | 6 ++++++ .../handler/AuthenticationPluginManagerTest.java | 8 ++++---- fe/fe-core/pom.xml | 4 ++++ .../doris/cloud/cache/CacheHotspotManagerTest.java | 11 +++++++++-- .../doris/filesystem/cos/CosObjStorageTest.java | 10 +++++----- fe/pom.xml | 7 +++++++ 7 files changed, 36 insertions(+), 11 deletions(-) diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java index 36d2329da89976..949b19d1443b99 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java @@ -1605,6 +1605,7 @@ public byte[] getBytesVarbinary(int rowId) { } public void updateMeta(VectorColumn meta) { + meta.appendLong(isConst ? 1 : 0); if (columnType.isUnsupported()) { meta.appendLong(0); } else if (columnType.isStringType()) { diff --git a/fe/fe-authentication/fe-authentication-handler/pom.xml b/fe/fe-authentication/fe-authentication-handler/pom.xml index e13be7bca8fb08..540dbd47764073 100644 --- a/fe/fe-authentication/fe-authentication-handler/pom.xml +++ b/fe/fe-authentication/fe-authentication-handler/pom.xml @@ -72,5 +72,11 @@ under the License. ${project.version} test + + org.apache.doris + fe-authentication-plugin-ldap + ${project.version} + test + diff --git a/fe/fe-authentication/fe-authentication-handler/src/test/java/org/apache/doris/authentication/handler/AuthenticationPluginManagerTest.java b/fe/fe-authentication/fe-authentication-handler/src/test/java/org/apache/doris/authentication/handler/AuthenticationPluginManagerTest.java index 6cde32bb90f0f1..57b8d3985a9311 100644 --- a/fe/fe-authentication/fe-authentication-handler/src/test/java/org/apache/doris/authentication/handler/AuthenticationPluginManagerTest.java +++ b/fe/fe-authentication/fe-authentication-handler/src/test/java/org/apache/doris/authentication/handler/AuthenticationPluginManagerTest.java @@ -70,7 +70,7 @@ void testPluginsAutoLoaded() { // Then Assertions.assertNotNull(pluginNames); Assertions.assertFalse(pluginNames.isEmpty(), "Should load at least built-in plugins"); - Assertions.assertTrue(pluginNames.contains("oidc"), "Should include oidc plugin"); + Assertions.assertTrue(pluginNames.contains("ldap"), "Should include ldap plugin"); Assertions.assertTrue(pluginNames.contains("password"), "Should include password plugin"); } @@ -148,9 +148,9 @@ void testGetFactory() { Assertions.assertTrue(factory.isPresent()); Assertions.assertEquals("password", factory.get().name()); - Optional oidcFactory = pluginManager.getFactory("oidc"); - Assertions.assertTrue(oidcFactory.isPresent()); - Assertions.assertEquals("oidc", oidcFactory.get().name()); + Optional ldapFactory = pluginManager.getFactory("ldap"); + Assertions.assertTrue(ldapFactory.isPresent()); + Assertions.assertEquals("ldap", ldapFactory.get().name()); } @Test diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 6e6ec0969b54c6..c8bf307d90db84 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -731,6 +731,10 @@ under the License. org.immutables value + + com.google.code.findbugs + annotations + io.trino trino-main diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java index dcf27f7604e236..017f91e594676a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java @@ -291,11 +291,11 @@ public void testConcurrentCreateClusterOnceJobReleasesRefCountedLockAfterWaiterC ExecutorService executor = Executors.newFixedThreadPool(2); try { - Future firstCreate = executor.submit(() -> cacheHotspotManager.createJob( + Future firstCreate = executor.submit(() -> createJobWithThreadLocalEnv( newClusterStmt("dst", "src", false))); Assert.assertTrue(firstCreateEntered.await(5, TimeUnit.SECONDS)); - Future secondCreate = executor.submit(() -> cacheHotspotManager.createJob( + Future secondCreate = executor.submit(() -> createJobWithThreadLocalEnv( newClusterStmt("dst", "src", false))); waitForOncePendingCreateLockRefCount(2, 5000L); @@ -393,6 +393,13 @@ private int getOncePendingCreateLockCount() throws Exception { return getOncePendingCreateLocks().size(); } + private long createJobWithThreadLocalEnv(WarmUpClusterCommand command) throws AnalysisException { + try (MockedStatic threadLocalEnvMock = Mockito.mockStatic(Env.class)) { + threadLocalEnvMock.when(Env::getCurrentEnv).thenReturn(env); + return cacheHotspotManager.createJob(command); + } + } + private int getOnlyOncePendingCreateLockRefCount() throws Exception { Map locks = getOncePendingCreateLocks(); Assert.assertEquals(1, locks.size()); diff --git a/fe/fe-filesystem/fe-filesystem-cos/src/test/java/org/apache/doris/filesystem/cos/CosObjStorageTest.java b/fe/fe-filesystem/fe-filesystem-cos/src/test/java/org/apache/doris/filesystem/cos/CosObjStorageTest.java index 33d5f09396005f..12b85f64894af7 100644 --- a/fe/fe-filesystem/fe-filesystem-cos/src/test/java/org/apache/doris/filesystem/cos/CosObjStorageTest.java +++ b/fe/fe-filesystem/fe-filesystem-cos/src/test/java/org/apache/doris/filesystem/cos/CosObjStorageTest.java @@ -140,7 +140,7 @@ void getPresignedUrl_missingBucketThrowsIOException() { } @Test - void constructor_missingRegionFailsTypedValidation() { + void getPresignedUrl_missingRegionThrowsIOException() { COSClient mockCos = Mockito.mock(COSClient.class); Map props = new HashMap<>(); props.put("COS_ENDPOINT", "https://cos.myqcloud.com"); @@ -149,11 +149,11 @@ void constructor_missingRegionFailsTypedValidation() { props.put("COS_BUCKET", "my-bucket-1234"); // no region - IllegalArgumentException exception = Assertions.assertThrows( - IllegalArgumentException.class, () -> new TestableCosObjStorage(props, mockCos)); + CosObjStorage storage = new TestableCosObjStorage(props, mockCos); + + IOException exception = Assertions.assertThrows(IOException.class, () -> storage.getPresignedUrl("some/key")); - Assertions.assertTrue(exception.getMessage().contains("Invalid S3 filesystem properties")); - Assertions.assertTrue(exception.getMessage().contains("Region is not set")); + Assertions.assertTrue(exception.getMessage().contains("COS region for presigned URL is required")); } @Test diff --git a/fe/pom.xml b/fe/pom.xml index af8947325cccee..b02bc2e7cfa817 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -384,6 +384,7 @@ under the License. 12.0.34 11.0.26 2.9.3 + 3.0.1 2.5.2 78.1 0.5.4 @@ -1766,6 +1767,12 @@ under the License. ${immutables.version} provided + + com.google.code.findbugs + annotations + ${findbugs-annotations.version} + provided + io.trino trino-main