Skip to content

Commit

Permalink
Introduce incubating WebSocketNetworkTransport (#5678)
Browse files Browse the repository at this point in the history
* introduce WebSocketNetworkTransport

* remove unused symbol

* make compile

* make compile

* guard against too short timeouts
  • Loading branch information
martinbonnin committed Mar 5, 2024
1 parent 84737e9 commit 3221a8b
Show file tree
Hide file tree
Showing 15 changed files with 1,053 additions and 9 deletions.
6 changes: 5 additions & 1 deletion libraries/apollo-api/api/apollo-api.api
Original file line number Diff line number Diff line change
Expand Up @@ -1290,9 +1290,13 @@ public final class com/apollographql/apollo3/exception/RouterError : com/apollog
public final fun getErrors ()Ljava/util/List;
}

public final class com/apollographql/apollo3/exception/SubscriptionConnectionException : com/apollographql/apollo3/exception/ApolloException {
public fun <init> (Ljava/lang/Object;)V
public final fun getPayload ()Ljava/lang/Object;
}

public final class com/apollographql/apollo3/exception/SubscriptionOperationException : com/apollographql/apollo3/exception/ApolloException {
public fun <init> (Ljava/lang/String;Ljava/lang/Object;)V
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/Object;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun getPayload ()Ljava/lang/Object;
}

Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,21 @@ class ApolloNetworkException(
/**
* The server could not process a subscription and sent an error.
*
* This typically happens if there is a validation error. This is a terminal event.
* This typically happens if there is a validation error.
*
* @param operationName the name of the subscription that triggered the error.
* @param payload the payload returned by the server.
*/
class SubscriptionOperationException(
operationName: String,
val payload: Any? = null,
val payload: Any?,
) : ApolloException(message = "Operation error $operationName")

class SubscriptionConnectionException(
val payload: Any?,
) : ApolloException(message = "Websocket error")


/**
* The router sent one or several errors.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.apollographql.apollo3.network.ws.incubating

import com.apollographql.apollo3.api.ApolloRequest
import com.apollographql.apollo3.api.NullableAnyAdapter
import com.apollographql.apollo3.api.Operation
import com.apollographql.apollo3.api.http.DefaultHttpRequestComposer
import com.apollographql.apollo3.api.http.DefaultHttpRequestComposer.Companion.appendQueryParameters
import com.apollographql.apollo3.api.json.buildJsonByteString
import com.apollographql.apollo3.api.json.jsonReader
import com.apollographql.apollo3.api.json.readAny
import com.apollographql.apollo3.api.json.writeAny
import com.apollographql.apollo3.api.toJsonString
import okio.Buffer

class AppSyncWsProtocol(
val authorization: suspend () -> Any? = { null },
) : WsProtocol {
override val name: String
get() = "graphql-ws"

override suspend fun connectionInit(): ClientMessage {
return mapOf("type" to "connection_init").toClientMessage()
}

override suspend fun <D : Operation.Data> operationStart(request: ApolloRequest<D>): ClientMessage {
// AppSync encodes the data as a String
val data = NullableAnyAdapter.toJsonString(DefaultHttpRequestComposer.composePayload(request))


return mapOf(
"type" to "start",
"id" to request.requestUuid.toString(),
"payload" to mapOf(
"data" to data,
"extensions" to mapOf(
"authorization" to authorization()
)
)
).toClientMessage()
}

override fun <D : Operation.Data> operationStop(request: ApolloRequest<D>): ClientMessage {
return mapOf(
"type" to "stop",
"id" to request.requestUuid.toString(),
).toClientMessage()
}

override fun ping(): ClientMessage? {
return mapOf("type" to "ping").toClientMessage()
}

override fun pong(): ClientMessage? {
return mapOf("type" to "pong").toClientMessage()
}

override fun parseServerMessage(text: String): ServerMessage {
val map = try {
@Suppress("UNCHECKED_CAST")
Buffer().writeUtf8(text).jsonReader().readAny() as Map<String, Any?>
} catch (e: Exception) {
return ParseErrorServerMessage("Cannot parse server message: '$this'")
}

val type = map["type"] as? String
if (type == null) {
return ParseErrorServerMessage("No 'type' found in server message: '$this'")
}

return when (type) {
"connection_ack" -> ConnectionAckServerMessage
"connection_error" -> ConnectionErrorServerMessage(map["payload"])
"ka" -> PingServerMessage
"data", "complete" -> {
val id = map["id"] as? String
when {
id == null -> ParseErrorServerMessage("No 'id' found in message: '$text'")
type == "data" -> ResponseServerMessage(id, map["payload"], false)
type == "complete" -> CompleteServerMessage(id)
else -> error("") // make the compiler happy
}
}
"error" -> {
val id = map["id"] as? String
if (id != null) {
OperationErrorServerMessage(id, map["payload"], true)
} else {
ParseErrorServerMessage("General error: $text")
}
}

else -> ParseErrorServerMessage("Unknown type: '$type' found in server message: '$text'")
}
}

companion object {
/**
* Helper method that builds the final URL. It will append the authorization and payload arguments as query parameters.
* This method can be used for both the HTTP URL as well as the WebSocket URL
*
* Example:
* ```
* buildUrl(
* baseUrl = "https://example1234567890000.appsync-realtime-api.us-east-1.amazonaws.com/graphql",
* // This example uses an API key. See the AppSync documentation for information on what to pass
* authorization = mapOf(
* "host" to "example1234567890000.appsync-api.us-east-1.amazonaws.com",
* "x-api-key" to "da2-12345678901234567890123456"
* )
* )
* ```
*
* @param baseUrl The base web socket URL.
* @param authorization The authorization as per the AppSync documentation.
* @param payload An optional payload. Defaults to an empty object.
*/
fun buildUrl(
baseUrl: String,
authorization: Map<String, Any?>,
payload: Map<String, Any?> = emptyMap(),
): String =
baseUrl
.appendQueryParameters(mapOf(
"header" to authorization.base64Encode(),
"payload" to payload.base64Encode(),
))

private fun Map<String, Any?>.base64Encode(): String {
return buildJsonByteString {
writeAny(this@base64Encode)
}.base64()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.apollographql.apollo3.network.ws.incubating

import com.apollographql.apollo3.api.json.buildJsonString
import com.apollographql.apollo3.api.json.writeAny

sealed interface ClientMessage
class TextClientMessage(val text: String): ClientMessage
class DataClientMessage(val data: ByteArray): ClientMessage

internal fun Any?.toClientMessage(): ClientMessage {
return buildJsonString {
writeAny(this@toClientMessage)
}.let { TextClientMessage(it) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.apollographql.apollo3.network.ws.incubating

import com.apollographql.apollo3.api.ApolloRequest
import com.apollographql.apollo3.api.Operation
import com.apollographql.apollo3.api.http.DefaultHttpRequestComposer
import com.apollographql.apollo3.api.json.jsonReader
import com.apollographql.apollo3.api.json.readAny
import okio.Buffer

class GraphQLWsProtocol(
val connectionParams: suspend () -> Any?,
) : WsProtocol {
override val name: String
get() = "graphql-transport-ws"

override suspend fun connectionInit(): ClientMessage {
val map = mutableMapOf<String, Any?>()
map.put("type", "connection_init")
val params = connectionParams()
if (params != null) {
map.put("payload", params)
}

return map.toClientMessage()
}

override suspend fun <D : Operation.Data> operationStart(request: ApolloRequest<D>): ClientMessage {
return mapOf(
"type" to "subscribe",
"id" to request.requestUuid.toString(),
"payload" to DefaultHttpRequestComposer.composePayload(request)
).toClientMessage()
}

override fun <D : Operation.Data> operationStop(request: ApolloRequest<D>): ClientMessage {
return mapOf(
"type" to "complete",
"id" to request.requestUuid.toString(),
).toClientMessage()
}

override fun ping(): ClientMessage? {
return mapOf("type" to "ping").toClientMessage()
}

override fun pong(): ClientMessage? {
return mapOf("type" to "pong").toClientMessage()
}

override fun parseServerMessage(text: String): ServerMessage {
val map = try {
@Suppress("UNCHECKED_CAST")
Buffer().writeUtf8(text).jsonReader().readAny() as Map<String, Any?>
} catch (e: Exception) {
return ParseErrorServerMessage("Cannot parse server message: '$text'")
}

val type = map["type"] as? String
if (type == null) {
return ParseErrorServerMessage("No 'type' found in server message: '$text'")
}

return when (type) {
"connection_ack" -> ConnectionAckServerMessage
"ping" -> PingServerMessage
"pong" -> PongServerMessage
"next", "complete", "error" -> {
val id = map["id"] as? String
when {
id == null -> ParseErrorServerMessage("No 'id' found in message: '$text'")
type == "next" -> ResponseServerMessage(id, map["payload"], false)
type == "complete" -> CompleteServerMessage(id)
type == "error" -> ResponseServerMessage(id, mapOf("errors" to map["payload"]), true)
else -> error("") // make the compiler happy
}
}

else -> ParseErrorServerMessage("Unknown type: '$type' found in server message: '$text'")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.apollographql.apollo3.network.ws.incubating

import com.apollographql.apollo3.api.json.ApolloJsonElement
import com.apollographql.apollo3.exception.ApolloException

internal interface OperationListener {
/**
* A response was received
*
* [response] is the Kotlin representation of a GraphQL response.
*
* ```kotlin
* mapOf(
* "data" to ...
* "errors" to listOf(...)
* )
* ```
*/
fun onResponse(response: ApolloJsonElement)

/**
* The operation terminated successfully. No future calls to this listener are made.
*/
fun onComplete()

/**
* The server sent an error. That error may be terminal in which case, no future calls to this listener are made.
*/
fun onError(payload: ApolloJsonElement, terminal: Boolean)

/**
* The transport failed. No future calls to this listener are made.
*/
fun onTransportError(cause: ApolloException)
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.apollographql.apollo3.network.ws.incubating

import com.apollographql.apollo3.api.json.ApolloJsonElement

sealed interface ServerMessage
object ConnectionAckServerMessage : ServerMessage
object ConnectionKeepAliveServerMessage : ServerMessage
object PingServerMessage : ServerMessage
object PongServerMessage : ServerMessage
class ConnectionErrorServerMessage(val payload: Any?) : ServerMessage

/**
* A GraphQL response was received
*
* @param response, a GraphQL response, possibly containing errors.
* @param complete, whether this is a terminal message for the given operation.
*/
class ResponseServerMessage(val id: String, val response: Any?, val complete: Boolean) : ServerMessage

/**
* The subscription completed normally
* This is a terminal message for the given operation.
*/
class CompleteServerMessage(val id: String) : ServerMessage

/**
* There was an error with the operation that cannot be represented by a GraphQL response
*
* @param payload additional information regarding the error. It may represent a GraphQL error
* but it doesn't have to
*/
class OperationErrorServerMessage(val id: String, val payload: ApolloJsonElement, val terminal: Boolean) : ServerMessage

/**
* Special Server message that indicates a malformed message
*/
class ParseErrorServerMessage(val errorMessage: String) : ServerMessage

0 comments on commit 3221a8b

Please sign in to comment.