Skip to content

Commit

Permalink
Merge 701eae4 into 147e8bf
Browse files Browse the repository at this point in the history
  • Loading branch information
rlenferink committed Dec 22, 2016
2 parents 147e8bf + 701eae4 commit 203ea38
Show file tree
Hide file tree
Showing 98 changed files with 12,195 additions and 0 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ add_subdirectory(log_service)
#add_subdirectory(event_admin)# event_admin is unstable
add_subdirectory(dependency_manager)
add_subdirectory(dependency_manager_cxx)
add_subdirectory(pubsub)

#Example as last, because some example will check if underlining options are enabled
add_subdirectory(examples)
42 changes: 42 additions & 0 deletions cmake/FindCZMQ.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# - Try to find CZMQ
# Once done this will define
# CZMQ_FOUND - System has Zmq
# CZMQ_INCLUDE_DIRS - The Zmq include directories
# CZMQ_LIBRARIES - The libraries needed to use Zmq
# CZMQ_DEFINITIONS - Compiler switches required for using Zmq

find_path(CZMQ_INCLUDE_DIR czmq.h
/usr/include
/usr/local/include )

find_library(CZMQ_LIBRARY NAMES czmq
PATHS /usr/lib /usr/local/lib /usr/lib64 /usr/local/lib64 )

set(CZMQ_LIBRARIES ${CZMQ_LIBRARY} )
set(CZMQ_INCLUDE_DIRS ${CZMQ_INCLUDE_DIR} )

include(FindPackageHandleStandardArgs)
# handle the QUIETLY and REQUIRED arguments and set CZMQ_FOUND to TRUE
# if all listed variables are TRUE
find_package_handle_standard_args(Czmq DEFAULT_MSG
CZMQ_LIBRARY CZMQ_INCLUDE_DIR)

mark_as_advanced(CZMQ_INCLUDE_DIR CZMQ_LIBRARY )
42 changes: 42 additions & 0 deletions cmake/FindZMQ.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# - Try to find ZMQ
# Once done this will define
# ZMQ_FOUND - System has Zmq
# ZMQ_INCLUDE_DIRS - The Zmq include directories
# ZMQ_LIBRARIES - The libraries needed to use Zmq
# ZMQ_DEFINITIONS - Compiler switches required for using Zmq

find_path(ZMQ_INCLUDE_DIR zmq.h zmq_utils.h
/usr/include
/usr/local/include )

find_library(ZMQ_LIBRARY NAMES zmq
PATHS /usr/lib /usr/local/lib /usr/lib64 /usr/local/lib64 )

set(ZMQ_LIBRARIES ${ZMQ_LIBRARY} )
set(ZMQ_INCLUDE_DIRS ${ZMQ_INCLUDE_DIR} )

include(FindPackageHandleStandardArgs)
# handle the QUIETLY and REQUIRED arguments and set ZMQ_FOUND to TRUE
# if all listed variables are TRUE
find_package_handle_standard_args(Zmq DEFAULT_MSG
ZMQ_LIBRARY ZMQ_INCLUDE_DIR)

mark_as_advanced(ZMQ_INCLUDE_DIR ZMQ_LIBRARY )
48 changes: 48 additions & 0 deletions pubsub/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

celix_subproject(PUBSUB "Option to build the pubsub bundles" OFF)
if (PUBSUB)

include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
include_directories("${PROJECT_SOURCE_DIR}/framework/public/include")

option(ENABLE_ZMQ_SECURITY "Enable security for ZeroMQ" OFF)

