Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 48 additions & 18 deletions acp/src/commonMain/kotlin/com/agentclientprotocol/client/Client.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import kotlinx.atomicfu.update
import kotlinx.collections.immutable.persistentMapOf
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.update
import kotlinx.serialization.json.JsonElement

private val logger = KotlinLogging.logger {}
Expand All @@ -37,6 +40,7 @@ public class Client(
private val _sessions = atomic(persistentMapOf<SessionId, CompletableDeferred<ClientSessionImpl>>())
private val _clientInfo = CompletableDeferred<ClientInfo>()
private val _agentInfo = CompletableDeferred<AgentInfo>()
private val _currentlyInitializingSessionsCount = MutableStateFlow(0)

init {
// Set up request handlers for incoming agent requests
Expand Down Expand Up @@ -158,15 +162,18 @@ public class Client(
* @return a [ClientSession] instance for the new session
*/
public suspend fun newSession(sessionParameters: SessionCreationParameters, operationsFactory: ClientOperationsFactory): ClientSession {
val newSessionResponse = AcpMethod.AgentMethods.SessionNew(protocol,
NewSessionRequest(
sessionParameters.cwd,
sessionParameters.mcpServers,
sessionParameters._meta
return withInitializingSession {
val newSessionResponse = AcpMethod.AgentMethods.SessionNew(
protocol,
NewSessionRequest(
sessionParameters.cwd,
sessionParameters.mcpServers,
sessionParameters._meta
)
)
)
val sessionId = newSessionResponse.sessionId
return createSession(sessionId, sessionParameters, newSessionResponse, operationsFactory)
val sessionId = newSessionResponse.sessionId
return@withInitializingSession createSession(sessionId, sessionParameters, newSessionResponse, operationsFactory)
}
}

/**
Expand All @@ -180,15 +187,18 @@ public class Client(
* @return a [ClientSession] instance for the new session
*/
public suspend fun loadSession(sessionId: SessionId, sessionParameters: SessionCreationParameters, operationsFactory: ClientOperationsFactory): ClientSession {
val loadSessionResponse = AcpMethod.AgentMethods.SessionLoad(protocol,
LoadSessionRequest(
sessionId,
sessionParameters.cwd,
sessionParameters.mcpServers,
sessionParameters._meta
))

return createSession(sessionId, sessionParameters, loadSessionResponse, operationsFactory)
return withInitializingSession {
val loadSessionResponse = AcpMethod.AgentMethods.SessionLoad(
protocol,
LoadSessionRequest(
sessionId,
sessionParameters.cwd,
sessionParameters.mcpServers,
sessionParameters._meta
)
)
return@withInitializingSession createSession(sessionId, sessionParameters, loadSessionResponse, operationsFactory)
}
}

private suspend fun createSession(sessionId: SessionId, sessionParameters: SessionCreationParameters, sessionResponse: AcpCreatedSessionResponse, factory: ClientOperationsFactory): ClientSession {
Expand All @@ -215,7 +225,27 @@ public class Client(
return completableDeferred.getCompleted()
}

private suspend fun getSessionOrThrow(sessionId: SessionId): ClientSessionImpl = (_sessions.value[sessionId] ?: acpFail("Session $sessionId not found")).await()
private suspend fun getSessionOrThrow(sessionId: SessionId): ClientSessionImpl {
_sessions.value[sessionId]?.let {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is somewhat redundant, i.e. the whole completableDeferred could be removed and replaced with just a map to the session. In case of only one session initialization the behavious will be exactly the same, because the _currentlyInitializingSessionsCount will become 0 right after the sessionDeferred.complete(session).

And case of simultanious session initialization seems to be rare (i.e. we shouldn't optimize for it)

return it.await()
}
// try to wait for all pending sessions to initialize
_currentlyInitializingSessionsCount.first { it == 0 }
// try to get the session again
_sessions.value[sessionId]?.let {
return it.await()
}
acpFail("Session $sessionId not found")
}

private suspend fun<T> withInitializingSession(block: suspend () -> T): T {
_currentlyInitializingSessionsCount.update { it + 1 }
try {
return block()
} finally {
_currentlyInitializingSessionsCount.update { it - 1 }
}
}
}

private inline fun <reified TInterface> sessionMethodNotFound(method: AcpMethod): Nothing {
Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {
private val buildNumber: String? = System.getenv("GITHUB_RUN_NUMBER")
private val isReleasePublication = System.getenv("RELEASE_PUBLICATION")?.toBoolean() ?: false

private val baseVersion = "0.7.1"
private val baseVersion = "0.7.2"

allprojects {
group = "com.agentclientprotocol"
Expand Down
Loading