Skip to content
Permalink
Browse files
Add test_package for pubsub and cxx_shell, add missing requirement fo…
…r rapidjson.
  • Loading branch information
PengZheng committed Mar 20, 2022
1 parent 1bb7c66 commit ee74c8fb8b8afdcc9dcacda1e15c965a7673daab
Showing 5 changed files with 308 additions and 3 deletions.
@@ -198,6 +198,8 @@ def requirements(self):
self.requires("openssl/1.1.1k")
if self.options.build_remote_service_admin or self.options.build_shell_bonjour:
self.requires("libxml2/[~2.9.9]")
if self.options.build_cxx_remote_service_admin:
self.requires("rapidjson/[~1.1.0]")
if self.options.build_shell_bonjour:
# TODO: CC=cc is fixed in the official mdnsresponder Makefile, patching is needed to make cross-compile work
# https://github.com/conan-io/conan-center-index/issues/9711
@@ -216,7 +218,7 @@ def _configure_cmake(self):
for opt, val in self.options.values.items():
self._cmake.definitions[opt.upper()] = self.options.get_safe(opt, False)
self._cmake.definitions["CMAKE_PROJECT_Celix_INCLUDE"] = os.path.join(self.build_folder, "conan_paths.cmake")
# the followint is workaround https://github.com/conan-io/conan/issues/7192
# the following is workaround for https://github.com/conan-io/conan/issues/7192
self._cmake.definitions["CMAKE_EXE_LINKER_FLAGS"] = "-Wl,--unresolved-symbols=ignore-in-shared-libs"
self.output.info(self._cmake.definitions)
v = tools.Version(self.version)
@@ -16,6 +16,7 @@

cmake_minimum_required (VERSION 3.14)
project(test_package)
include(${CMAKE_BINARY_DIR}/conan_paths.cmake)

set(CMAKE_CXX_STANDARD 17)

@@ -73,6 +74,13 @@ if (TEST_SHELL)
target_link_libraries(use_shell PRIVATE Celix::framework Celix::shell_api)
celix_get_bundle_file(Celix::shell SHELL_BUNDLE)
target_compile_definitions(use_shell PRIVATE SHELL_BUNDLE_LOCATION="${SHELL_BUNDLE}")
option(TEST_CXX_SHELL "Test CXX shell" OFF)
if (TEST_CXX_SHELL)
add_executable(use_cxx_shell test_cxx_shell.cpp)
target_link_libraries(use_cxx_shell PRIVATE Celix::framework Celix::shell_api)
celix_get_bundle_file(Celix::ShellCxx CXX_SHELL_BUNDLE)
target_compile_definitions(use_cxx_shell PRIVATE CXX_SHELL_BUNDLE_LOCATION="${CXX_SHELL_BUNDLE}")
endif ()
endif ()

option(TEST_REMOTE_SHELL "Test remote shell" OFF)
@@ -113,3 +121,23 @@ if (TEST_PUSHSTREAMS)
add_executable(use_pushstreams test_pushstreams.cpp)
target_link_libraries(use_pushstreams PRIVATE Celix::PushStreams)
endif ()

option(TEST_PUBSUB "Test PubSub" OFF)
if (TEST_PUBSUB)
add_celix_bundle(my_pubsub_admin
SOURCES
my_psa_activator.c
)
target_link_libraries(my_pubsub_admin PRIVATE Celix::framework Celix::dfi Celix::log_helper Celix::utils)
target_link_libraries(my_pubsub_admin PRIVATE Celix::pubsub_spi Celix::pubsub_utils )
add_celix_container(use_my_psa
BUNDLES
Celix::celix_pubsub_serializer_json
Celix::celix_pubsub_serializer_avrobin
Celix::celix_pubsub_topology_manager
my_pubsub_admin
hello
PROPERTIES
PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
)
endif ()
@@ -21,7 +21,7 @@

class TestPackageConan(ConanFile):
settings = "os", "arch", "compiler", "build_type"
generators = "cmake_paths"
generators = "cmake_paths", "cmake_find_package"

