Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce incubating WebSocketNetworkTransport #5678

Merged
merged 5 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion libraries/apollo-api/api/apollo-api.api
Original file line number Diff line number Diff line change
Expand Up @@ -1290,9 +1290,13 @@ public final class com/apollographql/apollo3/exception/RouterError : com/apollog
public final fun getErrors ()Ljava/util/List;
}

public final class com/apollographql/apollo3/exception/SubscriptionConnectionException : com/apollographql/apollo3/exception/ApolloException {
public fun <init> (Ljava/lang/Object;)V
public final fun getPayload ()Ljava/lang/Object;
}

public final class com/apollographql/apollo3/exception/SubscriptionOperationException : com/apollographql/apollo3/exception/ApolloException {
public fun <init> (Ljava/lang/String;Ljava/lang/Object;)V
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/Object;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun getPayload ()Ljava/lang/Object;
}

Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,21 @@ class ApolloNetworkException(
/**
* The server could not process a subscription and sent an error.
*
* This typically happens if there is a validation error. This is a terminal event.
* This typically happens if there is a validation error.
*
* @param operationName the name of the subscription that triggered the error.
* @param payload the payload returned by the server.
*/
class SubscriptionOperationException(
operationName: String,
val payload: Any? = null,
val payload: Any?,
) : ApolloException(message = "Operation error $operationName")

class SubscriptionConnectionException(
val payload: Any?,
) : ApolloException(message = "Websocket error")


/**
* The router sent one or several errors.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.apollographql.apollo3.network.ws.incubating

import com.apollographql.apollo3.api.ApolloRequest
import com.apollographql.apollo3.api.NullableAnyAdapter
import com.apollographql.apollo3.api.Operation
import com.apollographql.apollo3.api.http.DefaultHttpRequestComposer
import com.apollographql.apollo3.api.http.DefaultHttpRequestComposer.Companion.appendQueryParameters
import com.apollographql.apollo3.api.json.buildJsonByteString
import com.apollographql.apollo3.api.json.jsonReader
import com.apollographql.apollo3.api.json.readAny
import com.apollographql.apollo3.api.json.writeAny
import com.apollographql.apollo3.api.toJsonString
import okio.Buffer

class AppSyncWsProtocol(
val authorization: suspend () -> Any? = { null },
) : WsProtocol {
override val name: String
get() = "graphql-ws"

override suspend fun connectionInit(): ClientMessage {
return mapOf("type" to "connection_init").toClientMessage()
}

override suspend fun <D : Operation.Data> operationStart(request: ApolloRequest<D>): ClientMessage {
// AppSync encodes the data as a String
val data = NullableAnyAdapter.toJsonString(DefaultHttpRequestComposer.composePayload(request))


return mapOf(
"type" to "start",
"id" to request.requestUuid.toString(),
"payload" to mapOf(
"data" to data,
"extensions" to mapOf(
"authorization" to authorization()
)
)
).toClientMessage()
}

override fun <D : Operation.Data> operationStop(request: ApolloRequest<D>): ClientMessage {
return mapOf(
"type" to "stop",
"id" to request.requestUuid.toString(),
).toClientMessage()
}

override fun ping(): ClientMessage? {
return mapOf("type" to "ping").toClientMessage()
}

override fun pong(): ClientMessage? {
return mapOf("type" to "pong").toClientMessage()
}

override fun parseServerMessage(text: String): ServerMessage {
val map = try {
@Suppress("UNCHECKED_CAST")
Buffer().writeUtf8(text).jsonReader().readAny() as Map<String, Any?>
} catch (e: Exception) {
return ParseErrorServerMessage("Cannot parse server message: '$this'")
}

val type = map["type"] as? String
if (type == null) {
return ParseErrorServerMessage("No 'type' found in server message: '$this'")
}

return when (type) {
"connection_ack" -> ConnectionAckServerMessage
"connection_error" -> ConnectionErrorServerMessage(map["payload"])
"ka" -> PingServerMessage
"data", "complete" -> {
val id = map["id"] as? String
when {
id == null -> ParseErrorServerMessage("No 'id' found in message: '$text'")
type == "data" -> ResponseServerMessage(id, map["payload"], false)
type == "complete" -> CompleteServerMessage(id)
else -> error("") // make the compiler happy
}
}
"error" -> {
val id = map["id"] as? String
if (id != null) {
OperationErrorServerMessage(id, map["payload"], true)
} else {
ParseErrorServerMessage("General error: $text")
}
}

else -> ParseErrorServerMessage("Unknown type: '$type' found in server message: '$text'")
}
}

companion object {
/**
* Helper method that builds the final URL. It will append the authorization and payload arguments as query parameters.
* This method can be used for both the HTTP URL as well as the WebSocket URL
*
* Example:
* ```
* buildUrl(
* baseUrl = "https://example1234567890000.appsync-realtime-api.us-east-1.amazonaws.com/graphql",
* // This example uses an API key. See the AppSync documentation for information on what to pass
* authorization = mapOf(
* "host" to "example1234567890000.appsync-api.us-east-1.amazonaws.com",
* "x-api-key" to "da2-12345678901234567890123456"
* )
* )
* ```
*
* @param baseUrl The base web socket URL.
* @param authorization The authorization as per the AppSync documentation.
* @param payload An optional payload. Defaults to an empty object.
*/
fun buildUrl(
baseUrl: String,
authorization: Map<String, Any?>,
payload: Map<String, Any?> = emptyMap(),
): String =
baseUrl
.appendQueryParameters(mapOf(
"header" to authorization.base64Encode(),
"payload" to payload.base64Encode(),
))

private fun Map<String, Any?>.base64Encode(): String {
return buildJsonByteString {
writeAny(this@base64Encode)
}.base64()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.apollographql.apollo3.network.ws.incubating

import com.apollographql.apollo3.api.json.buildJsonString
import com.apollographql.apollo3.api.json.writeAny

sealed interface ClientMessage
class TextClientMessage(val text: String): ClientMessage
class DataClientMessage(val data: ByteArray): ClientMessage

internal fun Any?.toClientMessage(): ClientMessage {
return buildJsonString {
writeAny(this@toClientMessage)
}.let { TextClientMessage(it) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.apollographql.apollo3.network.ws.incubating

import com.apollographql.apollo3.api.ApolloRequest
import com.apollographql.apollo3.api.Operation
import com.apollographql.apollo3.api.http.DefaultHttpRequestComposer
import com.apollographql.apollo3.api.json.jsonReader
import com.apollographql.apollo3.api.json.readAny
import okio.Buffer

class GraphQLWsProtocol(
val connectionParams: suspend () -> Any?,
) : WsProtocol {
override val name: String
get() = "graphql-transport-ws"

override suspend fun connectionInit(): ClientMessage {
val map = mutableMapOf<String, Any?>()
map.put("type", "connection_init")
val params = connectionParams()
if (params != null) {
map.put("payload", params)
}

return map.toClientMessage()
}

override suspend fun <D : Operation.Data> operationStart(request: ApolloRequest<D>): ClientMessage {
return mapOf(
"type" to "subscribe",
"id" to request.requestUuid.toString(),
"payload" to DefaultHttpRequestComposer.composePayload(request)
).toClientMessage()
}

override fun <D : Operation.Data> operationStop(request: ApolloRequest<D>): ClientMessage {
return mapOf(
"type" to "complete",
"id" to request.requestUuid.toString(),
).toClientMessage()
}

override fun ping(): ClientMessage? {
return mapOf("type" to "ping").toClientMessage()
}

override fun pong(): ClientMessage? {
return mapOf("type" to "pong").toClientMessage()
}

override fun parseServerMessage(text: String): ServerMessage {
val map = try {
@Suppress("UNCHECKED_CAST")
Buffer().writeUtf8(text).jsonReader().readAny() as Map<String, Any?>
} catch (e: Exception) {
return ParseErrorServerMessage("Cannot parse server message: '$text'")
}

val type = map["type"] as? String
if (type == null) {
return ParseErrorServerMessage("No 'type' found in server message: '$text'")
}

return when (type) {
"connection_ack" -> ConnectionAckServerMessage
"ping" -> PingServerMessage
"pong" -> PongServerMessage
"next", "complete", "error" -> {
val id = map["id"] as? String
when {
id == null -> ParseErrorServerMessage("No 'id' found in message: '$text'")
type == "next" -> ResponseServerMessage(id, map["payload"], false)
type == "complete" -> CompleteServerMessage(id)
type == "error" -> ResponseServerMessage(id, mapOf("errors" to map["payload"]), true)
else -> error("") // make the compiler happy
}
}

else -> ParseErrorServerMessage("Unknown type: '$type' found in server message: '$text'")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.apollographql.apollo3.network.ws.incubating

import com.apollographql.apollo3.api.json.ApolloJsonElement
import com.apollographql.apollo3.exception.ApolloException

internal interface OperationListener {
/**
* A response was received
*
* [response] is the Kotlin representation of a GraphQL response.
*
* ```kotlin
* mapOf(
* "data" to ...
* "errors" to listOf(...)
* )
* ```
*/
fun onResponse(response: ApolloJsonElement)

/**
* The operation terminated successfully. No future calls to this listener are made.
*/
fun onComplete()

/**
* The server sent an error. That error may be terminal in which case, no future calls to this listener are made.
*/
fun onError(payload: ApolloJsonElement, terminal: Boolean)

/**
* The transport failed. No future calls to this listener are made.
*/
fun onTransportError(cause: ApolloException)
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.apollographql.apollo3.network.ws.incubating

import com.apollographql.apollo3.api.json.ApolloJsonElement

sealed interface ServerMessage
object ConnectionAckServerMessage : ServerMessage
object ConnectionKeepAliveServerMessage : ServerMessage
object PingServerMessage : ServerMessage
object PongServerMessage : ServerMessage
class ConnectionErrorServerMessage(val payload: Any?) : ServerMessage

/**
* A GraphQL response was received
*
* @param response, a GraphQL response, possibly containing errors.
* @param complete, whether this is a terminal message for the given operation.
*/
class ResponseServerMessage(val id: String, val response: Any?, val complete: Boolean) : ServerMessage

/**
* The subscription completed normally
* This is a terminal message for the given operation.
*/
class CompleteServerMessage(val id: String) : ServerMessage

/**
* There was an error with the operation that cannot be represented by a GraphQL response
*
* @param payload additional information regarding the error. It may represent a GraphQL error
* but it doesn't have to
*/
class OperationErrorServerMessage(val id: String, val payload: ApolloJsonElement, val terminal: Boolean) : ServerMessage

/**
* Special Server message that indicates a malformed message
*/
class ParseErrorServerMessage(val errorMessage: String) : ServerMessage