Skip to content

Commit

Permalink
[Client] Fix ConcurrentModificationException in sendAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Sep 1, 2021
1 parent f784379 commit 703b838
Showing 1 changed file with 33 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,32 @@
import static org.apache.pulsar.client.impl.ProducerBase.MultiSchemaMode.Enabled;
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
import static org.apache.pulsar.common.protocol.Commands.readChecksum;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.ScheduledFuture;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
Expand Down Expand Up @@ -261,7 +259,37 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
}

protected Queue<OpSendMsg> createPendingMessagesQueue() {
return new ArrayDeque<>();
return new ArrayDeque<OpSendMsg>() {
int forEachDepth = 0;
List<OpSendMsg> postponedOpSendMgs = new ArrayList<>();

@Override
public void forEach(Consumer action) {
try {
// track any forEach call that is in progress in the current call stack
// so that adding a new item while iterating doesn't cause ConcurrentModificationException
forEachDepth++;
super.forEach(action);
} finally {
forEachDepth--;
// if this is the top-most forEach call and there are postponed items, add them
if (forEachDepth == 0 && !postponedOpSendMgs.isEmpty()) {
super.addAll(postponedOpSendMgs);
postponedOpSendMgs.clear();
}
}
}

@Override
public boolean add(OpSendMsg o) {
// postpone adding to the queue while forEach iteration is in progress
if (forEachDepth > 0) {
return postponedOpSendMgs.add(o);
} else {
return super.add(o);
}
}
};
}

public ConnectionHandler getConnectionHandler() {
Expand Down

0 comments on commit 703b838

Please sign in to comment.