Skip to content

Conversation

@mattisonchao
Copy link
Member

@mattisonchao mattisonchao commented Aug 2, 2022

Motivation

Relate issue: #16907

List<Entry> entries = Lists.newArrayListWithExpectedSize(positions.size());
if (positions.isEmpty()) {
callback.readEntriesComplete(entries, ctx);
return Collections.emptySet();
}
// filters out messages which are already acknowledged
Set<Position> alreadyAcknowledgedPositions = Sets.newHashSet();
lock.readLock().lock();
try {
positions.stream()
.filter(position -> individualDeletedMessages.contains(((PositionImpl) position).getLedgerId(),
((PositionImpl) position).getEntryId())
|| ((PositionImpl) position).compareTo(markDeletePosition) <= 0)
.forEach(alreadyAcknowledgedPositions::add);
} finally {
lock.readLock().unlock();
}
final int totalValidPositions = positions.size() - alreadyAcknowledgedPositions.size();
final AtomicReference<ManagedLedgerException> exception = new AtomicReference<>();
ReadEntryCallback cb = new ReadEntryCallback() {
int pendingCallbacks = totalValidPositions;
@Override
public synchronized void readEntryComplete(Entry entry, Object ctx) {
if (exception.get() != null) {
// if there is already a failure for a different position, we should release the entry straight away
// and not add it to the list
entry.release();
if (--pendingCallbacks == 0) {
callback.readEntriesFailed(exception.get(), ctx);
}
} else {
entries.add(entry);
if (--pendingCallbacks == 0) {
if (sortEntries) {
entries.sort(ENTRY_COMPARATOR);
}
callback.readEntriesComplete(entries, ctx);
}
}
}
@Override
public synchronized void readEntryFailed(ManagedLedgerException mle, Object ctx) {
log.warn("[{}][{}] Error while replaying entries", ledger.getName(), name, mle);
if (exception.compareAndSet(null, mle)) {
// release the entries just once, any further read success will release the entry straight away
entries.forEach(Entry::release);
}
if (--pendingCallbacks == 0) {
callback.readEntriesFailed(exception.get(), ctx);
}
}
};
positions.stream().filter(position -> !alreadyAcknowledgedPositions.contains(position))
.forEach(p ->{
if (((PositionImpl) p).compareTo(this.readPosition) == 0) {
this.setReadPosition(this.readPosition.getNext());
log.warn("[{}][{}] replayPosition{} equals readPosition{}," + " need set next readPositio",
ledger.getName(), name, p, this.readPosition);
}
ledger.asyncReadEntry((PositionImpl) p, cb, ctx);
});
return alreadyAcknowledgedPositions;

We have to avoid using ArrayList in the concurrent environment.

Modifications

  • Use synchronizedList to instead of ArrayList

Verifying this change

  • Make sure that the change passes the CI checks.

Documentation

  • doc-not-needed
    (Please explain why)

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Aug 2, 2022
@mattisonchao mattisonchao marked this pull request as ready for review August 2, 2022 02:13
public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions,
ReadEntriesCallback callback, Object ctx, boolean sortEntries) {
List<Entry> entries = Lists.newArrayListWithExpectedSize(positions.size());
List<Entry> entries = Collections.synchronizedList(new ArrayList<>(positions.size()));
Copy link
Member

@shibd shibd Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why local variables need care about concurrency?

Copy link
Member Author

@mattisonchao mattisonchao Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will invoke the entries.add method at callback in the different threads. ( If we need to read entries from the different ledger.)

@mattisonchao mattisonchao changed the title Avoid using ArrayList in the concurrent environment. [fix][broker] Avoid using ArrayList in the concurrent environment. Aug 2, 2022
@mattisonchao mattisonchao self-assigned this Aug 2, 2022
Copy link
Member

@tisonkun tisonkun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can this patch fix the NPE issue reported originally?

BTW, you don't have to sync for forEach because compareAndSet will return true only once.

@tisonkun
Copy link
Member

tisonkun commented Aug 2, 2022

I think the issue here can be that the caller calls readEntryComplete with null parameter.

IIRC bookkeeper handles callbacks one by one so it may not in a concurrent context.

@mattisonchao
Copy link
Member Author

Sorry, I missed the synchronized in the callback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants