Skip to content

Commit

Permalink
CELIX-408: Major refactoring of the pubsub serializer design and usag…
Browse files Browse the repository at this point in the history
…e. example of udpmc and zmq seesm to be working. combi and zmq multicast are not stable yet.

- The serializer is refactored to miminize to needed callback directly to the services. The serializer is now a two step approach.
  The serializer service can be used to create a map os msg serializers.
  And the msg serializer are serializer structs (with function ptrs) to serialize a specific msg
- Where feasible replace _pt types with _t types. Removing the pointer in the typedef makes it possible to add const info and sizeof calls of the typedef
  • Loading branch information
pnoltes committed Apr 11, 2017
1 parent 97df926 commit 7efe433
Show file tree
Hide file tree
Showing 28 changed files with 582 additions and 689 deletions.
2 changes: 1 addition & 1 deletion cmake/cmake_celix/BundlePackaging.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ function(bundle_libs)
if (ADD_TO_MANIFEST)
list(APPEND LIBS "$<TARGET_SONAME_FILE_NAME:${LIB}>")
endif()
list(APPEND DEPS "${OUT}") #NOTE depending on ${OUT} not on $<TARGET_FILE:${LIB}>.
list(APPEND DEPS "${OUT}") #NOTE depending on ${OUT} not on $<TARGET_FILE:${LIB}>.
endif()

get_target_property(IS_LIB ${BUNDLE} "BUNDLE_TARGET_IS_LIB")
Expand Down
8 changes: 4 additions & 4 deletions dfi/private/src/json_serializer.c
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ static int jsonSerializer_parseSequence(dyn_type *seq, json_t *array, void *seqL
return status;
}

int jsonSerializer_serialize(dyn_type *type, void *input, char **output) {
int jsonSerializer_serialize(dyn_type *type, const void* input, char **output) {
int status = OK;

json_t *root = NULL;
Expand All @@ -294,11 +294,11 @@ int jsonSerializer_serialize(dyn_type *type, void *input, char **output) {
return status;
}

int jsonSerializer_serializeJson(dyn_type *type, void *input, json_t **out) {
return jsonSerializer_writeAny(type, input, out);
int jsonSerializer_serializeJson(dyn_type *type, const void* input, json_t **out) {
return jsonSerializer_writeAny(type, (void*)input /*TODO update static function to take const void**/, out);
}

static int jsonSerializer_writeAny(dyn_type *type, void *input, json_t **out) {
static int jsonSerializer_writeAny(dyn_type *type, void* input, json_t **out) {
int status = OK;

int descriptor = dynType_descriptorType(type);
Expand Down
4 changes: 2 additions & 2 deletions dfi/public/include/json_serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ DFI_SETUP_LOG_HEADER(jsonSerializer);
int jsonSerializer_deserialize(dyn_type *type, const char *input, void **result);
int jsonSerializer_deserializeJson(dyn_type *type, json_t *input, void **result);

int jsonSerializer_serialize(dyn_type *type, void *input, char **output);
int jsonSerializer_serializeJson(dyn_type *type, void *input, json_t **out);
int jsonSerializer_serialize(dyn_type *type, const void* input, char **output);
int jsonSerializer_serializeJson(dyn_type *type, const void* input, json_t **out);

#endif
1 change: 0 additions & 1 deletion pubsub/pubsub_admin_udp_mc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc
private/src/topic_subscription.c
private/src/topic_publication.c
private/src/large_udp.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

struct pubsub_admin {

pubsub_serializer_service_pt serializerSvc;
pubsub_serializer_service_t* serializerSvc;

bundle_context_pt bundle_context;
log_helper_pt loghelper;
Expand Down Expand Up @@ -73,7 +73,7 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco
celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);

celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);

#endif /* PUBSUB_ADMIN_IMPL_H_ */
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ typedef struct pubsub_udp_msg {
struct pubsub_msg_header header;
unsigned int payloadSize;
char payload[];
} *pubsub_udp_msg_pt;
} pubsub_udp_msg_t;

typedef struct topic_publication *topic_publication_pt;
celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, topic_publication_pt *out);
celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, topic_publication_pt *out);
celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);

celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);

celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);

celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

typedef struct topic_subscription* topic_subscription_pt;

celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out);
celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out);
celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
Expand All @@ -49,8 +49,8 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);

celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);

celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription);
Expand Down
4 changes: 2 additions & 2 deletions pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoin
return status;
}

celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){
celix_status_t status = CELIX_SUCCESS;
admin->serializerSvc = serializerSvc;

Expand Down Expand Up @@ -673,7 +673,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize
return status;
}

celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){
celix_status_t status = CELIX_SUCCESS;
admin->serializerSvc = NULL;

Expand Down
Loading

0 comments on commit 7efe433

Please sign in to comment.