diff --git a/pom.xml b/pom.xml index 1fc793a..90131d3 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ 1.7.21 - 19.0 + 20.0 0.1.8 5.5.0 diff --git a/src/main/java/com/github/phantomthief/collection/impl/BackPressureHandler.java b/src/main/java/com/github/phantomthief/collection/impl/BackPressureHandler.java new file mode 100644 index 0000000..83bd817 --- /dev/null +++ b/src/main/java/com/github/phantomthief/collection/impl/BackPressureHandler.java @@ -0,0 +1,19 @@ +package com.github.phantomthief.collection.impl; + +import java.util.concurrent.locks.Condition; + +import javax.annotation.Nullable; + +/** + * @author w.vela + * Created on 2019-07-30. + */ +class BackPressureHandler implements RejectHandler { + + @Override + public boolean onReject(T element, @Nullable Condition condition) throws Throwable { + assert condition != null; + condition.awaitUninterruptibly(); + return true; + } +} diff --git a/src/main/java/com/github/phantomthief/collection/impl/GenericSimpleBufferTriggerBuilder.java b/src/main/java/com/github/phantomthief/collection/impl/GenericSimpleBufferTriggerBuilder.java index a42ade5..997624e 100644 --- a/src/main/java/com/github/phantomthief/collection/impl/GenericSimpleBufferTriggerBuilder.java +++ b/src/main/java/com/github/phantomthief/collection/impl/GenericSimpleBufferTriggerBuilder.java @@ -148,12 +148,24 @@ public GenericSimpleBufferTriggerBuilder maxBufferCount(long count, } @CheckReturnValue - public GenericSimpleBufferTriggerBuilder - rejectHandler(Consumer rejectHandler) { + public GenericSimpleBufferTriggerBuilder rejectHandler(Consumer rejectHandler) { builder.rejectHandler(rejectHandler); return this; } + /** + * 开启背压(back-pressure)能力 + * 注意,当开启背压时,需要配合 {@link #maxBufferCount(long)} + * 并且不要设置 {@link #rejectHandler} + * + * 当buffer达到最大值时,会阻塞入队线程,直到消费完当前buffer后再继续执行 + */ + @CheckReturnValue + public GenericSimpleBufferTriggerBuilder enableBackPressure() { + builder.enableBackPressure(); + return this; + } + /** * use for debug and stats, like trigger thread's name. */ diff --git a/src/main/java/com/github/phantomthief/collection/impl/RejectHandler.java b/src/main/java/com/github/phantomthief/collection/impl/RejectHandler.java new file mode 100644 index 0000000..3c0658f --- /dev/null +++ b/src/main/java/com/github/phantomthief/collection/impl/RejectHandler.java @@ -0,0 +1,18 @@ +package com.github.phantomthief.collection.impl; + +import java.util.concurrent.locks.Condition; + +import javax.annotation.Nullable; + +/** + * @author w.vela + * Created on 2019-07-30. + */ +interface RejectHandler { + + /** + * execute on caller thread + * @return {@code true} 继续执行, {@code false} 阻止执行 + */ + boolean onReject(T element, @Nullable Condition condition) throws Throwable; +} diff --git a/src/main/java/com/github/phantomthief/collection/impl/SimpleBufferTrigger.java b/src/main/java/com/github/phantomthief/collection/impl/SimpleBufferTrigger.java index b9bf663..c2b65e0 100644 --- a/src/main/java/com/github/phantomthief/collection/impl/SimpleBufferTrigger.java +++ b/src/main/java/com/github/phantomthief/collection/impl/SimpleBufferTrigger.java @@ -1,5 +1,6 @@ package com.github.phantomthief.collection.impl; +import static com.google.common.base.Throwables.throwIfUnchecked; import static java.lang.System.currentTimeMillis; import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -13,9 +14,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.function.Supplier; import java.util.function.ToIntBiFunction; @@ -40,9 +41,10 @@ public class SimpleBufferTrigger implements BufferTrigger { private final BiConsumer exceptionHandler; private final AtomicReference buffer = new AtomicReference<>(); private final long maxBufferCount; - private final Consumer rejectHandler; + private final RejectHandler rejectHandler; private final ReadLock readLock; private final WriteLock writeLock; + private final Condition writeCondition; private volatile long lastConsumeTimestamp = currentTimeMillis(); @@ -58,9 +60,11 @@ public class SimpleBufferTrigger implements BufferTrigger { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); + writeCondition = writeLock.newCondition(); } else { readLock = null; writeLock = null; + writeCondition = null; } builder.scheduledExecutorService.schedule( new TriggerRunnable(builder.scheduledExecutorService, builder.triggerStrategy), @@ -96,10 +100,10 @@ public static SimpleBufferTriggerBuilder> newCounte public void enqueue(E element) { long currentCount = counter.get(); if (maxBufferCount > 0 && currentCount >= maxBufferCount) { - if (rejectHandler != null) { - rejectHandler.accept(element); + boolean pass = fireRejectHandler(element); + if (!pass) { + return; } - return; } boolean locked = false; if (readLock != null) { @@ -123,6 +127,26 @@ public void enqueue(E element) { } } + private boolean fireRejectHandler(E element) { + boolean pass = false; + if (rejectHandler != null) { + if (writeLock != null && writeCondition != null) { + writeLock.lock(); + } + try { + pass = rejectHandler.onReject(element, writeCondition); + } catch (Throwable e) { + throwIfUnchecked(e); + throw new RuntimeException(e); + } finally { + if (writeLock != null && writeCondition != null) { + writeLock.unlock(); + } + } + } + return pass; + } + @Override public void manuallyDoTrigger() { synchronized (SimpleBufferTrigger.this) { @@ -140,6 +164,9 @@ private void doConsume() { old = buffer.getAndSet(bufferFactory.get()); } finally { counter.set(0); + if (writeCondition != null) { + writeCondition.signalAll(); + } if (writeLock != null) { writeLock.unlock(); } diff --git a/src/main/java/com/github/phantomthief/collection/impl/SimpleBufferTriggerBuilder.java b/src/main/java/com/github/phantomthief/collection/impl/SimpleBufferTriggerBuilder.java index f94fdcf..015b2f9 100644 --- a/src/main/java/com/github/phantomthief/collection/impl/SimpleBufferTriggerBuilder.java +++ b/src/main/java/com/github/phantomthief/collection/impl/SimpleBufferTriggerBuilder.java @@ -39,7 +39,7 @@ public class SimpleBufferTriggerBuilder { ThrowableConsumer consumer; BiConsumer exceptionHandler; long maxBufferCount = -1; - Consumer rejectHandler; + RejectHandler rejectHandler; String name; boolean disableSwitchLock; @@ -159,11 +159,40 @@ public SimpleBufferTriggerBuilder maxBufferCount(long count, /** * it's better dealing this in container */ - public SimpleBufferTriggerBuilder - rejectHandler(Consumer rejectHandler) { + public SimpleBufferTriggerBuilder rejectHandler(Consumer rejectHandler) { checkNotNull(rejectHandler); + return this. rejectHandlerEx((e, h) -> { + rejectHandler.accept(e); + return false; + }); + } + + /** + * 开启背压(back-pressure)能力 + * 注意,当开启背压时,需要配合 {@link #maxBufferCount(long)} + * 并且不要设置 {@link #rejectHandler} + * + * 当buffer达到最大值时,会阻塞入队线程,直到消费完当前buffer后再继续执行 + */ + public SimpleBufferTriggerBuilder enableBackPressure() { + if (this.rejectHandler != null) { + throw new IllegalStateException("cannot enable back-pressure while reject handler was set."); + } SimpleBufferTriggerBuilder thisBuilder = (SimpleBufferTriggerBuilder) this; - thisBuilder.rejectHandler = (Consumer) rejectHandler; + thisBuilder.rejectHandler = new BackPressureHandler(); + return thisBuilder; + } + + /** + * it's better dealing this in container + */ + private SimpleBufferTriggerBuilder rejectHandlerEx(RejectHandler rejectHandler) { + checkNotNull(rejectHandler); + if (this.rejectHandler instanceof BackPressureHandler) { + throw new IllegalStateException("cannot set reject handler while enable back-pressure."); + } + SimpleBufferTriggerBuilder thisBuilder = (SimpleBufferTriggerBuilder) this; + thisBuilder.rejectHandler = (RejectHandler) rejectHandler; return thisBuilder; } @@ -176,16 +205,28 @@ public SimpleBufferTriggerBuilder name(String name) { } public BufferTrigger build() { + check(); return new LazyBufferTrigger<>(() -> { ensure(); - SimpleBufferTriggerBuilder builder = (SimpleBufferTriggerBuilder) SimpleBufferTriggerBuilder.this; + SimpleBufferTriggerBuilder builder = + (SimpleBufferTriggerBuilder) SimpleBufferTriggerBuilder.this; return new SimpleBufferTrigger<>(builder); }); } - private void ensure() { + private void check() { checkNotNull(consumer); + if (rejectHandler instanceof BackPressureHandler) { + if (disableSwitchLock) { + throw new IllegalStateException("back-pressure cannot work together with switch lock disabled."); + } + if (maxBufferCount <= 0) { + throw new IllegalStateException("back-pressure need to set maxBufferCount."); + } + } + } + private void ensure() { if (triggerStrategy == null) { logger.warn("no trigger strategy found. using NO-OP trigger"); triggerStrategy = (t, n) -> empty(); diff --git a/src/test/java/com/github/phantomthief/collection/impl/BackPressureTest.java b/src/test/java/com/github/phantomthief/collection/impl/BackPressureTest.java new file mode 100644 index 0000000..373146c --- /dev/null +++ b/src/test/java/com/github/phantomthief/collection/impl/BackPressureTest.java @@ -0,0 +1,89 @@ +package com.github.phantomthief.collection.impl; + +import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.util.Collections.synchronizedList; +import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.TimeUnit.DAYS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.phantomthief.collection.BufferTrigger; + +/** + * @author w.vela + * Created on 2019-07-30. + */ +class BackPressureTest { + + private static final Logger logger = LoggerFactory.getLogger(BackPressureTest.class); + + @Test + void test() { + List consumed = new ArrayList<>(); + BufferTrigger buffer = BufferTrigger.> simple() + .enableBackPressure() + .maxBufferCount(10) + .interval(1, SECONDS) + .setContainer(() -> synchronizedList(new ArrayList<>()), List::add) + .consumer(it -> { + logger.info("do consuming...{}", it); + sleepUninterruptibly(1, SECONDS); + consumed.addAll(it); + logger.info("consumer done.{}", it); + }) + .build(); + long cost = System.currentTimeMillis(); + ExecutorService executor = newFixedThreadPool(10); + for (int i = 0; i < 30; i++) { + int j = i; + executor.execute(() -> { + buffer.enqueue("" + j); + logger.info("enqueued:{}", j); + }); + } + shutdownAndAwaitTermination(executor, 1, DAYS); + buffer.manuallyDoTrigger(); + assertEquals(30, consumed.size()); + cost = System.currentTimeMillis() - cost; + assertTrue(cost >= SECONDS.toMillis(3)); + } + + @Test + void testNoBlock() { + List consumed = new ArrayList<>(); + BufferTrigger buffer = BufferTrigger.> simple() + .maxBufferCount(10) + .interval(1, SECONDS) + .setContainer(() -> synchronizedList(new ArrayList<>()), List::add) + .consumer(it -> { + logger.info("do consuming...{}", it); + sleepUninterruptibly(1, SECONDS); + consumed.addAll(it); + logger.info("consumer done.{}", it); + }) + .build(); + long cost = System.currentTimeMillis(); + ExecutorService executor = newFixedThreadPool(10); + for (int i = 0; i < 30; i++) { + int j = i; + executor.execute(() -> { + buffer.enqueue("" + j); + logger.info("enqueued:{}", j); + }); + } + shutdownAndAwaitTermination(executor, 1, DAYS); + buffer.manuallyDoTrigger(); + cost = System.currentTimeMillis() - cost; + assertTrue(cost <= 1200); + } +} diff --git a/src/test/java/com/github/phantomthief/collection/impl/BuilderTest.java b/src/test/java/com/github/phantomthief/collection/impl/BuilderTest.java new file mode 100644 index 0000000..da3f1b0 --- /dev/null +++ b/src/test/java/com/github/phantomthief/collection/impl/BuilderTest.java @@ -0,0 +1,44 @@ +package com.github.phantomthief.collection.impl; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; + +import com.github.phantomthief.collection.BufferTrigger; + +/** + * @author w.vela + * Created on 2019-07-30. + */ +class BuilderTest { + + @Test + void testBuilder() { + assertThrows(NullPointerException.class, () -> + BufferTrigger.simple() + .build()); + assertThrows(IllegalStateException.class, () -> + BufferTrigger.simple() + .consumer(it -> {}) + .enableBackPressure() + .disableSwitchLock() + .build()); + assertThrows(IllegalStateException.class, () -> + BufferTrigger.simple() + .consumer(it -> {}) + .enableBackPressure() + .rejectHandler(it -> {}) + .build()); + assertThrows(IllegalStateException.class, () -> + BufferTrigger.simple() + .consumer(it -> {}) + .rejectHandler(it -> {}) + .enableBackPressure() + .build()); + assertThrows(IllegalStateException.class, () -> + BufferTrigger.simple() + .consumer(it -> {}) + .enableBackPressure() + .build()); + } +}