Skip to content

Commit

Permalink
CELIX-408: Updates pubsub test, Fixes some issues in the runtime scri…
Browse files Browse the repository at this point in the history
…pt and adds cmake test for pubsub based on runtime scripts

- Note the BUILD_PUBSUB_TESTS is not yet enabled, because the test is not stable.
  • Loading branch information
pnoltes committed Apr 12, 2017
1 parent 2d9c77d commit 9906d9d
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 34 deletions.
26 changes: 18 additions & 8 deletions cmake/cmake_celix/runtime_common.sh.in
Expand Up @@ -59,7 +59,7 @@ function rt_run_deployment() {
cd ${DEPLOYMENT_DIR} #assuming absolute dir
if [ -d .cache ] ; then
echo "Clearing cache"
rm -fr .cache #clear cache
rm -fr .cache
fi
. ./release.sh #run deployment release
if [ "${USE_TERM}" = "TRUE" ] ; then
Expand Down Expand Up @@ -126,17 +126,27 @@ function rt_wait_for() {
RESULT=1
fi

echo "Signalling pids:${PIDS}"
kill ${PIDS}

for PID in ${PIDS}; do
if wait ${PID}; then
echo "${PID} exited normal"
echo "Signalling pids: ${PIDS}"
kill -2 ${PIDS}
TIMES=15
for (( I=1; I<=TIMES; I++ ))
do
echo "Checking if ${PIDS} are still running. ${I}/${TIMES}"
kill -0 ${PIDS} &> /dev/null
if [ $? -eq 1 ] ; then #e.g. no such process result
#all process stopped
break
else
echo "${PID} exited with error"
sleep 1
fi
done

kill -0 ${PIDS} &> /dev/null
if [ $? -eq 0 ] ; then
echo "Using kill -9 to ensure processes are killed"
kill -9 ${PIDS}
fi

echo ${RESULT}
}

Expand Down
1 change: 1 addition & 0 deletions launcher/CMakeLists.txt
Expand Up @@ -41,6 +41,7 @@ if (LAUNCHER)
if (CPPUTEST_FOUND)
#Test running which start celix and run CppUTest RUN_ALL_TESTS.
#Using this test running it is possible to create bundles containing CppUTests.
include_directories(celix_test_runner ${CPPUTEST_INCLUDE_DIRS})
add_executable(celix_test_runner
private/src/celix_test_runner.cpp
)
Expand Down
5 changes: 5 additions & 0 deletions pubsub/CMakeLists.txt
Expand Up @@ -36,7 +36,12 @@ if (PUBSUB)
add_subdirectory(deploy)
add_subdirectory(keygen)
add_subdirectory(mock)


if (ENABLE_TESTING)
option(BUILD_PUBSUB_TESTS "Enable Tests for PUBSUB" ON)
endif()
if (ENABLE_TESTING AND BUILD_PUBSUB_TESTS)
add_subdirectory(test)
endif()

Expand Down
19 changes: 13 additions & 6 deletions pubsub/test/CMakeLists.txt
Expand Up @@ -33,15 +33,16 @@ add_bundle(pubsub_sut
)
bundle_files(pubsub_sut
msg_descriptors/msg.descriptor
msg_descriptors/sync.descriptor
DESTINATION "META-INF/descriptors/messages"
)
add_deploy(pubsub_udpmc_sut
NAME deploy_sut
BUNDLES
org.apache.celix.pubsub_serializer.PubSubSerializerJson
org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
#org.apache.celix.pubsub_admin.PubSubAdminUdpMc
org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_admin.PubSubAdminUdpMc
#org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
pubsub_sut
DIR ${PROJECT_BINARY_DIR}/runtimes/test/pubsub/udpmc
Expand All @@ -55,6 +56,7 @@ add_bundle(pubsub_tst
)
bundle_files(pubsub_tst
msg_descriptors/msg.descriptor
msg_descriptors/sync.descriptor
DESTINATION "META-INF/descriptors/messages"
)
add_deploy(pubsub_udpmc_tst
Expand All @@ -63,15 +65,15 @@ add_deploy(pubsub_udpmc_tst
org.apache.celix.pubsub_serializer.PubSubSerializerJson
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
#org.apache.celix.pubsub_admin.PubSubAdminUdpMc
org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_admin.PubSubAdminUdpMc
#org.apache.celix.pubsub_admin.PubSubAdminZmq
pubsub_tst
DIR ${PROJECT_BINARY_DIR}/runtimes/test/pubsub/udpmc
LAUNCHER celix_test_runner
)

if (ETCD_CMD)
add_runtime(pubsub_rt_test_udpmc
add_runtime(pubsub_test_udpmc_runtime
NAME udpmc
GROUP test/pubsub
DEPLOYMENTS
Expand All @@ -81,7 +83,12 @@ if (ETCD_CMD)
etcd
WAIT_FOR
pubsub_udpmc_tst
USE_TERM
#USE_TERM
#LOG_TO_FILES
)

add_test(NAME pubsub_udpmc_test
COMMAND ./start.sh
WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_test_udpmc_runtime,RUNTIME_LOCATION>
)
endif ()
9 changes: 9 additions & 0 deletions pubsub/test/msg_descriptors/sync.descriptor
@@ -0,0 +1,9 @@
:header
type=message
name=sync
version=1.0.0
:annotations
classname=org.example.Sync
:types
:message
{F nop}
2 changes: 2 additions & 0 deletions pubsub/test/test/msg.h
Expand Up @@ -22,6 +22,8 @@

#include <stdint.h>

#define MSG_NAME "msg"

typedef struct msg {
uint32_t seqNr;
} msg_t;
Expand Down
2 changes: 1 addition & 1 deletion pubsub/test/test/sut_activator.c
Expand Up @@ -83,7 +83,7 @@ celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt __att

static int sut_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, pubsub_multipart_callbacks_t *callbacks, bool *release) {
struct activator* act = handle;
printf("Received msg %s, sending back\n", msgType);
printf("Received msg '%s', sending back\n", msgType);
pthread_mutex_lock(&act->mutex);
if (act->pubSvc != NULL) {
unsigned int sendId = 0;
Expand Down
29 changes: 29 additions & 0 deletions pubsub/test/test/sync.h
@@ -0,0 +1,29 @@
/**
*Licensed to the Apache Software Foundation (ASF) under one
*or more contributor license agreements. See the NOTICE file
*distributed with this work for additional information
*regarding copyright ownership. The ASF licenses this file
*to you under the Apache License, Version 2.0 (the
*"License"); you may not use this file except in compliance
*with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*Unless required by applicable law or agreed to in writing,
*software distributed under the License is distributed on an
*"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
*specific language governing permissions and limitations
*under the License.
*/

#ifndef SYNC_H
#define SYNC_H

#define SYNC_NAME "sync"

typedef struct sync {
float nop; //need at least one entry
} sync_t;

#endif //SYNC_H
52 changes: 33 additions & 19 deletions pubsub/test/test/tst_activator.cpp
Expand Up @@ -29,6 +29,7 @@
#include "pubsub/publisher.h"

#include "msg.h"
#include "sync.h"

#include <CppUTest/TestHarness.h>
#include <CppUTestExt/MockSupport.h>
Expand All @@ -39,8 +40,6 @@ static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId
static int tst_pubAdded(void *handle, service_reference_pt reference, void *service);
static int tst_pubRemoved(void *handle, service_reference_pt reference, void *service);

#define MSG_NAME "msg"

struct activator {
pubsub_subscriber_t subSvc;
service_registration_pt reg = nullptr;
Expand All @@ -49,6 +48,10 @@ struct activator {

pthread_mutex_t mutex; //protects below
pubsub_publisher_t* pubSvc = nullptr;

unsigned int syncId = 0;
bool gotSync = 0;

unsigned int msgId = 0;
unsigned int count = 0;

Expand Down Expand Up @@ -96,7 +99,11 @@ celix_status_t bundleActivator_destroy(__attribute__((unused)) void * userData,
static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, pubsub_multipart_callbacks_t *callbacks, bool *release) {
struct activator* act = static_cast<struct activator*>(handle);
pthread_mutex_lock(&act->mutex);
act->count += 1;
if (msgTypeId == act->syncId) {
act->gotSync = true;
} else if (msgTypeId == act->msgId) {
act->count += 1;
}
pthread_mutex_unlock(&act->mutex);
return CELIX_SUCCESS;
}
Expand All @@ -105,7 +112,9 @@ static int tst_pubAdded(void *handle, service_reference_pt reference, void *serv
struct activator* act = static_cast<struct activator*>(handle);
pthread_mutex_lock(&act->mutex);
act->pubSvc = static_cast<pubsub_publisher_t*>(service);
act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &g_act.msgId);
act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &act->msgId);
act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, SYNC_NAME, &act->syncId);

pthread_mutex_unlock(&act->mutex);
return CELIX_SUCCESS;

Expand All @@ -126,43 +135,48 @@ TEST_GROUP(PUBSUB_INT_GROUP)
{
void setup() {
constexpr int TRIES = 25;
constexpr int TIMEOUT = 1000000;
constexpr int TIMEOUT = 250000;

pthread_mutex_lock(&g_act.mutex);
CHECK_EQUAL(true, g_act.started);
g_act.gotSync = false;
pthread_mutex_unlock(&g_act.mutex);

//check if publisher is available
unsigned int msgId = 0;
unsigned int syncId = 0;
for (int i = 0; i < TRIES; ++i) {
pthread_mutex_lock(&g_act.mutex);
msgId = g_act.msgId;
syncId = g_act.syncId;
pthread_mutex_unlock(&g_act.mutex);
if (msgId == 0) {
printf("publisher still nullptr / msg Id is still 0, waiting for a while\n");
if (syncId == 0) {
printf("publisher still nullptr / sync msg type id is still 0, waiting for a while\n");
usleep(TIMEOUT);
} else {
break;
}
}
CHECK(msgId != 0);
CHECK(syncId != 0);

//check if message are returned
msg_t initMsg;
initMsg.seqNr = 0;
int count = 0;
sync_t syncMsg;
bool gotSync = false;
for (int i = 0; i < TRIES; ++i) {
printf("Sending sync message. Try %d/%d\n", i+1, TRIES);
pthread_mutex_lock(&g_act.mutex);
g_act.pubSvc->send(g_act.pubSvc->handle, g_act.msgId, &initMsg);
g_act.pubSvc->send(g_act.pubSvc->handle, g_act.syncId, &syncMsg);
pthread_mutex_unlock(&g_act.mutex);
usleep(TIMEOUT);
pthread_mutex_lock(&g_act.mutex);
count = g_act.count;
gotSync = g_act.gotSync;
pthread_mutex_unlock(&g_act.mutex);
if (count > 0) {
if (gotSync) {
break;
} else {
printf("No return message received, waiting for a while. %d/%d\n", i+1, TRIES);
}
}
CHECK(count > 0);
if (!gotSync) {
printf("No sync message received, bailing\n");
}
CHECK(gotSync);
}

void teardown() {
Expand Down

0 comments on commit 9906d9d

Please sign in to comment.