Skip to content

Commit

Permalink
update conformance client to use new timeout capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
jhump committed May 17, 2024
1 parent 5372d44 commit 2e9bf73
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import com.connectrpc.conformance.client.adapt.ServerStreamClient
import com.connectrpc.conformance.v1.ConformanceServiceClient
import com.connectrpc.conformance.v1.ServerStreamRequest
import com.connectrpc.conformance.v1.ServerStreamResponse
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.withTimeout

class JavaServerStreamClient(
private val client: ConformanceServiceClient,
Expand All @@ -33,6 +35,21 @@ class JavaServerStreamClient(
try {
sendResult = stream.sendAndClose(req)
if (sendResult.isFailure) {
// It can't be because stream.sendClose was already closed. So the operation
// must have already failed. Extract the reason via a call to receive. But
// if something is awry, don't block forever on the receive call.
try {
withTimeout(50) {
// Waits up to 50 milliseconds.
stream.responseChannel().receive()
}
} catch (_: TimeoutCancellationException) {
// Receive did not complete :(
} catch (ex: Throwable) {
throw ex
}
// Either receive did not complete or it did not fail (which
// shouldn't actually be possible).
throw sendResult.exceptionOrNull()!!
}
} catch (ex: Throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import com.connectrpc.conformance.client.adapt.ServerStreamClient
import com.connectrpc.conformance.v1.ConformanceServiceClient
import com.connectrpc.conformance.v1.ServerStreamRequest
import com.connectrpc.conformance.v1.ServerStreamResponse
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.withTimeout

class JavaLiteServerStreamClient(
private val client: ConformanceServiceClient,
Expand All @@ -33,6 +35,21 @@ class JavaLiteServerStreamClient(
try {
sendResult = stream.sendAndClose(req)
if (sendResult.isFailure) {
// It can't be because stream.sendClose was already closed. So the operation
// must have already failed. Extract the reason via a call to receive. But
// if something is awry, don't block forever on the receive call.
try {
withTimeout(50) {
// Waits up to 50 milliseconds.
stream.responseChannel().receive()
}
} catch (_: TimeoutCancellationException) {
// Receive did not complete :(
} catch (ex: Throwable) {
throw ex
}
// Either receive did not complete or it did not fail (which
// shouldn't actually be possible).
throw sendResult.exceptionOrNull()!!
}
} catch (ex: Throwable) {
Expand Down
9 changes: 1 addition & 8 deletions conformance/client/known-failing-stream-cases.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1 @@
# We currently rely on OkHttp's "call timeout" to handle
# RPC deadlines, but that is not enforced when the request
# body is duplex. So timeouts don't currently work with
# bidi streams.
Timeouts/HTTPVersion:2/**/bidi-stream/**

# Deadline headers are not currently set.
Deadline Propagation/**
# Currently there are zero failing tests.
3 changes: 1 addition & 2 deletions conformance/client/known-failing-unary-cases.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
# Deadline headers are not currently set.
Deadline Propagation/**
# Currently there are zero failing tests.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ import com.connectrpc.conformance.client.adapt.Invoker
import com.connectrpc.conformance.client.adapt.ResponseStream
import com.connectrpc.conformance.client.adapt.ServerStreamClient
import com.connectrpc.conformance.client.adapt.UnaryClient
import com.connectrpc.http.Cancelable
import com.connectrpc.http.HTTPClientInterface
import com.connectrpc.http.Timeout
import com.connectrpc.impl.ProtocolClient
import com.connectrpc.okhttp.ConnectOkHttpClient
import com.connectrpc.protocols.GETConfiguration
Expand All @@ -57,6 +59,8 @@ import java.security.spec.PKCS8EncodedKeySpec
import java.time.Duration
import java.util.Base64
import kotlin.reflect.cast
import kotlin.time.DurationUnit
import kotlin.time.toDuration

/**
* The conformance client. This contains the logic for invoking an
Expand Down Expand Up @@ -441,9 +445,6 @@ class Client(
val certs = certs(req)
clientBuilder = clientBuilder.sslSocketFactory(certs.sslSocketFactory(), certs.trustManager)
}
if (req.timeoutMs != 0) {
clientBuilder = clientBuilder.callTimeout(Duration.ofMillis(req.timeoutMs.toLong()))
}
// TODO: need to support max receive bytes and use req.receiveLimitBytes
val getConfig = if (req.useGetHttpMethod) GETConfiguration.Enabled else GETConfiguration.Disabled
val requestCompression =
Expand All @@ -458,11 +459,25 @@ class Client(
} else {
emptyList()
}
val httpClient = clientBuilder.build()
val httpClient = ConnectOkHttpClient.configureClient(clientBuilder).build()
var connectHttpClient: HTTPClientInterface = ConnectOkHttpClient(httpClient)
args.verbose.withPrefix("http client interface: ").verbosity(3) {
connectHttpClient = TracingHTTPClient(connectHttpClient, this)
}
var timeoutScheduler = Timeout.DEFAULT_SCHEDULER
args.verbose.verbosity(3) {
val verbosePrinter = this
timeoutScheduler = object : Timeout.Scheduler {
override fun scheduleTimeout(delay: kotlin.time.Duration, action: Cancelable): Timeout {
verbosePrinter.println("Scheduling timeout in $delay...")
val timeout = Timeout.DEFAULT_SCHEDULER.scheduleTimeout(delay) {
verbosePrinter.println("Timeout elapsed! Cancelling...")
action()
}
return timeout
}
}
}
return Pair(
httpClient,
ProtocolClient(
Expand All @@ -474,6 +489,14 @@ class Client(
getConfiguration = getConfig,
requestCompression = requestCompression,
compressionPools = compressionPools,
timeoutScheduler = timeoutScheduler,
timeoutOracle = {
if (req.timeoutMs == 0) {
null
} else {
req.timeoutMs.toDuration(DurationUnit.MILLISECONDS)
}
},
),
),
)
Expand Down

0 comments on commit 2e9bf73

Please sign in to comment.