Skip to content

Commit

Permalink
Reduce amount of proxy ws client
Browse files Browse the repository at this point in the history
  • Loading branch information
apanevin committed May 25, 2018
1 parent d74c180 commit f25a5ac
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
Expand Up @@ -55,7 +55,10 @@ public ProxyResponseHandler(Gson gson, String requestTopic, String replyToTopic,
WebSocketKafkaProxyClient webSocketKafkaProxyClient = new WebSocketKafkaProxyClient((message, client) -> {});
webSocketKafkaProxyClient.setWebSocketKafkaProxyConfig(proxyConfig);
this.proxyClient = webSocketKafkaProxyClient;
this.proxyClient.start();
}

public void start() {
proxyClient.start();
}

@Override
Expand Down
Expand Up @@ -91,7 +91,11 @@ public NotificationHandler notificationHandler(Gson gson, RequestResponseMatcher
public WorkerPool<ServerEvent> workerPool(Gson gson, RequestResponseMatcher requestResponseMatcher, WebSocketKafkaProxyConfig proxyConfig) {
final ProxyResponseHandler[] workHandlers = new ProxyResponseHandler[proxyConfig.getWorkerThreads()];
IntStream.range(0, proxyConfig.getWorkerThreads()).forEach(
nbr -> workHandlers[nbr] = new ProxyResponseHandler(gson, REQUEST_TOPIC, RESPONSE_TOPIC, proxyConfig, requestResponseMatcher)
nbr -> {
ProxyResponseHandler handler = new ProxyResponseHandler(gson, REQUEST_TOPIC, RESPONSE_TOPIC, proxyConfig, requestResponseMatcher);
handler.start();
workHandlers[nbr] = handler;
}
);
final RingBuffer<ServerEvent> ringBuffer = RingBuffer.createMultiProducer(ServerEvent::new, proxyConfig.getBufferSize(), getWaitStrategy(proxyConfig.getWaitStrategy()));
final SequenceBarrier barrier = ringBuffer.newBarrier();
Expand Down

0 comments on commit f25a5ac

Please sign in to comment.