Skip to content

Commit

Permalink
Merge pull request #75 from scythe-robotics/fix-empty-service-msg
Browse files Browse the repository at this point in the history
Fix service behavior when the service is an empty input (e.g. `std_srvs/Trigger`)
  • Loading branch information
adnanademovic committed Mar 3, 2019
2 parents e7ad75c + 6995248 commit 036b885
Showing 1 changed file with 33 additions and 18 deletions.
51 changes: 33 additions & 18 deletions rosrust/src/tcpros/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ impl Service {
}
}

enum RequestType {
Probe,
Action,
}

fn listen_for_clients<T, U, V, F>(service: &str, node_name: &str, handler: F, connections: V)
where
T: ServicePair,
Expand All @@ -77,41 +82,51 @@ where
let handler = Arc::new(handler);
for mut stream in connections {
// Service request starts by exchanging connection headers
if let Err(err) = exchange_headers::<T, _>(&mut stream, service, node_name) {
// Connection can be closed when a client checks for a service.
if !err.is_closed_connection() {
error!(
"Failed to exchange headers for service '{}': {}",
service, err
);
match exchange_headers::<T, _>(&mut stream, service, node_name) {
Err(err) => {
// Connection can be closed when a client checks for a service.
if !err.is_closed_connection() {
error!(
"Failed to exchange headers for service '{}': {}",
service, err
);
}
continue;
}
continue;
}

// Spawn a thread for handling requests
spawn_request_handler::<T, U, F>(stream, Arc::clone(&handler));
// Spawn a thread for handling requests
Ok(RequestType::Action) => {
spawn_request_handler::<T, U, F>(stream, Arc::clone(&handler))
}
Ok(RequestType::Probe) => (),
}
}
}

fn exchange_headers<T, U>(stream: &mut U, service: &str, node_name: &str) -> Result<()>
fn exchange_headers<T, U>(stream: &mut U, service: &str, node_name: &str) -> Result<RequestType>
where
T: ServicePair,
U: std::io::Write + std::io::Read,
{
read_request::<T, U>(stream, service)?;
write_response::<T, U>(stream, node_name)
let req_type = read_request::<T, U>(stream, service)?;
write_response::<T, U>(stream, node_name)?;
Ok(req_type)
}

fn read_request<T: ServicePair, U: std::io::Read>(stream: &mut U, service: &str) -> Result<()> {
fn read_request<T: ServicePair, U: std::io::Read>(
stream: &mut U,
service: &str,
) -> Result<RequestType> {
let fields = header::decode(stream)?;
header::match_field(&fields, "service", service)?;
if fields.get("callerid").is_none() {
bail!(ErrorKind::HeaderMissingField("callerid".into()));
}
if header::match_field(&fields, "probe", "1").is_ok() {
return Ok(());
return Ok(RequestType::Probe);
}
header::match_field(&fields, "md5sum", &T::md5sum())
header::match_field(&fields, "md5sum", &T::md5sum())?;
Ok(RequestType::Action)
}

fn write_response<T, U>(stream: &mut U, node_name: &str) -> Result<()>
Expand Down Expand Up @@ -157,7 +172,7 @@ where
// TODO: validate message length
let _length = stream.read_u32::<LittleEndian>();
// Break out of loop in case of failure to read request
while let Ok(req) = RosMsg::decode(&mut stream) {
if let Ok(req) = RosMsg::decode(&mut stream) {
// Call function that handles request and returns response
match handler(req) {
Ok(res) => {
Expand Down

0 comments on commit 036b885

Please sign in to comment.