Skip to content

Commit

Permalink
Merge 060eb37 into dd530d8
Browse files Browse the repository at this point in the history
  • Loading branch information
PhantomThief committed Feb 4, 2021
2 parents dd530d8 + 060eb37 commit 7df97c7
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.github.phantomthief.collection.impl;

import static java.lang.System.nanoTime;

import java.util.concurrent.locks.Condition;

import javax.annotation.Nullable;
Expand All @@ -15,13 +17,20 @@ class BackPressureHandler<T> implements RejectHandler<T> {

private static final Logger logger = LoggerFactory.getLogger(BackPressureHandler.class);

private static GlobalBackPressureListener globalBackPressureListener = null;

@Nullable
private final BackPressureListener<T> listener;
private String name;

BackPressureHandler(BackPressureListener<T> listener) {
this.listener = listener;
}

void setName(String name) {
this.name = name;
}

@Override
public boolean onReject(T element, @Nullable Condition condition) {
if (listener != null) {
Expand All @@ -31,8 +40,28 @@ public boolean onReject(T element, @Nullable Condition condition) {
logger.error("", e);
}
}
if (globalBackPressureListener != null) {
try {
globalBackPressureListener.onHandle(name, element);
} catch (Throwable e) {
logger.error("", e);
}
}
assert condition != null;
long startNano = nanoTime();
condition.awaitUninterruptibly();
long blockInNano = nanoTime() - startNano;
if (globalBackPressureListener != null) {
try {
globalBackPressureListener.postHandle(name, element, blockInNano);
} catch (Throwable e) {
logger.error("", e);
}
}
return true;
}

static void setupGlobalBackPressureListener(GlobalBackPressureListener listener) {
globalBackPressureListener = listener;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.github.phantomthief.collection.impl;

import javax.annotation.Nullable;

/**
* @author w.vela
* Created on 2021-02-04.
*/
public interface GlobalBackPressureListener {

void onHandle(@Nullable String name, Object element);

void postHandle(@Nullable String name, Object element, long blockInNano);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class SimpleBufferTrigger<E, C> implements BufferTrigger<E> {

private static final long DEFAULT_NEXT_TRIGGER_PERIOD = TimeUnit.SECONDS.toMillis(1);

private final String name;
private final AtomicLong counter = new AtomicLong();
private final ThrowableConsumer<C, Throwable> consumer;
private final ToIntBiFunction<C, E> queueAdder;
Expand All @@ -63,6 +64,7 @@ public class SimpleBufferTrigger<E, C> implements BufferTrigger<E> {
* 使用提供的构造器创建SimpleBufferTrigger实例
*/
SimpleBufferTrigger(SimpleBufferTriggerBuilder<E, C> builder) {
this.name = builder.name;
this.queueAdder = builder.queueAdder;
this.bufferFactory = builder.bufferFactory;
this.consumer = builder.consumer;
Expand Down Expand Up @@ -328,4 +330,8 @@ public void run() {
}
}
}

public static void setupGlobalBackPressure(GlobalBackPressureListener listener) {
BackPressureHandler.setupGlobalBackPressureListener(listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,9 @@ private void ensure() {
scheduledExecutorService = makeScheduleExecutor();
usingInnerExecutor = true;
}
if (name != null && rejectHandler instanceof BackPressureHandler) {
((BackPressureHandler<E>) rejectHandler).setName(name);
}
}

private ScheduledExecutorService makeScheduleExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
import java.util.List;
import java.util.concurrent.ExecutorService;

import javax.annotation.Nullable;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.phantomthief.collection.BufferTrigger;
import com.google.common.util.concurrent.AtomicLongMap;

/**
* @author w.vela
Expand Down Expand Up @@ -89,4 +92,54 @@ void testNoBlock() {
cost = System.currentTimeMillis() - cost;
assertTrue(cost <= 1200);
}

@Test
void testGlobalHandler() {
AtomicLongMap<String> onHandle = AtomicLongMap.create();
AtomicLongMap<String> postHandle = AtomicLongMap.create();
SimpleBufferTrigger.setupGlobalBackPressure(new GlobalBackPressureListener() {
@Override
public void onHandle(@Nullable String name, Object element) {
onHandle.incrementAndGet(name);
}

@Override
public void postHandle(@Nullable String name, Object element, long blockInNano) {
postHandle.addAndGet(name, blockInNano);
}
});
List<String> consumed = new ArrayList<>();
List<String> backPressured = Collections.synchronizedList(new ArrayList<>());
String name = "test-1";
BufferTrigger<String> buffer = BufferTrigger.<String, List<String>> simple()
.maxBufferCount(10)
.enableBackPressure(backPressured::add)
.interval(1, SECONDS)
.setContainer(() -> synchronizedList(new ArrayList<>()), List::add)
.consumer(it -> {
logger.info("do consuming...{}", it);
sleepUninterruptibly(1, SECONDS);
consumed.addAll(it);
logger.info("consumer done.{}", it);
})
.name(name)
.build();
long cost = System.currentTimeMillis();
ExecutorService executor = newFixedThreadPool(10);
for (int i = 0; i < 30; i++) {
int j = i;
executor.execute(() -> {
buffer.enqueue("" + j);
logger.info("enqueued:{}", j);
});
}
shutdownAndAwaitTermination(executor, 1, DAYS);
assertTrue(backPressured.size() > 10);
assertTrue(onHandle.get(name) > 10);
buffer.manuallyDoTrigger();
assertEquals(30, consumed.size());
cost = System.currentTimeMillis() - cost;
assertTrue(cost >= SECONDS.toMillis(3));
assertTrue(postHandle.get(name) >= SECONDS.toNanos(3));
}
}

0 comments on commit 7df97c7

Please sign in to comment.