Skip to content

Commit

Permalink
improved benchmark to cover use case mentioned in issue #61
Browse files Browse the repository at this point in the history
  • Loading branch information
Neverlord committed Aug 22, 2012
1 parent 2ad44eb commit b519309
Showing 1 changed file with 58 additions and 35 deletions.
93 changes: 58 additions & 35 deletions unit_testing/test__remote_actor.cpp
Expand Up @@ -9,22 +9,21 @@
#include "cppa/cppa.hpp"
#include "cppa/exception.hpp"

using std::cout;
using std::cerr;
using std::endl;

using namespace std;
using namespace cppa;

namespace {

std::vector<string_pair> get_kv_pairs(int argc, char** argv, int begin = 1) {
std::vector<string_pair> result;
typedef vector<actor_ptr> actor_vector;

vector<string_pair> get_kv_pairs(int argc, char** argv, int begin = 1) {
vector<string_pair> result;
for (int i = begin; i < argc; ++i) {
auto vec = split(argv[i], '=');
if (vec.size() != 2) {
cerr << "\"" << argv[i] << "\" is not a key-value pair" << endl;
}
else if (std::any_of(result.begin(), result.end(),
else if (any_of(result.begin(), result.end(),
[&](const string_pair& p) { return p.first == vec[0]; })) {
cerr << "key \"" << vec[0] << "\" is already defined" << endl;
}
Expand All @@ -48,6 +47,12 @@ struct reflector : public event_based_actor {

// receive seven reply messages (2 local, 5 remote)
void spawn5_server(actor_ptr client, bool inverted) {
auto default_case = others() >> [] {
cout << "unexpected message; "
<< __FILE__ << " line " << __LINE__ << ": "
<< to_string(self->last_dequeued())
<< endl;
};
group_ptr grp;
if (!inverted) {
grp = group::get("local", "foobar");
Expand All @@ -63,24 +68,40 @@ void spawn5_server(actor_ptr client, bool inverted) {
spawn_in_group<reflector>(grp);
spawn_in_group<reflector>(grp);
receive_response (sync_send(client, atom("Spawn5"), grp)) (
on(atom("ok")) >> [&] {
on(atom("ok"), arg_match) >> [&](const actor_vector& vec) {
send(grp, "Hello reflectors!", 5.0);
if (vec.size() != 5) {
cout << "remote client did not spawn five reflectors!\n";
}
for (auto& a : vec) {
self->monitor(a);
}
},
after(std::chrono::seconds(10)) >> [&] {
throw std::runtime_error("timeout");
default_case,
after(chrono::seconds(10)) >> [&] {
throw runtime_error("timeout");
}
);
cout << "wait for reflected messages\n";
// receive seven reply messages (2 local, 5 remote)
int x = 0;
receive_for(x, 7) (
on("Hello reflectors!", 5.0) >> [] { },
others() >> [&] {
cout << "unexpected message; "
<< __FILE__ << " line " << __LINE__ << ": "
<< to_string(self->last_dequeued())
<< endl;
}
on("Hello reflectors!", 5.0) >> [] { }
);
cout << "wait for DOWN messages\n";
// wait for DOWN messages
{int i = 0; receive_for(i, 5) (
on(atom("DOWN"), arg_match) >> [](std::uint32_t reason) {
if (reason != exit_reason::normal) {
cout << "reflector exited for non-normal exit reason!" << endl;
}
},
default_case,
after(chrono::seconds(2)) >> [&] {
i = 4;
cout << "received timeout while waiting for DOWN messages!\n";
}
);}
// wait for locally spawned reflectors
await_all_others_done();
send(client, atom("Spawn5Done"));
Expand All @@ -90,10 +111,11 @@ void spawn5_client() {
bool spawned_reflectors = false;
do_receive (
on(atom("Spawn5"), arg_match) >> [&](const group_ptr& grp) {
actor_vector vec;
for (int i = 0; i < 5; ++i) {
spawn_in_group<reflector>(grp);
vec.push_back(spawn_in_group<reflector>(grp));
}
reply(atom("ok"));
reply(atom("ok"), std::move(vec));
spawned_reflectors = true;
},
on(atom("GetGroup")) >> [] {
Expand All @@ -107,14 +129,14 @@ void spawn5_client() {
);
}

int client_part(const std::vector<string_pair>& args) {
int client_part(const vector<string_pair>& args) {
CPPA_TEST(test__remote_actor_client_part);
auto i = std::find_if(args.begin(), args.end(),
auto i = find_if(args.begin(), args.end(),
[](const string_pair& p) { return p.first == "port"; });
if (i == args.end()) {
throw std::runtime_error("no port specified");
throw runtime_error("no port specified");
}
auto port = static_cast<std::uint16_t>(std::stoi(i->second));
auto port = static_cast<uint16_t>(stoi(i->second));
auto server = remote_actor("localhost", port);
send(server, atom("SpawnPing"));
receive (
Expand All @@ -126,7 +148,7 @@ int client_part(const std::vector<string_pair>& args) {
receive_response (sync_send(server, atom("SyncMsg"))) (
others() >> [&] {
if (self->last_dequeued() != make_cow_tuple(atom("SyncReply"))) {
std::ostringstream oss;
ostringstream oss;
oss << "unexpected message; "
<< __FILE__ << " line " << __LINE__ << ": "
<< to_string(self->last_dequeued()) << endl;
Expand All @@ -136,7 +158,7 @@ int client_part(const std::vector<string_pair>& args) {
send(server, atom("Done"));
}
},
after(std::chrono::seconds(5)) >> [&] {
after(chrono::seconds(5)) >> [&] {
cerr << "sync_send timed out!" << endl;
send(server, atom("Timeout"));
}
Expand All @@ -147,7 +169,7 @@ int client_part(const std::vector<string_pair>& args) {
<< __FILE__ << " line " << __LINE__ << ": "
<< to_string(self->last_dequeued()));
},
after(std::chrono::seconds(0)) >> [&] { }
after(chrono::seconds(0)) >> [&] { }
);
// test 100 sync_messages
for (int i = 0; i < 100; ++i) {
Expand All @@ -160,7 +182,7 @@ int client_part(const std::vector<string_pair>& args) {
<< __FILE__ << " line " << __LINE__ << ": "
<< to_string(self->last_dequeued()));
},
after(std::chrono::seconds(10)) >> [&] {
after(chrono::seconds(10)) >> [&] {
CPPA_ERROR("unexpected timeout!");
}
);
Expand All @@ -177,8 +199,9 @@ int client_part(const std::vector<string_pair>& args) {
} // namespace <anonymous>

int main(int argc, char** argv) {
std::cout.unsetf(std::ios_base::unitbuf);
std::string app_path = argv[0];
announce<actor_vector>();
cout.unsetf(ios_base::unitbuf);
string app_path = argv[0];
bool run_remote_actor = true;
if (argc > 1) {
if (strcmp(argv[1], "run_remote_actor=false") == 0) {
Expand All @@ -191,7 +214,7 @@ int main(int argc, char** argv) {
}
CPPA_TEST(test__remote_actor);
//auto ping_actor = spawn(ping, 10);
std::uint16_t port = 4242;
uint16_t port = 4242;
bool success = false;
do {
try {
Expand All @@ -204,14 +227,14 @@ int main(int argc, char** argv) {
}
}
while (!success);
std::thread child;
std::ostringstream oss;
thread child;
ostringstream oss;
if (run_remote_actor) {
oss << app_path << " run=remote_actor port=" << port;// << " &>client.txt";
// execute client_part() in a separate process,
// connected via localhost socket
child = std::thread([&oss]() {
std::string cmdstr = oss.str();
child = thread([&oss]() {
string cmdstr = oss.str();
if (system(cmdstr.c_str()) != 0) {
cerr << "FATAL: command \"" << cmdstr << "\" failed!" << endl;
abort();
Expand Down Expand Up @@ -241,7 +264,7 @@ int main(int argc, char** argv) {
on(atom("Done")) >> [] {
// everything's fine
},
on(atom("Failure"), arg_match) >> [&](const std::string& str) {
on(atom("Failure"), arg_match) >> [&](const string& str) {
CPPA_ERROR(str);
},
on(atom("Timeout")) >> [&] {
Expand Down

0 comments on commit b519309

Please sign in to comment.