Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix service behavior when the service is an empty input (e.g. std_srvs/Trigger) #75

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 33 additions & 18 deletions rosrust/src/tcpros/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ impl Service {
}
}

enum RequestType {
Probe,
Action,
}

fn listen_for_clients<T, U, V, F>(service: &str, node_name: &str, handler: F, connections: V)
where
T: ServicePair,
Expand All @@ -77,41 +82,51 @@ where
let handler = Arc::new(handler);
for mut stream in connections {
// Service request starts by exchanging connection headers
if let Err(err) = exchange_headers::<T, _>(&mut stream, service, node_name) {
// Connection can be closed when a client checks for a service.
if !err.is_closed_connection() {
error!(
"Failed to exchange headers for service '{}': {}",
service, err
);
match exchange_headers::<T, _>(&mut stream, service, node_name) {
Err(err) => {
// Connection can be closed when a client checks for a service.
if !err.is_closed_connection() {
error!(
"Failed to exchange headers for service '{}': {}",
service, err
);
}
continue;
}
continue;
}

// Spawn a thread for handling requests
spawn_request_handler::<T, U, F>(stream, Arc::clone(&handler));
// Spawn a thread for handling requests
Ok(RequestType::Action) => {
spawn_request_handler::<T, U, F>(stream, Arc::clone(&handler))
}
Ok(RequestType::Probe) => (),
}
}
}

fn exchange_headers<T, U>(stream: &mut U, service: &str, node_name: &str) -> Result<()>
fn exchange_headers<T, U>(stream: &mut U, service: &str, node_name: &str) -> Result<RequestType>
where
T: ServicePair,
U: std::io::Write + std::io::Read,
{
read_request::<T, U>(stream, service)?;
write_response::<T, U>(stream, node_name)
let req_type = read_request::<T, U>(stream, service)?;
write_response::<T, U>(stream, node_name)?;
Ok(req_type)
}

fn read_request<T: ServicePair, U: std::io::Read>(stream: &mut U, service: &str) -> Result<()> {
fn read_request<T: ServicePair, U: std::io::Read>(
stream: &mut U,
service: &str,
) -> Result<RequestType> {
let fields = header::decode(stream)?;
header::match_field(&fields, "service", service)?;
if fields.get("callerid").is_none() {
bail!(ErrorKind::HeaderMissingField("callerid".into()));
}
if header::match_field(&fields, "probe", "1").is_ok() {
return Ok(());
return Ok(RequestType::Probe);
}
header::match_field(&fields, "md5sum", &T::md5sum())
header::match_field(&fields, "md5sum", &T::md5sum())?;
Ok(RequestType::Action)
}

fn write_response<T, U>(stream: &mut U, node_name: &str) -> Result<()>
Expand Down Expand Up @@ -157,7 +172,7 @@ where
// TODO: validate message length
let _length = stream.read_u32::<LittleEndian>();
// Break out of loop in case of failure to read request
while let Ok(req) = RosMsg::decode(&mut stream) {
if let Ok(req) = RosMsg::decode(&mut stream) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't clients able to send multiple requests via the same connection to a service?
That method greatly reduces communication overhead, since header exchanges can often be much larger than the actual service payloads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good catch, you're right this does prevent persistent connections.

Looks like service_client_link.cpp is where roscpp handles these things. I see two differences from service.rs:

  1. It looks for "persistent" in the header to know whether there might be more messages.
  2. It reads the length of the message before each new message is deserialized.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you just leave a TODO above the if line, and I can take on handling both cases

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, I've just made an issue and will handle it afterwards.

// Call function that handles request and returns response
match handler(req) {
Ok(res) => {
Expand Down