Skip to content

Commit

Permalink
chore: added client id to the callback for pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
ADRFranklin committed May 29, 2023
1 parent e5c655b commit b1fc93e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 27 deletions.
30 changes: 16 additions & 14 deletions src/impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ int Impl::HDel(int client_id, std::string key, std::string field)
auto req = client->hdel(key, std::vector<std::string>{ field });
client->sync_commit();
auto r = req.get();

if (r.is_error()) {
logprintf("ERROR: %s", r.error().c_str());
return 2;
Expand Down Expand Up @@ -404,8 +404,20 @@ int Impl::Subscribe(AMX* amx, std::string host, int port, std::string auth, std:
sub->auth(auth);
}

sub->subscribe(channel, [amx, callback](const std::string& chan, const std::string& msg) {
clientData cd;
cd.subscriber = sub;
cd.channel = channel;
cd.host = host;
cd.port = port;
cd.auth = auth;
cd.isPubSub = true;
clients[context_count] = cd;

id = context_count++;

sub->subscribe(channel, [id, amx, callback](const std::string& chan, const std::string& msg) {
message m;
m.clientId = id;
m.amx = amx;
m.channel = chan;
m.msg = msg;
Expand All @@ -418,17 +430,6 @@ int Impl::Subscribe(AMX* amx, std::string host, int port, std::string auth, std:

sub->commit();

clientData cd;
cd.subscriber = sub;
cd.channel = channel;
cd.host = host;
cd.port = port;
cd.auth = auth;
cd.isPubSub = true;
clients[context_count] = cd;

id = context_count++;

return 0;
}

Expand All @@ -443,7 +444,7 @@ int Impl::Unsubscribe(int client_id)

cd.subscriber->unsubscribe(cd.channel);
cd.subscriber->commit();

clients.erase(client_id);

return 0;
Expand Down Expand Up @@ -492,6 +493,7 @@ void Impl::amx_tick()
*/
amx_Push(amx, m.msg.length());
amx_PushString(amx, &amx_addr, &phys_addr, m.msg.c_str(), 0, 0);
amx_Push(amx, m.clientId);

amx_Exec(amx, &amx_ret, amx_idx);
amx_Release(amx, amx_addr);
Expand Down
1 change: 1 addition & 0 deletions src/impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ struct subscription {
};

struct message {
int clientId;
std::string channel;
std::string msg;
std::string callback;
Expand Down
23 changes: 10 additions & 13 deletions test.pwn
Original file line number Diff line number Diff line change
Expand Up @@ -232,19 +232,17 @@ Test:MessageBindReply()
ASSERT(ret == 0);
}

TestClose:MessageBindReply()
{
// Redis_Unsubscribe(pubsub_1);
}

forward Receive(data[]);
public Receive(data[])
forward Receive(PubSub:id, data[]);
public Receive(PubSub:id, data[])
{
ASSERT(id == pubsub_1);
if(!strcmp(data, "hello world!")) {
printf("\n\nPASS!\n\n*** Redis bind message callback 'Receive' returned the correct value: '%s' test passed!", data);
} else {
printf("\n\nFAIL!\n\n*** Redis bind message callback 'Receive' returned the incorrect value: '%s'", data);
}

Redis_Unsubscribe(pubsub_1);
}


Expand Down Expand Up @@ -280,17 +278,16 @@ Test:MultiMessage()
ASSERT(ret == 0);
}

TestClose:MultiMessage()
forward Receive2(PubSub:id, data[]);
public Receive2(PubSub:id, data[])
{
// Redis_Unsubscribe(pubsub_2);
}
ASSERT(id == pubsub_2);

forward Receive2(data[]);
public Receive2(data[])
{
if(!strcmp(data, "to receive2"))
printf("\n\nPASS!\n\n*** Redis bind message callback 'Receive2' returned the correct value: '%s' test passed!", data);

else
printf("\n\nFAIL!\n\n*** Redis bind message callback 'Receive2' returned the incorrect value: '%s'", data);

Redis_Unsubscribe(pubsub_2);
}

0 comments on commit b1fc93e

Please sign in to comment.