Skip to content

Commit

Permalink
Support client cert verification
Browse files Browse the repository at this point in the history
  • Loading branch information
Ravi Shankar committed Mar 4, 2019
1 parent fae8a99 commit 7b12744
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 14 deletions.
18 changes: 9 additions & 9 deletions src/client.rs
Expand Up @@ -15,12 +15,13 @@ use std::{
sync::Arc,
};
use tokio_executor;
use url::Url;

use error::NatsError;
use net::*;
use protocol::{commands::*, Op};

pub use net::NatsClientTlsConfig;

/// Sink (write) part of a TCP stream
type NatsSink = stream::SplitSink<NatsConnection>;
/// Stream (read) part of a TCP stream
Expand Down Expand Up @@ -139,6 +140,9 @@ pub struct NatsClientOptions {
pub connect_command: ConnectCommand,
/// Cluster URI in the IP:PORT format
pub cluster_uri: String,
/// TLS configuration for this client.
#[builder(default)]
pub tls_config: NatsClientTlsConfig,
}

impl NatsClientOptions {
Expand Down Expand Up @@ -187,8 +191,10 @@ impl NatsClient {
///
/// Returns `impl Future<Item = Self, Error = NatsError>`
pub fn from_options(opts: NatsClientOptions) -> impl Future<Item = Self, Error = NatsError> + Send + Sync {
let tls_required = opts.connect_command.tls_required;
let tls_required = opts.connect_command.tls_required
|| opts.tls_config.identity.is_some() || opts.tls_config.root_cert.is_some();

let tls_config = opts.tls_config.clone();
let cluster_uri = opts.cluster_uri.clone();
let cluster_sa = if let Ok(sockaddr) = SocketAddr::from_str(&cluster_uri) {
Ok(sockaddr)
Expand All @@ -203,13 +209,7 @@ impl NatsClient {
.from_err()
.and_then(move |cluster_sa| {
if tls_required {
match Url::parse(&cluster_uri) {
Ok(url) => match url.host_str() {
Some(host) => future::ok(Either::B(connect_tls(host.to_string(), cluster_sa))),
None => future::err(NatsError::TlsHostMissingError),
},
Err(e) => future::err(e.into()),
}
future::ok(Either::B(connect_tls(cluster_uri, cluster_sa, tls_config)))
} else {
future::ok(Either::A(connect(cluster_sa)))
}
Expand Down
7 changes: 5 additions & 2 deletions src/net/connection.rs
Expand Up @@ -9,7 +9,7 @@ use tokio_executor;
use error::NatsError;
use protocol::Op;

use super::connection_inner::NatsConnectionInner;
use super::{NatsClientTlsConfig, connection_inner::NatsConnectionInner};

macro_rules! reco {
($conn:ident) => {
Expand Down Expand Up @@ -39,6 +39,8 @@ pub struct NatsConnection {
pub(crate) addr: SocketAddr,
/// Host of the server; Only used if connecting to a TLS-enabled server
pub(crate) host: Option<String>,
/// TLS config for client verification; Only used if configured previously
pub(crate) tls_config: NatsClientTlsConfig,
/// Inner dual `Stream`/`Sink` of the TCP connection
pub(crate) inner: Arc<RwLock<NatsConnectionInner>>,
/// Current state of the connection
Expand All @@ -55,12 +57,13 @@ impl NatsConnection {
let inner_state = Arc::clone(&self.state);
let is_tls = self.is_tls;
let maybe_host = self.host.clone();
let tls_config = self.tls_config.clone();
NatsConnectionInner::connect_tcp(&self.addr)
.and_then(move |socket| {
if is_tls {
Either::A(
// This unwrap is safe because the value would always be present if `is_tls` is true
NatsConnectionInner::upgrade_tcp_to_tls(&maybe_host.unwrap(), socket)
NatsConnectionInner::upgrade_tcp_to_tls(&maybe_host.unwrap(), socket, tls_config)
.map(NatsConnectionInner::from),
)
} else {
Expand Down
13 changes: 12 additions & 1 deletion src/net/connection_inner.rs
@@ -1,3 +1,4 @@
use super::NatsClientTlsConfig;
use codec::OpCodec;
use futures::prelude::*;
use native_tls::TlsConnector as NativeTlsConnector;
Expand Down Expand Up @@ -29,8 +30,18 @@ impl NatsConnectionInner {
pub(crate) fn upgrade_tcp_to_tls(
host: &str,
socket: TcpStream,
config: NatsClientTlsConfig,
) -> impl Future<Item = TlsStream<TcpStream>, Error = NatsError> {
let tls_connector = NativeTlsConnector::builder().build().unwrap();
let mut builder = NativeTlsConnector::builder();
if let Some(i) = config.identity().unwrap() {
builder.identity(i);
}

if let Some(c) = config.root_cert().unwrap() {
builder.add_root_certificate(c);
}

let tls_connector = builder.build().unwrap();
let tls_stream: TlsConnector = tls_connector.into();
debug!(target: "nitox", "Connecting to {} through TLS over TCP", host);
tls_stream.connect(&host, socket).from_err()
Expand Down
60 changes: 58 additions & 2 deletions src/net/mod.rs
@@ -1,4 +1,5 @@
use futures::prelude::*;
use native_tls::{Certificate, Identity};
use parking_lot::RwLock;
use std::net::SocketAddr;
use std::sync::Arc;
Expand All @@ -13,12 +14,65 @@ use self::connection_inner::*;

pub(crate) use self::connection::NatsConnection;

/// TLS configuration for the client.
#[derive(Clone, Default)]
pub struct NatsClientTlsConfig {
pub(crate) identity: Option<Arc<(Vec<u8>, String)>>,
pub(crate) root_cert: Option<Arc<Vec<u8>>>,
}

impl NatsClientTlsConfig {
/// Set the identity from a DER-formatted PKCS #12 archive using the the given password to decrypt the key.
pub fn pkcs12_identity<B>(mut self, der_bytes: B, password: &str) -> Result<Self, NatsError>
where B: AsRef<[u8]>
{
self.identity = Some(Arc::new((der_bytes.as_ref().into(), password.into())));
self.identity()?;
Ok(self)
}

/// Set the root certificate in DER-format.
pub fn root_cert_der<B>(mut self, der_bytes: B) -> Result<Self, NatsError>
where B: AsRef<[u8]>
{
self.root_cert = Some(Arc::new(der_bytes.as_ref().into()));
self.root_cert()?;
Ok(self)
}

pub(crate) fn identity(&self) -> Result<Option<Identity>, NatsError> {
if let Some((b, p)) = self.identity.as_ref().map(|s| &**s) {
Ok(Some(Identity::from_pkcs12(b, p)?))
} else {
Ok(None)
}
}

pub(crate) fn root_cert(&self) -> Result<Option<Certificate>, NatsError> {
if let Some(b) = self.root_cert.as_ref() {
Ok(Some(Certificate::from_der(b)?))
} else {
Ok(None)
}
}
}

impl ::std::fmt::Debug for NatsClientTlsConfig {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
f.debug_struct("NatsClientTlsConfig")
.field("identity_exists", &self.identity.is_some())
.field("root_cert_exists", &self.root_cert.is_some())
.finish()
}
}

/// Connect to a raw TCP socket
pub(crate) fn connect(addr: SocketAddr) -> impl Future<Item = NatsConnection, Error = NatsError> {
NatsConnectionInner::connect_tcp(&addr).map(move |socket| {
debug!(target: "nitox", "Connected through TCP");
NatsConnection {
is_tls: false,
tls_config: Default::default(),
addr,
host: None,
state: Arc::new(RwLock::new(NatsConnectionState::Connected)),
Expand All @@ -28,16 +82,18 @@ pub(crate) fn connect(addr: SocketAddr) -> impl Future<Item = NatsConnection, Er
}

/// Connect to a TLS over TCP socket. Upgrade is performed automatically
pub(crate) fn connect_tls(host: String, addr: SocketAddr) -> impl Future<Item = NatsConnection, Error = NatsError> {
pub(crate) fn connect_tls(host: String, addr: SocketAddr, tls_config: NatsClientTlsConfig) -> impl Future<Item = NatsConnection, Error = NatsError> {
let inner_host = host.clone();
let inner_config = tls_config.clone();
NatsConnectionInner::connect_tcp(&addr)
.and_then(move |socket| {
debug!(target: "nitox", "Connected through TCP, upgrading to TLS");
NatsConnectionInner::upgrade_tcp_to_tls(&host, socket)
NatsConnectionInner::upgrade_tcp_to_tls(&host, socket, tls_config)
}).map(move |socket| {
debug!(target: "nitox", "Connected through TCP over TLS");
NatsConnection {
is_tls: true,
tls_config: inner_config,
addr,
host: Some(inner_host),
state: Arc::new(RwLock::new(NatsConnectionState::Connected)),
Expand Down

0 comments on commit 7b12744

Please sign in to comment.