diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java index 95cc8519746..e56d97c9910 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -80,7 +81,7 @@ public void scanNotActiveBroker() { RemotingUtil.closeChannel(channel); } this.executor.submit(() -> - notifyBrokerInActive(next.getKey().getClusterName(), next.getValue().getBrokerName(), next.getKey().getBrokerAddr(), next.getValue().getBrokerId())); + notifyBrokerInActive(next.getKey().getClusterName(), next.getValue().getBrokerName(), next.getKey().getBrokerAddr(), next.getValue().getBrokerId())); log.warn("The broker channel {} expired, brokerInfo {}, expired {}ms", next.getValue().getChannel(), next.getKey(), timeoutMillis); } } @@ -102,15 +103,15 @@ public void addBrokerLifecycleListener(BrokerLifecycleListener listener) { @Override public void registerBroker(String clusterName, String brokerName, String brokerAddr, - long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset) { + long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset) { final BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr); final BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(addrInfo, - new BrokerLiveInfo(brokerName, - brokerAddr, - brokerId, - System.currentTimeMillis(), - timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis, - channel, epoch == null ? -1 : epoch, maxOffset == null ? -1 : maxOffset)); + new BrokerLiveInfo(brokerName, + brokerAddr, + brokerId, + System.currentTimeMillis(), + timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis, + channel, epoch == null ? -1 : epoch, maxOffset == null ? -1 : maxOffset)); if (prevBrokerLiveInfo == null) { log.info("new broker registered, {}, brokerId:{}", addrInfo, brokerId); } @@ -127,24 +128,22 @@ public void changeBrokerMetadata(String clusterName, String brokerAddr, Long bro } @Override - public void onBrokerHeartbeat(String clusterName, String brokerAddr, Integer epoch, Long maxOffset, Long confirmOffset) { + public void onBrokerHeartbeat(String clusterName, String brokerAddr, Integer epoch, Long maxOffset, + Long confirmOffset) { BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr); BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo); - int realEpoch = epoch == null ? -1 : epoch; - long realMaxOffset = maxOffset == null ? -1 : maxOffset; - long realConfirmOffset = confirmOffset == null ? -1 : confirmOffset; - if (prev != null) { - prev.setLastUpdateTimestamp(System.currentTimeMillis()); - if (realEpoch > prev.getEpoch()) { - prev.setEpoch(realEpoch); - prev.setMaxOffset(realMaxOffset); - prev.setConfirmOffset(realConfirmOffset); - } else if (realEpoch == prev.getEpoch()) { - if (realMaxOffset > prev.getMaxOffset()) { - prev.setMaxOffset(realMaxOffset); - prev.setConfirmOffset(realConfirmOffset); - } - } + if (null == prev) { + return; + } + int realEpoch = Optional.ofNullable(epoch).orElse(-1); + long realMaxOffset = Optional.ofNullable(maxOffset).orElse(-1L); + long realConfirmOffset = Optional.ofNullable(confirmOffset).orElse(-1L); + + prev.setLastUpdateTimestamp(System.currentTimeMillis()); + if (realEpoch > prev.getEpoch() || (realEpoch == prev.getEpoch() && realMaxOffset > prev.getMaxOffset())) { + prev.setEpoch(realEpoch); + prev.setMaxOffset(realMaxOffset); + prev.setConfirmOffset(realConfirmOffset); } } @@ -156,7 +155,7 @@ public void onBrokerChannelClose(Channel channel) { log.info("Channel {} inactive, broker {}, addr:{}, id:{}", entry.getValue().getChannel(), entry.getValue().getBrokerName(), entry.getKey().getBrokerAddr(), entry.getValue().getBrokerId()); addrInfo = entry.getKey(); this.executor.submit(() -> - notifyBrokerInActive(entry.getKey().getClusterName(), entry.getValue().getBrokerName(), entry.getKey().getBrokerAddr(), entry.getValue().getBrokerId())); + notifyBrokerInActive(entry.getKey().getClusterName(), entry.getValue().getBrokerName(), entry.getKey().getBrokerAddr(), entry.getValue().getBrokerId())); break; } }