From 7828357b718833175cf46ed5d4b07aa7dec8e208 Mon Sep 17 00:00:00 2001 From: Benjamin Fry Date: Sat, 16 Sep 2017 08:11:05 -0700 Subject: [PATCH] recursive state machine for CNAME chains --- integration-tests/tests/client_tests.rs | 3 + integration-tests/tests/lookup_tests.rs | 37 +++ .../tests/secure_client_handle_tests.rs | 3 + resolver/src/lookup_state.rs | 212 ++++++++++++++---- 4 files changed, 210 insertions(+), 45 deletions(-) diff --git a/integration-tests/tests/client_tests.rs b/integration-tests/tests/client_tests.rs index 82a65f7b23..6cabdc0816 100644 --- a/integration-tests/tests/client_tests.rs +++ b/integration-tests/tests/client_tests.rs @@ -246,6 +246,7 @@ fn test_timeout_query_tcp() { test_timeout_query(client); } +// TODO: this test is flaky #[test] #[ignore] #[allow(deprecated)] @@ -260,6 +261,7 @@ fn test_dnssec_rollernet_td_udp() { ).unwrap(); } +// TODO: this test is flaky #[test] #[ignore] #[allow(deprecated)] @@ -274,6 +276,7 @@ fn test_dnssec_rollernet_td_tcp() { ).unwrap(); } +// TODO: this test is flaky #[test] #[ignore] #[allow(deprecated)] diff --git a/integration-tests/tests/lookup_tests.rs b/integration-tests/tests/lookup_tests.rs index 82c7b13a0f..ed59834cc4 100644 --- a/integration-tests/tests/lookup_tests.rs +++ b/integration-tests/tests/lookup_tests.rs @@ -98,6 +98,43 @@ fn test_cname_lookup() { let mut io_loop = Core::new().unwrap(); let lookup = io_loop.run(lookup).unwrap(); + assert_eq!( + *lookup.iter().next().unwrap(), + RData::A(Ipv4Addr::new(93, 184, 216, 34)) + ); +} + +#[test] +fn test_chained_cname_lookup() { + let resp_query = Query::query( + domain::Name::from_str("www.example.com.").unwrap(), + RecordType::A, + ); + let cname_record = cname_record( + domain::Name::from_str("www.example.com.").unwrap(), + domain::Name::from_str("v4.example.com.").unwrap(), + ); + let v4_record = v4_record( + domain::Name::from_str("v4.example.com.").unwrap(), + Ipv4Addr::new(93, 184, 216, 34), + ); + + // The first response should be a cname, the second will be the actual record + let message1 = message(resp_query.clone(), vec![cname_record], vec![], vec![]); + let message2 = message(resp_query, vec![v4_record], vec![], vec![]); + + // the mock pops messages... + let client = MockClientHandle::mock(vec![message2, message1]); + + let lookup = InnerLookupFuture::lookup( + vec![domain::Name::from_str("www.example.com.").unwrap()], + RecordType::A, + CachingClient::new(0, client), + ); + + let mut io_loop = Core::new().unwrap(); + let lookup = io_loop.run(lookup).unwrap(); + assert_eq!( *lookup.iter().next().unwrap(), RData::A(Ipv4Addr::new(93, 184, 216, 34)) diff --git a/integration-tests/tests/secure_client_handle_tests.rs b/integration-tests/tests/secure_client_handle_tests.rs index 8f69f2663b..722b50a9a4 100644 --- a/integration-tests/tests/secure_client_handle_tests.rs +++ b/integration-tests/tests/secure_client_handle_tests.rs @@ -124,18 +124,21 @@ where assert!(response.answers().is_empty()); } +// TODO: this test is flaky #[test] #[ignore] fn test_dnssec_rollernet_td_udp() { with_udp(dnssec_rollernet_td_test); } +// TODO: this test is flaky #[test] #[ignore] fn test_dnssec_rollernet_td_tcp() { with_udp(dnssec_rollernet_td_test); } +// TODO: this test is flaky #[test] #[ignore] fn test_dnssec_rollernet_td_tcp_mixed_case() { diff --git a/resolver/src/lookup_state.rs b/resolver/src/lookup_state.rs index a1d9e4225b..4bd5c3832a 100644 --- a/resolver/src/lookup_state.rs +++ b/resolver/src/lookup_state.rs @@ -7,6 +7,7 @@ //! Caching related functionality for the Resolver. +use std::cell::RefCell; use std::io; use std::mem; use std::sync::{Arc, Mutex, TryLockError}; @@ -24,6 +25,11 @@ use lru_cache::LruCache; /// Maximum TTL as defined in https://tools.ietf.org/html/rfc2181 const MAX_TTL: u32 = 2147483647_u32; +const MAX_QUERY_DEPTH: u8 = 8; // arbitrarily chosen number... + +thread_local! { + static QUERY_DEPTH: RefCell = RefCell::new(0); +} #[derive(Debug)] struct LruValue { @@ -77,6 +83,21 @@ impl DnsLru { lookup } + fn duplicate(&mut self, query: Query, lookup: Lookup, ttl: u32, now: Instant) -> Lookup { + let ttl = Duration::from_secs(ttl as u64); + let ttl_until = now + ttl; + + self.0.insert( + query, + LruValue { + lookup: Some(lookup.clone()), + ttl_until, + }, + ); + + lookup + } + fn nx_error(query: Query) -> io::Error { io::Error::new( io::ErrorKind::AddrNotAvailable, @@ -138,19 +159,23 @@ pub struct CachingClient { impl CachingClient { #[doc(hidden)] pub fn new(max_size: usize, client: C) -> Self { - CachingClient { - lru: Arc::new(Mutex::new(DnsLru::new(max_size))), - client, - } + Self::with_cache(Arc::new(Mutex::new(DnsLru::new(max_size))), client) + } + + fn with_cache(lru: Arc>, client: C) -> Self { + CachingClient { lru, client } } /// Perform a lookup against this caching client, looking first in the cache for a result pub fn lookup(&mut self, query: Query) -> Box> { - Box::new(QueryState::lookup( - query, - &mut self.client, - self.lru.clone(), - )) + QUERY_DEPTH.with(|c| *c.borrow_mut() += 1); + + Box::new( + QueryState::lookup(query, &mut self.client, self.lru.clone()).then(|f| { + QUERY_DEPTH.with(|c| *c.borrow_mut() -= 1); + f + }), + ) } } @@ -184,12 +209,13 @@ impl Future for FromCache { } /// This is the Future responsible for performing an actual query. -struct QueryFuture { +struct QueryFuture { message_future: Box>, query: Query, cache: Arc>, /// is this a DNSSec validating client? dnssec: bool, + client: CachingClient, } enum Records { @@ -197,20 +223,31 @@ enum Records { Exists(Vec<(RData, u32)>), /// Records do not exist, ttl for negative caching NoData(Option), + /// Future lookup for recursive cname records + CnameChain(Box>, u32), + /// Already cached, chained queries + Chained(Lookup, u32), } -impl QueryFuture { - fn handle_noerror(&self, mut message: Message) -> Records { +impl QueryFuture { + fn next_query(&mut self, query: Query, cname_ttl: u32, message: Message) -> Records { + if QUERY_DEPTH.with(|c| *c.borrow() >= MAX_QUERY_DEPTH) { + self.handle_nxdomain(message, true) + } else { + Records::CnameChain(self.client.lookup(query), cname_ttl) + } + } + + fn handle_noerror(&mut self, mut message: Message) -> Poll { // TODO: promote from UDP/DTLS to TCP/TLS to work around packet size limits // if message.truncated() { // client.promote_to_tcp_requery() + // state.next_query_promotion... // } - // TODO: here we might be getting CNAME records back, we should do a chained lookup. - // needs to cary a reference to the CachingClient for these chained lookups... - // seek out CNAMES // TODO: figure out how to get rid of this clone + let mut cname_ttl = 0; let mut was_cname = false; let mut search_name: Name = self.query.name().clone(); while let Some(cname) = message.answers().iter().find(|r| { @@ -218,6 +255,7 @@ impl QueryFuture { }) { was_cname = true; + cname_ttl = cname.ttl(); if let &RData::CNAME(ref name) = cname.rdata() { if search_name == *name { break; // already searched for this name @@ -247,17 +285,20 @@ impl QueryFuture { .collect::>(); if !records.is_empty() { - Records::Exists(records) + Ok(Async::Ready(Records::Exists(records))) } else { - // TODO: if it was a cname and we have no records - // then the actual record was not sent with the original response - // issue a follow up query for the CNAME - // if was_cname { new_state = CNAME::query() } else { // below } - - // TODO: review See https://tools.ietf.org/html/rfc2308 for NoData section - // Note on DNSSec, in secure_client_hanle, if verify_nsec fails then the request fails. - // this will mean that no unverified negative caches will make it to this point and be stored - self.handle_nxdomain(message, true) + // It was a CNAME, but not included in the request... + if was_cname { + let next_query = Query::query(search_name, self.query.query_type()); + Ok(Async::Ready( + self.next_query(next_query, cname_ttl, message), + )) + } else { + // TODO: review See https://tools.ietf.org/html/rfc2308 for NoData section + // Note on DNSSec, in secure_client_hanle, if verify_nsec fails then the request fails. + // this will mean that no unverified negative caches will make it to this point and be stored + Ok(Async::Ready(self.handle_nxdomain(message, true))) + } } } @@ -295,7 +336,7 @@ impl QueryFuture { } } -impl Future for QueryFuture { +impl Future for QueryFuture { type Item = Records; type Error = io::Error; @@ -310,7 +351,7 @@ impl Future for QueryFuture { message, false, /* false b/c DNSSec should not cache NXDomain */ ))), - ResponseCode::NoError => Ok(Async::Ready(self.handle_noerror(message))), + ResponseCode::NoError => self.handle_noerror(message), r @ _ => Err(io::Error::new( io::ErrorKind::Other, format!("DNS Error: {}", r), @@ -357,8 +398,15 @@ impl Future for InsertCache { Records::Exists(rdata) => Ok(Async::Ready( lru.insert(query, rdata, Instant::now()), )), + Records::Chained(lookup, ttl) => Ok(Async::Ready(lru.duplicate( + query, + lookup, + ttl, + Instant::now(), + ))), Records::NoData(Some(ttl)) => Err(lru.negative(query, ttl, Instant::now())), - _ => Err(DnsLru::nx_error(query)), + Records::NoData(None) | + Records::CnameChain(..) => Err(DnsLru::nx_error(query)), } } } @@ -369,7 +417,9 @@ enum QueryState { /// In the FromCache state we evaluate cache entries for any results FromCache(FromCache, C), /// In the query state there is an active query that's been started, see Self::lookup() - Query(QueryFuture), + Query(QueryFuture), + /// CNAME lookup (internally it is making cached queries + CnameChain(Box>, Query, u32, Arc>), /// State of adding the item to the cache InsertCache(InsertCache), /// A state which should not occur @@ -392,6 +442,7 @@ impl QueryState { // TODO: with specialization, could we define a custom query only on the FromCache type? match from_cache_state { QueryState::FromCache(from_cache, mut client) => { + let cache = from_cache.cache; let query = from_cache.query; let message_future = client.lookup(query.clone()); mem::replace( @@ -399,8 +450,9 @@ impl QueryState { QueryState::Query(QueryFuture { message_future, query, - cache: from_cache.cache, + cache: cache.clone(), dnssec: client.is_verifying_dnssec(), + client: CachingClient::with_cache(cache, client), }), ); } @@ -408,7 +460,7 @@ impl QueryState { } } - fn cache(&mut self, rdatas: Records) { + fn cname(&mut self, future: Box>, cname_ttl: u32) { // The error state, this query is complete... let query_state = mem::replace(self, QueryState::Error); @@ -418,19 +470,67 @@ impl QueryState { query, cache, dnssec: _, + client: _, }) => { mem::replace( self, - QueryState::InsertCache(InsertCache { - rdatas, - query, - cache, - }), + QueryState::CnameChain(future, query, cname_ttl, cache), ); } _ => panic!("bad state, expected Query"), } } + + fn cache(&mut self, rdatas: Records) { + // The error state, this query is complete... + let query_state = mem::replace(self, QueryState::Error); + + match query_state { + QueryState::Query(QueryFuture { + message_future: _, + query, + cache, + dnssec: _, + client: _, + }) => { + match rdatas { + // There are Cnames to lookup + Records::CnameChain(..) => { + panic!("CnameChain should have been polled in poll() of QueryState"); + } + rdatas @ _ => { + mem::replace( + self, + QueryState::InsertCache(InsertCache { + rdatas, + query, + cache, + }), + ); + } + } + } + QueryState::CnameChain(_, query, _, cache) => { + match rdatas { + // There are Cnames to lookup + Records::CnameChain(..) => { + panic!("CnameChain should have been polled in poll() of QueryState"); + } + rdatas @ _ => { + mem::replace( + self, + QueryState::InsertCache(InsertCache { + rdatas, + query, + cache, + }), + ); + } + } + } + _ => panic!("bad state, expected Query"), + } + } } impl Future for QueryState { @@ -439,7 +539,7 @@ impl Future for QueryState { fn poll(&mut self) -> Poll { // first transition any polling that is needed (mutable refs...) - let poll; + let records: Option; match *self { QueryState::FromCache(ref mut from_cache, ..) => { match from_cache.poll() { @@ -450,15 +550,29 @@ impl Future for QueryState { Err(error) => return Err(error), }; - poll = Ok(Async::NotReady); + records = None; } QueryState::Query(ref mut query, ..) => { - poll = query.poll().map_err(|e| e.into()); + let poll = query.poll().map_err(|e| e.into()); + match poll { + Ok(Async::NotReady) => { + return Ok(Async::NotReady); + } + Ok(Async::Ready(rdatas)) => records = Some(rdatas), // handled in next match + Err(e) => { + return Err(e); + } + } + } + QueryState::CnameChain(ref mut future, _, ttl, _) => { + let poll = future.poll(); match poll { Ok(Async::NotReady) => { return Ok(Async::NotReady); } - Ok(Async::Ready(_)) => (), // handled in next match + Ok(Async::Ready(lookup)) => { + records = Some(Records::Chained(lookup, ttl)); + } Err(e) => { return Err(e); } @@ -474,14 +588,22 @@ impl Future for QueryState { match *self { QueryState::FromCache(..) => self.query_after_cache(), QueryState::Query(..) => { - match poll { - Ok(Async::Ready(rdatas)) => { - self.cache(rdatas); + match records { + Some(Records::CnameChain(future, ttl)) => self.cname(future, ttl), + Some(records) => { + self.cache(records); } - _ => panic!("should have returned earlier"), + None => panic!("should have returned earlier"), + } + } + QueryState::CnameChain(..) => { + match records { + Some(records) => self.cache(records), + None => panic!("should have returned earlier"), } } - _ => panic!("should have returned earlier"), + QueryState::InsertCache(..) | + QueryState::Error => panic!("should have returned earlier"), } task::current().notify(); // yield