diff --git a/src/main/java/com/github/phantomthief/failover/impl/GcUtil.java b/src/main/java/com/github/phantomthief/failover/impl/GcUtil.java index 70e95de..bc4dc15 100644 --- a/src/main/java/com/github/phantomthief/failover/impl/GcUtil.java +++ b/src/main/java/com/github/phantomthief/failover/impl/GcUtil.java @@ -5,6 +5,11 @@ import java.lang.ref.ReferenceQueue; import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + /** * @author huangli @@ -12,9 +17,16 @@ */ final class GcUtil { - private static ConcurrentHashMap, Runnable> refMap = new ConcurrentHashMap<>(); + private static final Logger logger = LoggerFactory.getLogger(GcUtil.class); + + private static final ConcurrentHashMap, Runnable> refMap = new ConcurrentHashMap<>(); private static final ReferenceQueue REF_QUEUE = new ReferenceQueue<>(); + @VisibleForTesting + static ConcurrentHashMap, Runnable> getRefMap() { + return refMap; + } + public static void register(Object resource, Runnable cleaner) { if (resource != null && cleaner != null) { PhantomReference ref = new PhantomReference<>(resource, REF_QUEUE); @@ -26,7 +38,11 @@ public static void doClean() { Reference ref = REF_QUEUE.poll(); while (ref != null) { Runnable cleaner = refMap.remove(ref); - cleaner.run(); + try { + cleaner.run(); + } catch (Throwable t) { + logger.warn("Failover GC doClean failed", t); + } ref = REF_QUEUE.poll(); } } diff --git a/src/main/java/com/github/phantomthief/failover/impl/PriorityFailoverCheckTask.java b/src/main/java/com/github/phantomthief/failover/impl/PriorityFailoverCheckTask.java index 40fcb7f..40d41f3 100644 --- a/src/main/java/com/github/phantomthief/failover/impl/PriorityFailoverCheckTask.java +++ b/src/main/java/com/github/phantomthief/failover/impl/PriorityFailoverCheckTask.java @@ -3,6 +3,8 @@ import java.util.HashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,12 +23,39 @@ class PriorityFailoverCheckTask implements Runnable { private final PriorityFailoverConfig config; - private volatile ScheduledFuture future; + private final AtomicReference> futureRef = new AtomicReference<>(); private final HashMap> resourcesMap; private final GroupInfo[] groups; - private volatile boolean closed; + private final AtomicBoolean closed = new AtomicBoolean(false); + + /** + * 有时在 ResInfo 持有的资源对象中,会持有 failover 实例,以便调用 failover 的 success/fail 等方法 + * 一旦在注册到 GcUtil 的 CloseRunnable 中持有了 PriorityFailoverCheckTask 的引用,将导致 failover + * 始终存在引用,无法被关闭和清理。所以这里单独构造一个 static class,避免隐式持有 this.resourcesMap + * 导致 failover 无法被 gc 掉 + */ + private static class CloseRunnable implements Runnable { + private final AtomicBoolean closed; + private final AtomicReference> futureRef; + + CloseRunnable(AtomicBoolean closed, + AtomicReference> futureRef) { + this.closed = closed; + this.futureRef = futureRef; + } + + @Override + public void run() { + // 这里代码和 close 一样,由于此时 failover 已经被 gc 掉了,所以不需要再加锁了 + closed.set(true); + ScheduledFuture scheduledFuture = futureRef.get(); + if (scheduledFuture != null && !scheduledFuture.isCancelled()) { + scheduledFuture.cancel(true); + } + } + } PriorityFailoverCheckTask(PriorityFailoverConfig config, PriorityFailover failover) { this.config = config; @@ -36,20 +65,18 @@ class PriorityFailoverCheckTask implements Runnable { if (config.isStartCheckTaskImmediately()) { ensureStart(); } - GcUtil.register(failover, this::close); + GcUtil.register(failover, new CloseRunnable(closed, futureRef)); GcUtil.doClean(); - } else { - future = null; } } public void ensureStart() { - if (future == null) { + if (futureRef.get() == null) { synchronized (this) { - if (future == null && config.getChecker() != null) { - future = config.getCheckExecutor().scheduleWithFixedDelay( + if (futureRef.get() == null && config.getChecker() != null) { + futureRef.set(config.getCheckExecutor().scheduleWithFixedDelay( this, config.getCheckDuration().toMillis(), - config.getCheckDuration().toMillis(), TimeUnit.MILLISECONDS); + config.getCheckDuration().toMillis(), TimeUnit.MILLISECONDS)); } } } @@ -67,13 +94,13 @@ public void run() { try { for (ResInfo r : resourcesMap.values()) { try { - if (closed) { + if (closed.get()) { return; } if (config.getWeightFunction().needCheck(r.maxWeight, r.minWeight, r.priority, r.currentWeight, r.resource)) { boolean ok = config.getChecker().test(r.resource); - if (closed) { + if (closed.get()) { return; } PriorityFailover.updateWeight(ok, r, config, groups); @@ -93,13 +120,14 @@ public void run() { } public synchronized void close() { - closed = true; - if (future != null && !future.isCancelled()) { - future.cancel(true); + closed.set(true); + ScheduledFuture scheduledFuture = futureRef.get(); + if (scheduledFuture != null && !scheduledFuture.isCancelled()) { + scheduledFuture.cancel(true); } } boolean isClosed() { - return closed; + return closed.get(); } } diff --git a/src/main/java/com/github/phantomthief/failover/impl/WeightFailoverCheckTask.java b/src/main/java/com/github/phantomthief/failover/impl/WeightFailoverCheckTask.java index 965cb28..d1e829a 100644 --- a/src/main/java/com/github/phantomthief/failover/impl/WeightFailoverCheckTask.java +++ b/src/main/java/com/github/phantomthief/failover/impl/WeightFailoverCheckTask.java @@ -31,7 +31,7 @@ class WeightFailoverCheckTask { private static final Logger logger = LoggerFactory.getLogger(WeightFailoverCheckTask.class); static final int CLEAN_INIT_DELAY_SECONDS = 5; - private static final int CLEAN_DELAY_SECONDS = 10; + static final int CLEAN_DELAY_SECONDS = 10; static { SharedCheckExecutorHolder.getInstance().scheduleWithFixedDelay(WeightFailoverCheckTask::doClean, diff --git a/src/test/java/com/github/phantomthief/failover/impl/PriorityFailoverTest.java b/src/test/java/com/github/phantomthief/failover/impl/PriorityFailoverTest.java index cc19046..3077ebe 100644 --- a/src/test/java/com/github/phantomthief/failover/impl/PriorityFailoverTest.java +++ b/src/test/java/com/github/phantomthief/failover/impl/PriorityFailoverTest.java @@ -5,18 +5,24 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import com.github.phantomthief.failover.impl.PriorityFailover.ResStatus; +import com.google.common.util.concurrent.Uninterruptibles; /** * @author huangli @@ -514,4 +520,49 @@ public void testGc() { assertTrue(task.isClosed()); } + private static class MockResource { + private final String resource; + private volatile PriorityFailover failover; + + private MockResource(String resource) { + this.resource = resource; + } + + void setFailover( + PriorityFailover failover) { + this.failover = failover; + } + } + + /** + * 通常都会在 resource 中持有 failover 引用,测试下这种场景下也能够被正常 gc + */ + @SuppressWarnings("UnusedAssignment") + @Test + void testGc2() { + int beforeSize = GcUtil.getRefMap().size(); + List resources = Stream.of("a", "b").map(MockResource::new).collect(Collectors.toList()); + + PriorityFailover failover = PriorityFailover. newBuilder() + .addResources(resources) + .checkDuration(Duration.ofMillis(1)) + .checker(o -> true) + .build(); + for (MockResource r : resources) { + r.setFailover(failover); + } + failover = null; + resources = null; + + int counter = 0; + while (GcUtil.getRefMap().size() > beforeSize && counter < 20) { + counter++; + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + System.gc(); + GcUtil.doClean(); + } + + int afterSize = GcUtil.getRefMap().size(); + Assertions.assertEquals(beforeSize, afterSize); + } } diff --git a/src/test/java/com/github/phantomthief/failover/impl/WeightFailoverCheckTaskTest.java b/src/test/java/com/github/phantomthief/failover/impl/WeightFailoverCheckTaskTest.java index 6117e8b..f52b805 100644 --- a/src/test/java/com/github/phantomthief/failover/impl/WeightFailoverCheckTaskTest.java +++ b/src/test/java/com/github/phantomthief/failover/impl/WeightFailoverCheckTaskTest.java @@ -31,8 +31,9 @@ public void test() throws Exception { for (int i = 0; i < 5000; i++) { byte[] bs = new byte[1 * 1024 * 1024]; } + System.gc(); - Thread.sleep(WeightFailoverCheckTask.CLEAN_INIT_DELAY_SECONDS * 1000); + Thread.sleep((1 + WeightFailoverCheckTask.CLEAN_DELAY_SECONDS) * 1000); Assertions.assertTrue(closed.get()); Assertions.assertTrue(recoveryFuture.get().isCancelled());