Skip to content

Commit

Permalink
INT-3722: Additional Fix: TCP OG with Caching CF
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-3722

While the initial fix solved the reported problem, it caused a memory leak when
the gateway completed its work before the reader thread "closed" the connection,
releasing it to the pool. This left the connection in the deferred close state.

Handle the condition where the gateway signals it has completed its work
before the connection is released to the pool.
  • Loading branch information
garyrussell committed May 27, 2015
1 parent d17785d commit 3339313
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 9 deletions.
Expand Up @@ -16,7 +16,9 @@

package org.springframework.integration.ip.tcp.connection;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;

Expand Down Expand Up @@ -48,6 +50,8 @@ public class CachingClientConnectionFactory extends AbstractClientConnectionFact
private final Map<String, CachedConnection> deferredClosures =
new ConcurrentHashMap<String, CachedConnection>();

private final Set<String> okToRelease = new HashSet<String>();

private volatile boolean deferClose;

/**
Expand Down Expand Up @@ -149,9 +153,14 @@ public void enableCloseDeferral(boolean defer) {

@Override
public void closeDeferred(String connectionId) {
CachedConnection deferred = this.deferredClosures.remove(connectionId);
if (deferred != null) {
deferred.doClose();
synchronized(this.okToRelease) {
CachedConnection deferred = this.deferredClosures.remove(connectionId);
if (deferred != null) {
deferred.doClose();
}
else {
this.okToRelease.add(connectionId);
}
}
}

Expand All @@ -165,12 +174,14 @@ public CachedConnection(TcpConnectionSupport connection) {
}

@Override
public synchronized void close() {
if (deferClose && !this.released) {
deferredClosures.put(getConnectionId(), this);
}
else {
doClose();
public void close() {
synchronized(okToRelease) {
if (deferClose && !this.released && !okToRelease.remove(getConnectionId())) {
deferredClosures.put(getConnectionId(), this);
}
else {
doClose();
}
}
}

Expand Down
Expand Up @@ -457,6 +457,7 @@ public void integrationTest() throws Exception {
}

@Test
// @Repeat(1000) // INT-3722
public void gatewayIntegrationTest() throws Exception {
final List<String> connectionIds = new ArrayList<String>();
final AtomicBoolean okToRun = new AtomicBoolean(true);
Expand Down

0 comments on commit 3339313

Please sign in to comment.