Skip to content

Commit

Permalink
optimize lock.
Browse files Browse the repository at this point in the history
  • Loading branch information
w.vela committed Dec 13, 2016
1 parent 9db063b commit 85d15e4
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class BatchConsumeBlockingQueueTrigger<E> implements BufferTrigger<E> {
private final ThrowableConsumer<List<E>, Exception> consumer;
private final BiConsumer<Throwable, List<E>> exceptionHandler;
private final ScheduledExecutorService scheduledExecutorService;
private final Object lock = new Object();

BatchConsumeBlockingQueueTrigger(long consumePeriod, int batchConsumerSize,
BiConsumer<Throwable, List<E>> exceptionHandler,
Expand Down Expand Up @@ -61,7 +62,7 @@ public void enqueue(E element) {
try {
queue.put(element);
if (queue.size() >= batchConsumerSize) {
synchronized (BatchConsumeBlockingQueueTrigger.this) {
synchronized (lock) {
if (queue.size() >= batchConsumerSize) {
this.scheduledExecutorService.execute(this::doBatchConsumer);
}
Expand All @@ -78,7 +79,7 @@ public void manuallyDoTrigger() {
}

private void doBatchConsumer() {
synchronized (BatchConsumeBlockingQueueTrigger.this) {
synchronized (lock) {
while (!queue.isEmpty()) {
List<E> toConsumeData = new ArrayList<>(min(batchConsumerSize, queue.size()));
queue.drainTo(toConsumeData, batchConsumerSize);
Expand Down

0 comments on commit 85d15e4

Please sign in to comment.