def requirements(self):
# strictly speaking, `requires`, which should be set by 'conan test', is not needed by test_package
@@ -35,15 +35,18 @@ def build(self):
cmake.definitions["TEST_LOG_SERVICE"] = self.options["celix"].build_log_service
cmake.definitions["TEST_SYSLOG_WRITER"] = self.options["celix"].build_syslog_writer
cmake.definitions["TEST_SHELL"] = self.options["celix"].build_shell
if self.options["celix"].build_shell:
cmake.definitions["TEST_CXX_SHELL"] = self.options["celix"].celix_cxx
cmake.definitions["TEST_REMOTE_SHELL"] = self.options["celix"].build_remote_shell
cmake.definitions["TEST_SHELL_TUI"] = self.options["celix"].build_shell_tui
cmake.definitions["TEST_SHELL_WUI"] = self.options["celix"].build_shell_wui
cmake.definitions["TEST_ETCD_LIB"] = self.options["celix"].build_celix_etcdlib
cmake.definitions["TEST_LAUNCHER"] = self.options["celix"].build_launcher
cmake.definitions["TEST_PROMISES"] = self.options["celix"].build_promises
cmake.definitions["TEST_PUSHSTREAMS"] = self.options["celix"].build_pushstreams
cmake.definitions["TEST_PUBSUB"] = self.options["celix"].build_pubsub
cmake.definitions["CMAKE_PROJECT_test_package_INCLUDE"] = os.path.join(self.build_folder, "conan_paths.cmake")
# the followint is workaround https://github.com/conan-io/conan/issues/7192
# the following is workaround https://github.com/conan-io/conan/issues/7192
cmake.definitions["CMAKE_EXE_LINKER_FLAGS"] = "-Wl,--unresolved-symbols=ignore-in-shared-libs"
cmake.configure()
cmake.build()
@@ -59,6 +62,8 @@ def test(self):
self.run("./use_syslog_writer", cwd=os.path.join("deploy", "use_syslog_writer"), run_environment=True)
if self.options["celix"].build_shell:
self.run("./use_shell", run_environment=True)
if self.options["celix"].celix_cxx:
self.run("./use_cxx_shell", run_environment=True)
if self.options["celix"].build_remote_shell:
self.run("./use_remote_shell", cwd=os.path.join("deploy", "use_remote_shell"), run_environment=True)
if self.options["celix"].build_shell_tui:
@@ -73,4 +78,6 @@ def test(self):
self.run("./use_promises", run_environment=True)
if self.options["celix"].build_pushstreams:
self.run("./use_pushstreams", run_environment=True)
if self.options["celix"].build_pubsub:
self.run("./use_my_psa", cwd=os.path.join("deploy", "use_my_psa"), run_environment=True)

@@ -0,0 +1,184 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/

#include <celix_api.h>
#include <pubsub_serializer.h>
#include <celix_log_helper.h>
#include <pubsub_admin.h>
#include <pubsub_constants.h>
#include <celix_constants.h>
#include <stdlib.h>

typedef struct my_psa_activator {
celix_log_helper_t *logHelper;
long serializersTrackerId;
pubsub_admin_service_t adminService;
long adminSvcId;
} my_psa_activator_t;

static void myPsa_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
my_psa_activator_t *act = handle;
const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
celix_logHelper_info(act->logHelper, "Serializer Added: %s=%s %s=%ld\n",
PUBSUB_SERIALIZER_TYPE_KEY, serType, OSGI_FRAMEWORK_SERVICE_ID, svcId);

}

static void myPsa_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
my_psa_activator_t *act = handle;
const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
celix_logHelper_info(act->logHelper, "Serializer Removed: %s=%s %s=%ld\n",
PUBSUB_SERIALIZER_TYPE_KEY, serType, OSGI_FRAMEWORK_SERVICE_ID, svcId);
}

static celix_status_t matchPublisher(void *handle, long svcRequesterBndId,
const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties,
double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
(void)handle;
(void)svcRequesterBndId;
(void)svcFilter;
(void)outTopicProperties;
(void)outScore;
(void)outSerializerSvcId;
(void)outProtocolSvcId;
return CELIX_SUCCESS;
}

static celix_status_t matchSubscriber(void *handle, long svcProviderBndId,
const celix_properties_t *svcProperties, celix_properties_t **outTopicProperties,
double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
(void)handle;
(void)svcProviderBndId;
(void)svcProperties;
(void)outTopicProperties;
(void)outScore;
(void)outSerializerSvcId;
(void)outProtocolSvcId;
return CELIX_SUCCESS;

}

static celix_status_t matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *match) {
(void)handle;
(void)endpoint;
(void)match;
return CELIX_SUCCESS;
}

static celix_status_t setupTopicSender(void *handle, const char *scope, const char *topic,
const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId,
celix_properties_t **publisherEndpoint) {
(void)handle;
(void)scope;
(void)topic;
(void)topicProperties;
(void)serializerSvcId;
(void)protocolSvcId;
(void)publisherEndpoint;
return CELIX_SUCCESS;
}

static celix_status_t teardownTopicSender(void *handle, const char *scope, const char *topic) {
(void)handle;
(void)scope;
(void)topic;
return CELIX_SUCCESS;
}

static celix_status_t setupTopicReceiver(void *handle, const char *scope, const char *topic,
const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId,
celix_properties_t **subscriberEndpoint) {
(void)handle;
(void)scope;
(void)topic;
(void)topicProperties;
(void)serializerSvcId;
(void)protocolSvcId;
(void)subscriberEndpoint;
return CELIX_SUCCESS;
}

static celix_status_t teardownTopicReceiver(void *handle, const char *scope, const char *topic) {
(void)handle;
(void)scope;
(void)topic;
return CELIX_SUCCESS;
}

static celix_status_t addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
(void)handle;
(void)endpoint;
return CELIX_SUCCESS;
}

