Skip to content

Commit

Permalink
Merge pull request #951 from Netflix/dgs-improvements
Browse files Browse the repository at this point in the history
Set the errors property in the payload when processing subscription events.
  • Loading branch information
srinivasankavitha committed Mar 31, 2022
2 parents 0823675 + bc797ae commit 5bfe8dd
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
Expand Up @@ -118,7 +118,7 @@ class DgsWebSocketHandler(private val dgsQueryExecutor: DgsQueryExecutor) : Text
}

override fun onNext(er: ExecutionResult) {
val message = OperationMessage(GQL_DATA, DataPayload(er.getData()), id)
val message = OperationMessage(GQL_DATA, DataPayload(er.getData(), er.errors), id)
val jsonMessage = TextMessage(objectMapper.writeValueAsBytes(message))
logger.debug("Sending subscription data: {}", jsonMessage)

Expand Down
Expand Up @@ -19,11 +19,26 @@ package com.netflix.graphql.dgs.subscriptions.websockets
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.netflix.graphql.dgs.DgsQueryExecutor
import com.netflix.graphql.types.subscription.*
import com.netflix.graphql.types.subscription.DataPayload
import com.netflix.graphql.types.subscription.GQL_CONNECTION_ACK
import com.netflix.graphql.types.subscription.GQL_CONNECTION_INIT
import com.netflix.graphql.types.subscription.GQL_CONNECTION_TERMINATE
import com.netflix.graphql.types.subscription.GQL_DATA
import com.netflix.graphql.types.subscription.GQL_ERROR
import com.netflix.graphql.types.subscription.GQL_START
import com.netflix.graphql.types.subscription.GQL_STOP
import com.netflix.graphql.types.subscription.OperationMessage
import graphql.ExceptionWhileDataFetching
import graphql.ExecutionResult
import io.mockk.*
import graphql.execution.ResultPath
import io.mockk.Runs
import io.mockk.every
import io.mockk.impl.annotations.MockK
import io.mockk.junit5.MockKExtension
import io.mockk.just
import io.mockk.mockkClass
import io.mockk.slot
import io.mockk.verify
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -169,6 +184,18 @@ class DgsWebsocketHandlerTest {
}
}

@Test
fun testWithErrorAfterData() {
connect(session1)
nextWithError(session1, 2)
disconnect(session1)

// ACK, ERROR
verify(exactly = 4) {
session1.sendMessage(any())
}
}

@Test
fun testWithStop() {
connect(session1)
Expand Down Expand Up @@ -228,6 +255,7 @@ class DgsWebsocketHandlerTest {
val results = (1..nrOfResults).map {
val result1 = mockkClass(ExecutionResult::class)
every { result1.getData<Any>() } returns it
every { result1.errors } returns emptyList()
result1
}

Expand All @@ -246,6 +274,7 @@ class DgsWebsocketHandlerTest {
val results = (1..nrOfResults).map {
val result1 = mockkClass(ExecutionResult::class)
every { result1.getData<Any>() } returns it
every { result1.errors } returns emptyList()
result1
}

Expand All @@ -263,6 +292,7 @@ class DgsWebsocketHandlerTest {
val results = (1..nrOfResults).map {
val result1 = mockkClass(ExecutionResult::class)
every { result1.getData<Any>() } returns it
every { result1.errors } returns emptyList()
result1
}

Expand All @@ -278,7 +308,37 @@ class DgsWebsocketHandlerTest {
every { executionResult.getData<Publisher<ExecutionResult>>() } returns Mono.error(RuntimeException("That's wrong!"))
every { dgsQueryExecutor.execute("{ hello }", emptyMap()) } returns executionResult

val slot = slot<TextMessage>()
every { webSocketSession.sendMessage(capture(slot)) } just Runs

dgsWebsocketHandler.handleTextMessage(webSocketSession, queryMessage)

val returnMessage = jacksonObjectMapper().readValue<OperationMessage>(slot.captured.asBytes())
assertThat(returnMessage.type).isEqualTo(GQL_ERROR)
assertThat((returnMessage.payload as DataPayload).errors?.size).isEqualTo(1)
assertThat((returnMessage.payload as DataPayload).errors?.get(0)).isEqualTo("That's wrong!")
}

private fun nextWithError(webSocketSession: WebSocketSession, nrOfResults: Int) {
val results = (1..nrOfResults).map {
val result1 = mockkClass(ExecutionResult::class)
every { result1.getData<Any>() } returns null
every { result1.errors } returns listOf(ExceptionWhileDataFetching(ResultPath.rootPath(), RuntimeException("Error in data fetcher"), null))
result1
}
every { webSocketSession.isOpen } returns true
every { executionResult.getData<Publisher<ExecutionResult>>() } returns Mono.just(results).flatMapMany { Flux.fromIterable(results) }
every { dgsQueryExecutor.execute("{ hello }", emptyMap()) } returns executionResult

val slotList = mutableListOf<TextMessage>()
every { webSocketSession.sendMessage(capture(slotList)) } just Runs

dgsWebsocketHandler.handleTextMessage(webSocketSession, queryMessage)

val returnMessage = jacksonObjectMapper().readValue<OperationMessage>(slotList[0].asBytes())
assertThat(returnMessage.type).isEqualTo(GQL_DATA)
assertThat((returnMessage.payload as DataPayload).errors?.size).isEqualTo(1)
assertThat(((returnMessage.payload as DataPayload).errors?.get(0) as Map<String, String>)["message"]).isEqualTo("Exception while fetching data () : Error in data fetcher")
}

private fun stop(webSocketSession: WebSocketSession) {
Expand Down

0 comments on commit 5bfe8dd

Please sign in to comment.