set (PUBSUB_SERIALIZER_SRC "${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_serializer.c")
set (SERIALIZER_PATH "" CACHE FILEPATH "Path to the directory which will contain the serializer (include / src).")
set (SERIALIZER_LIB_INCLUDE_DIR "" CACHE FILEPATH "Path to the include dir of the addiotional libs.")
set (SERIALIZER_LIB_PATH "" CACHE FILEPATH "Path to the additional library.")
if (EXISTS ${SERIALIZER_PATH})
file (GLOB PUBSUB_SERIALIZER_SRC ${SERIALIZER_PATH}/src/*.c)

if (SERIALIZER_LIB_PATH)
get_filename_component(SERIALIZER_LIB_DIR ${SERIALIZER_LIB_PATH} DIRECTORY)
get_filename_component(SERIALIZER_LIB_FULLNAME ${SERIALIZER_LIB_PATH} NAME_WE)
string (REPLACE "lib" "" SERIALIZER_LIBRARY ${SERIALIZER_LIB_FULLNAME})
endif()
endif()

add_subdirectory(pubsub_topology_manager)
add_subdirectory(pubsub_discovery)
add_subdirectory(pubsub_admin_zmq)
add_subdirectory(pubsub_admin_udp_mc)
add_subdirectory(examples)
add_subdirectory(deploy)
add_subdirectory(keygen)

endif(PUBSUB)
71 changes: 71 additions & 0 deletions pubsub/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# PubSubAdmin

This subdirectory contains an implementation for a publish-subscribe remote services system, that use dfi library for message serialization.
For low-level communication, UDP and ZMQ is used.

# Description

This publisher / subscriber implementation is based on the concepts of the remote service admin (i.e. rsa / topology / discovery pattern).

Publishers are senders of data, subscribers can receive data. Publishers can publish/send data to certain channels (called 'topics' further on), subscribers can subscribe to these topics. For every topic a publisher service is created by the pubsub admin. This publisher is announced through etcd. So etcd is used for discovery of the publishers. Subscribers are also registered as a service by the pubsub admin and will watch etcd for changes and when a new publisher is announced, the subscriber will check if the topic matches its interests. If the subscriber is interested in/subscribed to a certain topic, a connection between publisher and subscriber will be instantiated by the pubsub admin.

The dfi library is used for message serialization. The publisher / subscriber implementation will arrange that every message which will be send gets an unique id.

For communication between publishers and subscribers UDP and ZeroMQ can be used. When using ZeroMQ it's also possible to setup a secure connection to encrypt the traffic being send between publishers and subscribers. This connection can be secured with ZeroMQ by using a curve25519 key pair per topic.

The publisher/subscriber implementation supports sending of a single message and sending of multipart messages.

## Getting started

To get the ZeroMQ pubsub admin running, [ZeroMQ](https://github.com/zeromq/libzmq) and [CZMQ](https://github.com/zeromq/czmq) needs to be installed.

Also, to make use of encrypted traffic, [OpenSSL] is required.
[OpenSSL github repo](https://github.com/openssl/openssl)

## Running instructions

### Running PSA ZMQ

For ZeroMQ without encryption, skip the steps 1-12 below
1. Run `touch ~/pubsub.keys`
1. Run `echo "aes_key:{AES_KEY here}" >> ~/pubsub.keys`
1. Run `echo "aes_iv:{AES_IV here}" >> ~/pubsub.keys`
1. Run `touch ~/pubsub.conf`
1. Run `echo "keys.file.path=$HOME" >> ~/pubsub.conf`
1. Run `echo "keys.file.name=pubsub.keys" >> ~/pubsub.conf`
1. To generate ZMQ keypairs
1. Run `pubsub/keygen/makecert cert_topic1.pub cert_topic1.key`
1. To encrypt files
1. Run `pubsub/keygen/ed_file ~/pubsub.keys cert_topic1.key cert_topic1.key.enc`
1. Store the keys in the pubsub/examples/keys/ directory
1. Build project to include these keys

For ZeroMQ without encryption, start here

1. Run `etcd`

1. Open second terminal on pubsub root
1. Run `cd deploy/pubsub/pubsub_publisher_zmq`
1. Run `cat ~/pubsub.conf >> config.properties` only for ZeroMQ with encryption
1. Run `sh run.sh`

1. Open third terminal on pubsub root
1. Run `cd deploy/pubsub/pubsub_subscriber_zmq`
1. Run `cat ~/pubsub.conf >> config.properties` only for ZeroMQ with encryption
1. Run `sh run.sh`

### Running PSA UDP-Multicast

1. Open a terminal
1. Run `etcd`

1. Open second terminal on project build location
1. Run `cd deploy/pubsub/pubsub_publisher_udp_mc`
1. Run `sh run.sh`

1. Open third terminal on project build location
1. Run `cd deploy/pubsub/pubsub_subscriber_udp_mc`
1. Run `sh run.sh`

Design information can be found at pubsub\_admin\_udp_mc/README.md

88 changes: 88 additions & 0 deletions pubsub/api/pubsub/publisher.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
*Licensed to the Apache Software Foundation (ASF) under one
*or more contributor license agreements. See the NOTICE file
*distributed with this work for additional information
*regarding copyright ownership. The ASF licenses this file
*to you under the Apache License, Version 2.0 (the
*"License"); you may not use this file except in compliance
*with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*Unless required by applicable law or agreed to in writing,
*software distributed under the License is distributed on an
*"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
*specific language governing permissions and limitations
*under the License.
*/
/*
* publisher.h
*
* \date Jan 7, 2016
* \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
* \copyright Apache License, Version 2.0
*/

#ifndef __PUBSUB_PUBLISHER_H_
#define __PUBSUB_PUBLISHER_H_

#include <stdlib.h>

#define PUBSUB_PUBLISHER_SERVICE_NAME "pubsub.publisher"
#define PUBSUB_PUBLISHER_SERVICE_VERSION "1.0.0"

//properties
#define PUBSUB_PUBLISHER_TOPIC "pubsub.topic"
#define PUBSUB_PUBLISHER_SCOPE "pubsub.scope"
#define PUBSUB_PUBLISHER_STRATEGY "pubsub.strategy"
#define PUBSUB_PUBLISHER_CONFIG "pubsub.config"

#define PUBSUB_PUBLISHER_SCOPE_DEFAULT "default"
//flags
#define PUBSUB_PUBLISHER_FIRST_MSG 01
#define PUBSUB_PUBLISHER_PART_MSG 02
#define PUBSUB_PUBLISHER_LAST_MSG 04

struct pubsub_release_callback_struct {
void *handle;
void (*release)(char *buf, void *handle);
};
typedef struct pubsub_release_callback_struct pubsub_release_callback_t;
typedef struct pubsub_release_callback_struct* pubsub_release_callback_pt;


struct pubsub_publisher {
void *handle;

/**
* Every msg is identifiable by msg type string. Because masg type string are performance wise not preferable (string compares),
* a "local" (int / platform dependent) unique id will be generated runtime
* with use of a distributed key/value store or communication between participation parties.
* this is called the local message type id. This local message type id can be requested with the localMsgIdForMsgType method.
* When return is successful the msgTypeId is always greater than 0. (Note this can be used to specify/detect uninitialized msg type ids in the consumer code).
*
* Returns 0 on success.
*/
int (*localMsgTypeIdForMsgType)(void *handle, const char *msgType, unsigned int *msgTypeId);

/**
* send is a async function, but the msg can be safely deleted after send returns.
* Returns 0 on success.
*/
int (*send)(void *handle, unsigned int msgTypeId, void *msg);


/**
* sendMultipart is a async function, but the msg can be safely deleted after send returns.
* The first (primary) message of a multipart message must have the flag PUBLISHER_PRIMARY_MSG
* The last message of a multipart message must have the flag PUBLISHER_LAST_MSG
* Returns 0 on success.
*/
int (*sendMultipart)(void *handle, unsigned int msgTypeId, void *msg, int flags);

};
typedef struct pubsub_publisher pubsub_publisher_t;
typedef struct pubsub_publisher* pubsub_publisher_pt;

#endif // __PUBSUB_PUBLISHER_H_
75 changes: 75 additions & 0 deletions pubsub/api/pubsub/subscriber.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
*Licensed to the Apache Software Foundation (ASF) under one
*or more contributor license agreements. See the NOTICE file
*distributed with this work for additional information
*regarding copyright ownership. The ASF licenses this file
*to you under the Apache License, Version 2.0 (the
*"License"); you may not use this file except in compliance
*with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*Unless required by applicable law or agreed to in writing,
*software distributed under the License is distributed on an
*"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
*specific language governing permissions and limitations
*under the License.
*/
/*
* subscriber.h
*
* \date Jan 7, 2016
* \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
* \copyright Apache License, Version 2.0
*/

#ifndef __PUBSUB_SUBSCRIBER_H_
#define __PUBSUB_SUBSCRIBER_H_

#include <stdbool.h>

#define PUBSUB_SUBSCRIBER_SERVICE_NAME "pubsub.subscriber"
#define PUBSUB_SUBSCRIBER_SERVICE_VERSION "1.0.0"

//properties
#define PUBSUB_SUBSCRIBER_TOPIC "pubsub.topic"
#define PUBSUB_SUBSCRIBER_SCOPE "pubsub.scope"
#define PUBSUB_SUBSCRIBER_STRATEGY "pubsub.strategy"
#define PUBSUB_SUBSCRIBER_CONFIG "pubsub.config"

#define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT "default"

struct pubsub_multipart_callbacks_struct {
void *handle;
int (*localMsgTypeIdForMsgType)(void *handle, const char *msgType, unsigned int *msgId);
int (*getMultipart)(void *handle, unsigned int msgTypeId, bool retain, void **part);
};
typedef struct pubsub_multipart_callbacks_struct pubsub_multipart_callbacks_t;
typedef struct pubsub_multipart_callbacks_struct* pubsub_multipart_callbacks_pt;

struct pubsub_subscriber_struct {
void *handle;

/**
* When a new message for a topic is available the receive will be called.
*
* msgType contains fully qualified name of the type and msgTypeId is a local id which presents the type for performance reasons.
* Release can be used to instruct the pubsubadmin to release (free) the message when receive function returns. Set it to false to take
* over ownership of the msg (e.g. take the responsibility to free it).
*
* The callbacks argument is only valid inside the receive function, use the getMultipart callback, with retain=true, to keep multipart messages in memory.
* results of the localMsgTypeIdForMsgType callback are valid during the complete lifecycle of the component, not just a single receive call.
*
* Return 0 implies a successful handling. If return is not 0, the msg will always be released by the pubsubadmin.
*
* this method can be NULL.
*/
int (*receive)(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, pubsub_multipart_callbacks_t *callbacks, bool *release);

};
typedef struct pubsub_subscriber_struct pubsub_subscriber_t;
typedef struct pubsub_subscriber_struct* pubsub_subscriber_pt;


#endif // __PUBSUB_SUBSCRIBER_H_
Loading

0 comments on commit 203ea38

Please sign in to comment.