Skip to content

Commit

Permalink
Back pressure (#15)
Browse files Browse the repository at this point in the history
* [feature] add back-pressure to simple buffer trigger.
  • Loading branch information
PhantomThief committed Jul 30, 2019
1 parent c9656ff commit ba14a08
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<properties>
<slf4j-api.version>1.7.21</slf4j-api.version>
<guava.version>19.0</guava.version>
<guava.version>20.0</guava.version>
<more-lambdas.version>0.1.8</more-lambdas.version>

<junit.version>5.5.0</junit.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.github.phantomthief.collection.impl;

import java.util.concurrent.locks.Condition;

import javax.annotation.Nullable;

/**
* @author w.vela
* Created on 2019-07-30.
*/
class BackPressureHandler<T> implements RejectHandler<T> {

@Override
public boolean onReject(T element, @Nullable Condition condition) throws Throwable {
assert condition != null;
condition.awaitUninterruptibly();
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,24 @@ public GenericSimpleBufferTriggerBuilder<E, C> maxBufferCount(long count,
}

@CheckReturnValue
public GenericSimpleBufferTriggerBuilder<E, C>
rejectHandler(Consumer<? super E> rejectHandler) {
public GenericSimpleBufferTriggerBuilder<E, C> rejectHandler(Consumer<? super E> rejectHandler) {
builder.rejectHandler(rejectHandler);
return this;
}

/**
* 开启背压(back-pressure)能力
* 注意,当开启背压时,需要配合 {@link #maxBufferCount(long)}
* 并且不要设置 {@link #rejectHandler}
*
* 当buffer达到最大值时,会阻塞入队线程,直到消费完当前buffer后再继续执行
*/
@CheckReturnValue
public GenericSimpleBufferTriggerBuilder<E, C> enableBackPressure() {
builder.enableBackPressure();
return this;
}

/**
* use for debug and stats, like trigger thread's name.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.github.phantomthief.collection.impl;

import java.util.concurrent.locks.Condition;

import javax.annotation.Nullable;

/**
* @author w.vela
* Created on 2019-07-30.
*/
interface RejectHandler<T> {

/**
* execute on caller thread
* @return {@code true} 继续执行, {@code false} 阻止执行
*/
boolean onReject(T element, @Nullable Condition condition) throws Throwable;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.phantomthief.collection.impl;

import static com.google.common.base.Throwables.throwIfUnchecked;
import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand All @@ -13,9 +14,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.ToIntBiFunction;

Expand All @@ -40,9 +41,10 @@ public class SimpleBufferTrigger<E, C> implements BufferTrigger<E> {
private final BiConsumer<Throwable, C> exceptionHandler;
private final AtomicReference<C> buffer = new AtomicReference<>();
private final long maxBufferCount;
private final Consumer<E> rejectHandler;
private final RejectHandler<E> rejectHandler;
private final ReadLock readLock;
private final WriteLock writeLock;
private final Condition writeCondition;

private volatile long lastConsumeTimestamp = currentTimeMillis();

Expand All @@ -58,9 +60,11 @@ public class SimpleBufferTrigger<E, C> implements BufferTrigger<E> {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
writeCondition = writeLock.newCondition();
} else {
readLock = null;
writeLock = null;
writeCondition = null;
}
builder.scheduledExecutorService.schedule(
new TriggerRunnable(builder.scheduledExecutorService, builder.triggerStrategy),
Expand Down Expand Up @@ -96,10 +100,10 @@ public static SimpleBufferTriggerBuilder<Object, Map<Object, Integer>> newCounte
public void enqueue(E element) {
long currentCount = counter.get();
if (maxBufferCount > 0 && currentCount >= maxBufferCount) {
if (rejectHandler != null) {
rejectHandler.accept(element);
boolean pass = fireRejectHandler(element);
if (!pass) {
return;
}
return;
}
boolean locked = false;
if (readLock != null) {
Expand All @@ -123,6 +127,26 @@ public void enqueue(E element) {
}
}

private boolean fireRejectHandler(E element) {
boolean pass = false;
if (rejectHandler != null) {
if (writeLock != null && writeCondition != null) {
writeLock.lock();
}
try {
pass = rejectHandler.onReject(element, writeCondition);
} catch (Throwable e) {
throwIfUnchecked(e);
throw new RuntimeException(e);
} finally {
if (writeLock != null && writeCondition != null) {
writeLock.unlock();
}
}
}
return pass;
}

@Override
public void manuallyDoTrigger() {
synchronized (SimpleBufferTrigger.this) {
Expand All @@ -140,6 +164,9 @@ private void doConsume() {
old = buffer.getAndSet(bufferFactory.get());
} finally {
counter.set(0);
if (writeCondition != null) {
writeCondition.signalAll();
}
if (writeLock != null) {
writeLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class SimpleBufferTriggerBuilder<E, C> {
ThrowableConsumer<C, Throwable> consumer;
BiConsumer<Throwable, C> exceptionHandler;
long maxBufferCount = -1;
Consumer<E> rejectHandler;
RejectHandler<E> rejectHandler;
String name;
boolean disableSwitchLock;

Expand Down Expand Up @@ -159,11 +159,40 @@ public <E1, C1> SimpleBufferTriggerBuilder<E1, C1> maxBufferCount(long count,
/**
* it's better dealing this in container
*/
public <E1, C1> SimpleBufferTriggerBuilder<E1, C1>
rejectHandler(Consumer<? super E1> rejectHandler) {
public <E1, C1> SimpleBufferTriggerBuilder<E1, C1> rejectHandler(Consumer<? super E1> rejectHandler) {
checkNotNull(rejectHandler);
return this.<E1, C1> rejectHandlerEx((e, h) -> {
rejectHandler.accept(e);
return false;
});
}

/**
* 开启背压(back-pressure)能力
* 注意,当开启背压时,需要配合 {@link #maxBufferCount(long)}
* 并且不要设置 {@link #rejectHandler}
*
* 当buffer达到最大值时,会阻塞入队线程,直到消费完当前buffer后再继续执行
*/
public <E1, C1> SimpleBufferTriggerBuilder<E1, C1> enableBackPressure() {
if (this.rejectHandler != null) {
throw new IllegalStateException("cannot enable back-pressure while reject handler was set.");
}
SimpleBufferTriggerBuilder<E1, C1> thisBuilder = (SimpleBufferTriggerBuilder<E1, C1>) this;
thisBuilder.rejectHandler = (Consumer<E1>) rejectHandler;
thisBuilder.rejectHandler = new BackPressureHandler<E1>();
return thisBuilder;
}

/**
* it's better dealing this in container
*/
private <E1, C1> SimpleBufferTriggerBuilder<E1, C1> rejectHandlerEx(RejectHandler<? super E1> rejectHandler) {
checkNotNull(rejectHandler);
if (this.rejectHandler instanceof BackPressureHandler) {
throw new IllegalStateException("cannot set reject handler while enable back-pressure.");
}
SimpleBufferTriggerBuilder<E1, C1> thisBuilder = (SimpleBufferTriggerBuilder<E1, C1>) this;
thisBuilder.rejectHandler = (RejectHandler<E1>) rejectHandler;
return thisBuilder;
}

Expand All @@ -176,16 +205,28 @@ public SimpleBufferTriggerBuilder<E, C> name(String name) {
}

public <E1> BufferTrigger<E1> build() {
check();
return new LazyBufferTrigger<>(() -> {
ensure();
SimpleBufferTriggerBuilder<E1, C> builder = (SimpleBufferTriggerBuilder<E1, C>) SimpleBufferTriggerBuilder.this;
SimpleBufferTriggerBuilder<E1, C> builder =
(SimpleBufferTriggerBuilder<E1, C>) SimpleBufferTriggerBuilder.this;
return new SimpleBufferTrigger<>(builder);
});
}

private void ensure() {
private void check() {
checkNotNull(consumer);
if (rejectHandler instanceof BackPressureHandler) {
if (disableSwitchLock) {
throw new IllegalStateException("back-pressure cannot work together with switch lock disabled.");
}
if (maxBufferCount <= 0) {
throw new IllegalStateException("back-pressure need to set maxBufferCount.");
}
}
}

private void ensure() {
if (triggerStrategy == null) {
logger.warn("no trigger strategy found. using NO-OP trigger");
triggerStrategy = (t, n) -> empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.github.phantomthief.collection.impl;

import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.util.Collections.synchronizedList;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.phantomthief.collection.BufferTrigger;

/**
* @author w.vela
* Created on 2019-07-30.
*/
class BackPressureTest {

private static final Logger logger = LoggerFactory.getLogger(BackPressureTest.class);

@Test
void test() {
List<String> consumed = new ArrayList<>();
BufferTrigger<String> buffer = BufferTrigger.<String, List<String>> simple()
.enableBackPressure()
.maxBufferCount(10)
.interval(1, SECONDS)
.setContainer(() -> synchronizedList(new ArrayList<>()), List::add)
.consumer(it -> {
logger.info("do consuming...{}", it);
sleepUninterruptibly(1, SECONDS);
consumed.addAll(it);
logger.info("consumer done.{}", it);
})
.build();
long cost = System.currentTimeMillis();
ExecutorService executor = newFixedThreadPool(10);
for (int i = 0; i < 30; i++) {
int j = i;
executor.execute(() -> {
buffer.enqueue("" + j);
logger.info("enqueued:{}", j);
});
}
shutdownAndAwaitTermination(executor, 1, DAYS);
buffer.manuallyDoTrigger();
assertEquals(30, consumed.size());
cost = System.currentTimeMillis() - cost;
assertTrue(cost >= SECONDS.toMillis(3));
}

@Test
void testNoBlock() {
List<String> consumed = new ArrayList<>();
BufferTrigger<String> buffer = BufferTrigger.<String, List<String>> simple()
.maxBufferCount(10)
.interval(1, SECONDS)
.setContainer(() -> synchronizedList(new ArrayList<>()), List::add)
.consumer(it -> {
logger.info("do consuming...{}", it);
sleepUninterruptibly(1, SECONDS);
consumed.addAll(it);
logger.info("consumer done.{}", it);
})
.build();
long cost = System.currentTimeMillis();
ExecutorService executor = newFixedThreadPool(10);
for (int i = 0; i < 30; i++) {
int j = i;
executor.execute(() -> {
buffer.enqueue("" + j);
logger.info("enqueued:{}", j);
});
}
shutdownAndAwaitTermination(executor, 1, DAYS);
buffer.manuallyDoTrigger();
cost = System.currentTimeMillis() - cost;
assertTrue(cost <= 1200);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.github.phantomthief.collection.impl;

import static org.junit.jupiter.api.Assertions.assertThrows;

import org.junit.jupiter.api.Test;

import com.github.phantomthief.collection.BufferTrigger;

/**
* @author w.vela
* Created on 2019-07-30.
*/
class BuilderTest {

@Test
void testBuilder() {
assertThrows(NullPointerException.class, () ->
BufferTrigger.simple()
.build());
assertThrows(IllegalStateException.class, () ->
BufferTrigger.simple()
.consumer(it -> {})
.enableBackPressure()
.disableSwitchLock()
.build());
assertThrows(IllegalStateException.class, () ->
BufferTrigger.simple()
.consumer(it -> {})
.enableBackPressure()
.rejectHandler(it -> {})
.build());
assertThrows(IllegalStateException.class, () ->
BufferTrigger.simple()
.consumer(it -> {})
.rejectHandler(it -> {})
.enableBackPressure()
.build());
assertThrows(IllegalStateException.class, () ->
BufferTrigger.simple()
.consumer(it -> {})
.enableBackPressure()
.build());
}
}

0 comments on commit ba14a08

Please sign in to comment.