Skip to content
Permalink
Browse files

Upgrade to Wire 3.0.0-rc01

This rearranges several gRPC classes.
  • Loading branch information...
swankjesse committed Aug 6, 2019
1 parent 7c0d985 commit bfa27cf7379f4e2bd9e97f477c3cbd2b7354703f
@@ -84,9 +84,9 @@ ext.dep = [
"tracingJaeger": "com.uber.jaeger:jaeger-core:0.24.0",
"tracingZipkin": "io.opentracing.brave:brave-opentracing:0.29.0",
"vitess": "io.vitess:vitess-jdbc:3.0.0",
"wireGrpcClient": "com.squareup.wire:wire-grpc-client:3.0.0-alpha01",
"wirePlugin": "com.squareup.wire:wire-gradle-plugin:3.0.0-alpha01",
"wireRuntime": "com.squareup.wire:wire-runtime:3.0.0-alpha01",
"wireRuntime": "com.squareup.wire:wire-runtime:3.0.0-rc01",
"wireGrpcClient": "com.squareup.wire:wire-grpc-client:3.0.0-rc01",
"wirePlugin": "com.squareup.wire:wire-gradle-plugin:3.0.0-rc01",
"zipkinBrave": "io.zipkin.brave:brave:4.17.2",
"zipkinReporter": "io.zipkin.reporter2:zipkin-sender-okhttp3:2.4.1",
"zookeeper": "org.apache.zookeeper:zookeeper:3.5.4-beta",
@@ -32,6 +32,14 @@ protobuf {
wire {
kotlin {
}

// Generate service interfaces also.
kotlin {
includes = ['routeguide.RouteGuide']
exclusive = false
blockingServices = true
singleMethodServices = true
}
}

sourceSets {
@@ -49,6 +57,7 @@ dependencies {
compile dep.grpcProtobuf
compile dep.grpcStub
compile dep.wireGrpcClient
compile dep.wireRuntime
compile project(':misk')
compile project(':misk-testing')
}

This file was deleted.

@@ -1,16 +1,15 @@
package misk.grpc.miskserver

import misk.web.Grpc
import misk.web.actions.WebAction
import misk.web.interceptors.LogRequestResponse
import routeguide.Feature
import routeguide.Point
import routeguide.RouteGuideGetFeature
import javax.inject.Inject

class GetFeatureGrpcAction @Inject constructor() : WebAction {
@Grpc("/routeguide.RouteGuide/GetFeature")
class GetFeatureGrpcAction @Inject constructor() : WebAction, RouteGuideGetFeature {
@LogRequestResponse(sampling = 1.0, includeBody = true)
fun sayHello(point: Point): Feature {
return Feature(name = "maple tree", location = point)
override fun GetFeature(request: Point): Feature {
return Feature(name = "maple tree", location = request)
}
}
@@ -1,24 +1,23 @@
package misk.grpc.miskserver

import misk.grpc.GrpcReceiveChannel
import misk.grpc.GrpcSendChannel
import com.squareup.wire.MessageSink
import com.squareup.wire.MessageSource
import misk.grpc.consumeEachAndClose
import misk.web.Grpc
import misk.web.actions.WebAction
import misk.web.interceptors.LogRequestResponse
import routeguide.RouteGuideRouteChat
import routeguide.RouteNote
import javax.inject.Inject

class RouteChatGrpcAction @Inject constructor() : WebAction {
@Grpc("/routeguide.RouteGuide/RouteChat")
class RouteChatGrpcAction @Inject constructor() : WebAction, RouteGuideRouteChat {
@LogRequestResponse(sampling = 1.0, includeBody = true)
fun chat(
request: GrpcReceiveChannel<RouteNote>,
response: GrpcSendChannel<RouteNote>
override fun RouteChat(
request: MessageSource<RouteNote>,
response: MessageSink<RouteNote>
) {
response.use {
request.consumeEachAndClose { routeNote ->
response.send(RouteNote(message = "ACK: ${routeNote.message}"))
response.write(RouteNote(message = "ACK: ${routeNote.message}"))
}
}
}
@@ -69,7 +69,7 @@ class MiskClientMiskServerTest {

// Confirm interceptors were invoked.
assertThat(logCollector.takeMessage(RequestLoggingInterceptor::class)).isEqualTo(
"RouteChatGrpcAction principal=unknown request=[GrpcReceiveChannel, GrpcSendChannel]")
"RouteChatGrpcAction principal=unknown request=[GrpcMessageSource, GrpcMessageSink]")
assertThat(logCollector.takeMessage(RequestLoggingInterceptor::class)).isEqualTo(
"RouteChatGrpcAction principal=unknown time=0.000 ns response=kotlin.Unit")
}
@@ -1,17 +1,34 @@
package misk.grpc

import com.squareup.wire.MessageSink
import com.squareup.wire.MessageSource
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import java.lang.reflect.WildcardType
import kotlin.reflect.KType
import kotlin.reflect.jvm.javaType

/**
* Returns the channel element type, like `MyRequest` if this is `Channel<MyRequest>`. Returns null
* if this is not a channel.
* Returns the stream element type, like `MyRequest` if this is `MessageSource<MyRequest>`.
* Returns null if this is not a [MessageSource] or [MessageSink].
*/
internal fun KType.streamElementType(): Type? {
// Unbox the type parameter.
val parameterizedType = javaType as? ParameterizedType ?: return null
if (parameterizedType.rawType != GrpcReceiveChannel::class.java &&
parameterizedType.rawType != GrpcSendChannel::class.java) return null
return parameterizedType.actualTypeArguments[0]
if (parameterizedType.rawType != MessageSource::class.java &&
parameterizedType.rawType != MessageSink::class.java) return null
// Remove the wildcard, like 'out MessageSource' (Kotlin) or '? super MessageSource' (Java).
return when (val typeArgument = parameterizedType.actualTypeArguments[0]) {
is WildcardType -> typeArgument.lowerBounds[0]
else -> typeArgument //
}
}

fun <T : Any> MessageSource<T>.consumeEachAndClose(block: (T) -> Unit) {
use {
while (true) {
val message = read() ?: return
block(message)
}
}
}

This file was deleted.

@@ -0,0 +1,33 @@
package misk.grpc

import okio.BufferedSource
import okio.GzipSource
import okio.Source
import java.net.ProtocolException

/**
* This is derived from Wire's GrpcDecoder.kt.
* https://github.com/square/wire/blob/master/wire-grpc-client/src/main/java/com/squareup/wire/GrpcDecoder.kt
*/
internal sealed class GrpcDecoder(val name: String) {
/** Returns a stream that decodes `source`. */
abstract fun decode(source: BufferedSource): Source

internal object IdentityGrpcDecoder : GrpcDecoder("identity") {
override fun decode(source: BufferedSource) = source
}

internal object GzipGrpcDecoder : GrpcDecoder("gzip") {
override fun decode(source: BufferedSource) = GzipSource(source)
}
}

internal fun String.toGrpcDecoding(): GrpcDecoder {
return when (this) {
"identity" -> GrpcDecoder.IdentityGrpcDecoder
"gzip" -> GrpcDecoder.GzipGrpcDecoder
"deflate" -> throw ProtocolException("deflate not yet supported")
"snappy" -> throw ProtocolException("snappy not yet supported")
else -> throw ProtocolException("unsupported grpc-encoding: $this")
}
}
@@ -0,0 +1,33 @@
package misk.grpc

import okio.BufferedSink
import okio.buffer
import okio.gzip
import java.net.ProtocolException

/**
* This is derived from Wire's GrpcEncoder.kt.
* https://github.com/square/wire/blob/master/wire-grpc-client/src/main/java/com/squareup/wire/GrpcEncoder.kt
*/
internal sealed class GrpcEncoder(val name: String) {
/** Returns a stream that decodes `source`. */
abstract fun encode(sink: BufferedSink): BufferedSink

internal object IdentityGrpcEncoder : GrpcEncoder("identity") {
override fun encode(sink: BufferedSink) = sink
}

internal object GzipGrpcEncoder : GrpcEncoder("gzip") {
override fun encode(sink: BufferedSink) = sink.gzip().buffer()
}
}

internal fun String.toGrpcEncoder(): GrpcEncoder {
return when (this) {
"identity" -> GrpcEncoder.IdentityGrpcEncoder
"gzip" -> GrpcEncoder.GzipGrpcEncoder
"deflate" -> throw ProtocolException("deflate not yet supported")
"snappy" -> throw ProtocolException("snappy not yet supported")
else -> throw ProtocolException("unsupported grpc-encoding: $this")
}
}

This file was deleted.

@@ -0,0 +1,42 @@
package misk.grpc

import com.squareup.wire.MessageSink
import com.squareup.wire.ProtoAdapter
import okio.Buffer
import okio.BufferedSink
import java.io.Closeable

/**
* Writes a sequence of gRPC messages as an HTTP/2 stream.
*
* This is derived from Wire's GrpcMessageSink.kt.
* https://github.com/square/wire/blob/master/wire-grpc-client/src/main/java/com/squareup/wire/GrpcMessageSink.kt
*
* @param sink the HTTP/2 stream body.
* @param messageAdapter a proto adapter for each message.
* @param grpcEncoding the content coding for the stream body.
*/
internal class GrpcMessageSink<T : Any> constructor(
private val sink: BufferedSink,
private val messageAdapter: ProtoAdapter<T>,
private val grpcEncoding: String = "identity"
) : MessageSink<T>, Closeable by sink {
override fun write(message: T) {
val messageEncoding = grpcEncoding.toGrpcEncoder()
val encodingSink = messageEncoding.encode(sink)

val compressedFlag = if (grpcEncoding == "identity") 0 else 1
encodingSink.writeByte(compressedFlag)

val encodedMessage = Buffer()
messageAdapter.encode(encodedMessage, message)

// TODO: fail if the message size is more than MAX_INT
encodingSink.writeInt(encodedMessage.size.toInt())
encodingSink.writeAll(encodedMessage)

sink.flush()
}

override fun toString() = "GrpcMessageSink"
}
@@ -0,0 +1,53 @@
package misk.grpc

import com.squareup.wire.MessageSource
import com.squareup.wire.ProtoAdapter
import okio.Buffer
import okio.BufferedSource
import okio.buffer
import java.io.Closeable
import java.net.ProtocolException

/**
* Reads an HTTP/2 stream as a sequence of gRPC messages.
*
* This is derived from Wire's GrpcMessageSource.kt.
* https://github.com/square/wire/blob/master/wire-grpc-client/src/main/java/com/squareup/wire/GrpcMessageSource.kt
*
* @param source the HTTP/2 stream body.
* @param messageAdapter a proto adapter for each message.
* @param grpcEncoding the "grpc-encoding" header, or null if it is absent.
*/
internal class GrpcMessageSource<T : Any>(
private val source: BufferedSource,
private val messageAdapter: ProtoAdapter<T>,
private val grpcEncoding: String? = null
) : MessageSource<T>, Closeable by source {
override fun read(): T? {
if (source.exhausted()) return null

// Length-Prefixed-Message → Compressed-Flag Message-Length Message
// Compressed-Flag → 0 / 1 # encoded as 1 byte unsigned integer
// Message-Length → {length of Message} # encoded as 4 byte unsigned integer
// Message → *{binary octet}

val compressedFlag = source.readByte()
val messageDecoding: GrpcDecoder = when {
compressedFlag.toInt() == 0 -> GrpcDecoder.IdentityGrpcDecoder
compressedFlag.toInt() == 1 -> {
grpcEncoding?.toGrpcDecoding() ?: throw ProtocolException(
"message is encoded but message-encoding header was omitted")
}
else -> throw ProtocolException("unexpected compressed-flag: $compressedFlag")
}

val encodedLength = source.readInt().toLong() and 0xffffffffL

val encodedMessage = Buffer()
encodedMessage.write(source, encodedLength)

return messageAdapter.decode(messageDecoding.decode(encodedMessage).buffer())
}

override fun toString() = "GrpcMessageSource"
}

0 comments on commit bfa27cf

Please sign in to comment.
You can’t perform that action at this time.