-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix producer stuck issue due to NPE thrown when creating a new ledger #7401
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -404,7 +404,7 @@ public void operationFailed(MetaStoreException e) { | |
}; | ||
|
||
// Create a new ledger to start writing | ||
this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); | ||
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis(); | ||
mbean.startDataLedgerCreateOp(); | ||
|
||
asyncCreateLedger(bookKeeper, config, digestType, (rc, lh, ctx) -> { | ||
|
@@ -596,13 +596,19 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { | |
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Queue addEntry request", name); | ||
} | ||
if (State.CreatingLedger == state) { | ||
long elapsedMs = System.currentTimeMillis() - this.lastLedgerCreationInitiationTimestamp; | ||
if (elapsedMs > TimeUnit.SECONDS.toMillis(2 * config.getMetadataOperationsTimeoutSeconds())) { | ||
log.info("[{}] Ledger creation was initiated {} ms ago but it never completed" + | ||
" and creation timeout task didn't kick in as well. Force to fail the create ledger operation ..."); | ||
this.createComplete(Code.TimeoutException, null, null); | ||
} | ||
} | ||
} else if (state == State.ClosedLedger) { | ||
// No ledger and no pending operations. Create a new ledger | ||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Creating a new ledger", name); | ||
} | ||
log.info("[{}] Creating a new ledger", name); | ||
if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) { | ||
this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); | ||
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis(); | ||
mbean.startDataLedgerCreateOp(); | ||
asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap()); | ||
} | ||
|
@@ -1229,8 +1235,8 @@ public void operationComplete(Void v, Stat stat) { | |
metadataMutex.unlock(); | ||
updateLedgersIdsComplete(stat); | ||
synchronized (ManagedLedgerImpl.this) { | ||
mbean.addLedgerSwitchLatencySample(System.nanoTime() - lastLedgerCreationInitiationTimestamp, | ||
TimeUnit.NANOSECONDS); | ||
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() - lastLedgerCreationInitiationTimestamp, | ||
TimeUnit.MILLISECONDS); | ||
} | ||
} | ||
|
||
|
@@ -1380,11 +1386,9 @@ synchronized void ledgerClosed(final LedgerHandle lh) { | |
|
||
if (!pendingAddEntries.isEmpty()) { | ||
// Need to create a new ledger to write pending entries | ||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Creating a new ledger", name); | ||
} | ||
log.info("[{}] Creating a new ledger", name); | ||
STATE_UPDATER.set(this, State.CreatingLedger); | ||
this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); | ||
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis(); | ||
mbean.startDataLedgerCreateOp(); | ||
asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap()); | ||
} | ||
|
@@ -3170,15 +3174,28 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf | |
Map<String, byte[]> finalMetadata = new HashMap<>(); | ||
finalMetadata.putAll(ledgerMetadata); | ||
finalMetadata.putAll(metadata); | ||
if (log.isDebugEnabled()) { | ||
log.debug("creating ledger, metadata: "+finalMetadata); | ||
} | ||
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), | ||
log.info("[{}] Creating ledger, metadata: {} - metadata ops timeout : {} seconds", | ||
name, finalMetadata, config.getMetadataOperationsTimeoutSeconds()); | ||
try { | ||
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), | ||
digestType, config.getPassword(), cb, ledgerCreated, finalMetadata); | ||
} catch (Throwable cause) { | ||
log.error("[{}] Encountered unexpected error when creating ledger", | ||
name, cause); | ||
cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated); | ||
return; | ||
} | ||
scheduledExecutor.schedule(() -> { | ||
if (!ledgerCreated.get()) { | ||
cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated); | ||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Timeout creating ledger", name); | ||
} | ||
} else { | ||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Ledger already created when timeout task is triggered", name); | ||
} | ||
} | ||
cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this change intentional? Shouldn't the callback only be triggered There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is intentional - when the timeout task is triggered, always execute the callback. It is totally fine because we already have the logic to ensure the callback is triggered only once. This change ensures all the logic is executed in a central place instead of spreading across multiple places and can potentially make code maintenance much harder. |
||
}, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try-catch here is good in any case, though we should also ensure that BK client is handling DNS errors by triggering the callback instead of exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fix was already made in the bookkeeper client. We don't release the bookkeeper client yet.