Skip to content

[ISSUE #9455] fix broker fail to start after time jump #9456

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public class BrokerController {
protected volatile BrokerMemberGroup brokerMemberGroup;
protected EscapeBridge escapeBridge;
protected List<BrokerAttachedPlugin> brokerAttachedPlugins = new ArrayList<>();
protected volatile long shouldStartTime;
protected volatile long startupTime;
private BrokerPreOnlineService brokerPreOnlineService;
protected volatile boolean isIsolated = false;
protected volatile long minBrokerIdInGroup = 0;
Expand Down Expand Up @@ -1764,7 +1764,7 @@ protected void startBasicService() throws Exception {

public void start() throws Exception {

this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();
this.startupTime = System.currentTimeMillis();

if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster()) {
isIsolated = true;
Expand All @@ -1785,8 +1785,9 @@ public void start() throws Exception {
@Override
public void run0() {
try {
if (System.currentTimeMillis() < shouldStartTime) {
BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
long diff = System.currentTimeMillis() - startupTime;
if (diff >= 0 && diff < messageStoreConfig.getDisappearTimeAfterStart()) {
BrokerController.LOG.info("Register to namesrv after {}", startupTime + messageStoreConfig.getDisappearTimeAfterStart());
return;
}
if (isIsolated) {
Expand Down Expand Up @@ -2533,8 +2534,8 @@ public EscapeBridge getEscapeBridge() {
return escapeBridge;
}

public long getShouldStartTime() {
return shouldStartTime;
public long getStartupTime() {
return startupTime;
}

public BrokerPreOnlineService getBrokerPreOnlineService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ public void incrementInFlightMessageNum(String topic, String group, int queueId,
}

public void decrementInFlightMessageNum(String topic, String group, long popTime, int qId, int delta) {
if (popTime < this.brokerController.getShouldStartTime()) {
if (popTime < this.brokerController.getStartupTime()) {
return;
}
decrementInFlightMessageNum(topic, group, qId, delta);
}

public void decrementInFlightMessageNum(PopCheckPoint checkPoint) {
if (checkPoint.getPopTime() < this.brokerController.getShouldStartTime()) {
if (checkPoint.getPopTime() < this.brokerController.getStartupTime()) {
return;
}
decrementInFlightMessageNum(checkPoint.getTopic(), checkPoint.getCId(), checkPoint.getQueueId(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,8 +630,9 @@ public void run() {
int slow = 1;
while (!this.isStopped()) {
try {
if (System.currentTimeMillis() < brokerController.getShouldStartTime()) {
POP_LOGGER.info("PopReviveService Ready to run after {}", brokerController.getShouldStartTime());
long diff = System.currentTimeMillis() - brokerController.getStartupTime();
if (diff >= 0 && diff < brokerController.getMessageStoreConfig().getDisappearTimeAfterStart()) {
POP_LOGGER.info("PopReviveService Ready to run after {}", brokerController.getStartupTime() + brokerController.getMessageStoreConfig().getDisappearTimeAfterStart());
this.waitForRunning(1000);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class PopInflightMessageCounterTest {
public void testNum() {
BrokerController brokerController = mock(BrokerController.class);
long brokerStartTime = System.currentTimeMillis();
when(brokerController.getShouldStartTime()).thenReturn(brokerStartTime);
when(brokerController.getStartupTime()).thenReturn(brokerStartTime);
PopInflightMessageCounter counter = new PopInflightMessageCounter(brokerController);

final String topic = "topic";
Expand Down Expand Up @@ -67,7 +67,7 @@ public void testNum() {
public void testClearInFlightMessageNum() {
BrokerController brokerController = mock(BrokerController.class);
long brokerStartTime = System.currentTimeMillis();
when(brokerController.getShouldStartTime()).thenReturn(brokerStartTime);
when(brokerController.getStartupTime()).thenReturn(brokerStartTime);
PopInflightMessageCounter counter = new PopInflightMessageCounter(brokerController);

final String topic = "topic";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected void initializeScheduledTasks() {

@Override
public void start() throws Exception {
this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();
this.startupTime = System.currentTimeMillis();

if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster()) {
isIsolated = true;
Expand All @@ -75,8 +75,9 @@ public void start() throws Exception {
@Override
public void run0() {
try {
if (System.currentTimeMillis() < shouldStartTime) {
BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
long diff = System.currentTimeMillis() - startupTime;
if (diff >= 0 && diff < messageStoreConfig.getDisappearTimeAfterStart()) {
BrokerController.LOG.info("Register to namesrv after {}", startupTime + messageStoreConfig.getDisappearTimeAfterStart());
return;
}
if (isIsolated) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public class TimerMessageStore {
private volatile BrokerRole lastBrokerRole = BrokerRole.SLAVE;
//the dequeue is an asynchronous process, use this flag to track if the status has changed
private boolean dequeueStatusChangeFlag = false;
private long shouldStartTime;
private long startupTime;

// True if current store is master or current brokerId is equal to the minimum brokerId of the replica group in slaveActingMaster mode.
protected volatile boolean shouldRunningDequeue;
Expand Down Expand Up @@ -464,7 +464,7 @@ public static boolean isMagicOK(int magic) {
}

public void start() {
this.shouldStartTime = storeConfig.getDisappearTimeAfterStart() + System.currentTimeMillis();
this.startupTime = System.currentTimeMillis();
maybeMoveWriteTime();
enqueueGetService.start();
enqueuePutService.start();
Expand Down Expand Up @@ -1439,8 +1439,9 @@ public void run() {
TimerMessageStore.LOGGER.info(this.getServiceName() + " service start");
while (!this.isStopped()) {
try {
if (System.currentTimeMillis() < shouldStartTime) {
TimerMessageStore.LOGGER.info("TimerDequeueGetService ready to run after {}.", shouldStartTime);
long diff = System.currentTimeMillis() - startupTime;
if (diff >= 0 && diff < storeConfig.getDisappearTimeAfterStart()) {
TimerMessageStore.LOGGER.info("TimerDequeueGetService ready to run after {}.", startupTime + storeConfig.getDisappearTimeAfterStart());
waitForRunning(1000);
continue;
}
Expand Down
Loading