Skip to content

Commit

Permalink
Fixed potential deadlocks Coverity issues
Browse files Browse the repository at this point in the history
  • Loading branch information
gricciardi committed Jun 22, 2017
1 parent ea11a78 commit e172288
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 18 deletions.
5 changes: 5 additions & 0 deletions pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ struct pubsub_admin {
unsigned int maxPort;
};

/* Note: correct locking order is
* 1. subscriptionsLock
* 2. publications locks
*/

celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin);
celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin);

Expand Down
4 changes: 2 additions & 2 deletions pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
return pubsubAdmin_addAnySubscription(admin,subEP);
}

celixThreadMutex_lock(&admin->subscriptionsLock);
/* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */
celixThreadMutex_lock(&admin->localPublicationsLock);
celixThreadMutex_lock(&admin->externalPublicationsLock);
Expand Down Expand Up @@ -378,9 +379,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
}

if(status==CELIX_SUCCESS){
celixThreadMutex_lock(&admin->subscriptionsLock);
hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
celixThreadMutex_unlock(&admin->subscriptionsLock);
}
}

Expand All @@ -392,6 +391,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
free(scope_topic);
celixThreadMutex_unlock(&admin->externalPublicationsLock);
celixThreadMutex_unlock(&admin->localPublicationsLock);
celixThreadMutex_unlock(&admin->subscriptionsLock);

return status;

Expand Down
30 changes: 23 additions & 7 deletions pubsub/pubsub_admin_zmq/private/src/topic_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@

struct topic_publication {
zsock_t* zmq_socket;
celix_thread_mutex_t socket_lock; //Protects zmq_socket access
zcert_t * zmq_cert;
char* endpoint;
service_registration_pt svcFactoryReg;
array_list_pt pub_ep_list; //List<pubsub_endpoint>
hash_map_pt boundServices; //<bundle_pt,bound_service>
celix_thread_mutex_t tp_lock;
celix_thread_mutex_t tp_lock; // Protects topic_publication data structure
pubsub_serializer_service_t* serializerSvc;
};

Expand All @@ -79,11 +80,19 @@ typedef struct publish_bundle_bound_service {
char *topic;
pubsub_msg_serializer_map_t* map;
unsigned short getCount;
celix_thread_mutex_t mp_lock;
celix_thread_mutex_t mp_lock; //Protects publish_bundle_bound_service data structure
bool mp_send_in_progress;
array_list_pt mp_parts;
} publish_bundle_bound_service_t;

/* Note: correct locking order is
* 1. tp_lock
* 2. mp_lock
* 3. socket_lock
*
* tp_lock and socket_lock are independent.
*/

typedef struct pubsub_msg {
pubsub_msg_header_pt header;
char* payload;
Expand Down Expand Up @@ -211,6 +220,8 @@ celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p
pub->zmq_socket = socket;
pub->serializerSvc = NULL;

celixThreadMutex_create(&(pub->socket_lock),NULL);

#ifdef BUILD_WITH_ZMQ_SECURITY
if (pubEP->is_secure){
pub->zmq_cert = pub_cert;
Expand Down Expand Up @@ -241,7 +252,6 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
hashMap_destroy(pub->boundServices,false,false);

pub->svcFactoryReg = NULL;
zsock_destroy(&(pub->zmq_socket));
#ifdef BUILD_WITH_ZMQ_SECURITY
zcert_destroy(&(pub->zmq_cert));
#endif
Expand All @@ -250,6 +260,12 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){

celixThreadMutex_destroy(&(pub->tp_lock));

celixThreadMutex_lock(&(pub->socket_lock));
zsock_destroy(&(pub->zmq_socket));
celixThreadMutex_unlock(&(pub->socket_lock));

celixThreadMutex_destroy(&(pub->socket_lock));

free(pub);

return status;
Expand Down Expand Up @@ -570,16 +586,16 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy
}
else{
arrayList_add(bound->mp_parts,msg);
celixThreadMutex_lock(&(bound->parent->tp_lock));
celixThreadMutex_lock(&(bound->parent->socket_lock));
snd = send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts);
bound->mp_send_in_progress = false;
celixThreadMutex_unlock(&(bound->parent->tp_lock));
celixThreadMutex_unlock(&(bound->parent->socket_lock));
}
break;
case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG: //Normal send case
celixThreadMutex_lock(&(bound->parent->tp_lock));
celixThreadMutex_lock(&(bound->parent->socket_lock));
snd = send_pubsub_msg(bound->parent->zmq_socket,msg,true);
celixThreadMutex_unlock(&(bound->parent->tp_lock));
celixThreadMutex_unlock(&(bound->parent->socket_lock));
break;
default:
printf("TP: ERROR: Invalid MP flags combination\n");
Expand Down
22 changes: 13 additions & 9 deletions pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ struct topic_subscription {
zsock_t* zmq_socket;
zcert_t * zmq_cert;
zcert_t * zmq_pub_cert;
pthread_mutex_t socket_lock;
pthread_mutex_t socket_lock; //Protects zmq_socket access
service_tracker_pt tracker;
array_list_pt sub_ep_list;
celix_thread_t recv_thread;
bool running;
celix_thread_mutex_t ts_lock;
celix_thread_mutex_t ts_lock; //Protects topic_subscription data structure access
bundle_context_pt context;

hash_map_pt msgSerializerMapMap; // key = service ptr, value = pubsub_msg_serializer_map_t*
Expand All @@ -80,6 +80,11 @@ struct topic_subscription {
pubsub_serializer_service_t* serializerSvc;
};

/* Note: correct locking order is
* 1. socket_lock
* 2. ts_lock
*/

typedef struct complete_zmq_msg {
zframe_t* header;
zframe_t* payload;
Expand Down Expand Up @@ -284,17 +289,18 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
celixThreadMutex_destroy(&ts->pendingDisconnections_lock);

celixThreadMutex_lock(&ts->socket_lock);
zsock_destroy(&(ts->zmq_socket));
#ifdef BUILD_WITH_ZMQ_SECURITY
zcert_destroy(&(ts->zmq_cert));
zcert_destroy(&(ts->zmq_pub_cert));
#endif
celixThreadMutex_unlock(&ts->socket_lock);
celixThreadMutex_destroy(&ts->socket_lock);

celixThreadMutex_unlock(&ts->ts_lock);

celixThreadMutex_lock(&ts->socket_lock);
zsock_destroy(&(ts->zmq_socket));
celixThreadMutex_unlock(&ts->socket_lock);
celixThreadMutex_destroy(&ts->socket_lock);


free(ts);

Expand Down Expand Up @@ -623,8 +629,6 @@ static void* zmq_recv_thread_func(void * arg) {
zframe_destroy(&headerMsg);
} else {

celixThreadMutex_lock(&sub->ts_lock);

//Let's fetch all the messages from the socket
array_list_pt msg_list = NULL;
arrayList_create(&msg_list);
Expand Down Expand Up @@ -669,8 +673,8 @@ static void* zmq_recv_thread_func(void * arg) {
}
}

celixThreadMutex_lock(&sub->ts_lock);
process_msg(sub, msg_list);

celixThreadMutex_unlock(&sub->ts_lock);

}
Expand Down

0 comments on commit e172288

Please sign in to comment.