diff --git a/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/ProxyResponseHandler.java b/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/ProxyResponseHandler.java index ef39681cd..52ec84d7e 100644 --- a/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/ProxyResponseHandler.java +++ b/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/ProxyResponseHandler.java @@ -55,7 +55,10 @@ public ProxyResponseHandler(Gson gson, String requestTopic, String replyToTopic, WebSocketKafkaProxyClient webSocketKafkaProxyClient = new WebSocketKafkaProxyClient((message, client) -> {}); webSocketKafkaProxyClient.setWebSocketKafkaProxyConfig(proxyConfig); this.proxyClient = webSocketKafkaProxyClient; - this.proxyClient.start(); + } + + public void start() { + proxyClient.start(); } @Override diff --git a/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/FrontendProxyClientConfig.java b/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/FrontendProxyClientConfig.java index f18920934..db9e85b6d 100644 --- a/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/FrontendProxyClientConfig.java +++ b/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/FrontendProxyClientConfig.java @@ -91,7 +91,11 @@ public NotificationHandler notificationHandler(Gson gson, RequestResponseMatcher public WorkerPool workerPool(Gson gson, RequestResponseMatcher requestResponseMatcher, WebSocketKafkaProxyConfig proxyConfig) { final ProxyResponseHandler[] workHandlers = new ProxyResponseHandler[proxyConfig.getWorkerThreads()]; IntStream.range(0, proxyConfig.getWorkerThreads()).forEach( - nbr -> workHandlers[nbr] = new ProxyResponseHandler(gson, REQUEST_TOPIC, RESPONSE_TOPIC, proxyConfig, requestResponseMatcher) + nbr -> { + ProxyResponseHandler handler = new ProxyResponseHandler(gson, REQUEST_TOPIC, RESPONSE_TOPIC, proxyConfig, requestResponseMatcher); + handler.start(); + workHandlers[nbr] = handler; + } ); final RingBuffer ringBuffer = RingBuffer.createMultiProducer(ServerEvent::new, proxyConfig.getBufferSize(), getWaitStrategy(proxyConfig.getWaitStrategy())); final SequenceBarrier barrier = ringBuffer.newBarrier();