Skip to content

Commit

Permalink
Fixed coverity issues
Browse files Browse the repository at this point in the history
  • Loading branch information
rlenferink committed Feb 6, 2017
1 parent 1af1ff0 commit 8c84284
Show file tree
Hide file tree
Showing 16 changed files with 253 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,32 @@ struct publisherActivator {
};

celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
celix_status_t status = CELIX_SUCCESS;

struct publisherActivator * act = malloc(sizeof(*act));

const char* fwUUID = NULL;

bundleContext_getProperty(context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
if(fwUUID==NULL){
if(fwUUID == NULL){
printf("MP_PUBLISHER: Cannot retrieve fwUUID.\n");
return CELIX_INVALID_BUNDLE_CONTEXT;
status = CELIX_INVALID_BUNDLE_CONTEXT;
}

bundle_pt bundle = NULL;
long bundleId = 0;
bundleContext_getBundle(context,&bundle);
bundle_getBundleId(bundle,&bundleId);

arrayList_create(&(act->trackerList));
act->client = publisher_create(act->trackerList,fwUUID,bundleId);
*userData = act;
if (status == CELIX_SUCCESS){
bundle_pt bundle = NULL;
long bundleId = 0;
bundleContext_getBundle(context,&bundle);
bundle_getBundleId(bundle,&bundleId);

arrayList_create(&(act->trackerList));
act->client = publisher_create(act->trackerList,fwUUID,bundleId);
*userData = act;
} else {
free(act);
}

return CELIX_SUCCESS;
return status;
}

celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
if (mc_ip == NULL) {
const char *mc_prefix = NULL;
const char *interface = NULL;
int b0, b1, b2, b3;
int b0 = 224, b1 = 100, b2 = 1, b3 = 1;
bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , &mc_prefix);
if(mc_prefix == NULL) {
mc_prefix = DEFAULT_MC_PREFIX;
Expand All @@ -127,22 +127,28 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
int sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
if(sendSocket == -1) {
perror("pubsubAdmin_create:socket");
return CELIX_SERVICE_EXCEPTION;
}
char loop = 1;
if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) {
perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)");
return CELIX_SERVICE_EXCEPTION;
status = CELIX_SERVICE_EXCEPTION;
}

struct in_addr multicast_interface;
inet_aton(if_ip, &multicast_interface);
if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)) != 0) {
perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)");
return CELIX_SERVICE_EXCEPTION;
}
if (status == CELIX_SUCCESS){
char loop = 1;
if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) {
perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)");
status = CELIX_SERVICE_EXCEPTION;
}

if (status == CELIX_SUCCESS){
struct in_addr multicast_interface;
inet_aton(if_ip, &multicast_interface);
if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)) != 0) {
perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)");
status = CELIX_SERVICE_EXCEPTION;
}

(*admin)->sendSocket = sendSocket;
(*admin)->sendSocket = sendSocket;
}

}

}
#endif
Expand All @@ -162,6 +168,10 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
(*admin)->mcIpAddress = strdup(DEFAULT_MC_IP);
}

if (status != CELIX_SUCCESS){
pubsubAdmin_destroy(*admin);
}

}

return status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,65 +226,79 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
if (!hashMap_containsKey(ts->socketMap, pubURL)){

celixThreadMutex_lock(&ts->ts_lock);

int *recvSocket = calloc(sizeof(int), 1);
*recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
if (*recvSocket < 0) {
perror("pubsub_topicSubscriptionCreate:socket");
return CELIX_SERVICE_EXCEPTION;
status = CELIX_SERVICE_EXCEPTION;
}

int reuse = 1;
if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) {
perror("setsockopt() SO_REUSEADDR");
return CELIX_SERVICE_EXCEPTION;
if (status == CELIX_SUCCESS){
int reuse = 1;
if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) {
perror("setsockopt() SO_REUSEADDR");
status = CELIX_SERVICE_EXCEPTION;
}
}

// TODO Check if there is a better way to parse the URL to IP/Portnr
//replace ':' by spaces
char *url = strdup(pubURL);
char *pt = url;
while((pt=strchr(pt, ':')) != NULL) {
*pt = ' ';
}
char mcIp[100];
unsigned short mcPort;
sscanf(url, "udp //%s %hu", mcIp, &mcPort);
free (url);

printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort);

struct ip_mreq mc_addr;
mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress);
printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress);
if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
perror("setsockopt() IP_ADD_MEMBERSHIP");
return CELIX_SERVICE_EXCEPTION;
}
if (status == CELIX_SUCCESS){
// TODO Check if there is a better way to parse the URL to IP/Portnr
//replace ':' by spaces
char *url = strdup(pubURL);
char *pt = url;
while((pt=strchr(pt, ':')) != NULL) {
*pt = ' ';
}
char mcIp[100];
unsigned short mcPort;
sscanf(url, "udp //%s %hu", mcIp, &mcPort);
free (url);

printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort);

struct ip_mreq mc_addr;
mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress);
printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress);
if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
perror("setsockopt() IP_ADD_MEMBERSHIP");
status = CELIX_SERVICE_EXCEPTION;
}

