Skip to content

Commit

Permalink
capicxx-someip-runtime 3.1.12.6
Browse files Browse the repository at this point in the history
  • Loading branch information
juergengehring committed Jan 25, 2018
1 parent 69a4939 commit 64cbeec
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 29 deletions.
6 changes: 6 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Changes
=======

v3.1.12.6
- Fixed hang-up in proxy destruction when async method call was done
and proxy is not available.
- Ensure that all async method calls done while the proxy is not
available are called with CAPI::CallStatus::NOT_AVAILABLE.

v3.1.12.5
- Fix possible heap corruption in conjunction with multiple mainloops
- Optimize (de)serialization of byte arrays
Expand Down
8 changes: 5 additions & 3 deletions include/CommonAPI/SomeIP/Connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,16 +269,16 @@ class Connection:

std::shared_ptr<vsomeip::application> application_;

mutable std::map<session_id_t, Message> sendAndBlockAnswers_;
mutable std::map<session_id_fake_t, Message> sendAndBlockAnswers_;
mutable std::condition_variable_any sendAndBlockCondition_;

std::shared_ptr<std::thread> asyncAnswersCleanupThread_;
std::mutex cleanupMutex_;
mutable std::mutex cleanupMutex_;
mutable std::condition_variable cleanupCondition_;
std::atomic<bool> cleanupCancelled_;

mutable std::recursive_mutex sendReceiveMutex_;
typedef std::map<session_id_t,
typedef std::map<session_id_fake_t,
std::tuple<
std::chrono::steady_clock::time_point,
std::shared_ptr<vsomeip::message>,
Expand Down Expand Up @@ -327,6 +327,8 @@ class Connection:

std::map<service_id_t, std::map<instance_id_t, std::map<eventgroup_id_t, SubsciptionHandler_t>>> subscription_;
std::mutex subscriptionMutex_;

std::map<std::shared_ptr<vsomeip::message>, session_id_fake_t> errorResponses_;
};


