Skip to content

Commit

Permalink
fix(PriorityFailoverCheckTask): 修复 failover 实例无法被 gc 的问题 (#43)
Browse files Browse the repository at this point in the history
* fix(PriorityFailoverCheckTask): 添加验证 failover 实例无法被 gc 的测试用例

Co-authored-by: wangzhiqian
  • Loading branch information
ymwangzq committed Mar 1, 2021
1 parent a389d55 commit 2ecdae6
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 19 deletions.
20 changes: 18 additions & 2 deletions src/main/java/com/github/phantomthief/failover/impl/GcUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,28 @@
import java.lang.ref.ReferenceQueue;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;


/**
* @author huangli
* Created on 2019-12-30
*/
final class GcUtil {

private static ConcurrentHashMap<Reference<Object>, Runnable> refMap = new ConcurrentHashMap<>();
private static final Logger logger = LoggerFactory.getLogger(GcUtil.class);

private static final ConcurrentHashMap<Reference<Object>, Runnable> refMap = new ConcurrentHashMap<>();
private static final ReferenceQueue<Object> REF_QUEUE = new ReferenceQueue<>();

@VisibleForTesting
static ConcurrentHashMap<Reference<Object>, Runnable> getRefMap() {
return refMap;
}

public static void register(Object resource, Runnable cleaner) {
if (resource != null && cleaner != null) {
PhantomReference<Object> ref = new PhantomReference<>(resource, REF_QUEUE);
Expand All @@ -26,7 +38,11 @@ public static void doClean() {
Reference<?> ref = REF_QUEUE.poll();
while (ref != null) {
Runnable cleaner = refMap.remove(ref);
cleaner.run();
try {
cleaner.run();
} catch (Throwable t) {
logger.warn("Failover GC doClean failed", t);
}
ref = REF_QUEUE.poll();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.util.HashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -21,12 +23,39 @@ class PriorityFailoverCheckTask<T> implements Runnable {

private final PriorityFailoverConfig<T> config;

private volatile ScheduledFuture<?> future;
private final AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<>();

private final HashMap<T, ResInfo<T>> resourcesMap;
private final GroupInfo<T>[] groups;

private volatile boolean closed;
private final AtomicBoolean closed = new AtomicBoolean(false);

/**
* 有时在 ResInfo 持有的资源对象中,会持有 failover 实例,以便调用 failover 的 success/fail 等方法
* 一旦在注册到 GcUtil 的 CloseRunnable 中持有了 PriorityFailoverCheckTask 的引用,将导致 failover
* 始终存在引用,无法被关闭和清理。所以这里单独构造一个 static class,避免隐式持有 this.resourcesMap
* 导致 failover 无法被 gc 掉
*/
private static class CloseRunnable implements Runnable {
private final AtomicBoolean closed;
private final AtomicReference<ScheduledFuture<?>> futureRef;

CloseRunnable(AtomicBoolean closed,
AtomicReference<ScheduledFuture<?>> futureRef) {
this.closed = closed;
this.futureRef = futureRef;
}

@Override
public void run() {
// 这里代码和 close 一样,由于此时 failover 已经被 gc 掉了,所以不需要再加锁了
closed.set(true);
ScheduledFuture<?> scheduledFuture = futureRef.get();
if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
scheduledFuture.cancel(true);
}
}
}

PriorityFailoverCheckTask(PriorityFailoverConfig<T> config, PriorityFailover<T> failover) {
this.config = config;
Expand All @@ -36,20 +65,18 @@ class PriorityFailoverCheckTask<T> implements Runnable {
if (config.isStartCheckTaskImmediately()) {
ensureStart();
}
GcUtil.register(failover, this::close);
GcUtil.register(failover, new CloseRunnable(closed, futureRef));
GcUtil.doClean();
} else {
future = null;
}
}

public void ensureStart() {
if (future == null) {
if (futureRef.get() == null) {
synchronized (this) {
if (future == null && config.getChecker() != null) {
future = config.getCheckExecutor().scheduleWithFixedDelay(
if (futureRef.get() == null && config.getChecker() != null) {
futureRef.set(config.getCheckExecutor().scheduleWithFixedDelay(
this, config.getCheckDuration().toMillis(),
config.getCheckDuration().toMillis(), TimeUnit.MILLISECONDS);
config.getCheckDuration().toMillis(), TimeUnit.MILLISECONDS));
}
}
}
Expand All @@ -67,13 +94,13 @@ public void run() {
try {
for (ResInfo<T> r : resourcesMap.values()) {
try {
if (closed) {
if (closed.get()) {
return;
}
if (config.getWeightFunction().needCheck(r.maxWeight,
r.minWeight, r.priority, r.currentWeight, r.resource)) {
boolean ok = config.getChecker().test(r.resource);
if (closed) {
if (closed.get()) {
return;
}
PriorityFailover.updateWeight(ok, r, config, groups);
Expand All @@ -93,13 +120,14 @@ public void run() {
}

public synchronized void close() {
closed = true;
if (future != null && !future.isCancelled()) {
future.cancel(true);
closed.set(true);
ScheduledFuture<?> scheduledFuture = futureRef.get();
if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
scheduledFuture.cancel(true);
}
}

boolean isClosed() {
return closed;
return closed.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class WeightFailoverCheckTask<T> {
private static final Logger logger = LoggerFactory.getLogger(WeightFailoverCheckTask.class);

static final int CLEAN_INIT_DELAY_SECONDS = 5;
private static final int CLEAN_DELAY_SECONDS = 10;
static final int CLEAN_DELAY_SECONDS = 10;

static {
SharedCheckExecutorHolder.getInstance().scheduleWithFixedDelay(WeightFailoverCheckTask::doClean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,24 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.github.phantomthief.failover.impl.PriorityFailover.ResStatus;
import com.google.common.util.concurrent.Uninterruptibles;

/**
* @author huangli
Expand Down Expand Up @@ -514,4 +520,49 @@ public void testGc() {
assertTrue(task.isClosed());
}

private static class MockResource {
private final String resource;
private volatile PriorityFailover<MockResource> failover;

private MockResource(String resource) {
this.resource = resource;
}

void setFailover(
PriorityFailover<MockResource> failover) {
this.failover = failover;
}
}

/**
* 通常都会在 resource 中持有 failover 引用,测试下这种场景下也能够被正常 gc
*/
@SuppressWarnings("UnusedAssignment")
@Test
void testGc2() {
int beforeSize = GcUtil.getRefMap().size();
List<MockResource> resources = Stream.of("a", "b").map(MockResource::new).collect(Collectors.toList());

PriorityFailover<MockResource> failover = PriorityFailover.<MockResource> newBuilder()
.addResources(resources)
.checkDuration(Duration.ofMillis(1))
.checker(o -> true)
.build();
for (MockResource r : resources) {
r.setFailover(failover);
}
failover = null;
resources = null;

int counter = 0;
while (GcUtil.getRefMap().size() > beforeSize && counter < 20) {
counter++;
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
System.gc();
GcUtil.doClean();
}

int afterSize = GcUtil.getRefMap().size();
Assertions.assertEquals(beforeSize, afterSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ public void test() throws Exception {
for (int i = 0; i < 5000; i++) {
byte[] bs = new byte[1 * 1024 * 1024];
}
System.gc();

Thread.sleep(WeightFailoverCheckTask.CLEAN_INIT_DELAY_SECONDS * 1000);
Thread.sleep((1 + WeightFailoverCheckTask.CLEAN_DELAY_SECONDS) * 1000);

Assertions.assertTrue(closed.get());
Assertions.assertTrue(recoveryFuture.get().isCancelled());
Expand Down

0 comments on commit 2ecdae6

Please sign in to comment.