From f25a5ac4d301d05b195542872fc94bfe4294c63c Mon Sep 17 00:00:00 2001 From: Andrey Panevin Date: Fri, 25 May 2018 14:17:13 +0300 Subject: [PATCH] Reduce amount of proxy ws client --- .../java/com/devicehive/proxy/ProxyResponseHandler.java | 5 ++++- .../devicehive/proxy/config/FrontendProxyClientConfig.java | 6 +++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/ProxyResponseHandler.java b/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/ProxyResponseHandler.java index ef39681cd..52ec84d7e 100644 --- a/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/ProxyResponseHandler.java +++ b/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/ProxyResponseHandler.java @@ -55,7 +55,10 @@ public ProxyResponseHandler(Gson gson, String requestTopic, String replyToTopic, WebSocketKafkaProxyClient webSocketKafkaProxyClient = new WebSocketKafkaProxyClient((message, client) -> {}); webSocketKafkaProxyClient.setWebSocketKafkaProxyConfig(proxyConfig); this.proxyClient = webSocketKafkaProxyClient; - this.proxyClient.start(); + } + + public void start() { + proxyClient.start(); } @Override diff --git a/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/FrontendProxyClientConfig.java b/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/FrontendProxyClientConfig.java index f18920934..db9e85b6d 100644 --- a/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/FrontendProxyClientConfig.java +++ b/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/FrontendProxyClientConfig.java @@ -91,7 +91,11 @@ public NotificationHandler notificationHandler(Gson gson, RequestResponseMatcher public WorkerPool workerPool(Gson gson, RequestResponseMatcher requestResponseMatcher, WebSocketKafkaProxyConfig proxyConfig) { final ProxyResponseHandler[] workHandlers = new ProxyResponseHandler[proxyConfig.getWorkerThreads()]; IntStream.range(0, proxyConfig.getWorkerThreads()).forEach( - nbr -> workHandlers[nbr] = new ProxyResponseHandler(gson, REQUEST_TOPIC, RESPONSE_TOPIC, proxyConfig, requestResponseMatcher) + nbr -> { + ProxyResponseHandler handler = new ProxyResponseHandler(gson, REQUEST_TOPIC, RESPONSE_TOPIC, proxyConfig, requestResponseMatcher); + handler.start(); + workHandlers[nbr] = handler; + } ); final RingBuffer ringBuffer = RingBuffer.createMultiProducer(ServerEvent::new, proxyConfig.getBufferSize(), getWaitStrategy(proxyConfig.getWaitStrategy())); final SequenceBarrier barrier = ringBuffer.newBarrier();