Skip to content
Permalink
Browse files

Update to wire3.0.0-rc03 (#1217)

  • Loading branch information...
oldergod committed Oct 7, 2019
1 parent 6b21377 commit 08b531b1bc9f1c3f551fe2849a10392284b87d15
@@ -87,9 +87,9 @@ ext.dep = [
"tracingJaeger": "com.uber.jaeger:jaeger-core:0.24.0",
"tracingZipkin": "io.opentracing.brave:brave-opentracing:0.29.0",
"vitess": "io.vitess:vitess-jdbc:3.0.0",
"wireRuntime": "com.squareup.wire:wire-runtime:3.0.0-rc01",
"wireGrpcClient": "com.squareup.wire:wire-grpc-client:3.0.0-rc01",
"wirePlugin": "com.squareup.wire:wire-gradle-plugin:3.0.0-rc01",
"wireRuntime": "com.squareup.wire:wire-runtime:3.0.0-rc03",
"wireGrpcClient": "com.squareup.wire:wire-grpc-client:3.0.0-rc03",
"wirePlugin": "com.squareup.wire:wire-gradle-plugin:3.0.0-rc03",
"zipkinBrave": "io.zipkin.brave:brave:4.17.2",
"zipkinReporter": "io.zipkin.reporter2:zipkin-sender-okhttp3:2.4.1",
"zookeeper": "org.apache.zookeeper:zookeeper:3.5.4-beta",
@@ -31,13 +31,15 @@ protobuf {

wire {
kotlin {
rpcRole = 'client'
}

// Generate service interfaces also.
kotlin {
includes = ['routeguide.RouteGuide']
exclusive = false
blockingServices = true
rpcRole = 'server'
rpcCallStyle = 'blocking'
singleMethodServices = true
}
}
@@ -47,7 +49,7 @@ sourceSets {
main.java.srcDirs += 'build/generated/source/proto/main/java'

// TODO(jwilson): we do this to make IntelliJ happy but the Wire Gradle plugin should do that.
main.java.srcDirs += 'build/generated/src/main/java'
main.java.srcDirs += 'build/generated/source/wire'
}

dependencies {
@@ -56,6 +58,7 @@ dependencies {
compile dep.grpcNetty
compile dep.grpcProtobuf
compile dep.grpcStub
compile dep.kotlinxCoroutines
compile dep.wireGrpcClient
compile dep.wireRuntime
compile project(':misk')
@@ -67,4 +70,4 @@ afterEvaluate { project ->
outputDirectory = "$rootDir/docs/0.x"
outputFormat = 'gfm'
}
}
}
@@ -11,7 +11,7 @@ import misk.security.ssl.SslLoader
import misk.security.ssl.TrustStoreConfig
import okhttp3.HttpUrl
import okhttp3.OkHttpClient
import routeguide.RouteGuide
import routeguide.RouteGuideClient
import javax.inject.Named
import javax.inject.Singleton

@@ -53,5 +53,6 @@ class MiskGrpcClientModule : KAbstractModule() {

@Provides
@Singleton
fun provideRouteGuide(grpcClient: GrpcClient): RouteGuide = grpcClient.create(RouteGuide::class)
}
fun provideRouteGuide(grpcClient: GrpcClient): RouteGuideClient =
grpcClient.create(RouteGuideClient::class)
}
@@ -4,12 +4,12 @@ import misk.web.actions.WebAction
import misk.web.interceptors.LogRequestResponse
import routeguide.Feature
import routeguide.Point
import routeguide.RouteGuideGetFeature
import routeguide.RouteGuideGetFeatureBlockingServer
import javax.inject.Inject

class GetFeatureGrpcAction @Inject constructor() : WebAction, RouteGuideGetFeature {
class GetFeatureGrpcAction @Inject constructor() : WebAction, RouteGuideGetFeatureBlockingServer {
@LogRequestResponse(sampling = 1.0, includeBody = true)
override fun GetFeature(request: Point): Feature {
return Feature(name = "maple tree", location = request)
}
}
}
@@ -5,13 +5,13 @@ import com.squareup.wire.MessageSource
import misk.grpc.consumeEachAndClose
import misk.web.actions.WebAction
import misk.web.interceptors.LogRequestResponse
import routeguide.RouteGuideRouteChat
import routeguide.RouteGuideRouteChatBlockingServer
import routeguide.RouteNote
import javax.inject.Inject
import javax.inject.Singleton

@Singleton
class RouteChatGrpcAction @Inject constructor() : WebAction, RouteGuideRouteChat {
class RouteChatGrpcAction @Inject constructor() : WebAction, RouteGuideRouteChatBlockingServer {
var welcomeMessage: String? = null

@LogRequestResponse(sampling = 1.0, includeBody = true)
@@ -10,9 +10,7 @@ import javax.inject.Named
/** A module that runs a Misk gRPC server: Wire protos and a Jetty backend. */
class RouteGuideMiskServiceModule : KAbstractModule() {
override fun configure() {
install(WebTestingModule(webConfig = WebTestingModule.TESTING_WEB_CONFIG.copy(
http2 = true
)))
install(WebTestingModule(webConfig = WebTestingModule.TESTING_WEB_CONFIG.copy(http2 = true)))
install(WebActionModule.create<GetFeatureGrpcAction>())
install(WebActionModule.create<RouteChatGrpcAction>())
}
@@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
syntax = "proto2";

package routeguide;

@@ -51,29 +51,29 @@ service RouteGuide {
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
optional int32 latitude = 1;
optional int32 longitude = 2;
}

// A latitude-longitude rectangle, represented as two diagonally opposite
// points "lo" and "hi".
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
optional Point lo = 1;

// The other corner of the rectangle.
Point hi = 2;
optional Point hi = 2;
}

// A feature names something at a given point.
//
// If a feature could not be named, the name is empty.
message Feature {
// The name of the feature.
string name = 1;
optional string name = 1;

// The point where the feature is detected.
Point location = 2;
optional Point location = 2;
}

// Not used in the RPC. Instead, this is here for the form serialized to disk.
@@ -84,10 +84,10 @@ message FeatureDatabase {
// A RouteNote is a message sent while at a given point.
message RouteNote {
// The location from which the message is sent.
Point location = 1;
optional Point location = 1;

// The message to be sent.
string message = 2;
optional string message = 2;
}

// A RouteSummary is received in response to a RecordRoute rpc.
@@ -97,14 +97,14 @@ message RouteNote {
// the distance between each point.
message RouteSummary {
// The number of points received.
int32 point_count = 1;
optional int32 point_count = 1;

// The number of known features passed while traversing the route.
int32 feature_count = 2;
optional int32 feature_count = 2;

// The distance covered in metres.
int32 distance = 3;
optional int32 distance = 3;

// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
optional int32 elapsed_time = 4;
}
@@ -15,7 +15,7 @@ import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import routeguide.Feature
import routeguide.Point
import routeguide.RouteGuide
import routeguide.RouteGuideClient
import routeguide.RouteNote
import javax.inject.Inject
import javax.inject.Provider
@@ -28,7 +28,7 @@ class MiskClientMiskServerTest {
RouteGuideMiskServiceModule(),
LogCollectorModule())

@Inject lateinit var routeGuideProvider: Provider<RouteGuide>
@Inject lateinit var routeGuideProvider: Provider<RouteGuideClient>
@Inject lateinit var logCollector: LogCollector
@Inject lateinit var routeChatGrpcAction: RouteChatGrpcAction

@@ -46,7 +46,7 @@ class MiskClientMiskServerTest {
runBlocking {
val routeGuide = routeGuideProvider.get()

val returnValue = routeGuide.GetFeature(point)
val returnValue = routeGuide.GetFeature().execute(point)
assertThat(returnValue).isEqualTo(feature)
}

@@ -62,7 +62,7 @@ class MiskClientMiskServerTest {
runBlocking {
val routeGuide = routeGuideProvider.get()

val (sendChannel, receiveChannel) = routeGuide.RouteChat()
val (sendChannel, receiveChannel) = routeGuide.RouteChat().execute()
sendChannel.send(RouteNote(message = "a"))
assertThat(receiveChannel.receive()).isEqualTo(RouteNote(message = "ACK: a"))
sendChannel.send(RouteNote(message = "b"))
@@ -84,7 +84,8 @@ class MiskClientMiskServerTest {
runBlocking {
val routeGuide = routeGuideProvider.get()

val (sendChannel, receiveChannel: ReceiveChannel<RouteNote>) = routeGuide.RouteChat()
val (sendChannel, receiveChannel: ReceiveChannel<RouteNote>) =
routeGuide.RouteChat().execute()
assertThat(receiveChannel.receive()).isEqualTo(RouteNote(message = "welcome"))
sendChannel.close()
}
@@ -11,7 +11,7 @@ import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import routeguide.Feature
import routeguide.Point
import routeguide.RouteGuide
import routeguide.RouteGuideClient
import routeguide.RouteNote
import javax.inject.Inject
import javax.inject.Provider
@@ -24,16 +24,14 @@ class MiskClientProtocServerTest {
RouteGuideProtocServiceModule(),
MiskTestingServiceModule())

@Inject lateinit var routeGuideProvider: Provider<RouteGuide>
@Inject lateinit var routeGuideProvider: Provider<RouteGuideClient>

@Test
fun requestResponse() {
runBlocking {
val routeGuide = routeGuideProvider.get()

val feature = routeGuide.GetFeature(Point(
latitude = 43,
longitude = -80))
val feature = routeGuide.GetFeature().execute(Point(latitude = 43, longitude = -80))
assertThat(feature).isEqualTo(Feature(
name = "pine tree",
location = Point(latitude = 43, longitude = -80)
@@ -46,7 +44,7 @@ class MiskClientProtocServerTest {
runBlocking {
val routeGuide = routeGuideProvider.get()

val (sendChannel, receiveChannel) = routeGuide.RouteChat()
val (sendChannel, receiveChannel) = routeGuide.RouteChat().execute()
sendChannel.send(RouteNote(message = "Taco cat"))
assertThat(receiveChannel.receive().message).isEqualTo("tac ocaT")
sendChannel.send(RouteNote(message = "A nut for a jar of tuna"))
@@ -1,25 +1,16 @@
package misk

import com.google.common.util.concurrent.Service
import com.google.common.util.concurrent.ServiceManager
import com.google.inject.Injector
import com.google.inject.Provides
import com.google.inject.Scopes
import misk.concurrent.SleeperModule
import misk.environment.RealEnvVarModule
import misk.healthchecks.HealthCheck
import misk.inject.KAbstractModule
import misk.inject.asSingleton
import misk.metrics.MetricsModule
import misk.moshi.MoshiModule
import misk.prometheus.PrometheusHistogramRegistryModule
import misk.resources.ResourceLoaderModule
import misk.time.ClockModule
import misk.time.TickerModule
import misk.tokens.TokenGeneratorModule
import mu.KotlinLogging
import javax.inject.Provider
import javax.inject.Singleton

/**
* Install this module in real environments.
@@ -4,13 +4,12 @@ import com.squareup.wire.MessageSink
import com.squareup.wire.ProtoAdapter
import okio.Buffer
import okio.BufferedSink
import java.io.Closeable

/**
* Writes a sequence of gRPC messages as an HTTP/2 stream.
*
* This is derived from Wire's GrpcMessageSink.kt.
* https://github.com/square/wire/blob/master/wire-grpc-client/src/main/java/com/squareup/wire/GrpcMessageSink.kt
* https://github.com/square/wire/search?q=GrpcMessageSink&type=Code
*
* @param sink the HTTP/2 stream body.
* @param messageAdapter a proto adapter for each message.
@@ -19,24 +18,35 @@ import java.io.Closeable
internal class GrpcMessageSink<T : Any> constructor(
private val sink: BufferedSink,
private val messageAdapter: ProtoAdapter<T>,
private val grpcEncoding: String = "identity"
) : MessageSink<T>, Closeable by sink {
private val grpcEncoding: String
) : MessageSink<T> {
private var closed = false
override fun write(message: T) {
val messageEncoding = grpcEncoding.toGrpcEncoder()
val encodingSink = messageEncoding.encode(sink)

val compressedFlag = if (grpcEncoding == "identity") 0 else 1
encodingSink.writeByte(compressedFlag)
check(!closed) { "closed" }

val encodedMessage = Buffer()
messageAdapter.encode(encodedMessage, message)
grpcEncoding.toGrpcEncoder().encode(encodedMessage).use { encodingSink ->
messageAdapter.encode(encodingSink, message)
}

val compressedFlag = if (grpcEncoding == "identity") 0 else 1
sink.writeByte(compressedFlag)
// TODO: fail if the message size is more than MAX_INT
encodingSink.writeInt(encodedMessage.size.toInt())
encodingSink.writeAll(encodedMessage)

sink.writeInt(encodedMessage.size.toInt())
sink.writeAll(encodedMessage)
sink.flush()
}

override fun cancel() {
check(!closed) { "closed" }
// TODO: Cancel the Jetty request.
}

override fun close() {
if (closed) return
closed = true
sink.close()
}

override fun toString() = "GrpcMessageSink"
}
}
@@ -12,7 +12,7 @@ import java.net.ProtocolException
* Reads an HTTP/2 stream as a sequence of gRPC messages.
*
* This is derived from Wire's GrpcMessageSource.kt.
* https://github.com/square/wire/blob/master/wire-grpc-client/src/main/java/com/squareup/wire/GrpcMessageSource.kt
* https://github.com/square/wire/search?q=GrpcMessageSource&type=Code
*
* @param source the HTTP/2 stream body.
* @param messageAdapter a proto adapter for each message.
@@ -43,10 +43,11 @@ internal class GrpcMessageSource<T : Any>(

val encodedLength = source.readInt().toLong() and 0xffffffffL

val encodedMessage = Buffer()
encodedMessage.write(source, encodedLength)
val encodedMessage = Buffer().write(source, encodedLength)

return messageAdapter.decode(messageDecoding.decode(encodedMessage).buffer())
return messageDecoding.decode(encodedMessage).buffer().use {
messageAdapter.decode(it)
}
}

override fun toString() = "GrpcMessageSource"

0 comments on commit 08b531b

Please sign in to comment.
You can’t perform that action at this time.