diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/service/SendDataToFlinkService.java b/collector/src/main/java/com/navercorp/pinpoint/collector/service/SendDataToFlinkService.java index 303ea9e52e37..808359931bbf 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/service/SendDataToFlinkService.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/service/SendDataToFlinkService.java @@ -20,7 +20,6 @@ import org.apache.thrift.TBase; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; -import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -32,7 +31,7 @@ public class SendDataToFlinkService { private final Logger logger = LogManager.getLogger(this.getClass()); - private volatile List>> flinkTcpDataSenderList = new CopyOnWriteArrayList<>(); + private volatile List>> dataSenderList = new CopyOnWriteArrayList<>(); private final AtomicInteger callCount = new AtomicInteger(1); protected void sendData(TBase data) { @@ -53,12 +52,13 @@ protected void sendData(TBase data) { } private TcpDataSender> roundRobinTcpDataSender() { - if (flinkTcpDataSenderList.isEmpty()) { + final List>> copyList = this.dataSenderList; + if (copyList.isEmpty()) { return null; } int count = callCount.getAndIncrement(); - int tcpDataSenderIndex = count % flinkTcpDataSenderList.size(); + int tcpDataSenderIndex = count % copyList.size(); if (tcpDataSenderIndex < 0) { tcpDataSenderIndex = tcpDataSenderIndex * -1; @@ -66,7 +66,7 @@ protected void sendData(TBase data) { } try { - return flinkTcpDataSenderList.get(tcpDataSenderIndex); + return copyList.get(tcpDataSenderIndex); } catch (Exception e) { logger.warn("not get FlinkTcpDataSender", e); } @@ -75,6 +75,6 @@ protected void sendData(TBase data) { } public void replaceFlinkTcpDataSenderList(List flinkTcpDataSenderList) { - this.flinkTcpDataSenderList = new CopyOnWriteArrayList<>(flinkTcpDataSenderList); + this.dataSenderList = new CopyOnWriteArrayList<>(flinkTcpDataSenderList); } }