Skip to content

Commit

Permalink
Update to wire3.0.0-rc03
Browse files Browse the repository at this point in the history
  • Loading branch information
oldergod committed Oct 6, 2019
1 parent 6b21377 commit e403828
Show file tree
Hide file tree
Showing 15 changed files with 66 additions and 63 deletions.
6 changes: 3 additions & 3 deletions dependencies.gradle
Expand Up @@ -87,9 +87,9 @@ ext.dep = [
"tracingJaeger": "com.uber.jaeger:jaeger-core:0.24.0",
"tracingZipkin": "io.opentracing.brave:brave-opentracing:0.29.0",
"vitess": "io.vitess:vitess-jdbc:3.0.0",
"wireRuntime": "com.squareup.wire:wire-runtime:3.0.0-rc01",
"wireGrpcClient": "com.squareup.wire:wire-grpc-client:3.0.0-rc01",
"wirePlugin": "com.squareup.wire:wire-gradle-plugin:3.0.0-rc01",
"wireRuntime": "com.squareup.wire:wire-runtime:3.0.0-rc03",
"wireGrpcClient": "com.squareup.wire:wire-grpc-client:3.0.0-rc03",
"wirePlugin": "com.squareup.wire:wire-gradle-plugin:3.0.0-rc03",
"zipkinBrave": "io.zipkin.brave:brave:4.17.2",
"zipkinReporter": "io.zipkin.reporter2:zipkin-sender-okhttp3:2.4.1",
"zookeeper": "org.apache.zookeeper:zookeeper:3.5.4-beta",
Expand Down
8 changes: 5 additions & 3 deletions misk-grpc-tests/build.gradle
Expand Up @@ -31,13 +31,15 @@ protobuf {

wire {
kotlin {
rpcRole = 'client'
}

// Generate service interfaces also.
kotlin {
includes = ['routeguide.RouteGuide']
exclusive = false
blockingServices = true
rpcRole = 'server'
rpcCallStyle = 'blocking'
singleMethodServices = true
}
}
Expand All @@ -47,7 +49,7 @@ sourceSets {
main.java.srcDirs += 'build/generated/source/proto/main/java'

// TODO(jwilson): we do this to make IntelliJ happy but the Wire Gradle plugin should do that.
main.java.srcDirs += 'build/generated/src/main/java'
main.java.srcDirs += 'build/generated/source/wire'
}

dependencies {
Expand All @@ -67,4 +69,4 @@ afterEvaluate { project ->
outputDirectory = "$rootDir/docs/0.x"
outputFormat = 'gfm'
}
}
}
Expand Up @@ -11,7 +11,7 @@ import misk.security.ssl.SslLoader
import misk.security.ssl.TrustStoreConfig
import okhttp3.HttpUrl
import okhttp3.OkHttpClient
import routeguide.RouteGuide
import routeguide.RouteGuideClient
import javax.inject.Named
import javax.inject.Singleton

Expand Down Expand Up @@ -53,5 +53,6 @@ class MiskGrpcClientModule : KAbstractModule() {

@Provides
@Singleton
fun provideRouteGuide(grpcClient: GrpcClient): RouteGuide = grpcClient.create(RouteGuide::class)
}
fun provideRouteGuide(grpcClient: GrpcClient): RouteGuideClient =
grpcClient.create(RouteGuideClient::class)
}
Expand Up @@ -4,12 +4,12 @@ import misk.web.actions.WebAction
import misk.web.interceptors.LogRequestResponse
import routeguide.Feature
import routeguide.Point
import routeguide.RouteGuideGetFeature
import routeguide.RouteGuideGetFeatureBlockingServer
import javax.inject.Inject

class GetFeatureGrpcAction @Inject constructor() : WebAction, RouteGuideGetFeature {
class GetFeatureGrpcAction @Inject constructor() : WebAction, RouteGuideGetFeatureBlockingServer {
@LogRequestResponse(sampling = 1.0, includeBody = true)
override fun GetFeature(request: Point): Feature {
return Feature(name = "maple tree", location = request)
}
}
}
Expand Up @@ -5,13 +5,13 @@ import com.squareup.wire.MessageSource
import misk.grpc.consumeEachAndClose
import misk.web.actions.WebAction
import misk.web.interceptors.LogRequestResponse
import routeguide.RouteGuideRouteChat
import routeguide.RouteGuideRouteChatBlockingServer
import routeguide.RouteNote
import javax.inject.Inject
import javax.inject.Singleton

