Skip to content

Commit

Permalink
Fix memory leak in WebSocket proxy (#790)
Browse files Browse the repository at this point in the history
  • Loading branch information
massakam authored and Yuki Shiga committed Sep 26, 2017
1 parent 5f378f6 commit bace0b4
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 19 deletions.
Expand Up @@ -117,7 +117,7 @@ public void onWebSocketError(Throwable cause) {
try {
close();
} catch (IOException e) {
log.error("Failed in closing producer for topic[{}] with error: [{}]: ", topic, e.getMessage());
log.error("Failed in closing WebSocket session for topic {} with error: {}", topic, e.getMessage());
}
}

Expand Down
Expand Up @@ -88,7 +88,10 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request, Ser

try {
this.consumer = service.getPulsarClient().subscribe(topic, subscription, conf);
this.service.addConsumer(this);
if (!this.service.addConsumer(this)) {
log.warn("[{}:{}] Failed to add consumer handler for topic {}", request.getRemoteAddr(),
request.getRemotePort(), topic);
}
receiveMessage();
} catch (Exception e) {
log.warn("[{}:{}] Failed in creating subscription {} on topic {}", request.getRemoteAddr(),
Expand Down Expand Up @@ -186,7 +189,9 @@ public void onWebSocketText(String message) {
@Override
public void close() throws IOException {
if (consumer != null) {
this.service.removeConsumer(this);
if (!this.service.removeConsumer(this)) {
log.warn("[{}] Failed to remove consumer handler", consumer.getTopic());
}
consumer.closeAsync().thenAccept(x -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Closed consumer asynchronously", consumer.getTopic());
Expand Down
Expand Up @@ -87,7 +87,10 @@ public ProducerHandler(WebSocketService service, HttpServletRequest request, Ser
try {
ProducerConfiguration conf = getProducerConfiguration();
this.producer = service.getPulsarClient().createProducer(topic, conf);
this.service.addProducer(this);
if (!this.service.addProducer(this)) {
log.warn("[{}:{}] Failed to add producer handler for topic {}", request.getRemoteAddr(),
request.getRemotePort(), topic);
}
} catch (Exception e) {
log.warn("[{}:{}] Failed in creating producer on topic {}", request.getRemoteAddr(),
request.getRemotePort(), topic, e);
Expand All @@ -103,7 +106,9 @@ public ProducerHandler(WebSocketService service, HttpServletRequest request, Ser
@Override
public void close() throws IOException {
if (producer != null) {
this.service.removeProducer(this);
if (!this.service.removeProducer(this)) {
log.warn("[{}] Failed to remove producer handler", producer.getTopic());
}
producer.closeAsync().thenAccept(x -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Closed producer asynchronously", producer.getTopic());
Expand Down
Expand Up @@ -82,7 +82,10 @@ public ReaderHandler(WebSocketService service, HttpServletRequest request, Servl
try {
this.reader = service.getPulsarClient().createReader(topic, getMessageId(), conf);
this.subscription = ((ReaderImpl)this.reader).getConsumer().getSubscription();
this.service.addReader(this);
if (!this.service.addReader(this)) {
log.warn("[{}:{}] Failed to add reader handler for topic {}", request.getRemoteAddr(),
request.getRemotePort(), topic);
}
receiveMessage();
} catch (Exception e) {
log.warn("[{}:{}] Failed in creating reader {} on topic {}", request.getRemoteAddr(),
Expand Down Expand Up @@ -171,7 +174,9 @@ public void onWebSocketText(String message) {
@Override
public void close() throws IOException {
if (reader != null) {
this.service.removeReader(this);
if (!this.service.removeReader(this)) {
log.warn("[{}] Failed to remove reader handler", reader.getTopic());
}
reader.closeAsync().thenAccept(x -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Closed reader asynchronously", reader.getTopic());
Expand Down
Expand Up @@ -23,7 +23,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -42,6 +41,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
import org.apache.pulsar.websocket.stats.ProxyStats;
import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
Expand All @@ -52,8 +52,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;

import io.netty.util.concurrent.DefaultThreadFactory;

/**
Expand All @@ -78,9 +76,9 @@ public class WebSocketService implements Closeable {
private ConfigurationCacheService configurationCacheService;

private ClusterData localCluster;
private final ConcurrentOpenHashMap<String, List<ProducerHandler>> topicProducerMap;
private final ConcurrentOpenHashMap<String, List<ConsumerHandler>> topicConsumerMap;
private final ConcurrentOpenHashMap<String, List<ReaderHandler>> topicReaderMap;
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ProducerHandler>> topicProducerMap;
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ConsumerHandler>> topicConsumerMap;
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ReaderHandler>> topicReaderMap;
private final ProxyStats proxyStats;

public WebSocketService(WebSocketProxyConfiguration config) {
Expand Down Expand Up @@ -275,11 +273,12 @@ public boolean isAuthorizationEnabled() {
}

public boolean addProducer(ProducerHandler producer) {
return topicProducerMap.computeIfAbsent(producer.getProducer().getTopic(), topic -> Lists.newArrayList())
return topicProducerMap
.computeIfAbsent(producer.getProducer().getTopic(), topic -> new ConcurrentOpenHashSet<>())
.add(producer);
}

public ConcurrentOpenHashMap<String, List<ProducerHandler>> getProducers() {
public ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ProducerHandler>> getProducers() {
return topicProducerMap;
}

Expand All @@ -292,11 +291,12 @@ public boolean removeProducer(ProducerHandler producer) {
}

public boolean addConsumer(ConsumerHandler consumer) {
return topicConsumerMap.computeIfAbsent(consumer.getConsumer().getTopic(), topic -> Lists.newArrayList())
return topicConsumerMap
.computeIfAbsent(consumer.getConsumer().getTopic(), topic -> new ConcurrentOpenHashSet<>())
.add(consumer);
}

public ConcurrentOpenHashMap<String, List<ConsumerHandler>> getConsumers() {
public ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ConsumerHandler>> getConsumers() {
return topicConsumerMap;
}

Expand All @@ -309,11 +309,11 @@ public boolean removeConsumer(ConsumerHandler consumer) {
}

public boolean addReader(ReaderHandler reader) {
return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic -> Lists.newArrayList())
return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic -> new ConcurrentOpenHashSet<>())
.add(reader);
}

public ConcurrentOpenHashMap<String, List<ReaderHandler>> getReaders() {
public ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ReaderHandler>> getReaders() {
return topicReaderMap;
}

Expand Down

0 comments on commit bace0b4

Please sign in to comment.