Skip to content

Commit

Permalink
CELIX-???: Extracted pubsub_serializer to a stand-alone service
Browse files Browse the repository at this point in the history
  • Loading branch information
rlenferink committed Mar 30, 2017
1 parent 544906f commit eb6b496
Show file tree
Hide file tree
Showing 28 changed files with 763 additions and 135 deletions.
1 change: 1 addition & 0 deletions pubsub/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ if (PUBSUB)

add_subdirectory(pubsub_topology_manager)
add_subdirectory(pubsub_discovery)
add_subdirectory(pubsub_serializer_json)
add_subdirectory(pubsub_admin_zmq)
add_subdirectory(pubsub_admin_udp_mc)
add_subdirectory(examples)
Expand Down
11 changes: 11 additions & 0 deletions pubsub/deploy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ add_deploy("pubsub_publisher_udp_mc"
org.apache.celix.pubsub_admin.PubSubAdminUdpMc
org.apache.celix.pubsub_publisher.PoiPublisher
org.apache.celix.pubsub_publisher.PoiPublisher2
org.apache.celix.pubsub_serializer.PubSubSerializerJson
)

add_deploy("pubsub_subscriber_udp_mc"
Expand All @@ -38,6 +39,7 @@ add_deploy("pubsub_subscriber_udp_mc"
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_admin.PubSubAdminUdpMc
org.apache.celix.pubsub_subscriber.PoiSubscriber
org.apache.celix.pubsub_serializer.PubSubSerializerJson
)

add_deploy("pubsub_subscriber2_udp_mc"
Expand All @@ -49,6 +51,7 @@ add_deploy("pubsub_subscriber2_udp_mc"
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_admin.PubSubAdminUdpMc
org.apache.celix.pubsub_subscriber.PoiSubscriber
org.apache.celix.pubsub_serializer.PubSubSerializerJson
)

if (BUILD_PUBSUB_PSA_ZMQ)
Expand All @@ -65,6 +68,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
org.apache.celix.pubsub_admin.PubSubAdminUdpMc
org.apache.celix.pubsub_publisher.PoiPublisher
org.apache.celix.pubsub_publisher.PoiPublisher2
org.apache.celix.pubsub_serializer.PubSubSerializerJson
PROPERTIES
poi1.psa=zmq
poi2.psa=udp
Expand All @@ -80,6 +84,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_admin.PubSubAdminUdpMc
org.apache.celix.pubsub_subscriber.PoiSubscriber
org.apache.celix.pubsub_serializer.PubSubSerializerJson
PROPERTIES
poi1.psa=zmq
poi2.psa=udp
Expand All @@ -96,6 +101,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_publisher.PoiPublisher
org.apache.celix.pubsub_subscriber.PoiSubscriber
org.apache.celix.pubsub_serializer.PubSubSerializerJson
)

add_deploy("pubsub_publisher_zmq"
Expand All @@ -108,6 +114,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_publisher.PoiPublisher
org.apache.celix.pubsub_publisher.PoiPublisher2
org.apache.celix.pubsub_serializer.PubSubSerializerJson
PROPERTIES
pubsub.scope=my_small_scope
)
Expand All @@ -121,6 +128,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_subscriber.PoiSubscriber
org.apache.celix.pubsub_serializer.PubSubSerializerJson
)

add_deploy("pubsub_subscriber2_zmq"
Expand All @@ -132,6 +140,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_subscriber.PoiSubscriber
org.apache.celix.pubsub_serializer.PubSubSerializerJson
)

# ZMQ Multipart
Expand All @@ -144,6 +153,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_subscriber.MpSubscriber
org.apache.celix.pubsub_serializer.PubSubSerializerJson
)

add_deploy("pubsub_mp_publisher_zmq"
Expand All @@ -155,6 +165,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_publisher.MpPublisher
org.apache.celix.pubsub_serializer.PubSubSerializerJson
)

endif()
12 changes: 1 addition & 11 deletions pubsub/pubsub_admin_udp_mc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,6 @@ include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
include_directories("private/include")
include_directories("public/include")
include_directories("${JANSSON_INCLUDE_DIR}")
if (SERIALIZER_PATH)
include_directories("${SERIALIZER_PATH}/include")
endif()
if (SERIALIZER_LIB_INCLUDE_DIR)
include_directories("${SERIALIZER_LIB_INCLUDE_DIR}")
endif()
if (SERIALIZER_LIB_DIR)
link_directories("${SERIALIZER_LIB_DIR}")
endif()

add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc
BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_udp_multicast"
Expand All @@ -47,11 +38,10 @@ add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
${PUBSUB_SERIALIZER_SRC}
)

set_target_properties(org.apache.celix.pubsub_admin.PubSubAdminUdpMc PROPERTIES INSTALL_RPATH "$ORIGIN")
target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminUdpMc celix_framework celix_utils celix_dfi ${JANSSON_LIBRARIES} ${SERIALIZER_LIBRARY})
target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminUdpMc celix_framework celix_utils celix_dfi ${JANSSON_LIBRARIES})

install_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc)

Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@
#define PUBSUB_ADMIN_IMPL_H_

#include "pubsub_admin.h"
#include "pubsub_serializer.h"
#include "log_helper.h"

struct pubsub_admin {

pubsub_serializer_service_pt serializerSvc;

bundle_context_pt bundle_context;
log_helper_pt loghelper;

Expand Down Expand Up @@ -70,4 +73,7 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco
celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);

celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);

#endif /* PUBSUB_ADMIN_IMPL_H_ */
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "publisher.h"
#include "pubsub_endpoint.h"
#include "pubsub_common.h"
#include "pubsub_serializer.h"

