Skip to content

Commit

Permalink
Add connection timeout (#38)
Browse files Browse the repository at this point in the history
* Fix some docs

* Add a connect_timeout option

For clearer control over connection timeouts.

* Add CHANGELOG entry

* Separate out connection timeout test

* Update to tonic 0.9

* Tidy up docs

* Remove redundant into_iter calls

At the request of clippy

* Fmt

* Remove unnecessary clone

Co-authored-by: Conrad Ludgate <oon@conradludgate.com>

* Default connection timeout to timeout

If exists

* Fix comment typos

---------

Co-authored-by: Conrad Ludgate <oon@conradludgate.com>
  • Loading branch information
ThomWright and conradludgate committed Apr 11, 2023
1 parent 811331d commit 8026765
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 32 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

* Added `connect_timeout` to `LoadBalancedChannelBuilder`

## 0.5.1 (2023-02-23)

* Make `LoadBalancedChannelBuilder` `Send`

## 0.5.0 (2022-08-05)

* Trim dependencies

### Breaking changes
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
[![CI](https://github.com/TrueLayer/ginepro/workflows/CI/badge.svg)](https://github.com/TrueLayer/ginepro/actions)
[![Coverage Status](https://coveralls.io/repos/github/TrueLayer/ginepro/badge.svg?branch=main&t=UWgSpm)](https://coveralls.io/github/TrueLayer/ginepro?branch=main)

# Overview
## Overview

`ginepro` enriches [tonic](https://github.com/hyperium/tonic) by periodcally updating the list of
`ginepro` enriches [tonic](https://github.com/hyperium/tonic) by periodically updating the list of
servers that are available through a `ServiceDiscovery` interface that currently is implemented for DNS.

## How to install
Expand All @@ -19,7 +19,7 @@ Add `ginepro` to your dependencies
```toml
[dependencies]
# ...
ginepro = "0.3.0"
ginepro = "0.5.1"
```

## Getting started
Expand Down Expand Up @@ -47,7 +47,7 @@ let grpc_client = TesterClient::new(load_balanced_channel);

For more examples, have a look at the [examples](ginepro/examples) directory.

#### License
## License

<sup>
Licensed under either of <a href="LICENSE-APACHE">Apache License, Version
Expand Down
2 changes: 1 addition & 1 deletion ginepro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ categories = ["asynchronous", "web-programming"]
readme = "../README.md"

[dependencies]
tonic = { version = "0.8", features = ["tls"] }
tonic = { version = "0.9", features = ["tls"] }
tower = { version = "0.4", default-features = false, features = ["discover"] }
anyhow = "1"
tokio = { version = "1", features = ["full"] }
Expand Down
16 changes: 15 additions & 1 deletion ginepro/src/balanced_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub struct LoadBalancedChannelBuilder<T, S> {
probe_interval: Option<Duration>,
resolution_strategy: ResolutionStrategy,
timeout: Option<Duration>,
connect_timeout: Option<Duration>,
tls_config: Option<ClientTlsConfig>,
lookup_service: Option<T>,
}
Expand All @@ -120,6 +121,7 @@ where
service_definition,
probe_interval: None,
timeout: None,
connect_timeout: None,
tls_config: None,
lookup_service: None,
resolution_strategy: ResolutionStrategy::Lazy,
Expand All @@ -137,6 +139,7 @@ where
probe_interval: self.probe_interval,
tls_config: self.tls_config,
timeout: self.timeout,
connect_timeout: self.connect_timeout,
resolution_strategy: self.resolution_strategy,
}
}
Expand All @@ -156,14 +159,24 @@ where
}
}

/// Set a timeout that will be applied to every new `Endpoint`.
/// Set a request timeout that will be applied to every new `Endpoint`.
pub fn timeout(self, timeout: Duration) -> LoadBalancedChannelBuilder<T, S> {
Self {
timeout: Some(timeout),
..self
}
}

/// Set a connection timeout that will be applied to every new `Endpoint`.
///
/// Defaults to the overall request `timeout` if not set.
pub fn connect_timeout(self, connection_timeout: Duration) -> LoadBalancedChannelBuilder<T, S> {
Self {
connect_timeout: Some(connection_timeout),
..self
}
}

/// Set the [`ResolutionStrategy`].
///
/// Default set to [`ResolutionStrategy::Lazy`].
Expand Down Expand Up @@ -220,6 +233,7 @@ where
.map_err(|err| anyhow::anyhow!(err))?,
dns_lookup: lookup_service,
endpoint_timeout: self.timeout,
endpoint_connect_timeout: self.connect_timeout.or(self.timeout),
probe_interval: self
.probe_interval
.unwrap_or_else(|| Duration::from_secs(10)),
Expand Down
4 changes: 2 additions & 2 deletions ginepro/src/dns_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ pub struct DnsResolver {
}

impl DnsResolver {
/// Construct a new [`DnsResolver`] from env and system configration, e.g `resolv.conf`.
/// Construct a new [`DnsResolver`] from env and system configuration, e.g `resolv.conf`.
pub async fn from_system_config() -> Result<Self, anyhow::Error> {
let (config, mut opts) = system_conf::read_system_conf()
.context("failed to read dns services from system configuration")?;

// We do not want any caching on out side.
// We do not want any caching on our side.
opts.cache_size = 0;

let dns = AsyncResolver::tokio(config, opts).expect("resolver must be valid");
Expand Down
4 changes: 2 additions & 2 deletions ginepro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
//! use shared_proto::pb::tester_client::TesterClient;
//! use std::convert::TryInto;
//!
//! // Create a load balanced channel with the default lookup implementation.
//! // Create a load balanced channel with a dummy lookup implementation.
//! let load_balanced_channel = LoadBalancedChannel::builder(("my.hostname", 5000))
//! .lookup_service(DummyLookupService)
//! .channel()
Expand All @@ -56,7 +56,7 @@
//! let tester_client = TesterClient::new(load_balanced_channel);
//! }
//! ```
//! For systems with lower churn, the probe interval can be lowered.
//! For systems with higher churn, the probe interval can be lowered.
//!
//! ```rust
//! #[tokio::main]
Expand Down
21 changes: 10 additions & 11 deletions ginepro/src/service_probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ where
dns_lookup: Lookup,
probe_interval: tokio::time::Duration,
endpoint_timeout: Option<tokio::time::Duration>,
endpoint_connect_timeout: Option<tokio::time::Duration>,
/// The set of last reported endpoints by `dns_lookup`.
endpoints: HashSet<SocketAddr>,
endpoint_reporter: Sender<Change<SocketAddr, Endpoint>>,
Expand All @@ -56,6 +57,8 @@ where
pub probe_interval: tokio::time::Duration,
/// A timeout that will be applied to every endpoint.
pub endpoint_timeout: Option<tokio::time::Duration>,
/// A connection timeout that will be applied to every endpoint.
pub endpoint_connect_timeout: Option<tokio::time::Duration>,
}

impl<Lookup: LookupService> GrpcServiceProbe<Lookup> {
Expand All @@ -70,6 +73,7 @@ impl<Lookup: LookupService> GrpcServiceProbe<Lookup> {
dns_lookup: config.dns_lookup,
probe_interval: config.probe_interval,
endpoint_timeout: config.endpoint_timeout,
endpoint_connect_timeout: config.endpoint_connect_timeout,
endpoints: HashSet::new(),
endpoint_reporter,
scheme: http::uri::Scheme::HTTP,
Expand Down Expand Up @@ -139,18 +143,10 @@ impl<Lookup: LookupService> GrpcServiceProbe<Lookup> {
) -> Vec<Change<SocketAddr, Endpoint>> {
let mut changeset = Vec::new();

let remove_set: HashSet<SocketAddr> = self
.endpoints
.difference(endpoints)
.copied()
.into_iter()
.collect();
let remove_set: HashSet<SocketAddr> =
self.endpoints.difference(endpoints).copied().collect();

let add_set: HashSet<SocketAddr> = endpoints
.difference(&self.endpoints)
.copied()
.into_iter()
.collect();
let add_set: HashSet<SocketAddr> = endpoints.difference(&self.endpoints).copied().collect();

changeset.extend(
add_set
Expand Down Expand Up @@ -224,6 +220,9 @@ impl<Lookup: LookupService> GrpcServiceProbe<Lookup> {
if let Some(ref timeout) = self.endpoint_timeout {
endpoint = endpoint.timeout(*timeout);
}
if let Some(ref connect_timeout) = self.endpoint_connect_timeout {
endpoint = endpoint.connect_timeout(*connect_timeout)
}

Some(endpoint)
}
Expand Down
4 changes: 2 additions & 2 deletions shared_proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ edition = "2021"
publish = false

[dependencies]
tonic = "0.8"
tonic = "0.9"
prost = "0.11"

[build-dependencies]
tonic-build = "0.8"
tonic-build = "0.9"
2 changes: 1 addition & 1 deletion shared_proto/build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Compile a grpc service defintion to be exposed and used
//! Compile a grpc service definition to be exposed and used
//! for testing in this crate and others that want to test
//! tonic functionality.

Expand Down
4 changes: 2 additions & 2 deletions tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ publish = false
[dependencies]
ginepro = { path = "../ginepro" }

tonic = { version = "0.8", features = ["tls"] }
tonic = { version = "0.9", features = ["tls"] }
openssl = "0.10"
tracing = { version = "0.1", features = ["log", "attributes"] }
futures = "0.3.12"
Expand All @@ -21,5 +21,5 @@ tower-layer = "0.3"
[dev-dependencies]
anyhow = "1"
async-trait = "0.1"
tonic-health = "0.7"
tonic-health = "0.9"
shared-proto = { path = "../shared_proto"}
2 changes: 0 additions & 2 deletions tests/tests/all/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ impl LookupService for TestDnsResolver {

Ok(ips
.values()
.cloned()
.into_iter()
.map(|address| address.parse().expect("not a valid ip address"))
.collect())
}
Expand Down
76 changes: 72 additions & 4 deletions tests/tests/all/service_probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn load_balance_succeeds_with_churn() {
.expect("failed to init");
let mut client = TesterClient::new(load_balanced_channel);

let servers: Vec<String> = (0..10).into_iter().map(|s| s.to_string()).collect();
let servers: Vec<String> = (0..10).map(|s| s.to_string()).collect();
let mut servers_called = Vec::new();

// Act
Expand Down Expand Up @@ -82,7 +82,7 @@ async fn load_balance_happy_path_scenario_calls_all_endpoints() {
// 2. Do 20 gRPC calls.
// 3. Assert that all 3 servers have been called.
// What we want to test:
// A common load balaning scenario in which you have more calls
// A common load balancing scenario in which you have more calls
// than servers, and you want all servers to be called.

let num_calls = 20;
Expand Down Expand Up @@ -147,14 +147,14 @@ async fn load_balance_happy_path_scenario_calls_all_endpoints() {
}

#[tokio::test]
async fn connection_timeout_is_not_fatal() {
async fn os_connection_timeout_is_not_fatal() {
// Scenario:
// The DNS probe returns an IP that we fail to connect to.
// We want to ensure that our client keeps working as expected
// as long as another good server comes up.
// Steps:
// * Discover an IP without a backing server (`ghost_server`)
// * See the client call fail
// * See the client call fail with an OS timeout
// * Discover an IP with a backing server (`good_server`)
// * Wait for discovery update to happen in the probe task
// * See the client call succeed
Expand All @@ -179,6 +179,74 @@ async fn connection_timeout_is_not_fatal() {
.test(tonic::Request::new(Ping {}))
.await
.expect_err("The call without a backing server should fail");

resolver
.remove_ip_and_not_server("ghost_server".into())
.await;

resolver
.add_server_with_provided_impl(
"good_server".to_string(),
TesterImpl {
sender: Arc::clone(&sender),
name: "good_server".to_string(),
},
)
.await;

// Give time to the DNS probe to add the new good server
tokio::time::sleep(probe_interval * 5).await;

let res = client
.test(tonic::Request::new(Ping {}))
.await
.expect("failed to call server");

let server = receiver.recv().await.expect("");
assert_eq!(
server,
get_payload_raw(res.into_inner().payload.expect("no payload"))
);
}

#[tokio::test]
async fn connection_timeout_is_applied() {
// Scenario:
// The DNS probe returns an IP that we fail to connect to. We want to ensure
// that 1. our client keeps working as expected as long as another good
// server comes up and 2. our connection timeout is applied correctly.
// Steps:
// * Discover an IP without a backing server (`ghost_server`)
// * See the client call fail quickly from our own timeout
// * Discover an IP with a backing server (`good_server`)
// * Wait for discovery update to happen in the probe task
// * See the client call succeed
let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
let sender = Arc::new(Mutex::new(sender));
let mut resolver = TestDnsResolver::default();
let probe_interval = tokio::time::Duration::from_millis(3);

let load_balanced_channel = LoadBalancedChannelBuilder::new_with_service(("test", 5000))
.lookup_service(resolver.clone())
.timeout(tokio::time::Duration::from_millis(500))
.connect_timeout(tokio::time::Duration::from_millis(500))
.dns_probe_interval(probe_interval)
.channel()
.await
.expect("failed to init");
let mut client = TesterClient::new(load_balanced_channel);

resolver
.add_ip_without_server("ghost_server".into(), "127.0.0.124:5000".into())
.await;

// Check our timeout is applied when we fail to connect
let req = client.test(tonic::Request::new(Ping {}));
let res = tokio::time::timeout(Duration::from_secs(1), req)
.await
.expect("took longer than connect_timeout to fail");
res.expect_err("The call without a backing server should fail");

resolver
.remove_ip_and_not_server("ghost_server".into())
.await;
Expand Down

0 comments on commit 8026765

Please sign in to comment.