Skip to content

Commit

Permalink
Local endpoints call discovery listener
Browse files Browse the repository at this point in the history
  • Loading branch information
richiprosima committed Sep 9, 2016
1 parent c394c60 commit 24825bb
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 77 deletions.
4 changes: 4 additions & 0 deletions src/cpp/rtps/builtin/discovery/endpoint/EDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ bool EDPSimple::processLocalReaderProxyData(ReaderProxyData* rdata)
}
}
lock.unlock();
if(this->mp_subListen->getAttachedListener() != nullptr)
this->mp_subListen->getAttachedListener()->onNewCacheChangeAdded(mp_SubReader.first, change);
mp_SubWriter.second->add_change(change);
return true;
}
Expand Down Expand Up @@ -305,6 +307,8 @@ bool EDPSimple::processLocalWriterProxyData(WriterProxyData* wdata)
}
}
lock.unlock();
if(this->mp_pubListen->getAttachedListener() != nullptr)
this->mp_pubListen->getAttachedListener()->onNewCacheChangeAdded(mp_PubReader.first, change);
mp_PubWriter.second->add_change(change);
return true;
}
Expand Down
132 changes: 67 additions & 65 deletions src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,44 +61,45 @@ void EDPSimplePUBListener::onNewCacheChangeAdded(RTPSReader* /*reader*/, const C
if(writerProxyData.readFromCDRMessage(&tempMsg))
{
change->instanceHandle = writerProxyData.key();
if(writerProxyData.guid().guidPrefix != mp_SEDP->mp_RTPSParticipant->getGuid().guidPrefix)
{
//LOOK IF IS AN UPDATED INFORMATION
WriterProxyData* wdata = nullptr;
ParticipantProxyData* pdata = nullptr;
if(this->mp_SEDP->mp_PDP->addWriterProxyData(&writerProxyData,true,&wdata,&pdata)) //ADDED NEW DATA
{
//CHECK the locators:
pdata->mp_mutex->lock();
if(wdata->unicastLocatorList().empty() && wdata->multicastLocatorList().empty())
{
wdata->unicastLocatorList(pdata->m_defaultUnicastLocatorList);
wdata->multicastLocatorList(pdata->m_defaultMulticastLocatorList);
}
wdata->isAlive(true);
pdata->mp_mutex->unlock();
mp_SEDP->pairingWriterProxy(pdata, wdata);
}
else if(pdata == nullptr) //RTPSParticipant NOT FOUND
{
logWarning(RTPS_EDP,"Received message from UNKNOWN RTPSParticipant, removing");
}
else //NOT ADDED BECAUSE IT WAS ALREADY THERE
{
pdata->mp_mutex->lock();
wdata->update(&writerProxyData);
pdata->mp_mutex->unlock();
mp_SEDP->pairingWriterProxy(pdata, wdata);
}
}
else
logInfo(RTPS_EDP,"Message from own RTPSParticipant, ignoring");
if(writerProxyData.guid().guidPrefix == mp_SEDP->mp_RTPSParticipant->getGuid().guidPrefix)
{
logInfo(RTPS_EDP,"Message from own RTPSParticipant, ignoring");
mp_SEDP->mp_PubReader.second->remove_change(change);
return;
}
//LOOK IF IS AN UPDATED INFORMATION
WriterProxyData* wdata = nullptr;
ParticipantProxyData* pdata = nullptr;
if(this->mp_SEDP->mp_PDP->addWriterProxyData(&writerProxyData,true,&wdata,&pdata)) //ADDED NEW DATA
{
//CHECK the locators:
pdata->mp_mutex->lock();
if(wdata->unicastLocatorList().empty() && wdata->multicastLocatorList().empty())
{
wdata->unicastLocatorList(pdata->m_defaultUnicastLocatorList);
wdata->multicastLocatorList(pdata->m_defaultMulticastLocatorList);
}
wdata->isAlive(true);
pdata->mp_mutex->unlock();
mp_SEDP->pairingWriterProxy(pdata, wdata);
}
else if(pdata == nullptr) //RTPSParticipant NOT FOUND
{
logWarning(RTPS_EDP,"Received message from UNKNOWN RTPSParticipant, removing");
}
else //NOT ADDED BECAUSE IT WAS ALREADY THERE
{
pdata->mp_mutex->lock();
wdata->update(&writerProxyData);
pdata->mp_mutex->unlock();
mp_SEDP->pairingWriterProxy(pdata, wdata);
}
}

//Call the slave, if it exists
if(attached_listener != nullptr){
attached_listener_mutex.lock();
attached_listener->onNewCacheChangeAdded(nullptr,change_in);
attached_listener->onNewCacheChangeAdded(this->mp_SEDP->mp_PubReader.first, change_in);
attached_listener_mutex.unlock();
}
}
Expand Down Expand Up @@ -190,43 +191,44 @@ void EDPSimpleSUBListener::onNewCacheChangeAdded(RTPSReader* /*reader*/, const C
if(readerProxyData.readFromCDRMessage(&tempMsg))
{
change->instanceHandle = readerProxyData.m_key;
if(readerProxyData.m_guid.guidPrefix != mp_SEDP->mp_RTPSParticipant->getGuid().guidPrefix)
{
//LOOK IF IS AN UPDATED INFORMATION
ReaderProxyData* rdata = nullptr;
ParticipantProxyData* pdata = nullptr;
if(this->mp_SEDP->mp_PDP->addReaderProxyData(&readerProxyData,true,&rdata,&pdata)) //ADDED NEW DATA
{
pdata->mp_mutex->lock();
//CHECK the locators:
if(rdata->m_unicastLocatorList.empty() && rdata->m_multicastLocatorList.empty())
{
rdata->m_unicastLocatorList = pdata->m_defaultUnicastLocatorList;
rdata->m_multicastLocatorList = pdata->m_defaultMulticastLocatorList;
}
rdata->m_isAlive = true;
pdata->mp_mutex->unlock();
mp_SEDP->pairingReaderProxy(pdata, rdata);
}
else if(pdata == nullptr) //RTPSParticipant NOT FOUND
{
logWarning(RTPS_EDP,"From UNKNOWN RTPSParticipant, removing");
}
else //NOT ADDED BECAUSE IT WAS ALREADY THERE
{
pdata->mp_mutex->lock();
rdata->update(&readerProxyData);
pdata->mp_mutex->unlock();
mp_SEDP->pairingReaderProxy(pdata, rdata);
}
}
else
if(readerProxyData.m_guid.guidPrefix == mp_SEDP->mp_RTPSParticipant->getGuid().guidPrefix)
{
logInfo(RTPS_EDP,"From own RTPSParticipant, ignoring");
mp_SEDP->mp_SubReader.second->remove_change(change);
return;
}
//LOOK IF IS AN UPDATED INFORMATION
ReaderProxyData* rdata = nullptr;
ParticipantProxyData* pdata = nullptr;
if(this->mp_SEDP->mp_PDP->addReaderProxyData(&readerProxyData,true,&rdata,&pdata)) //ADDED NEW DATA
{
pdata->mp_mutex->lock();
//CHECK the locators:
if(rdata->m_unicastLocatorList.empty() && rdata->m_multicastLocatorList.empty())
{
rdata->m_unicastLocatorList = pdata->m_defaultUnicastLocatorList;
rdata->m_multicastLocatorList = pdata->m_defaultMulticastLocatorList;
}
rdata->m_isAlive = true;
pdata->mp_mutex->unlock();
mp_SEDP->pairingReaderProxy(pdata, rdata);
}
else if(pdata == nullptr) //RTPSParticipant NOT FOUND
{
logWarning(RTPS_EDP,"From UNKNOWN RTPSParticipant, removing");
}
else //NOT ADDED BECAUSE IT WAS ALREADY THERE
{
pdata->mp_mutex->lock();
rdata->update(&readerProxyData);
pdata->mp_mutex->unlock();
mp_SEDP->pairingReaderProxy(pdata, rdata);
}
}

//Call the slave, if it exists
if(attached_listener != nullptr)
attached_listener->onNewCacheChangeAdded(nullptr,change);
attached_listener->onNewCacheChangeAdded(this->mp_SEDP->mp_SubReader.first, change);

}
else
Expand Down
71 changes: 59 additions & 12 deletions test/unittest/rtps/ros2features/ros2feature_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,12 @@ TEST(ros2features, SlaveListenerCallback_DynamicMode){
bool result;
p_attr.rtps.builtin.domainId = (uint32_t)boost::interprocess::ipcdetail::get_current_process_id() % 230 + 35;
my_participant = Domain::createParticipant(p_attr);

ASSERT_NE(my_participant, nullptr);

std::pair<StatefulReader*,StatefulReader*> EDP_Readers = my_participant->getEDPReaders();
result = EDP_Readers.second->setListener(&slave_listener);
ASSERT_EQ(result,true);

ASSERT_EQ(Domain::registerType(my_participant, &my_type), true);

pub_attr.topic.topicKind = NO_KEY;
Expand All @@ -259,12 +263,8 @@ TEST(ros2features, SlaveListenerCallback_DynamicMode){
my_publisher = Domain::createPublisher(my_participant, pub_attr, &my_dummy_listener);
ASSERT_NE(my_publisher, nullptr);

std::pair<StatefulReader*,StatefulReader*> EDP_Readers = my_participant->getEDPReaders();
result = EDP_Readers.second->setListener(&slave_listener);
ASSERT_EQ(result,true);

slave_listener.mapmutex.lock();
ASSERT_EQ(slave_listener.topicNtypes.size(),0);
ASSERT_EQ(slave_listener.topicNtypes.size(),1);
slave_listener.mapmutex.unlock();

Participant *my_participant2;
Expand Down Expand Up @@ -307,8 +307,12 @@ TEST(ros2features, SlaveListenerCallback_StaticMode){
bool result;
p_attr.rtps.builtin.domainId = (uint32_t)boost::interprocess::ipcdetail::get_current_process_id() % 230;
my_participant = Domain::createParticipant(p_attr);

ASSERT_NE(my_participant, nullptr);

std::pair<StatefulReader*,StatefulReader*> EDP_Readers = my_participant->getEDPReaders();
result = EDP_Readers.second->setListener(&slave_listener);
ASSERT_EQ(result,true);

ASSERT_EQ(Domain::registerType(my_participant, &my_type), true);

pub_attr.topic.topicKind = NO_KEY;
Expand All @@ -317,12 +321,8 @@ TEST(ros2features, SlaveListenerCallback_StaticMode){
my_publisher = Domain::createPublisher(my_participant, pub_attr, &my_dummy_listener);
ASSERT_NE(my_publisher, nullptr);

std::pair<StatefulReader*,StatefulReader*> EDP_Readers = my_participant->getEDPReaders();
result = EDP_Readers.second->setListener(&slave_listener);
ASSERT_EQ(result,true);

slave_listener.mapmutex.lock();
ASSERT_EQ(slave_listener.topicNtypes.size(),0);
ASSERT_EQ(slave_listener.topicNtypes.size(),1);
slave_listener.mapmutex.unlock();

Participant *my_participant2;
Expand Down Expand Up @@ -354,6 +354,53 @@ TEST(ros2features, SlaveListenerCallback_StaticMode){
eprosima::fastrtps::Domain::removeParticipant(my_participant2);
}

TEST(ros2features, SlaveListenerCallbackWithOneParticipant_StaticMode){
Participant *my_participant;
Publisher *my_publisher;
ParticipantAttributes p_attr;
PublisherAttributes pub_attr;
HelloWorldType my_type;
gettopicnamesandtypesReaderListener slave_listener;
bool result;
p_attr.rtps.builtin.domainId = (uint32_t)boost::interprocess::ipcdetail::get_current_process_id() % 230;
my_participant = Domain::createParticipant(p_attr);
ASSERT_NE(my_participant, nullptr);

std::pair<StatefulReader*,StatefulReader*> EDP_Readers = my_participant->getEDPReaders();
result = EDP_Readers.second->setListener(&slave_listener);
ASSERT_EQ(result,true);

ASSERT_EQ(Domain::registerType(my_participant, &my_type), true);

pub_attr.topic.topicKind = NO_KEY;
pub_attr.topic.topicDataType = "HelloWorldType";
pub_attr.historyMemoryPolicy = PREALLOCATED_MEMORY_MODE;
my_publisher = Domain::createPublisher(my_participant, pub_attr, nullptr);
ASSERT_NE(my_publisher, nullptr);

slave_listener.mapmutex.lock();
ASSERT_EQ(slave_listener.topicNtypes.size(),1);
slave_listener.mapmutex.unlock();

Publisher *my_publisher2;
PublisherAttributes pub_attr2;

pub_attr2.topic.topicKind = NO_KEY;
pub_attr2.topic.topicDataType = "HelloWorldType";
pub_attr2.topic.topicName = "OtherTopic";
pub_attr2.historyMemoryPolicy = PREALLOCATED_MEMORY_MODE;
my_publisher2 = Domain::createPublisher(my_participant, pub_attr2, nullptr);
ASSERT_NE(my_publisher2, nullptr);

slave_listener.mapmutex.lock();
ASSERT_EQ(slave_listener.topicNtypes.size(), 2);
slave_listener.mapmutex.unlock();

EDP_Readers.second->setListener(nullptr);

eprosima::fastrtps::Domain::removeParticipant(my_participant);
}


int main(int argc, char **argv)
{
Expand Down

0 comments on commit 24825bb

Please sign in to comment.