Skip to content

Commit

Permalink
[pinpoint-apm#8865] Correct inconsistent synchronization in volatile …
Browse files Browse the repository at this point in the history
…field
  • Loading branch information
emeroad committed May 17, 2022
1 parent 131da94 commit df2b2ee
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.thrift.TBase;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -32,7 +31,7 @@
public class SendDataToFlinkService {
private final Logger logger = LogManager.getLogger(this.getClass());

private volatile List<TcpDataSender<TBase<?, ?>>> flinkTcpDataSenderList = new CopyOnWriteArrayList<>();
private volatile List<TcpDataSender<TBase<?, ?>>> dataSenderList = new CopyOnWriteArrayList<>();
private final AtomicInteger callCount = new AtomicInteger(1);

protected void sendData(TBase<?, ?> data) {
Expand All @@ -53,20 +52,21 @@ protected void sendData(TBase<?, ?> data) {
}

private TcpDataSender<TBase<?, ?>> roundRobinTcpDataSender() {
if (flinkTcpDataSenderList.isEmpty()) {
final List<TcpDataSender<TBase<?, ?>>> copyList = this.dataSenderList;
if (copyList.isEmpty()) {
return null;
}

int count = callCount.getAndIncrement();
int tcpDataSenderIndex = count % flinkTcpDataSenderList.size();
int tcpDataSenderIndex = count % copyList.size();

if (tcpDataSenderIndex < 0) {
tcpDataSenderIndex = tcpDataSenderIndex * -1;
callCount.set(0);
}

try {
return flinkTcpDataSenderList.get(tcpDataSenderIndex);
return copyList.get(tcpDataSenderIndex);
} catch (Exception e) {
logger.warn("not get FlinkTcpDataSender", e);
}
Expand All @@ -75,6 +75,6 @@ protected void sendData(TBase<?, ?> data) {
}

public void replaceFlinkTcpDataSenderList(List<FlinkTcpDataSender> flinkTcpDataSenderList) {
this.flinkTcpDataSenderList = new CopyOnWriteArrayList<>(flinkTcpDataSenderList);
this.dataSenderList = new CopyOnWriteArrayList<>(flinkTcpDataSenderList);
}
}

0 comments on commit df2b2ee

Please sign in to comment.