Skip to content

Commit

Permalink
Wait for publisher and subscription to match (ros2#1777)
Browse files Browse the repository at this point in the history
Fix ros2#1775

Connext takes significantly longer for discovery to happened compared to the other RMWs.
So, waiting an arbitrary amount of time for a message to be received is brittle.

By instead waiting for the publisher and subscription to match, the tests become more robust.

Signed-off-by: Jacob Perron <jacob@openrobotics.org>
  • Loading branch information
jacobperron authored and Nico Neumann committed Dec 15, 2021
1 parent 48e2606 commit eeb2f02
Showing 1 changed file with 41 additions and 1 deletion.
42 changes: 41 additions & 1 deletion rclcpp/test/rclcpp/test_subscription_with_type_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ class CLASSNAME (test_intra_process_within_one_node, RMW_IMPLEMENTATION) : publi
public:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
if (!rclcpp::ok()) {
rclcpp::init(0, nullptr);
}
}

static void TearDownTestCase()
Expand Down Expand Up @@ -128,6 +130,20 @@ void wait_for_message_to_be_received(
executor.spin_once(g_sleep_per_loop);
}
}

bool wait_for_match(
const std::shared_ptr<rclcpp::SubscriptionBase> sub,
const std::shared_ptr<rclcpp::PublisherBase> pub)
{
int i = 0;
bool matched = false;
while (!matched && i < g_max_loops) {
matched = sub->get_publisher_count() > 0 && pub->get_subscription_count() > 0;
std::this_thread::sleep_for(g_sleep_per_loop);
}
return matched;
}

/*
* Testing publisher creation signatures with a type adapter.
*/
Expand Down Expand Up @@ -175,6 +191,7 @@ TEST_F(
ASSERT_STREQ(message_data.c_str(), msg.c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -191,6 +208,7 @@ TEST_F(
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -206,6 +224,7 @@ TEST_F(
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -222,6 +241,7 @@ TEST_F(
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -237,6 +257,7 @@ TEST_F(
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -253,6 +274,7 @@ TEST_F(
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -268,6 +290,7 @@ TEST_F(
ASSERT_STREQ(message_data.c_str(), msg.data.c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -284,6 +307,7 @@ TEST_F(
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -299,6 +323,7 @@ TEST_F(
ASSERT_STREQ(message_data.c_str(), msg->data.c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -315,6 +340,7 @@ TEST_F(
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -330,6 +356,7 @@ TEST_F(
ASSERT_STREQ(message_data.c_str(), msg->data.c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -346,6 +373,7 @@ TEST_F(
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand Down Expand Up @@ -381,6 +409,7 @@ TEST_F(
ASSERT_STREQ(message_data.c_str(), msg.c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -398,6 +427,7 @@ TEST_F(
ASSERT_FALSE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -413,6 +443,7 @@ TEST_F(
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -430,6 +461,7 @@ TEST_F(
ASSERT_FALSE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -445,6 +477,7 @@ TEST_F(
ASSERT_STREQ(message_data.c_str(), (*msg).c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -461,6 +494,7 @@ TEST_F(
ASSERT_FALSE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -476,6 +510,7 @@ TEST_F(
ASSERT_STREQ(message_data.c_str(), msg.data.c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -492,6 +527,7 @@ TEST_F(
ASSERT_FALSE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -507,6 +543,7 @@ TEST_F(
ASSERT_STREQ(message_data.c_str(), msg->data.c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -523,6 +560,7 @@ TEST_F(
ASSERT_FALSE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -538,6 +576,7 @@ TEST_F(
ASSERT_STREQ(message_data.c_str(), msg->data.c_str());
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand All @@ -554,6 +593,7 @@ TEST_F(
ASSERT_FALSE(message_info.get_rmw_message_info().from_intra_process);
};
auto sub = node->create_subscription<StringTypeAdapter>(topic_name, 1, callback);
ASSERT_TRUE(wait_for_match(sub, pub));
pub->publish(msg);
ASSERT_FALSE(is_received);
wait_for_message_to_be_received(is_received, node);
Expand Down

0 comments on commit eeb2f02

Please sign in to comment.