static celix_status_t removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
(void)handle;
(void)endpoint;
}

int psa_udpmc_start(my_psa_activator_t *act, celix_bundle_context_t *ctx) {
act->adminSvcId = -1L;
act->serializersTrackerId = -1L;
act->logHelper = celix_logHelper_create(ctx, "my_psa_admin");
celix_status_t status = CELIX_SUCCESS;

//track serializers
if (status == CELIX_SUCCESS) {
celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
opts.filter.ignoreServiceLanguage = true;
opts.callbackHandle = act;
opts.addWithProperties = myPsa_addSerializerSvc;
opts.removeWithProperties = myPsa_removeSerializerSvc;
act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
}

//register pubsub admin service
if (status == CELIX_SUCCESS) {
pubsub_admin_service_t *psaSvc = &act->adminService;
psaSvc->handle = act;
psaSvc->matchPublisher = matchPublisher;
psaSvc->matchSubscriber = matchSubscriber;
psaSvc->matchDiscoveredEndpoint = matchDiscoveredEndpoint;
psaSvc->setupTopicSender = setupTopicSender;
psaSvc->teardownTopicSender = teardownTopicSender;
psaSvc->setupTopicReceiver = setupTopicReceiver;
psaSvc->teardownTopicReceiver = teardownTopicReceiver;
psaSvc->addDiscoveredEndpoint = addDiscoveredEndpoint;
psaSvc->removeDiscoveredEndpoint = removeDiscoveredEndpoint;

celix_properties_t *props = celix_properties_create();
celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, "my_psa");

act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
}

return status;
}

int psa_udpmc_stop(my_psa_activator_t *act, celix_bundle_context_t *ctx) {
celix_bundleContext_unregisterService(ctx, act->adminSvcId);
celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
celix_logHelper_destroy(act->logHelper);
return CELIX_SUCCESS;
}

CELIX_GEN_BUNDLE_ACTIVATOR(my_psa_activator_t, psa_udpmc_start, psa_udpmc_stop);
@@ -0,0 +1,84 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/

#include <celix/BundleContext.h>
#include <celix/IShellCommand.h>
#include <celix_shell.h>
#include <celix_api.h>
#include <iostream>
#include <atomic>
#include <assert.h>

class ShellCommandImpl : public celix::IShellCommand {
public:
~ShellCommandImpl() noexcept override = default;
void executeCommand(const std::string& commandLine, const std::vector<std::string>& commandArgs, FILE* outStream, FILE* errorStream) override {
fprintf(outStream, "called cxx command with cmd line %s\n", commandLine.c_str());
fprintf(errorStream, "Arguments size is %i\n", (int)commandArgs.size());
}
};

int main() {
celix_framework_t* fw = NULL;
celix_bundle_context_t *ctx = NULL;
celix_properties_t *properties = NULL;

properties = properties_create();
properties_set(properties, "LOGHELPER_ENABLE_STDOUT_FALLBACK", "true");
properties_set(properties, "org.osgi.framework.storage.clean", "onFirstInit");
properties_set(properties, "org.osgi.framework.storage", ".cacheBundleContextTestFramework");

fw = celix_frameworkFactory_createFramework(properties);
ctx = framework_getContext(fw);
long bndId = celix_bundleContext_installBundle(ctx, CXX_SHELL_BUNDLE_LOCATION, true);
assert(bndId >= 0);

auto cxxCtx = celix::BundleContext{ctx};
std::atomic<std::size_t> commandCount{0};
auto countCb = [&commandCount](celix_shell& shell) {
celix_array_list_t* result = nullptr;
shell.getCommands(shell.handle, &result);
commandCount = celix_arrayList_size(result);
for (int i = 0; i < celix_arrayList_size(result); ++i) {
free(celix_arrayList_get(result, i));
}
celix_arrayList_destroy(result);
};
auto callCount = cxxCtx.useService<celix_shell>(CELIX_SHELL_SERVICE_NAME)
.addUseCallback(countCb)
.build();
assert(callCount == 1);
std::size_t initialCount = commandCount.load();
std::cout << "initialCount=" << initialCount << std::endl;

auto reg = cxxCtx.registerService<celix::IShellCommand>(std::make_shared<ShellCommandImpl>())
.addProperty(celix::IShellCommand::COMMAND_NAME, "cxx::example")
.addProperty(celix::IShellCommand::COMMAND_USAGE, "usage")
.addProperty(celix::IShellCommand::COMMAND_DESCRIPTION, "desc")
.build();
reg->wait();
callCount = cxxCtx.useService<celix_shell>(CELIX_SHELL_SERVICE_NAME)
.addUseCallback(countCb)
.build();
assert(callCount == 1);
std::cout << "command count after registration=" << commandCount.load() << std::endl;
reg.reset();
celix_frameworkFactory_destroyFramework(fw);
return 0;
}

0 comments on commit ee74c8f

Please sign in to comment.