Skip to content

Commit

Permalink
Pooling workflows and Shared connections fun. (#1116)
Browse files Browse the repository at this point in the history
* Changed the map holding the consumers, producers and listeners to use thread safe maps.

* Switched so we still use a weak map as the base map, but wrap it with Collections.synchronizedMap.
Also added a unit test that consistently fails when not wrapping the weak map.
  • Loading branch information
aaron-mcgrath-adp committed Apr 14, 2023
1 parent bb15b98 commit 353025f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;

import javax.validation.Valid;

import org.apache.commons.lang3.BooleanUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.adaptris.annotation.AdvancedConfig;
import com.adaptris.annotation.InputFieldDefault;
import com.adaptris.core.util.LifecycleHelper;
Expand Down Expand Up @@ -57,9 +60,12 @@ public abstract class AdaptrisConnectionImp implements AdaptrisConnection, State
* </p>
*/
public AdaptrisConnectionImp() {
consumers = Collections.newSetFromMap(new WeakHashMap<AdaptrisMessageConsumer, Boolean>());
producers = Collections.newSetFromMap(new WeakHashMap<AdaptrisMessageProducer, Boolean>());
listeners = Collections.newSetFromMap(new WeakHashMap<StateManagedComponent, Boolean>());
consumers = Collections.newSetFromMap(
Collections.synchronizedMap(new WeakHashMap<AdaptrisMessageConsumer, Boolean>()));
producers = Collections.newSetFromMap(
Collections.synchronizedMap(new WeakHashMap<AdaptrisMessageProducer, Boolean>()));
listeners = Collections.newSetFromMap(
Collections.synchronizedMap(new WeakHashMap<StateManagedComponent, Boolean>()));
state = ClosedState.getInstance();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,23 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;

import org.awaitility.Awaitility;
import org.junit.Test;
import com.adaptris.core.stubs.MockConnection;
import com.adaptris.core.stubs.MockMessageConsumer;
import com.adaptris.core.stubs.MockMessageProducer;
import com.adaptris.interlok.junit.scaffolding.jms.MockConsumer;
import com.adaptris.interlok.junit.scaffolding.jms.MockProducer;

public class AdaptrisConnectionTest extends com.adaptris.interlok.junit.scaffolding.BaseCase {

Expand All @@ -45,6 +55,44 @@ public void testConnectionErrorHandler() throws Exception {
mc.setConnectionErrorHandler(nc);
assertEquals(nc, mc.getConnectionErrorHandler());
}

// INTERLOK-4039
@Test
public void testConcurrentListenerRegistration() throws Exception {
int threadCount = 100;
final MockConnection connection = new MockConnection();

ThreadFactory tf = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
};
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(threadCount, tf);

List<Callable<Boolean>> callables = new ArrayList<>();
for(int index = 0; index < threadCount; index ++) {
Callable<Boolean> call = () -> {
StateManagedComponent comp = new ServiceList();
AdaptrisMessageConsumer consumer = new MockConsumer();
AdaptrisMessageProducer producer = new MockProducer();
connection.addExceptionListener(comp);
connection.addMessageConsumer(consumer);
connection.addMessageProducer(producer);
return true;
};
callables.add(call);
}

newFixedThreadPool.invokeAll(callables);

Awaitility
.await()
.atMost(Duration.ofSeconds(10))
.with()
.pollInterval(Duration.ofMillis(100))
.untilTrue(new AtomicBoolean(connection.retrieveExceptionListeners().size() == threadCount));
}

@Test
public void testAddMessageConsumer() throws Exception {
Expand Down

0 comments on commit 353025f

Please sign in to comment.