Skip to content

Commit

Permalink
Clear registrations on disconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
Laurence Armstrong committed Nov 11, 2018
1 parent 59300f3 commit 9f6e8cc
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 8 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Expand Up @@ -22,7 +22,7 @@ plugins {

val kotlinVersion: String by extra
val kotlinTestVersion = "3.1.10"
val coroutinesVersion = "1.0.0"
val coroutinesVersion = "1.0.1"
val logbackVersion = "1.2.1"

subprojects {
Expand Down
Expand Up @@ -2,6 +2,7 @@ package com.laurencegarmstrong.kwamp.router.core

import com.laurencegarmstrong.kwamp.core.*
import com.laurencegarmstrong.kwamp.core.messages.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

Expand All @@ -13,10 +14,10 @@ class Dealer(
private val procedureLock = ReentrantLock()
private val procedures = HashMap<Uri, Long>()
private val procedureRegistrations = IdentifiableSet<ProcedureConfig>(linearIdGenerator)
private val sessionProcedures = ConcurrentHashMap<Long, MutableSet<Long>>()

private val invocations = IdentifiableSet<InvocationConfig>(randomIdGenerator)


fun registerProcedure(
session: WampSession,
registrationMessage: Register
Expand All @@ -34,18 +35,28 @@ class Dealer(
)
}
}
sessionProcedures.computeIfAbsent(session.id) { hashSetOf() }.add(procedureConfig.registrationId)
messageSender.sendRegistered(session.connection, registrationMessage.requestId, procedureConfig.registrationId)
}

fun unregisterProcedure(session: WampSession, unregisterMessage: Unregister) {
sessionProcedures[session.id]?.remove(unregisterMessage.registration)
try {
removeRegisteredProcedure(unregisterMessage.registration)
} catch (error: NoSuchRegistrationException) {
throw NoSuchRegistrationErrorException(unregisterMessage.requestId)
}
messageSender.sendUnregistered(session.connection, unregisterMessage.requestId)
}

private fun removeRegisteredProcedure(registrationId: Long) {
procedureLock.withLock {
//TODO clear associated invocations
val procedureConfig = procedureRegistrations.remove(unregisterMessage.registration)
?: throw NoSuchRegistrationErrorException(unregisterMessage.requestId)
val procedureConfig = procedureRegistrations.remove(registrationId)
?: throw NoSuchRegistrationException()

procedures.remove(procedureConfig.uri)!!
}
messageSender.sendUnregistered(session.connection, unregisterMessage.requestId)
}

fun callProcedure(callerSession: WampSession, callMessage: Call) {
Expand Down Expand Up @@ -97,8 +108,16 @@ class Dealer(
errorMessage.argumentsKw
)
}

fun cleanSessionResources(sessionId: Long) {
sessionProcedures.remove(sessionId)?.forEach { procedureId ->
removeRegisteredProcedure(procedureId)
}
}
}

class NoSuchRegistrationException : IllegalArgumentException()

data class ProcedureConfig(val uri: Uri, val procedureProvidingSession: WampSession, val registrationId: Long)

data class InvocationConfig(
Expand Down
Expand Up @@ -36,7 +36,9 @@ class Realm(
is ProtocolViolationException -> messageSender.sendAbort(connection, fatalException)
else -> fatalException?.run { printStackTrace() }
}
sessions.endSession(id)
sessions.endSession(id)?.also {
dealer.cleanSessionResources(id)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion kwamp-router-example/build.gradle.kts
Expand Up @@ -14,7 +14,7 @@ repositories {
maven(url = "https://dl.bintray.com/kotlin/ktor")
}

val ktorVersion = "0.9.5-rc13"
val ktorVersion = "1.0.0-beta-3"

dependencies {
implementation(project(":kwamp-router-core"))
Expand Down
Expand Up @@ -20,7 +20,7 @@ class TestConnection(
) {
val incoming = Channel<ByteArray>(channelCapacity)
val outgoing = Channel<ByteArray>(channelCapacity)
val connection = Connection(incoming, outgoing, { }, messageSerializer)
val connection = Connection(incoming, outgoing, { incoming.close() }, messageSerializer)

fun send(message: Message) {
runBlocking {
Expand Down
Expand Up @@ -3,6 +3,7 @@ package com.laurencegarmstrong.kwamp.router.core.conversations.scripts
import com.laurencegarmstrong.kwamp.conversations.core.TestConnection
import com.laurencegarmstrong.kwamp.conversations.core.defaultRouter
import com.laurencegarmstrong.kwamp.core.Uri
import com.laurencegarmstrong.kwamp.core.WampClose
import com.laurencegarmstrong.kwamp.core.messages.*
import com.laurencegarmstrong.kwamp.router.core.conversations.infrastructure.RouterConversation
import io.kotlintest.be
Expand Down Expand Up @@ -72,6 +73,51 @@ class Rpc : StringSpec({
}
}
}

"Remove procedure registrations on disconnect" {
val clientA = TestConnection()
val clientB = TestConnection()
RouterConversation(
defaultRouter(),
clientA,
clientB
) {
clientA.startsASession()
clientB.startsASession()

clientA willSend {
Register(
123L,
emptyMap(),
Uri("clientA.proc")
)
}
clientA shouldReceiveMessage { message: Registered ->
message.requestId should be(123L)
message.registration should be(1L)
}

clientA willSend {
Goodbye(emptyMap(), WampClose.GOODBYE_AND_OUT.uri)
}

clientA shouldReceiveMessage { message: Goodbye ->
message.reason should be(WampClose.GOODBYE_AND_OUT.uri)
}

clientB willSend {
Register(
123L,
emptyMap(),
Uri("clientA.proc")
)
}
clientB shouldReceiveMessage { message: Registered ->
message.requestId should be(123L)
message.registration should be(1L)
}
}
}
})


0 comments on commit 9f6e8cc

Please sign in to comment.