Skip to content

Commit

Permalink
Revert "[code healthy] remove deprecated RecoverableFailover."
Browse files Browse the repository at this point in the history
This reverts commit 459d3a2.
  • Loading branch information
w.vela committed Jan 25, 2018
1 parent 459d3a2 commit 956d9a3
Show file tree
Hide file tree
Showing 3 changed files with 322 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.github.phantomthief.failover.impl;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import javax.annotation.CheckReturnValue;

/**
* @author w.vela
*/
@Deprecated
public class GenericRecoverableCheckFailoverBuilder<E> {

private final RecoverableCheckFailoverBuilder<Object> builder;

GenericRecoverableCheckFailoverBuilder(RecoverableCheckFailoverBuilder<Object> builder) {
this.builder = builder;
}

@CheckReturnValue
public RecoverableCheckFailoverBuilder<Object> setFailCount(int failCount) {
return builder.setFailCount(failCount);
}

@CheckReturnValue
public GenericRecoverableCheckFailoverBuilder<E> setChecker(Predicate<? super E> checker) {
builder.setChecker(checker);
return this;
}

@CheckReturnValue
public GenericRecoverableCheckFailoverBuilder<E>
setRecoveryCheckDuration(long recoveryCheckDuration, TimeUnit unit) {
builder.setRecoveryCheckDuration(recoveryCheckDuration, unit);
return this;
}

@CheckReturnValue
public GenericRecoverableCheckFailoverBuilder<E> setFailDuration(long failDuration,
TimeUnit unit) {
builder.setFailDuration(failDuration, unit);
return this;
}

@CheckReturnValue
public GenericRecoverableCheckFailoverBuilder<E>
setReturnOriginalWhileAllFailed(boolean returnOriginalWhileAllFailed) {
builder.setReturnOriginalWhileAllFailed(returnOriginalWhileAllFailed);
return this;
}

public RecoverableCheckFailover<E> build(List<? extends E> original) {
return builder.build(original);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package com.github.phantomthief.failover.impl;

import static com.github.phantomthief.failover.util.SharedCheckExecutorHolder.getInstance;
import static com.github.phantomthief.util.MoreSuppliers.lazy;
import static com.google.common.collect.EvictingQueue.create;
import static java.lang.System.currentTimeMillis;
import static java.util.Collections.emptySet;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toList;
import static org.slf4j.LoggerFactory.getLogger;

import java.io.Closeable;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Predicate;

import org.slf4j.Logger;

import com.github.phantomthief.failover.Failover;
import com.github.phantomthief.util.MoreSuppliers.CloseableSupplier;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.EvictingQueue;

/**
* 一个简易的failover/failback策略类
* failover条件是一段时间内出错次数超过一个阈值
* failback策略是定期检查可用
*
* @author w.vela
*/
@Deprecated
public class RecoverableCheckFailover<T> implements Failover<T>, Closeable {

private static Logger logger = getLogger(RecoverableCheckFailover.class);
private final List<T> original;
private final long failDuration;
private final Set<T> failedList = new CopyOnWriteArraySet<>();
private final LoadingCache<T, EvictingQueue<Long>> failCountMap;
private final boolean returnOriginalWhileAllFailed;
private final CloseableSupplier<ScheduledFuture<?>> recoveryFuture;

private volatile boolean closed;

RecoverableCheckFailover(List<T> original, Predicate<T> checker, int failCount,
long failDuration, long recoveryCheckDuration, boolean returnOriginalWhileAllFailed) {
this.returnOriginalWhileAllFailed = returnOriginalWhileAllFailed;
this.original = original;
this.failDuration = failDuration;
this.failCountMap = CacheBuilder.newBuilder().weakKeys()
.build(new CacheLoader<T, EvictingQueue<Long>>() {

@Override
public EvictingQueue<Long> load(T key) throws Exception {
return create(failCount);
}
});
recoveryFuture = lazy(() -> getInstance().scheduleWithFixedDelay(() -> {
if (closed) {
tryCloseScheduler();
return;
}
if (failedList == null || failedList.isEmpty()) {
return;
}
try {
// 考虑到COWArraySet不支持iterator.remove,所以这里使用搜集->统一清理的策略
List<T> covered = failedList.stream() //
.filter(checker) //
.peek(obj -> logger.info("obj:{} is recovered during test.", obj)) //
.collect(toList());
failedList.removeAll(covered);
} catch (Throwable e) {
logger.error("Ops.", e);
}
}, recoveryCheckDuration, recoveryCheckDuration, MILLISECONDS));
}

public static RecoverableCheckFailoverBuilder<Object> newBuilder() {
return new RecoverableCheckFailoverBuilder<>();
}

public static <E> GenericRecoverableCheckFailoverBuilder<E> newGenericBuilder() {
return new GenericRecoverableCheckFailoverBuilder<>(newBuilder());
}

@Override
public void fail(T object) {
if (!getAll().contains(object)) {
logger.warn("invalid fail obj:{}, it's not in original list.", object);
return;
}
logger.warn("server {} failed.", object);
boolean addToFail = false;
EvictingQueue<Long> evictingQueue = failCountMap.getUnchecked(object);
synchronized (evictingQueue) {
evictingQueue.add(currentTimeMillis());
if (evictingQueue.remainingCapacity() == 0
&& evictingQueue.element() >= currentTimeMillis() - failDuration) {
addToFail = true;
}
}
if (addToFail) {
failedList.add(object);
}
recoveryFuture.get();
}

@Override
public void down(T object) {
if (!getAll().contains(object)) {
logger.warn("invalid fail obj:{}, it's not in original list.", object);
return;
}
logger.warn("server {} down.", object);
failedList.add(object);
recoveryFuture.get();
}

@Override
public List<T> getAvailable() {
return getAvailableExclude(emptySet());
}

@Override
public List<T> getAvailableExclude(Collection<T> exclusions) {
List<T> availables = original.stream()
.filter(obj -> !getFailed().contains(obj))
.filter(obj -> !exclusions.contains(obj))
.collect(toList());
if (availables.isEmpty() && returnOriginalWhileAllFailed) {
return original;
} else {
return availables;
}
}

@Override
public Set<T> getFailed() {
return failedList;
}

@Override
public List<T> getAll() {
return original;
}

public synchronized void close() {
closed = true;
tryCloseScheduler();
}

private void tryCloseScheduler() {
recoveryFuture.ifPresent(future -> {
if (!future.isCancelled()) {
if (!future.cancel(true)) {
logger.warn("fail to close failover:{}", this);
}
}
});
}

@Override
public String toString() {
return "RecoverableCheckFailover [" + original + "]";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.github.phantomthief.failover.impl;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import javax.annotation.CheckReturnValue;

import org.slf4j.Logger;

@Deprecated
public final class RecoverableCheckFailoverBuilder<T> {

private static final int DEFAULT_FAIL_COUNT = 10;
private static final long DEFAULT_FAIL_DURATION = MINUTES.toMillis(1);
private static final long DEFAULT_RECOVERY_CHECK_DURATION = SECONDS.toMillis(5);
private static final Logger logger = getLogger(RecoverableCheckFailover.class);
private int failCount;
private long failDuration;
private long recoveryCheckDuration;
private boolean returnOriginalWhileAllFailed;
private Predicate<T> checker;

@CheckReturnValue
public RecoverableCheckFailoverBuilder<T> setFailCount(int failCount) {
this.failCount = failCount;
return this;
}

@SuppressWarnings("unchecked")
@CheckReturnValue
public <E> RecoverableCheckFailoverBuilder<E> setChecker(Predicate<? super E> checker) {
RecoverableCheckFailoverBuilder<E> thisBuilder = (RecoverableCheckFailoverBuilder<E>) this;
thisBuilder.checker = thisBuilder.catching((Predicate<E>) checker);
return thisBuilder;
}

@CheckReturnValue
public RecoverableCheckFailoverBuilder<T> setRecoveryCheckDuration(long recoveryCheckDuration,
TimeUnit unit) {
this.recoveryCheckDuration = unit.toMillis(recoveryCheckDuration);
return this;
}

@CheckReturnValue
public RecoverableCheckFailoverBuilder<T> setFailDuration(long failDuration, TimeUnit unit) {
this.failDuration = unit.toMillis(failDuration);
return this;
}

@CheckReturnValue
public RecoverableCheckFailoverBuilder<T>
setReturnOriginalWhileAllFailed(boolean returnOriginalWhileAllFailed) {
this.returnOriginalWhileAllFailed = returnOriginalWhileAllFailed;
return this;
}

@SuppressWarnings("unchecked")
public <E> RecoverableCheckFailover<E> build(List<? extends E> original) {
RecoverableCheckFailoverBuilder<E> thisBuilder = (RecoverableCheckFailoverBuilder<E>) this;
thisBuilder.ensure();
return new RecoverableCheckFailover<>((List<E>) original, thisBuilder.checker, failCount,
failDuration, recoveryCheckDuration, returnOriginalWhileAllFailed);
}

private void ensure() {
checkNotNull(checker);

if (failCount <= 0) {
failCount = DEFAULT_FAIL_COUNT;
}
if (failDuration <= 0) {
failDuration = DEFAULT_FAIL_DURATION;
}
if (recoveryCheckDuration <= 0) {
recoveryCheckDuration = DEFAULT_RECOVERY_CHECK_DURATION;
}
}

private Predicate<T> catching(Predicate<T> predicate) {
return t -> {
try {
return predicate.test(t);
} catch (Throwable e) {
logger.error("Ops. fail to test:{}", t, e);
return false;
}
};
}
}

0 comments on commit 956d9a3

Please sign in to comment.