Skip to content

Commit

Permalink
Fix unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
codelipenghui committed Dec 27, 2019
1 parent f659e8c commit ef23a4b
Showing 1 changed file with 31 additions and 26 deletions.
Expand Up @@ -1292,40 +1292,45 @@ public synchronized void updateLedgersIdsComplete(Stat stat) {
if (log.isDebugEnabled()) {
log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size());
}
// Process all the pending addEntry requests

// Avoid use same OpAddEntry between different ledger handle
int pendingSize = pendingAddEntries.size();
OpAddEntry op;
OpAddEntry existsOp;
do {
op = pendingAddEntries.poll();
if (op != null) {
existsOp = pendingAddEntries.poll();
if (existsOp != null) {
// If op is used by another ledger handle, we need to close it and create a new one
if (op.ledger != null) {
op.close();
op = OpAddEntry.create(op.ml, op.data, op.callback, op.ctx);
if (existsOp.ledger != null) {
existsOp.close();
existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, existsOp.callback, existsOp.ctx);
}
op.setLedger(currentLedger);
pendingAddEntries.add(op);
++currentLedgerEntries;
currentLedgerSize += op.data.readableBytes();
existsOp.setLedger(currentLedger);
pendingAddEntries.add(existsOp);
}
} while (existsOp != null && --pendingSize > 0);

if (log.isDebugEnabled()) {
log.debug("[{}] Sending {}", name, op);
}
// Process all the pending addEntry requests
for (OpAddEntry op : pendingAddEntries) {
++currentLedgerEntries;
currentLedgerSize += op.data.readableBytes();

if (currentLedgerIsFull()) {
STATE_UPDATER.set(this, State.ClosingLedger);
op.setCloseWhenDone(true);
op.initiate();
if (log.isDebugEnabled()) {
log.debug("[{}] Stop writing into ledger {} queue={}", name, currentLedger.getId(),
pendingAddEntries.size());
}
break;
} else {
op.initiate();
if (log.isDebugEnabled()) {
log.debug("[{}] Sending {}", name, op);
}

if (currentLedgerIsFull()) {
STATE_UPDATER.set(this, State.ClosingLedger);
op.setCloseWhenDone(true);
op.initiate();
if (log.isDebugEnabled()) {
log.debug("[{}] Stop writing into ledger {} queue={}", name, currentLedger.getId(),
pendingAddEntries.size());
}
break;
} else {
op.initiate();
}
} while (op != null && --pendingSize > 0);
}
}

// //////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit ef23a4b

Please sign in to comment.