Skip to content

Commit

Permalink
Update simple subscription test to use single client (ExpediaGroup#1012)
Browse files Browse the repository at this point in the history
Co-authored-by: Shane Myrick <accounts@shanemyrick.com>
  • Loading branch information
smyrick and Shane Myrick committed Jan 14, 2021
1 parent ac53a4d commit 2b64aed
Showing 1 changed file with 20 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import kotlin.random.Random
class SimpleSubscriptionIT(@LocalServerPort private var port: Int) {

private val objectMapper = jacksonObjectMapper()
private val client = ReactorNettyWebSocketClient()
private val uri: URI = URI.create("ws://localhost:$port$SUBSCRIPTION_ENDPOINT")

@Test
fun `verify singleValueSubscription query`() {
Expand Down Expand Up @@ -129,9 +131,6 @@ class SimpleSubscriptionIT(@LocalServerPort private var port: Int) {
private fun subscribe(query: String, initPayload: Any? = null): TestPublisher<String> {
val output = TestPublisher.create<String>()

val client = ReactorNettyWebSocketClient()
val uri = URI.create("ws://localhost:$port$SUBSCRIPTION_ENDPOINT")

client.execute(uri) { session -> executeSubscription(session, initPayload, query, output) }.subscribe()

return output
Expand All @@ -148,28 +147,26 @@ class SimpleSubscriptionIT(@LocalServerPort private var port: Int) {
val startMessage = getStartMessage(query, id)

return session.send(Flux.just(session.textMessage(initMessage)))
.then(
session.send(Flux.just(session.textMessage(startMessage)))
.thenMany(
session.receive()
.map { objectMapper.readValue<SubscriptionOperationMessage>(it.payloadAsText) }
.doOnNext {
if (it.type == ServerMessages.GQL_DATA.type) {
val data = objectMapper.writeValueAsString(it.payload)
output.next(data)
} else if (it.type == ServerMessages.GQL_COMPLETE.type) {
output.complete()
}
}
)
.doOnError {
output.error(it)
}
.doOnComplete {
output.complete()
.then(session.send(Flux.just(session.textMessage(startMessage))))
.thenMany(
session.receive()
.map { objectMapper.readValue<SubscriptionOperationMessage>(it.payloadAsText) }
.doOnNext {
if (it.type == ServerMessages.GQL_DATA.type) {
val data = objectMapper.writeValueAsString(it.payload)
output.next(data)
} else if (it.type == ServerMessages.GQL_COMPLETE.type) {
output.complete()
}
}
.then()
)
.doOnError {
output.error(it)
}
.doOnComplete {
output.complete()
}
.then()
}

private fun SubscriptionOperationMessage.toJson() = objectMapper.writeValueAsString(this)
Expand Down

0 comments on commit 2b64aed

Please sign in to comment.