Expand Down
4 changes: 3 additions & 1 deletion include/CommonAPI/SomeIP/OutputStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,9 @@ class OutputStream: public CommonAPI::OutputStream<OutputStream> {

if (!hasError()) {
_writeValue(uint32_t(_value.size()), arrayLengthWidth);
_writeRaw(reinterpret_cast<const byte_t *>(&_value[0]), _value.size());
if (_value.size()) {
_writeRaw(reinterpret_cast<const byte_t *>(&_value[0]), _value.size());
}
}

return (*this);
Expand Down
2 changes: 2 additions & 0 deletions include/CommonAPI/SomeIP/Types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ typedef uint32_t AvailabilityHandlerId_t;
typedef std::function<void (std::shared_ptr<Proxy>, service_id_t, instance_id_t, bool, void*)> AvailabilityHandler_t;
typedef std::function<bool (client_id_t, bool) > SubsciptionHandler_t;

typedef std::uint32_t session_id_fake_t;

class Message;
typedef std::function<bool (const Message &) > MessageHandler_t;

Expand Down
65 changes: 47 additions & 18 deletions src/CommonAPI/SomeIP/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,14 @@ void Connection::receive(const std::shared_ptr<vsomeip::message> &_message) {
void Connection::handleProxyReceive(const std::shared_ptr<vsomeip::message> &_message) {
sendReceiveMutex_.lock();

session_id_t sessionId = _message->get_session();
session_id_fake_t sessionId = _message->get_session();
if (!sessionId) {
auto found_Message = errorResponses_.find(_message);
if (found_Message != errorResponses_.end()) {
sessionId = found_Message->second;
errorResponses_.erase(found_Message);
}
}

// handle events
if(_message->get_message_type() == message_type_e::MT_NOTIFICATION) {
Expand Down Expand Up @@ -237,7 +244,10 @@ void Connection::handleAvailabilityChange(const service_id_t _service,
instance_id_t _instance, bool _is_available) {
if (!_is_available) {
// cancel sync calls
sendAndBlockCondition_.notify_all();
{
std::lock_guard<std::recursive_mutex> lock(sendReceiveMutex_);
sendAndBlockCondition_.notify_all();
}
// cancel asynchronous calls
cancelAsyncAnswers(_service, _instance);
}
Expand Down Expand Up @@ -314,8 +324,10 @@ void Connection::cleanup() {
watch_->pushQueue(msg_queue_entry);
} else {
try {
itsLock.unlock();
std::get<2>(its_answer.second)->onMessageReply(
CallStatus::REMOTE_ERROR, Message(response));
itsLock.lock();
} catch (const std::exception& e) {
COMMONAPI_ERROR("Message reply failed on cleanup(", e.what(), ")");
}
Expand Down Expand Up @@ -415,8 +427,8 @@ void Connection::doDisconnect() {
{
std::lock_guard<std::mutex> lg(cleanupMutex_);
cleanupCancelled_ = true;
cleanupCondition_.notify_one();
}
cleanupCondition_.notify_one();
if (asyncAnswersCleanupThread_->joinable())
asyncAnswersCleanupThread_->join();
}
Expand Down Expand Up @@ -486,23 +498,30 @@ bool Connection::sendMessageWithReplyAsync(
if (!isConnected())
return false;

std::lock_guard<std::recursive_mutex> lock(sendReceiveMutex_);
application_->send(message.message_, true);

if (_info->sender_ != 0) {
COMMONAPI_DEBUG("Message sent: SenderID: ", _info->sender_,
" - ClientID: ", message.getClientId(),
", SessionID: ", message.getSessionId());
}
{
std::lock_guard<std::recursive_mutex> lock(sendReceiveMutex_);
application_->send(message.message_, true);

auto timeoutTime = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now()
+ std::chrono::milliseconds(_info->timeout_);
if (_info->sender_ != 0) {
COMMONAPI_DEBUG("Message sent: SenderID: ", _info->sender_,
" - ClientID: ", message.getClientId(),
", SessionID: ", message.getSessionId());
}

asyncAnswers_[message.getSessionId()] = std::make_tuple(
timeoutTime, message.message_,
std::move(messageReplyAsyncHandler));
auto timeoutTime =
(std::chrono::steady_clock::time_point) std::chrono::steady_clock::now()
+ std::chrono::milliseconds(_info->timeout_);

asyncAnswers_[message.getSessionId()] = std::make_tuple(
timeoutTime, message.message_,
std::move(messageReplyAsyncHandler));
}
{
std::lock_guard<std::mutex> itsLock(cleanupMutex_);
cleanupCondition_.notify_one();
}

cleanupCondition_.notify_one();

return true;
}
Expand All @@ -516,7 +535,7 @@ Message Connection::sendMessageWithReplyAndBlock(

std::unique_lock<std::recursive_mutex> lock(sendReceiveMutex_);

std::pair<std::map<session_id_t, Message>::iterator, bool> itsAnswer;
std::pair<std::map<session_id_fake_t, Message>::iterator, bool> itsAnswer;
application_->send(message.message_, true);
if (_info->sender_ != 0) {
COMMONAPI_DEBUG("Message sent: SenderID: ", _info->sender_,
Expand Down Expand Up @@ -1155,7 +1174,17 @@ void Connection::proxyPushMessageToMainLoop(const Message &_message,
std::lock_guard<std::recursive_mutex> lock(sendReceiveMutex_);
auto timeoutTime = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now()
+ std::chrono::milliseconds(ASYNC_MESSAGE_REPLY_TIMEOUT_MS);
asyncAnswers_[_message.getSessionId()]
session_id_fake_t itsSession = _message.getSessionId();
if (itsSession == 0) {
static std::uint16_t fakeSessionId = 0;
fakeSessionId++;
if (fakeSessionId == 0) {
fakeSessionId++; // set to 1 on overflow
}
itsSession = static_cast<session_id_fake_t>(fakeSessionId << 16);
errorResponses_[_message.message_] = itsSession;
}
asyncAnswers_[itsSession]
= std::make_tuple(
timeoutTime, _message.message_,
std::move(messageReplyAsyncHandler));
Expand Down
4 changes: 3 additions & 1 deletion src/CommonAPI/SomeIP/OutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,9 @@ OutputStream& OutputStream::writeValue(const ByteBuffer &_value, const ByteBuffe

if (!hasError()) {
_writeValue(uint32_t(_value.size()), 4);
_writeRaw(static_cast<const byte_t *>(&_value[0]), _value.size());
if (_value.size()) {
_writeRaw(static_cast<const byte_t *>(&_value[0]), _value.size());
}
}

return (*this);
Expand Down
27 changes: 21 additions & 6 deletions src/CommonAPI/SomeIP/Proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void ProxyStatusEventHelper::onListenerRemoved(const Listener& _listener,
void Proxy::availabilityTimeoutThreadHandler() const {
std::unique_lock<std::mutex> threadLock(availabilityTimeoutThreadMutex_);

bool cancel = false;
bool finish = false;
bool firstIteration = true;

// the callbacks that have to be done are stored with
Expand All @@ -64,7 +64,7 @@ void Proxy::availabilityTimeoutThreadHandler() const {
> CallbackData_t;
std::list<CallbackData_t> callbacks;

while(!cancel) {
while(!finish) {

//get min timeout
timeoutsMutex_.lock();
Expand Down Expand Up @@ -185,7 +185,7 @@ void Proxy::availabilityTimeoutThreadHandler() const {
//cancel thread
timeoutsMutex_.lock();
if(timeouts_.size() == 0 && callbacks.size() == 0)
cancel = true;
finish = true;
timeoutsMutex_.unlock();
}
}
Expand Down Expand Up @@ -238,7 +238,10 @@ void Proxy::onServiceInstanceStatus(std::shared_ptr<Proxy> _proxy,
for(auto listenerIt : proxyStatusEvent_.listeners_)
proxyStatusEvent_.notifySpecificListener(listenerIt.first, availabilityStatus_);
}
_proxy->availabilityCondition_.notify_one();
{
std::lock_guard<std::mutex> itsLock(_proxy->availabilityMutex_);
_proxy->availabilityCondition_.notify_one();
}
}

Proxy::Proxy(const Address &_address,
Expand All @@ -255,9 +258,20 @@ Proxy::Proxy(const Address &_address,
}

Proxy::~Proxy() {
{
std::lock_guard<std::mutex> itsLock(timeoutsMutex_);
timeouts_.clear();
}
{
std::lock_guard<std::mutex> itsTimeoutThreadLock(availabilityTimeoutThreadMutex_);
availabilityTimeoutCondition_.notify_all();
}
if(availabilityTimeoutThread_) {
if(availabilityTimeoutThread_->joinable())
if (availabilityTimeoutThread_->get_id() == std::this_thread::get_id()) {
availabilityTimeoutThread_->detach();
} else if(availabilityTimeoutThread_->joinable()) {
availabilityTimeoutThread_->join();
}
}
getConnection()->releaseService(alias_);
getConnection()->unregisterAvailabilityHandler(alias_, availabilityHandlerId_);
Expand Down Expand Up @@ -357,7 +371,8 @@ std::future<AvailabilityStatus> Proxy::isAvailableAsync(
//start availability thread
if(!isAvailabilityTimeoutThread)
availabilityTimeoutThread_ = std::make_shared<std::thread>(
std::bind(&Proxy::availabilityTimeoutThreadHandler, this));
std::bind(&Proxy::availabilityTimeoutThreadHandler,
shared_from_this()));
} else {
//add timeout
timeouts_.push_back(std::make_tuple(timeoutPoint, _callback, std::move(promise)));
Expand Down

0 comments on commit 64cbeec

Please sign in to comment.