From 8c84284f6f63de076bbc1d8f66ac77115d2e5707 Mon Sep 17 00:00:00 2001 From: Roy Lenferink Date: Mon, 6 Feb 2017 17:43:19 +0100 Subject: [PATCH] Fixed coverity issues --- .../publisher/private/src/mp_pub_activator.c | 28 +++-- .../private/src/pubsub_admin_impl.c | 38 +++--- .../private/src/topic_subscription.c | 112 ++++++++++-------- .../private/src/pubsub_admin_impl.c | 36 +++--- .../private/src/topic_publication.c | 2 +- .../private/src/topic_subscription.c | 12 +- .../public/include/pubsub_serializer.h | 5 +- .../pubsub_common/public/src/dyn_msg_utils.c | 44 ++++--- .../pubsub/pubsub_common/public/src/etcd.c | 6 +- .../pubsub_common/public/src/log_helper.c | 48 ++++---- .../public/src/pubsub_endpoint.c | 63 +++++----- .../public/src/pubsub_serializer.c | 24 ++-- .../private/src/etcd_watcher.c | 8 +- .../private/src/pubsub_discovery_impl.c | 4 +- .../private/src/pstm_activator.c | 15 +++ .../private/src/pubsub_topology_manager.c | 14 ++- 16 files changed, 253 insertions(+), 206 deletions(-) diff --git a/celix-pubsub/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c b/celix-pubsub/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c index 231157acc..5c8b14527 100644 --- a/celix-pubsub/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c +++ b/celix-pubsub/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c @@ -44,26 +44,32 @@ struct publisherActivator { }; celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) { + celix_status_t status = CELIX_SUCCESS; + struct publisherActivator * act = malloc(sizeof(*act)); const char* fwUUID = NULL; bundleContext_getProperty(context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); - if(fwUUID==NULL){ + if(fwUUID == NULL){ printf("MP_PUBLISHER: Cannot retrieve fwUUID.\n"); - return CELIX_INVALID_BUNDLE_CONTEXT; + status = CELIX_INVALID_BUNDLE_CONTEXT; } - bundle_pt bundle = NULL; - long bundleId = 0; - bundleContext_getBundle(context,&bundle); - bundle_getBundleId(bundle,&bundleId); - - arrayList_create(&(act->trackerList)); - act->client = publisher_create(act->trackerList,fwUUID,bundleId); - *userData = act; + if (status == CELIX_SUCCESS){ + bundle_pt bundle = NULL; + long bundleId = 0; + bundleContext_getBundle(context,&bundle); + bundle_getBundleId(bundle,&bundleId); + + arrayList_create(&(act->trackerList)); + act->client = publisher_create(act->trackerList,fwUUID,bundleId); + *userData = act; + } else { + free(act); + } - return CELIX_SUCCESS; + return status; } celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) { diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c index 9182e1bb0..bd3bb2fab 100644 --- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c +++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c @@ -104,7 +104,7 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad if (mc_ip == NULL) { const char *mc_prefix = NULL; const char *interface = NULL; - int b0, b1, b2, b3; + int b0 = 224, b1 = 100, b2 = 1, b3 = 1; bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , &mc_prefix); if(mc_prefix == NULL) { mc_prefix = DEFAULT_MC_PREFIX; @@ -127,22 +127,28 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad int sendSocket = socket(AF_INET, SOCK_DGRAM, 0); if(sendSocket == -1) { perror("pubsubAdmin_create:socket"); - return CELIX_SERVICE_EXCEPTION; - } - char loop = 1; - if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) { - perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)"); - return CELIX_SERVICE_EXCEPTION; + status = CELIX_SERVICE_EXCEPTION; } - struct in_addr multicast_interface; - inet_aton(if_ip, &multicast_interface); - if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)) != 0) { - perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)"); - return CELIX_SERVICE_EXCEPTION; - } + if (status == CELIX_SUCCESS){ + char loop = 1; + if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) { + perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)"); + status = CELIX_SERVICE_EXCEPTION; + } + + if (status == CELIX_SUCCESS){ + struct in_addr multicast_interface; + inet_aton(if_ip, &multicast_interface); + if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)) != 0) { + perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)"); + status = CELIX_SERVICE_EXCEPTION; + } - (*admin)->sendSocket = sendSocket; + (*admin)->sendSocket = sendSocket; + } + + } } #endif @@ -162,6 +168,10 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad (*admin)->mcIpAddress = strdup(DEFAULT_MC_IP); } + if (status != CELIX_SUCCESS){ + pubsubAdmin_destroy(*admin); + } + } return status; diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c index 0907f16d0..0caf0842f 100644 --- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c +++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c @@ -226,65 +226,79 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts if (!hashMap_containsKey(ts->socketMap, pubURL)){ celixThreadMutex_lock(&ts->ts_lock); + int *recvSocket = calloc(sizeof(int), 1); *recvSocket = socket(AF_INET, SOCK_DGRAM, 0); if (*recvSocket < 0) { perror("pubsub_topicSubscriptionCreate:socket"); - return CELIX_SERVICE_EXCEPTION; + status = CELIX_SERVICE_EXCEPTION; } - int reuse = 1; - if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) { - perror("setsockopt() SO_REUSEADDR"); - return CELIX_SERVICE_EXCEPTION; + if (status == CELIX_SUCCESS){ + int reuse = 1; + if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) { + perror("setsockopt() SO_REUSEADDR"); + status = CELIX_SERVICE_EXCEPTION; + } } - // TODO Check if there is a better way to parse the URL to IP/Portnr - //replace ':' by spaces - char *url = strdup(pubURL); - char *pt = url; - while((pt=strchr(pt, ':')) != NULL) { - *pt = ' '; - } - char mcIp[100]; - unsigned short mcPort; - sscanf(url, "udp //%s %hu", mcIp, &mcPort); - free (url); - - printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort); - - struct ip_mreq mc_addr; - mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp); - mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress); - printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress); - if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) { - perror("setsockopt() IP_ADD_MEMBERSHIP"); - return CELIX_SERVICE_EXCEPTION; - } + if (status == CELIX_SUCCESS){ + // TODO Check if there is a better way to parse the URL to IP/Portnr + //replace ':' by spaces + char *url = strdup(pubURL); + char *pt = url; + while((pt=strchr(pt, ':')) != NULL) { + *pt = ' '; + } + char mcIp[100]; + unsigned short mcPort; + sscanf(url, "udp //%s %hu", mcIp, &mcPort); + free (url); + + printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort); + + struct ip_mreq mc_addr; + mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp); + mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress); + printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress); + if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) { + perror("setsockopt() IP_ADD_MEMBERSHIP"); + status = CELIX_SERVICE_EXCEPTION; + } - struct sockaddr_in mcListenAddr; - mcListenAddr.sin_family = AF_INET; - mcListenAddr.sin_addr.s_addr = INADDR_ANY; - mcListenAddr.sin_port = htons(mcPort); - if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) { - perror("bind()"); - return CELIX_SERVICE_EXCEPTION; - } + if (status == CELIX_SUCCESS){ + struct sockaddr_in mcListenAddr; + mcListenAddr.sin_family = AF_INET; + mcListenAddr.sin_addr.s_addr = INADDR_ANY; + mcListenAddr.sin_port = htons(mcPort); + if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) { + perror("bind()"); + status = CELIX_SERVICE_EXCEPTION; + } + } + + if (status == CELIX_SUCCESS){ + #if defined(__APPLE__) && defined(__MACH__) + //TODO: Use kqueue for OSX + #else + struct epoll_event ev; + memset(&ev, 0, sizeof(ev)); + ev.events = EPOLLIN; + ev.data.fd = *recvSocket; + if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) { + perror("epoll_ctl() EPOLL_CTL_ADD"); + status = CELIX_SERVICE_EXCEPTION; + } + #endif + } -#if defined(__APPLE__) && defined(__MACH__) - //TODO: Use kqueue for OSX -#else - struct epoll_event ev; - memset(&ev, 0, sizeof(ev)); - ev.events = EPOLLIN; - ev.data.fd = *recvSocket; - if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) { - perror("epoll_ctl() EPOLL_CTL_ADD"); - return CELIX_SERVICE_EXCEPTION; } -#endif - hashMap_put(ts->socketMap, pubURL, (void*)recvSocket); + if (status == CELIX_SUCCESS){ + hashMap_put(ts->socketMap, pubURL, (void*)recvSocket); + }else{ + free(recvSocket); + } celixThreadMutex_unlock(&ts->ts_lock); @@ -444,9 +458,9 @@ static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_pt msg){ bool validVersion = checkVersion(msgVersion,&msg->header); if(validVersion){ - int rc = pubsubSerializer_deserialize(msgType, (const void *) msg->payload, &msgInst); + celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) msg->payload, &msgInst); - if (rc != -1) { + if (status == CELIX_SUCCESS) { bool release = true; pubsub_multipart_callbacks_t mp_callbacks; mp_callbacks.handle = sub; diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c index 8e14800d5..e67089966 100644 --- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c +++ b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c @@ -505,7 +505,7 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint /* And check also for ANY subscription */ topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC); if (any_sub != NULL && pubEP->endpoint != NULL) { - pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint); + pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, pubEP->endpoint); } free(scope_topic); @@ -559,22 +559,24 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi arrayList_remove(ext_pub_list,i); } } - // Check if there are more publsihers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic) - for(i=0; iendpoint,p->endpoint) == 0) { - count++; - } - } + // Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic) + int ext_pub_list_size = arrayList_size(ext_pub_list); + for(i=0; iendpoint,p->endpoint) == 0) { + count++; + } + } + + if(ext_pub_list_size == 0){ + hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic); + char* topic = (char*)hashMapEntry_getKey(entry); + array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry); + hashMap_remove(admin->externalPublications,topic); + arrayList_destroy(list); + free(topic); + } - } - if(arrayList_size(ext_pub_list)==0){ - hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic); - char* topic = (char*)hashMapEntry_getKey(entry); - array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry); - hashMap_remove(admin->externalPublications,topic); - arrayList_destroy(list); - free(topic); } celixThreadMutex_unlock(&admin->externalPublicationsLock); @@ -591,7 +593,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi /* And check also for ANY subscription */ topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC); if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){ - pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint); + pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint); } free(scope_topic); celixThreadMutex_unlock(&admin->subscriptionsLock); diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c index b76b1ce64..1a036dbcb 100644 --- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c +++ b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c @@ -189,7 +189,7 @@ celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p snprintf(ep,EP_ADDRESS_LEN,"tcp://%s:%u",bindIP,port); snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); //NOTE using a different bind addres than endpoint address - rv = zsock_bind (socket, bindAddress); + rv = zsock_bind (socket, "%s", bindAddress); if (rv == -1) { perror("Error for zmq_bind"); } diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c index f58f516ba..cb9aff51e 100644 --- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c +++ b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c @@ -336,7 +336,7 @@ celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->socket_lock); - if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket,pubURL) != 0){ + if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket, "%s", pubURL) != 0){ status = CELIX_SERVICE_EXCEPTION; } celixThreadMutex_unlock(&ts->socket_lock); @@ -366,7 +366,7 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->socket_lock); - if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket,pubURL) != 0){ + if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket, "%s", pubURL) != 0){ status = CELIX_SERVICE_EXCEPTION; } celixThreadMutex_unlock(&ts->socket_lock); @@ -497,9 +497,9 @@ static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){ if(validVersion){ - int rc = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst); + celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst); - if (rc != -1) { + if (status == CELIX_SUCCESS) { bool release = true; mp_handle_pt mp_handle = create_mp_handle(msgTypes,msg_list); @@ -735,9 +735,9 @@ static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_ms bool validVersion = checkVersion(msgVersion,header); if(validVersion){ - int rc = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(c_msg->payload), &msgInst); + celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(c_msg->payload), &msgInst); - if(rc != -1){ + if(status == CELIX_SUCCESS){ unsigned int* msgId = calloc(1,sizeof(unsigned int)); *msgId = header->type; msg_map_entry_pt entry = calloc(1,sizeof(struct msg_map_entry)); diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h index c1f9a4b50..565bac4c7 100644 --- a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h +++ b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h @@ -29,11 +29,12 @@ #include "bundle.h" #include "hash_map.h" +#include "celix_errno.h" typedef struct _pubsub_message_type pubsub_message_type; -int pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen); -int pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output); +celix_status_t pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen); +celix_status_t pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output); unsigned int pubsubSerializer_hashCode(const char *string); version_pt pubsubSerializer_getVersion(pubsub_message_type *msgType); diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c b/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c index 8309c113c..11e4507e7 100644 --- a/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c +++ b/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c @@ -54,13 +54,14 @@ void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle){ char *metaInfPath = NULL; root = getMsgDescriptionDir(bundle); - asprintf(&metaInfPath, "%s/META-INF/descriptors", root); - addMsgDescriptorsFromBundle(root, bundle, msgTypesMap); - addMsgDescriptorsFromBundle(metaInfPath, bundle, msgTypesMap); + if(root != NULL){ + asprintf(&metaInfPath, "%s/META-INF/descriptors", root); - free(metaInfPath); - if(root!=NULL){ + addMsgDescriptorsFromBundle(root, bundle, msgTypesMap); + addMsgDescriptorsFromBundle(metaInfPath, bundle, msgTypesMap); + + free(metaInfPath); free(root); } } @@ -127,25 +128,30 @@ static void addMsgDescriptorsFromBundle(const char *root, bundle_pt bundle, hash snprintf(path, 128, "%s/%s", root, entry->d_name); FILE *stream = fopen(path,"r"); - dyn_message_type* msgType = NULL; + if (stream != NULL){ + dyn_message_type* msgType = NULL; - int rc = dynMessage_parse(stream, &msgType); - if (rc == 0 && msgType!=NULL) { + int rc = dynMessage_parse(stream, &msgType); + if (rc == 0 && msgType!=NULL) { - char* msgName = NULL; - dynMessage_getName(msgType,&msgName); + char* msgName = NULL; + dynMessage_getName(msgType,&msgName); - if(msgName!=NULL){ - unsigned int* msgId = malloc(sizeof(unsigned int)); - *msgId = utils_stringHash(msgName); - hashMap_put(msgTypesMap,msgId,msgType); - } + if(msgName!=NULL){ + unsigned int* msgId = malloc(sizeof(unsigned int)); + *msgId = utils_stringHash(msgName); + hashMap_put(msgTypesMap,msgId,msgType); + } + } + else{ + printf("DMU: cannot parse message from descriptor %s\n.",path); + } + fclose(stream); + }else{ + printf("DMU: cannot open descriptor file %s\n.",path); } - else{ - printf("DMU: cannot parse message from descriptor %s\n.",path); - } - fclose(stream); + } entry = readdir(dir); } diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c b/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c index bbb17c377..99ec87a78 100644 --- a/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c +++ b/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c @@ -289,15 +289,13 @@ int etcd_set_with_check(const char* key, const char* value, int ttl, bool always int result = 0; if (etcd_get(key, &etcd_value, NULL) == 0) { if (strcmp(etcd_value, value) != 0) { - printf("[ETCDLIB} WARNING: value already exists and is different\n"); + printf("[ETCDLIB] WARNING: value already exists and is different\n"); printf(" key = %s\n", key); printf(" old value = %s\n", etcd_value); printf(" new value = %s\n", value); result = -1; } - if (etcd_value) { - free(etcd_value); - } + free(etcd_value); } if(always_write || !result) { result = etcd_set(key, value, ttl, false); diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c b/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c index 7a633634e..b18ef36c7 100644 --- a/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c +++ b/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c @@ -149,9 +149,6 @@ celix_status_t logHelper_destroy(log_helper_pt* loghelper) { return status; } - - - celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... ) { celix_status_t status = CELIX_SUCCESS; @@ -169,7 +166,6 @@ celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* m int i = 0; for (; i < arrayList_size(loghelper->logServices); i++) { - log_service_pt logService = arrayList_get(loghelper->logServices, i); if (logService != NULL) { @@ -179,31 +175,29 @@ celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* m } pthread_mutex_unlock(&loghelper->logListLock); - } + if (!logged && loghelper->stdOutFallback) { + char *levelStr = NULL; + + switch (level) { + case OSGI_LOGSERVICE_ERROR: + levelStr = "ERROR"; + break; + case OSGI_LOGSERVICE_WARNING: + levelStr = "WARNING"; + break; + case OSGI_LOGSERVICE_INFO: + levelStr = "INFO"; + break; + case OSGI_LOGSERVICE_DEBUG: + default: + levelStr = "DEBUG"; + break; + } - if (!logged && loghelper->stdOutFallback) { - char *levelStr = NULL; - - switch (level) { - case OSGI_LOGSERVICE_ERROR: - levelStr = "ERROR"; - break; - case OSGI_LOGSERVICE_WARNING: - levelStr = "WARNING"; - break; - case OSGI_LOGSERVICE_INFO: - levelStr = "INFO"; - break; - case OSGI_LOGSERVICE_DEBUG: - default: - levelStr = "DEBUG"; - break; - } - - printf("%s: %s\n", levelStr, msg); - } - + printf("%s: %s\n", levelStr, msg); + } + } return status; } diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c index 4af52ac34..8586203da 100644 --- a/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c +++ b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c @@ -35,36 +35,43 @@ #include "constants.h" #include "subscriber.h" -celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId, const char* endpoint, pubsub_endpoint_pt* psEp) { +celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId, const char* endpoint, pubsub_endpoint_pt* out) { celix_status_t status = CELIX_SUCCESS; - *psEp = calloc(1, sizeof(**psEp)); + + pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp)); if (fwUUID != NULL) { - (*psEp)->frameworkUUID = strdup(fwUUID); + psEp->frameworkUUID = strdup(fwUUID); } if (scope != NULL) { - (*psEp)->scope = strdup(scope); + psEp->scope = strdup(scope); } if (topic != NULL) { - (*psEp)->topic = strdup(topic); + psEp->topic = strdup(topic); } - (*psEp)->serviceID = serviceId; + psEp->serviceID = serviceId; if (endpoint != NULL) { - (*psEp)->endpoint = strdup(endpoint); + psEp->endpoint = strdup(endpoint); } + if (status != CELIX_SUCCESS) { + pubsubEndpoint_destroy(psEp); + } else { + *out = psEp; + } + return status; } -celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* psEp){ +celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* out){ celix_status_t status = CELIX_SUCCESS; - *psEp = calloc(1,sizeof(**psEp)); + pubsub_endpoint_pt psEp = calloc(1,sizeof(*psEp)); bundle_pt bundle = NULL; bundle_context_pt ctxt = NULL; @@ -82,58 +89,49 @@ celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt re const char* serviceId = NULL; serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId); - if(fwUUID!=NULL){ - (*psEp)->frameworkUUID=strdup(fwUUID); + psEp->frameworkUUID = strdup(fwUUID); } if(scope!=NULL){ - (*psEp)->scope=strdup(scope); + psEp->scope = strdup(scope); } else { - (*psEp)->scope=strdup(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT); + psEp->scope = strdup(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT); } if(topic!=NULL){ - (*psEp)->topic=strdup(topic); + psEp->topic = strdup(topic); } if(serviceId!=NULL){ - (*psEp)->serviceID = strtol(serviceId,NULL,10); + psEp->serviceID = strtol(serviceId,NULL,10); } - if (!(*psEp)->frameworkUUID || !(*psEp)->serviceID || !(*psEp)->scope || !(*psEp)->topic) { + if (!psEp->frameworkUUID || !psEp->serviceID || !psEp->scope || !psEp->topic) { fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: incomplete description!."); status = CELIX_BUNDLE_EXCEPTION; } + if (status != CELIX_SUCCESS) { + pubsubEndpoint_destroy(psEp); + } else { + *out = psEp; + } + return status; } celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){ - if(psEp->frameworkUUID!=NULL){ + if (psEp != NULL) { free(psEp->frameworkUUID); - psEp->frameworkUUID = NULL; - } - - if(psEp->scope!=NULL){ free(psEp->scope); - psEp->scope = NULL; - } - - if(psEp->topic!=NULL){ free(psEp->topic); - psEp->topic = NULL; - } - - if(psEp->endpoint!=NULL){ free(psEp->endpoint); - psEp->endpoint = NULL; } - free(psEp); - return CELIX_SUCCESS; + return CELIX_SUCCESS; } bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){ @@ -145,7 +143,6 @@ bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){ ((psEp1->endpoint==NULL && psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/ ); - } char *createScopeTopicKey(const char* scope, const char* topic) { diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c index 85ef86826..bb6096a4b 100644 --- a/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c +++ b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c @@ -42,35 +42,39 @@ struct _pubsub_message_type { /* _dyn_message_type */ version_pt msgVersion; }; -int pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen){ - - int rc = 0; +celix_status_t pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen){ + celix_status_t status = CELIX_SUCCESS; dyn_type *type = NULL; dynMessage_getMessageType((dyn_message_type *) msgType, &type); char *jsonOutput = NULL; - rc = jsonSerializer_serialize(type, (void *) input, &jsonOutput); + int rc = jsonSerializer_serialize(type, (void *) input, &jsonOutput); + if (rc != 0){ + status = CELIX_BUNDLE_EXCEPTION; + } *output = (void *) jsonOutput; *outputLen = strlen(jsonOutput) + 1; - return rc; + return status; } -int pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output){ - - int rc = 0; +celix_status_t pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output){ + celix_status_t status = CELIX_SUCCESS; dyn_type *type = NULL; dynMessage_getMessageType((dyn_message_type *) msgType, &type); void *textOutput = NULL; - rc = jsonSerializer_deserialize(type, (const char *) input, &textOutput); + int rc = jsonSerializer_deserialize(type, (const char *) input, &textOutput); + if (rc != 0){ + status = CELIX_BUNDLE_EXCEPTION; + } *output = textOutput; - return rc; + return status; } unsigned int pubsubSerializer_hashCode(const char *string){ diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c b/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c index a3940457a..0d8468ea9 100644 --- a/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c +++ b/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c @@ -243,19 +243,17 @@ celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, bundle_c (*watcher)->scope = strdup(scope); (*watcher)->topic = strdup(topic); - celixThreadMutex_create(&(*watcher)->watcherLock, NULL); celixThreadMutex_lock(&(*watcher)->watcherLock); - if ((status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher)) != CELIX_SUCCESS) { - return status; + status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher); + if (status == CELIX_SUCCESS) { + (*watcher)->running = true; } - (*watcher)->running = true; celixThreadMutex_unlock(&(*watcher)->watcherLock); - return status; } diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c b/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c index 1b6aca9cd..0c7d6c4f2 100644 --- a/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c +++ b/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c @@ -131,7 +131,6 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) { etcdWatcher_stop(wi->watcher); } hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&ps_discovery->watchersMutex); celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex); @@ -168,6 +167,7 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) { hashMapIterator_destroy(iter); hashMap_destroy(ps_discovery->watchers, true, true); celixThreadMutex_unlock(&ps_discovery->watchersMutex); + return status; } @@ -316,7 +316,7 @@ celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt free(pub_key); if(pubEP_list==NULL){ printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",pubEP->topic); - return CELIX_ILLEGAL_STATE; + status = CELIX_ILLEGAL_STATE; } else{ diff --git a/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c index ae7b4a907..a35d161a2 100644 --- a/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c +++ b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c @@ -149,8 +149,23 @@ celix_status_t bundleActivator_create(bundle_context_pt context, void **userData if (status == CELIX_SUCCESS) { *userData = activator; } + if (status != CELIX_SUCCESS){ + serviceTracker_destroy(activator->pubsubAdminTracker); + } + } + if (status != CELIX_SUCCESS){ + serviceTracker_destroy(activator->pubsubDiscoveryTracker); } } + if (status != CELIX_SUCCESS){ + pubsub_topologyManager_destroy(activator->manager); + } + } + if (status != CELIX_SUCCESS){ // an exception occurred so free allocated memory + logHelper_stop(activator->loghelper); + logHelper_destroy(&activator->loghelper); + free(activator); + } return status; diff --git a/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c index 5ba131538..a6541b9b2 100644 --- a/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c +++ b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c @@ -372,7 +372,7 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_ref celixThreadMutex_unlock(&manager->psaListLock); } else{ - status=CELIX_INVALID_BUNDLE_CONTEXT; + status = CELIX_INVALID_BUNDLE_CONTEXT; } return status; @@ -457,7 +457,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r } else{ - status=CELIX_INVALID_BUNDLE_CONTEXT; + status = CELIX_INVALID_BUNDLE_CONTEXT; } return status; @@ -631,14 +631,14 @@ celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_ celixThreadMutex_unlock(&manager->psaListLock); } - free(topic); } else{ status=CELIX_INVALID_BUNDLE_CONTEXT; } - free(scope); + free(topic); + free(scope); } return status; @@ -731,8 +731,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra celixThreadMutex_unlock(&manager->psaListLock); pubsubEndpoint_destroy(pubcmp); - free(pub_scope); - free(pub_topic); + free(pub_key); } @@ -741,6 +740,9 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra else{ status=CELIX_INVALID_BUNDLE_CONTEXT; } + + free(pub_scope); + free(pub_topic); } return status;