Skip to content

Commit

Permalink
[ISSUE #4847]Optimize DefaultBrokerHeartbeatManager#onBrokerHeartbeat…
Browse files Browse the repository at this point in the history
… mehtod code logic (#4848)
  • Loading branch information
mxsm committed Aug 20, 2022
1 parent 02595d9 commit d117742
Showing 1 changed file with 24 additions and 25 deletions.
Expand Up @@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -80,7 +81,7 @@ public void scanNotActiveBroker() {
RemotingUtil.closeChannel(channel);
}
this.executor.submit(() ->
notifyBrokerInActive(next.getKey().getClusterName(), next.getValue().getBrokerName(), next.getKey().getBrokerAddr(), next.getValue().getBrokerId()));
notifyBrokerInActive(next.getKey().getClusterName(), next.getValue().getBrokerName(), next.getKey().getBrokerAddr(), next.getValue().getBrokerId()));
log.warn("The broker channel {} expired, brokerInfo {}, expired {}ms", next.getValue().getChannel(), next.getKey(), timeoutMillis);
}
}
Expand All @@ -102,15 +103,15 @@ public void addBrokerLifecycleListener(BrokerLifecycleListener listener) {

@Override
public void registerBroker(String clusterName, String brokerName, String brokerAddr,
long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset) {
long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset) {
final BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
final BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(addrInfo,
new BrokerLiveInfo(brokerName,
brokerAddr,
brokerId,
System.currentTimeMillis(),
timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
channel, epoch == null ? -1 : epoch, maxOffset == null ? -1 : maxOffset));
new BrokerLiveInfo(brokerName,
brokerAddr,
brokerId,
System.currentTimeMillis(),
timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
channel, epoch == null ? -1 : epoch, maxOffset == null ? -1 : maxOffset));
if (prevBrokerLiveInfo == null) {
log.info("new broker registered, {}, brokerId:{}", addrInfo, brokerId);
}
Expand All @@ -127,24 +128,22 @@ public void changeBrokerMetadata(String clusterName, String brokerAddr, Long bro
}

@Override
public void onBrokerHeartbeat(String clusterName, String brokerAddr, Integer epoch, Long maxOffset, Long confirmOffset) {
public void onBrokerHeartbeat(String clusterName, String brokerAddr, Integer epoch, Long maxOffset,
Long confirmOffset) {
BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
int realEpoch = epoch == null ? -1 : epoch;
long realMaxOffset = maxOffset == null ? -1 : maxOffset;
long realConfirmOffset = confirmOffset == null ? -1 : confirmOffset;
if (prev != null) {
prev.setLastUpdateTimestamp(System.currentTimeMillis());
if (realEpoch > prev.getEpoch()) {
prev.setEpoch(realEpoch);
prev.setMaxOffset(realMaxOffset);
prev.setConfirmOffset(realConfirmOffset);
} else if (realEpoch == prev.getEpoch()) {
if (realMaxOffset > prev.getMaxOffset()) {
prev.setMaxOffset(realMaxOffset);
prev.setConfirmOffset(realConfirmOffset);
}
}
if (null == prev) {
return;
}
int realEpoch = Optional.ofNullable(epoch).orElse(-1);
long realMaxOffset = Optional.ofNullable(maxOffset).orElse(-1L);
long realConfirmOffset = Optional.ofNullable(confirmOffset).orElse(-1L);

prev.setLastUpdateTimestamp(System.currentTimeMillis());
if (realEpoch > prev.getEpoch() || (realEpoch == prev.getEpoch() && realMaxOffset > prev.getMaxOffset())) {
prev.setEpoch(realEpoch);
prev.setMaxOffset(realMaxOffset);
prev.setConfirmOffset(realConfirmOffset);
}
}

Expand All @@ -156,7 +155,7 @@ public void onBrokerChannelClose(Channel channel) {
log.info("Channel {} inactive, broker {}, addr:{}, id:{}", entry.getValue().getChannel(), entry.getValue().getBrokerName(), entry.getKey().getBrokerAddr(), entry.getValue().getBrokerId());
addrInfo = entry.getKey();
this.executor.submit(() ->
notifyBrokerInActive(entry.getKey().getClusterName(), entry.getValue().getBrokerName(), entry.getKey().getBrokerAddr(), entry.getValue().getBrokerId()));
notifyBrokerInActive(entry.getKey().getClusterName(), entry.getValue().getBrokerName(), entry.getKey().getBrokerAddr(), entry.getValue().getBrokerId()));
break;
}
}
Expand Down

0 comments on commit d117742

Please sign in to comment.