#define UDP_BASE_PORT 49152
#define UDP_MAX_PORT 65000
Expand All @@ -41,12 +42,15 @@ typedef struct pubsub_udp_msg {
} *pubsub_udp_msg_pt;

typedef struct topic_publication *topic_publication_pt;
celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP,char* bindIP, topic_publication_pt *out);
celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, topic_publication_pt *out);
celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);

celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);

celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);

celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@

#include "pubsub_endpoint.h"
#include "pubsub_common.h"
#include "pubsub_serializer.h"

typedef struct topic_subscription* topic_subscription_pt;

celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, char* scope, char* topic,topic_subscription_pt* out);
celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out);
celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
Expand All @@ -48,6 +49,9 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);

celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);

celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription);
unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription);
Expand Down
3 changes: 3 additions & 0 deletions pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context)
pubsubAdminSvc->matchPublisher = pubsubAdmin_matchPublisher;
pubsubAdminSvc->matchSubscriber = pubsubAdmin_matchSubscriber;

pubsubAdminSvc->setSerializer = pubsubAdmin_setSerializer;
pubsubAdminSvc->removeSerializer = pubsubAdmin_removeSerializer;

activator->adminService = pubsubAdminSvc;

status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);
Expand Down
62 changes: 59 additions & 3 deletions pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu

int i;

status += pubsub_topicSubscriptionCreate(admin->ifIpAddress, admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC,&any_sub);
status += pubsub_topicSubscriptionCreate(admin->ifIpAddress, admin->bundle_context, admin->serializerSvc, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC,&any_sub);

/* Connect all internal publishers */
celixThreadMutex_lock(&admin->localPublicationsLock);
Expand Down Expand Up @@ -334,7 +334,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint

if(subscription == NULL) {

status += pubsub_topicSubscriptionCreate(admin->ifIpAddress, admin->bundle_context, subEP->scope, subEP->topic,&subscription);
status += pubsub_topicSubscriptionCreate(admin->ifIpAddress, admin->bundle_context, admin->serializerSvc, subEP->scope, subEP->topic,&subscription);

/* Try to connect internal publishers */
if(factory!=NULL){
Expand Down Expand Up @@ -431,7 +431,7 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_

if (factory == NULL) {
topic_publication_pt pub = NULL;
status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP,admin->mcIpAddress,&pub);
status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, admin->serializerSvc, admin->mcIpAddress,&pub);
if(status == CELIX_SUCCESS){
status = pubsub_topicPublicationStart(admin->bundle_context,pub,&factory);
if(status==CELIX_SUCCESS && factory !=NULL){
Expand Down Expand Up @@ -645,6 +645,62 @@ celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoin
return status;
}

celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
celix_status_t status = CELIX_SUCCESS;
admin->serializerSvc = serializerSvc;

/* Add serializer to all topic_publication_pt */
celixThreadMutex_lock(&admin->localPublicationsLock);
hash_map_iterator_pt lp_iter = hashMapIterator_create(admin->localPublications);
while(hashMapIterator_hasNext(lp_iter)){
service_factory_pt factory = (service_factory_pt) hashMapIterator_nextValue(lp_iter);
topic_publication_pt topic_pub = (topic_publication_pt) factory->handle;
pubsub_topicPublicationAddSerializer(topic_pub, admin->serializerSvc);
}
hashMapIterator_destroy(lp_iter);
celixThreadMutex_unlock(&admin->localPublicationsLock);

/* Add serializer to all topic_subscription_pt */
celixThreadMutex_lock(&admin->subscriptionsLock);
hash_map_iterator_pt subs_iter = hashMapIterator_create(admin->subscriptions);
while(hashMapIterator_hasNext(subs_iter)){
topic_subscription_pt topic_sub = (topic_subscription_pt) hashMapIterator_nextValue(subs_iter);
pubsub_topicSubscriptionAddSerializer(topic_sub, admin->serializerSvc);
}
hashMapIterator_destroy(subs_iter);
celixThreadMutex_unlock(&admin->subscriptionsLock);

return status;
}

celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
celix_status_t status = CELIX_SUCCESS;
admin->serializerSvc = NULL;

/* Remove serializer from all topic_publication_pt */
celixThreadMutex_lock(&admin->localPublicationsLock);
hash_map_iterator_pt lp_iter = hashMapIterator_create(admin->localPublications);
while(hashMapIterator_hasNext(lp_iter)){
service_factory_pt factory = (service_factory_pt) hashMapIterator_nextValue(lp_iter);
topic_publication_pt topic_pub = (topic_publication_pt) factory->handle;
pubsub_topicPublicationRemoveSerializer(topic_pub, admin->serializerSvc);
}
hashMapIterator_destroy(lp_iter);
celixThreadMutex_unlock(&admin->localPublicationsLock);

/* Remove serializer from all topic_subscription_pt */
celixThreadMutex_lock(&admin->subscriptionsLock);
hash_map_iterator_pt subs_iter = hashMapIterator_create(admin->subscriptions);
while(hashMapIterator_hasNext(subs_iter)){
topic_subscription_pt topic_sub = (topic_subscription_pt) hashMapIterator_nextValue(subs_iter);
pubsub_topicSubscriptionRemoveSerializer(topic_sub, admin->serializerSvc);
}
hashMapIterator_destroy(subs_iter);
celixThreadMutex_unlock(&admin->subscriptionsLock);

return status;
}

static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double* score){
celix_status_t status = CELIX_SUCCESS;

Expand Down
Loading

0 comments on commit eb6b496

Please sign in to comment.