Skip to content

Commit

Permalink
Fix building PubsubAdminUDP for OSX
Browse files Browse the repository at this point in the history
TODO: Implement epoll alternative (kqueue) for OSX
  • Loading branch information
rlenferink committed Feb 6, 2017
1 parent 45ecd64 commit 1af1ff0
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,15 @@ celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt ref
celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service){
pubsub_sender_pt manager = (pubsub_sender_pt)handle;
celix_thread_t *tid = hashMap_get(manager->tid_map, service);

#if defined(__APPLE__) && defined(__MACH__)
uint64_t threadid;
pthread_threadid_np(tid->thread, &threadid);
printf("PUBLISHER: publish service unexporting (%s) %llu!\n",manager->ident, threadid);
#else
printf("PUBLISHER: publish service unexporting (%s) %li!\n",manager->ident, tid->thread);
#endif

stop=true;
celixThread_join(*tid,NULL);
free(tid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@

#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#if defined(__APPLE__) && defined(__MACH__)
#include <sys/event.h>
#include <sys/time.h>
#else
#include <sys/epoll.h>
#endif

#include "utils.h"
#include "celix_errno.h"
#include "constants.h"
Expand Down Expand Up @@ -95,7 +101,11 @@ celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundl
topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts));
ts->context = bundle_context;
ts->ifIpAddress = strdup(ifIp);
#if defined(__APPLE__) && defined(__MACH__)
//TODO: Use kqueue for OSX
#else
ts->topicEpollFd = epoll_create1(0);
#endif
if(ts->topicEpollFd == -1) {
status += CELIX_SERVICE_EXCEPTION;
}
Expand Down Expand Up @@ -155,7 +165,11 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){

hashMap_destroy(ts->socketMap,false,false);
largeUdp_destroy(ts->largeUdpHandle);
#if defined(__APPLE__) && defined(__MACH__)
//TODO: Use kqueue for OSX
#else
close(ts->topicEpollFd);
#endif

celixThreadMutex_unlock(&ts->ts_lock);

Expand Down Expand Up @@ -257,7 +271,9 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
return CELIX_SERVICE_EXCEPTION;
}


#if defined(__APPLE__) && defined(__MACH__)
//TODO: Use kqueue for OSX
#else
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN;
Expand All @@ -266,6 +282,8 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
perror("epoll_ctl() EPOLL_CTL_ADD");
return CELIX_SERVICE_EXCEPTION;
}
#endif

hashMap_put(ts->socketMap, pubURL, (void*)recvSocket);

celixThreadMutex_unlock(&ts->ts_lock);
Expand All @@ -280,6 +298,10 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
celix_status_t status = CELIX_SUCCESS;

if (hashMap_containsKey(ts->socketMap, pubURL)){

#if defined(__APPLE__) && defined(__MACH__)
//TODO: Use kqueue for OSX
#else
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));

Expand All @@ -294,6 +316,8 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
free(s);

celixThreadMutex_unlock(&ts->ts_lock);
#endif

}

return status;
Expand Down Expand Up @@ -454,6 +478,18 @@ static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_pt msg){
static void* udp_recv_thread_func(void * arg) {
topic_subscription_pt sub = (topic_subscription_pt) arg;

#if defined(__APPLE__) && defined(__MACH__)
//TODO: use kqueue for OSX
//struct kevent events[MAX_EPOLL_EVENTS];
while (sub->running) {
int nfds = 0;
if(nfds > 0) {
pubsub_udp_msg_pt udpMsg = NULL;
process_msg(sub, udpMsg);
}
}
#else

struct epoll_event events[MAX_EPOLL_EVENTS];

while (sub->running) {
Expand All @@ -480,6 +516,7 @@ static void* udp_recv_thread_func(void * arg) {
}
}
}
#endif

return NULL;
}
Expand Down

0 comments on commit 1af1ff0

Please sign in to comment.