@Singleton
class RouteChatGrpcAction @Inject constructor() : WebAction, RouteGuideRouteChat {
class RouteChatGrpcAction @Inject constructor() : WebAction, RouteGuideRouteChatBlockingServer {
var welcomeMessage: String? = null

@LogRequestResponse(sampling = 1.0, includeBody = true)
Expand Down
Expand Up @@ -10,9 +10,7 @@ import javax.inject.Named
/** A module that runs a Misk gRPC server: Wire protos and a Jetty backend. */
class RouteGuideMiskServiceModule : KAbstractModule() {
override fun configure() {
install(WebTestingModule(webConfig = WebTestingModule.TESTING_WEB_CONFIG.copy(
http2 = true
)))
install(WebTestingModule(webConfig = WebTestingModule.TESTING_WEB_CONFIG.copy(http2 = true)))
install(WebActionModule.create<GetFeatureGrpcAction>())
install(WebActionModule.create<RouteChatGrpcAction>())
}
Expand Down
28 changes: 14 additions & 14 deletions misk-grpc-tests/src/main/proto/routeguide/RouteGuideProto.proto
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
syntax = "proto2";

package routeguide;

Expand Down Expand Up @@ -51,29 +51,29 @@ service RouteGuide {
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
optional int32 latitude = 1;
optional int32 longitude = 2;
}

// A latitude-longitude rectangle, represented as two diagonally opposite
// points "lo" and "hi".
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
optional Point lo = 1;

// The other corner of the rectangle.
Point hi = 2;
optional Point hi = 2;
}

// A feature names something at a given point.
//
// If a feature could not be named, the name is empty.
message Feature {
// The name of the feature.
string name = 1;
optional string name = 1;

// The point where the feature is detected.
Point location = 2;
optional Point location = 2;
}

// Not used in the RPC. Instead, this is here for the form serialized to disk.
Expand All @@ -84,10 +84,10 @@ message FeatureDatabase {
// A RouteNote is a message sent while at a given point.
message RouteNote {
// The location from which the message is sent.
Point location = 1;
optional Point location = 1;

// The message to be sent.
string message = 2;
optional string message = 2;
}

// A RouteSummary is received in response to a RecordRoute rpc.
Expand All @@ -97,14 +97,14 @@ message RouteNote {
// the distance between each point.
message RouteSummary {
// The number of points received.
int32 point_count = 1;
optional int32 point_count = 1;

// The number of known features passed while traversing the route.
int32 feature_count = 2;
optional int32 feature_count = 2;

// The distance covered in metres.
int32 distance = 3;
optional int32 distance = 3;

// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
optional int32 elapsed_time = 4;
}
Expand Up @@ -15,7 +15,7 @@ import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import routeguide.Feature
import routeguide.Point
import routeguide.RouteGuide
import routeguide.RouteGuideClient
import routeguide.RouteNote
import javax.inject.Inject
import javax.inject.Provider
Expand All @@ -28,7 +28,7 @@ class MiskClientMiskServerTest {
RouteGuideMiskServiceModule(),
LogCollectorModule())

@Inject lateinit var routeGuideProvider: Provider<RouteGuide>
@Inject lateinit var routeGuideProvider: Provider<RouteGuideClient>
@Inject lateinit var logCollector: LogCollector
@Inject lateinit var routeChatGrpcAction: RouteChatGrpcAction

Expand All @@ -46,7 +46,7 @@ class MiskClientMiskServerTest {
runBlocking {
val routeGuide = routeGuideProvider.get()

val returnValue = routeGuide.GetFeature(point)
val returnValue = routeGuide.GetFeature().execute(point)
assertThat(returnValue).isEqualTo(feature)
}

Expand All @@ -62,7 +62,7 @@ class MiskClientMiskServerTest {
runBlocking {
val routeGuide = routeGuideProvider.get()

val (sendChannel, receiveChannel) = routeGuide.RouteChat()
val (sendChannel, receiveChannel) = routeGuide.RouteChat().execute()
sendChannel.send(RouteNote(message = "a"))
assertThat(receiveChannel.receive()).isEqualTo(RouteNote(message = "ACK: a"))
sendChannel.send(RouteNote(message = "b"))
Expand All @@ -84,7 +84,8 @@ class MiskClientMiskServerTest {
runBlocking {
val routeGuide = routeGuideProvider.get()

val (sendChannel, receiveChannel: ReceiveChannel<RouteNote>) = routeGuide.RouteChat()
val (sendChannel, receiveChannel: ReceiveChannel<RouteNote>) =
routeGuide.RouteChat().execute()
assertThat(receiveChannel.receive()).isEqualTo(RouteNote(message = "welcome"))
sendChannel.close()
}
Expand Down
Expand Up @@ -11,7 +11,7 @@ import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import routeguide.Feature
import routeguide.Point
import routeguide.RouteGuide
import routeguide.RouteGuideClient
import routeguide.RouteNote
import javax.inject.Inject
import javax.inject.Provider
Expand All @@ -24,16 +24,14 @@ class MiskClientProtocServerTest {
RouteGuideProtocServiceModule(),
MiskTestingServiceModule())

@Inject lateinit var routeGuideProvider: Provider<RouteGuide>
@Inject lateinit var routeGuideProvider: Provider<RouteGuideClient>

@Test
fun requestResponse() {
runBlocking {
val routeGuide = routeGuideProvider.get()

val feature = routeGuide.GetFeature(Point(
latitude = 43,
longitude = -80))
val feature = routeGuide.GetFeature().execute(Point(latitude = 43, longitude = -80))
assertThat(feature).isEqualTo(Feature(
name = "pine tree",
location = Point(latitude = 43, longitude = -80)
Expand All @@ -46,7 +44,7 @@ class MiskClientProtocServerTest {
runBlocking {
val routeGuide = routeGuideProvider.get()

val (sendChannel, receiveChannel) = routeGuide.RouteChat()
val (sendChannel, receiveChannel) = routeGuide.RouteChat().execute()
sendChannel.send(RouteNote(message = "Taco cat"))
assertThat(receiveChannel.receive().message).isEqualTo("tac ocaT")
sendChannel.send(RouteNote(message = "A nut for a jar of tuna"))
Expand Down
9 changes: 0 additions & 9 deletions misk/src/main/kotlin/misk/MiskServiceModule.kt
@@ -1,25 +1,16 @@
package misk

import com.google.common.util.concurrent.Service
import com.google.common.util.concurrent.ServiceManager
import com.google.inject.Injector
import com.google.inject.Provides
import com.google.inject.Scopes
import misk.concurrent.SleeperModule
import misk.environment.RealEnvVarModule
import misk.healthchecks.HealthCheck
import misk.inject.KAbstractModule
import misk.inject.asSingleton
import misk.metrics.MetricsModule
import misk.moshi.MoshiModule
import misk.prometheus.PrometheusHistogramRegistryModule
import misk.resources.ResourceLoaderModule
import misk.time.ClockModule
import misk.time.TickerModule
import misk.tokens.TokenGeneratorModule
import mu.KotlinLogging
import javax.inject.Provider
import javax.inject.Singleton

/**
* Install this module in real environments.
Expand Down
22 changes: 17 additions & 5 deletions misk/src/main/kotlin/misk/grpc/GrpcMessageSink.kt
Expand Up @@ -4,13 +4,12 @@ import com.squareup.wire.MessageSink
import com.squareup.wire.ProtoAdapter
import okio.Buffer
import okio.BufferedSink
import java.io.Closeable

/**
* Writes a sequence of gRPC messages as an HTTP/2 stream.
*
* This is derived from Wire's GrpcMessageSink.kt.
* https://github.com/square/wire/blob/master/wire-grpc-client/src/main/java/com/squareup/wire/GrpcMessageSink.kt
* https://github.com/square/wire/search?q=GrpcMessageSink&type=Code
*
* @param sink the HTTP/2 stream body.
* @param messageAdapter a proto adapter for each message.
Expand All @@ -19,9 +18,11 @@ import java.io.Closeable
internal class GrpcMessageSink<T : Any> constructor(
private val sink: BufferedSink,
private val messageAdapter: ProtoAdapter<T>,
private val grpcEncoding: String = "identity"
) : MessageSink<T>, Closeable by sink {
private val grpcEncoding: String
) : MessageSink<T> {
private var closed = false
override fun write(message: T) {
check(!closed) { "closed" }
val messageEncoding = grpcEncoding.toGrpcEncoder()
val encodingSink = messageEncoding.encode(sink)

Expand All @@ -38,5 +39,16 @@ internal class GrpcMessageSink<T : Any> constructor(
sink.flush()
}

override fun cancel() {
check(!closed) { "closed" }
// TODO: Cancel the Jetty request.
}

override fun close() {
if (closed) return
closed = true
sink.close()
}

override fun toString() = "GrpcMessageSink"
}
}
2 changes: 1 addition & 1 deletion misk/src/main/kotlin/misk/grpc/GrpcMessageSource.kt
Expand Up @@ -12,7 +12,7 @@ import java.net.ProtocolException
* Reads an HTTP/2 stream as a sequence of gRPC messages.
*
* This is derived from Wire's GrpcMessageSource.kt.
* https://github.com/square/wire/blob/master/wire-grpc-client/src/main/java/com/squareup/wire/GrpcMessageSource.kt
* https://github.com/square/wire/search?q=GrpcMessageSource&type=Code
*
* @param source the HTTP/2 stream body.
* @param messageAdapter a proto adapter for each message.
Expand Down
4 changes: 2 additions & 2 deletions misk/src/main/kotlin/misk/grpc/GrpcResponseFeatureBinding.kt
Expand Up @@ -25,7 +25,7 @@ internal class GrpcResponseFeatureBinding(
// TODO(jwilson): permit non-identity GRPC encoding.

val responseBody = subject.takeResponseBody()
val messageSink = GrpcMessageSink(responseBody, adapter)
val messageSink = GrpcMessageSink(responseBody, adapter, grpcEncoding = "identity")

subject.httpCall.setResponseHeader("Content-Type",
MediaTypes.APPLICATION_GRPC_MEDIA_TYPE.toString())
Expand Down Expand Up @@ -75,4 +75,4 @@ internal class GrpcResponseFeatureBinding(
}
}
}
}
}
2 changes: 1 addition & 1 deletion misk/src/test/kotlin/misk/grpc/GrpcConnectivityTest.kt
Expand Up @@ -68,7 +68,7 @@ class GrpcConnectivityTest {
}

override fun writeTo(sink: BufferedSink) {
val writer = GrpcMessageSink(sink, HelloRequest.ADAPTER)
val writer = GrpcMessageSink(sink, HelloRequest.ADAPTER, "gzip")
writer.write(HelloRequest("jesse!"))
}
})
Expand Down

0 comments on commit e403828

Please sign in to comment.