struct sockaddr_in mcListenAddr;
mcListenAddr.sin_family = AF_INET;
mcListenAddr.sin_addr.s_addr = INADDR_ANY;
mcListenAddr.sin_port = htons(mcPort);
if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
perror("bind()");
return CELIX_SERVICE_EXCEPTION;
}
if (status == CELIX_SUCCESS){
struct sockaddr_in mcListenAddr;
mcListenAddr.sin_family = AF_INET;
mcListenAddr.sin_addr.s_addr = INADDR_ANY;
mcListenAddr.sin_port = htons(mcPort);
if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
perror("bind()");
status = CELIX_SERVICE_EXCEPTION;
}
}

if (status == CELIX_SUCCESS){
#if defined(__APPLE__) && defined(__MACH__)
//TODO: Use kqueue for OSX
#else
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN;
ev.data.fd = *recvSocket;
if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) {
perror("epoll_ctl() EPOLL_CTL_ADD");
status = CELIX_SERVICE_EXCEPTION;
}
#endif
}

#if defined(__APPLE__) && defined(__MACH__)
//TODO: Use kqueue for OSX
#else
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN;
ev.data.fd = *recvSocket;
if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) {
perror("epoll_ctl() EPOLL_CTL_ADD");
return CELIX_SERVICE_EXCEPTION;
}
#endif

hashMap_put(ts->socketMap, pubURL, (void*)recvSocket);
if (status == CELIX_SUCCESS){
hashMap_put(ts->socketMap, pubURL, (void*)recvSocket);
}else{
free(recvSocket);
}

celixThreadMutex_unlock(&ts->ts_lock);

Expand Down Expand Up @@ -444,9 +458,9 @@ static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_pt msg){
bool validVersion = checkVersion(msgVersion,&msg->header);

if(validVersion){
int rc = pubsubSerializer_deserialize(msgType, (const void *) msg->payload, &msgInst);
celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) msg->payload, &msgInst);

if (rc != -1) {
if (status == CELIX_SUCCESS) {
bool release = true;
pubsub_multipart_callbacks_t mp_callbacks;
mp_callbacks.handle = sub;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
/* And check also for ANY subscription */
topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
if (any_sub != NULL && pubEP->endpoint != NULL) {
pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint);
pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, pubEP->endpoint);
}
free(scope_topic);

Expand Down Expand Up @@ -559,22 +559,24 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
arrayList_remove(ext_pub_list,i);
}
}
// Check if there are more publsihers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
for(i=0; i<arrayList_size(ext_pub_list);i++) {
pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
count++;
}
}
// Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
int ext_pub_list_size = arrayList_size(ext_pub_list);
for(i=0; i<ext_pub_list_size; i++) {
pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
count++;
}
}

if(ext_pub_list_size == 0){
hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
char* topic = (char*)hashMapEntry_getKey(entry);
array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
hashMap_remove(admin->externalPublications,topic);
arrayList_destroy(list);
free(topic);
}

}
if(arrayList_size(ext_pub_list)==0){
hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
char* topic = (char*)hashMapEntry_getKey(entry);
array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
hashMap_remove(admin->externalPublications,topic);
arrayList_destroy(list);
free(topic);
}

celixThreadMutex_unlock(&admin->externalPublicationsLock);
Expand All @@ -591,7 +593,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
/* And check also for ANY subscription */
topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint);
}
free(scope_topic);
celixThreadMutex_unlock(&admin->subscriptionsLock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p

snprintf(ep,EP_ADDRESS_LEN,"tcp://%s:%u",bindIP,port);
snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); //NOTE using a different bind addres than endpoint address
rv = zsock_bind (socket, bindAddress);
rv = zsock_bind (socket, "%s", bindAddress);
if (rv == -1) {
perror("Error for zmq_bind");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL){
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&ts->socket_lock);
if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket,pubURL) != 0){
if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket, "%s", pubURL) != 0){
status = CELIX_SERVICE_EXCEPTION;
}
celixThreadMutex_unlock(&ts->socket_lock);
Expand Down Expand Up @@ -366,7 +366,7 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
celix_status_t status = CELIX_SUCCESS;

celixThreadMutex_lock(&ts->socket_lock);
if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket,pubURL) != 0){
if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket, "%s", pubURL) != 0){
status = CELIX_SERVICE_EXCEPTION;
}
celixThreadMutex_unlock(&ts->socket_lock);
Expand Down Expand Up @@ -497,9 +497,9 @@ static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){

if(validVersion){

int rc = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst);
celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst);

if (rc != -1) {
if (status == CELIX_SUCCESS) {
bool release = true;

mp_handle_pt mp_handle = create_mp_handle(msgTypes,msg_list);
Expand Down Expand Up @@ -735,9 +735,9 @@ static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_ms
bool validVersion = checkVersion(msgVersion,header);

if(validVersion){
int rc = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(c_msg->payload), &msgInst);
celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(c_msg->payload), &msgInst);

if(rc != -1){
if(status == CELIX_SUCCESS){
unsigned int* msgId = calloc(1,sizeof(unsigned int));
*msgId = header->type;
msg_map_entry_pt entry = calloc(1,sizeof(struct msg_map_entry));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@

#include "bundle.h"
#include "hash_map.h"
#include "celix_errno.h"

typedef struct _pubsub_message_type pubsub_message_type;

int pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
int pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output);
celix_status_t pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
celix_status_t pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output);

unsigned int pubsubSerializer_hashCode(const char *string);
version_pt pubsubSerializer_getVersion(pubsub_message_type *msgType);
Expand Down

0 comments on commit 8c84284

Please sign in to comment.