Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce CallOptions, to support future per-call options (like timeouts) #275

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package com.connectrpc.conformance.client.java

import com.connectrpc.Headers
import com.connectrpc.CallOptions
import com.connectrpc.conformance.client.adapt.BidiStreamClient
import com.connectrpc.conformance.v1.BidiStreamRequest
import com.connectrpc.conformance.v1.BidiStreamResponse
Expand All @@ -26,7 +26,7 @@ class JavaBidiStreamClient(
BidiStreamRequest.getDefaultInstance(),
BidiStreamResponse.getDefaultInstance(),
) {
override suspend fun execute(headers: Headers): BidiStream<BidiStreamRequest, BidiStreamResponse> {
return BidiStream.new(client.bidiStream(headers))
override suspend fun execute(options: CallOptions): BidiStream<BidiStreamRequest, BidiStreamResponse> {
return BidiStream.new(client.bidiStream(options))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package com.connectrpc.conformance.client.java

import com.connectrpc.Headers
import com.connectrpc.CallOptions
import com.connectrpc.conformance.client.adapt.ClientStreamClient
import com.connectrpc.conformance.v1.ClientStreamRequest
import com.connectrpc.conformance.v1.ClientStreamResponse
Expand All @@ -26,7 +26,7 @@ class JavaClientStreamClient(
ClientStreamRequest.getDefaultInstance(),
ClientStreamResponse.getDefaultInstance(),
) {
override suspend fun execute(headers: Headers): ClientStream<ClientStreamRequest, ClientStreamResponse> {
return ClientStream.new(client.clientStream(headers))
override suspend fun execute(options: CallOptions): ClientStream<ClientStreamRequest, ClientStreamResponse> {
return ClientStream.new(client.clientStream(options))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package com.connectrpc.conformance.client.java

import com.connectrpc.Headers
import com.connectrpc.CallOptions
import com.connectrpc.ResponseMessage
import com.connectrpc.UnaryBlockingCall
import com.connectrpc.conformance.client.adapt.UnaryClient
Expand All @@ -29,19 +29,19 @@ class JavaIdempotentUnaryClient(
IdempotentUnaryRequest.getDefaultInstance(),
IdempotentUnaryResponse.getDefaultInstance(),
) {
override suspend fun execute(req: IdempotentUnaryRequest, headers: Headers): ResponseMessage<IdempotentUnaryResponse> {
return client.idempotentUnary(req, headers)
override suspend fun execute(req: IdempotentUnaryRequest, options: CallOptions): ResponseMessage<IdempotentUnaryResponse> {
return client.idempotentUnary(req, options)
}

override fun execute(
req: IdempotentUnaryRequest,
headers: Headers,
options: CallOptions,
onFinish: (ResponseMessage<IdempotentUnaryResponse>) -> Unit,
): Cancelable {
return client.idempotentUnary(req, headers, onFinish)
return client.idempotentUnary(req, options, onFinish)
}

override fun blocking(req: IdempotentUnaryRequest, headers: Headers): UnaryBlockingCall<IdempotentUnaryResponse> {
return client.idempotentUnaryBlocking(req, headers)
override fun blocking(req: IdempotentUnaryRequest, options: CallOptions): UnaryBlockingCall<IdempotentUnaryResponse> {
return client.idempotentUnaryBlocking(req, options)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package com.connectrpc.conformance.client.java

import com.connectrpc.Headers
import com.connectrpc.CallOptions
import com.connectrpc.conformance.client.adapt.ResponseStream
import com.connectrpc.conformance.client.adapt.ServerStreamClient
import com.connectrpc.conformance.v1.ConformanceServiceClient
Expand All @@ -27,8 +27,8 @@ class JavaServerStreamClient(
ServerStreamRequest.getDefaultInstance(),
ServerStreamResponse.getDefaultInstance(),
) {
override suspend fun execute(req: ServerStreamRequest, headers: Headers): ResponseStream<ServerStreamResponse> {
val stream = client.serverStream(headers)
override suspend fun execute(req: ServerStreamRequest, options: CallOptions): ResponseStream<ServerStreamResponse> {
val stream = client.serverStream(options)
val sendResult: Result<Unit>
try {
sendResult = stream.sendAndClose(req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package com.connectrpc.conformance.client.java

import com.connectrpc.Headers
import com.connectrpc.CallOptions
import com.connectrpc.ResponseMessage
import com.connectrpc.UnaryBlockingCall
import com.connectrpc.conformance.client.adapt.UnaryClient
Expand All @@ -29,19 +29,19 @@ class JavaUnaryClient(
UnaryRequest.getDefaultInstance(),
UnaryResponse.getDefaultInstance(),
) {
override suspend fun execute(req: UnaryRequest, headers: Headers): ResponseMessage<UnaryResponse> {
return client.unary(req, headers)
override suspend fun execute(req: UnaryRequest, options: CallOptions): ResponseMessage<UnaryResponse> {
return client.unary(req, options)
}

override fun execute(
req: UnaryRequest,
headers: Headers,
options: CallOptions,
onFinish: (ResponseMessage<UnaryResponse>) -> Unit,
): Cancelable {
return client.unary(req, headers, onFinish)
return client.unary(req, options, onFinish)
}

override fun blocking(req: UnaryRequest, headers: Headers): UnaryBlockingCall<UnaryResponse> {
return client.unaryBlocking(req, headers)
override fun blocking(req: UnaryRequest, options: CallOptions): UnaryBlockingCall<UnaryResponse> {
return client.unaryBlocking(req, options)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package com.connectrpc.conformance.client.java

import com.connectrpc.Headers
import com.connectrpc.CallOptions
import com.connectrpc.ResponseMessage
import com.connectrpc.UnaryBlockingCall
import com.connectrpc.conformance.client.adapt.UnaryClient
Expand All @@ -29,19 +29,19 @@ class JavaUnimplementedClient(
UnimplementedRequest.getDefaultInstance(),
UnimplementedResponse.getDefaultInstance(),
) {
override suspend fun execute(req: UnimplementedRequest, headers: Headers): ResponseMessage<UnimplementedResponse> {
return client.unimplemented(req, headers)
override suspend fun execute(req: UnimplementedRequest, options: CallOptions): ResponseMessage<UnimplementedResponse> {
return client.unimplemented(req, options)
}

override fun execute(
req: UnimplementedRequest,
headers: Headers,
options: CallOptions,
onFinish: (ResponseMessage<UnimplementedResponse>) -> Unit,
): Cancelable {
return client.unimplemented(req, headers, onFinish)
return client.unimplemented(req, options, onFinish)
}

override fun blocking(req: UnimplementedRequest, headers: Headers): UnaryBlockingCall<UnimplementedResponse> {
return client.unimplementedBlocking(req, headers)
override fun blocking(req: UnimplementedRequest, options: CallOptions): UnaryBlockingCall<UnimplementedResponse> {
return client.unimplementedBlocking(req, options)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package com.connectrpc.conformance.client.javalite

import com.connectrpc.Headers
import com.connectrpc.CallOptions
import com.connectrpc.conformance.client.adapt.BidiStreamClient
import com.connectrpc.conformance.v1.BidiStreamRequest
import com.connectrpc.conformance.v1.BidiStreamResponse
Expand All @@ -26,7 +26,7 @@ class JavaLiteBidiStreamClient(
BidiStreamRequest.getDefaultInstance(),
BidiStreamResponse.getDefaultInstance(),
) {
override suspend fun execute(headers: Headers): BidiStream<BidiStreamRequest, BidiStreamResponse> {
return BidiStream.new(client.bidiStream(headers))
override suspend fun execute(options: CallOptions): BidiStream<BidiStreamRequest, BidiStreamResponse> {
return BidiStream.new(client.bidiStream(options))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package com.connectrpc.conformance.client.javalite

import com.connectrpc.Headers
import com.connectrpc.CallOptions
import com.connectrpc.conformance.client.adapt.ClientStreamClient
import com.connectrpc.conformance.v1.ClientStreamRequest
import com.connectrpc.conformance.v1.ClientStreamResponse
Expand All @@ -26,7 +26,7 @@ class JavaLiteClientStreamClient(
ClientStreamRequest.getDefaultInstance(),
ClientStreamResponse.getDefaultInstance(),
) {
override suspend fun execute(headers: Headers): ClientStream<ClientStreamRequest, ClientStreamResponse> {
return ClientStream.new(client.clientStream(headers))
override suspend fun execute(options: CallOptions): ClientStream<ClientStreamRequest, ClientStreamResponse> {
return ClientStream.new(client.clientStream(options))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package com.connectrpc.conformance.client.javalite

import com.connectrpc.Headers
import com.connectrpc.CallOptions
import com.connectrpc.ResponseMessage
import com.connectrpc.UnaryBlockingCall
import com.connectrpc.conformance.client.adapt.UnaryClient
Expand All @@ -29,22 +29,19 @@ class JavaLiteIdempotentUnaryClient(
IdempotentUnaryRequest.getDefaultInstance(),
IdempotentUnaryResponse.getDefaultInstance(),
) {
override suspend fun execute(
req: IdempotentUnaryRequest,
headers: Headers,
): ResponseMessage<IdempotentUnaryResponse> {
return client.idempotentUnary(req, headers)
override suspend fun execute(req: IdempotentUnaryRequest, options: CallOptions): ResponseMessage<IdempotentUnaryResponse> {
return client.idempotentUnary(req, options)
}

override fun execute(
req: IdempotentUnaryRequest,
headers: Headers,
options: CallOptions,
onFinish: (ResponseMessage<IdempotentUnaryResponse>) -> Unit,
): Cancelable {
return client.idempotentUnary(req, headers, onFinish)
return client.idempotentUnary(req, options, onFinish)
}

override fun blocking(req: IdempotentUnaryRequest, headers: Headers): UnaryBlockingCall<IdempotentUnaryResponse> {
return client.idempotentUnaryBlocking(req, headers)
override fun blocking(req: IdempotentUnaryRequest, options: CallOptions): UnaryBlockingCall<IdempotentUnaryResponse> {
return client.idempotentUnaryBlocking(req, options)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package com.connectrpc.conformance.client.javalite

import com.connectrpc.Headers
import com.connectrpc.CallOptions
import com.connectrpc.conformance.client.adapt.ResponseStream
import com.connectrpc.conformance.client.adapt.ServerStreamClient
import com.connectrpc.conformance.v1.ConformanceServiceClient
Expand All @@ -27,8 +27,8 @@ class JavaLiteServerStreamClient(
ServerStreamRequest.getDefaultInstance(),
ServerStreamResponse.getDefaultInstance(),
) {
override suspend fun execute(req: ServerStreamRequest, headers: Headers): ResponseStream<ServerStreamResponse> {
val stream = client.serverStream(headers)
override suspend fun execute(req: ServerStreamRequest, options: CallOptions): ResponseStream<ServerStreamResponse> {
val stream = client.serverStream(options)
val sendResult: Result<Unit>
try {
sendResult = stream.sendAndClose(req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package com.connectrpc.conformance.client.javalite

import com.connectrpc.Headers
import com.connectrpc.CallOptions
import com.connectrpc.ResponseMessage
import com.connectrpc.UnaryBlockingCall
import com.connectrpc.conformance.client.adapt.UnaryClient
Expand All @@ -29,22 +29,19 @@ class JavaLiteUnaryClient(
UnaryRequest.getDefaultInstance(),
UnaryResponse.getDefaultInstance(),
) {
override suspend fun execute(
req: UnaryRequest,
headers: Headers,
): ResponseMessage<UnaryResponse> {
return client.unary(req, headers)
override suspend fun execute(req: UnaryRequest, options: CallOptions): ResponseMessage<UnaryResponse> {
return client.unary(req, options)
}

override fun execute(
req: UnaryRequest,
headers: Headers,
options: CallOptions,
onFinish: (ResponseMessage<UnaryResponse>) -> Unit,
): Cancelable {
return client.unary(req, headers, onFinish)
return client.unary(req, options, onFinish)
}

override fun blocking(req: UnaryRequest, headers: Headers): UnaryBlockingCall<UnaryResponse> {
return client.unaryBlocking(req, headers)
override fun blocking(req: UnaryRequest, options: CallOptions): UnaryBlockingCall<UnaryResponse> {
return client.unaryBlocking(req, options)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package com.connectrpc.conformance.client.javalite

import com.connectrpc.Headers
import com.connectrpc.CallOptions
import com.connectrpc.ResponseMessage
import com.connectrpc.UnaryBlockingCall
import com.connectrpc.conformance.client.adapt.UnaryClient
Expand All @@ -29,19 +29,19 @@ class JavaLiteUnimplementedClient(
UnimplementedRequest.getDefaultInstance(),
UnimplementedResponse.getDefaultInstance(),
) {
override suspend fun execute(req: UnimplementedRequest, headers: Headers): ResponseMessage<UnimplementedResponse> {
return client.unimplemented(req, headers)
override suspend fun execute(req: UnimplementedRequest, options: CallOptions): ResponseMessage<UnimplementedResponse> {
return client.unimplemented(req, options)
}

override fun execute(
req: UnimplementedRequest,
headers: Headers,
options: CallOptions,
onFinish: (ResponseMessage<UnimplementedResponse>) -> Unit,
): Cancelable {
return client.unimplemented(req, headers, onFinish)
return client.unimplemented(req, options, onFinish)
}

override fun blocking(req: UnimplementedRequest, headers: Headers): UnaryBlockingCall<UnimplementedResponse> {
return client.unimplementedBlocking(req, headers)
override fun blocking(req: UnimplementedRequest, options: CallOptions): UnaryBlockingCall<UnimplementedResponse> {
return client.unimplementedBlocking(req, options)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.connectrpc.conformance.client

import com.connectrpc.CallOptions
import com.connectrpc.ConnectException
import com.connectrpc.Headers
import com.connectrpc.ProtocolClientConfig
Expand Down Expand Up @@ -132,7 +133,7 @@ class Client(
val canceler = client.execute(
args.invokeStyle,
msg,
req.requestHeaders,
CallOptions.headers(req.requestHeaders),
resp::complete,
)
when (val cancel = req.cancel) {
Expand Down Expand Up @@ -161,7 +162,7 @@ class Client(
) {
throw RuntimeException("client stream calls can only support `BeforeCloseSend` and 'AfterCloseSendMs' cancellation field, instead got ${req.cancel!!::class.simpleName}")
}
return client.execute(req.requestHeaders) { stream ->
return client.execute(CallOptions.headers(req.requestHeaders)) { stream ->
var numUnsent = 0
for (i in req.requestMessages.indices) {
if (req.requestDelayMs > 0) {
Expand Down Expand Up @@ -217,7 +218,7 @@ class Client(
val msg = fromAny(req.requestMessages[0], client.reqTemplate, SERVER_STREAM_REQUEST_NAME)
var sent = false
try {
return client.execute(msg, req.requestHeaders) { stream ->
return client.execute(msg, CallOptions.headers(req.requestHeaders)) { stream ->
sent = true
val cancel = req.cancel
if (cancel is Cancel.AfterCloseSendMs) {
Expand Down Expand Up @@ -255,7 +256,7 @@ class Client(
client: BidiStreamClient<Req, Resp>,
req: ClientCompatRequest,
): ClientResponseResult {
return client.execute(req.requestHeaders) { stream ->
return client.execute(CallOptions.headers(req.requestHeaders)) { stream ->
var numUnsent = 0
for (i in req.requestMessages.indices) {
if (req.requestDelayMs > 0) {
Expand Down Expand Up @@ -296,7 +297,7 @@ class Client(
client: BidiStreamClient<Req, Resp>,
req: ClientCompatRequest,
): ClientResponseResult {
return client.execute(req.requestHeaders) { stream ->
return client.execute(CallOptions.headers(req.requestHeaders)) { stream ->
val cancel = req.cancel
val payloads: MutableList<MessageLite> = mutableListOf()
for (i in req.requestMessages.indices) {
Expand Down
Loading