Skip to content

Conversation

@nerzhulart
Copy link
Contributor

#3

@nerzhulart nerzhulart requested a review from erokhins November 19, 2025 18:32
@nerzhulart nerzhulart force-pushed the n500/delay-incoming-messages branch from 8ecfc3d to 90f5a44 Compare November 19, 2025 18:35
pendingIncomingRequests.update { map -> map.put(requestId, job) }
}.invokeOnCompletion {
pendingIncomingRequests.update { it.remove(requestId) }
messageProcessingMutex.withLock {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a potential deadlock here.
Let's imagine that we have one established session, then, the following steps will block us completely:

  1. we are starting another one in newSession
  2. we have sent the session/new request
  3. before the agent answers to it, it sending us sessionUpdate for already established session
  4. after that agent send us the NewSessionResponse

As you can see, on step 1 we locked this mutex, and on step 3 we suspended inside the handleIncomingMessage, so we will never process the message from step 4

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok! I reworked it using your approach with a counter

Copy link
Contributor

@erokhins erokhins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment about deadlock :(

@nerzhulart nerzhulart force-pushed the n500/delay-incoming-messages branch from 90f5a44 to d3c47af Compare November 19, 2025 20:19
Copy link
Contributor

@erokhins erokhins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personal preference: I would remove the sessionDeferred part.

Still, approved even in the current way -- wasn't able to break it.
P.s. probably it deserves a test -- it could be implemented by detay in the factory.createClientOperations(sessionId, sessionResponse)


private suspend fun getSessionOrThrow(sessionId: SessionId): ClientSessionImpl = (_sessions.value[sessionId] ?: acpFail("Session $sessionId not found")).await()
private suspend fun getSessionOrThrow(sessionId: SessionId): ClientSessionImpl {
_sessions.value[sessionId]?.let {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is somewhat redundant, i.e. the whole completableDeferred could be removed and replaced with just a map to the session. In case of only one session initialization the behavious will be exactly the same, because the _currentlyInitializingSessionsCount will become 0 right after the sessionDeferred.complete(session).

And case of simultanious session initialization seems to be rare (i.e. we shouldn't optimize for it)

@nerzhulart nerzhulart merged commit 2a2c4d8 into master Nov 19, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants