Skip to content

Commit

Permalink
Merge pull request adilbaig#23 from acehreli/pubsub
Browse files Browse the repository at this point in the history
Subscriber
  • Loading branch information
adilbaig committed May 19, 2016
2 parents 447832e + 12fac4a commit 3818eb1
Show file tree
Hide file tree
Showing 4 changed files with 511 additions and 27 deletions.
45 changes: 43 additions & 2 deletions benchmark/benchmark.d
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import tinyredis,
import tinyredis.redis,
tinyredis.subscriber,
tinyredis.encoder,
std.stdio,
std.getopt,
std.datetime,
Expand Down Expand Up @@ -47,6 +49,29 @@ void timeCommand(Redis redis, string command, ref StopWatch sw, const uint reqs,
writeln("");
}

void timePubSub(Redis redis, Subscriber subscriber, string command, ref StopWatch sw, const uint reqs, const uint pipeline)
{
sw.reset();
sw.start();

auto e = encode(command);
if(pipeline > 1) {
e = std.array.replicate(e, pipeline);
}

for(uint i = 0; i < reqs/pipeline; i++)
{
redis.sendRaw(e);
subscriber.processMessages();
}

sw.stop();

writefln("%d messages processed in %.3f seconds", reqs, sw.peek().msecs()/1000.0);
writefln("%d messages per second", cast(uint)std.math.round(reqs/(sw.peek().msecs()/1000.0)));
writeln("");
}

/**
* Make sure the redis server is running
*/
Expand Down Expand Up @@ -84,6 +109,22 @@ int main(string[] args)

writeln("====== SET ======");
timeCommand(redis, "SET trbck:get 12", sw, reqs, pipeline);


auto subscriber = new Subscriber();
ulong messages = 0;
ulong pmessages = 0;
subscriber.subscribe("my_channel", (channel, message) { ++messages; });
subscriber.psubscribe("my_pattern*", (pattern, channel, message) { ++pmessages; });

writeln("====== SUBSCRIBE ======");
timePubSub(redis, subscriber, "PUBLISH my_channel my_message", sw, reqs, pipeline);
if (messages != reqs)
writefln("WARNING: Expected %s messages, processed %s messages\n", reqs, messages);

writeln("====== PSUBSCRIBE ======");
timePubSub(redis, subscriber, "PUBLISH my_patternX my_message", sw, reqs, pipeline);
if (pmessages != reqs)
writefln("WARNING: Expected %s messages, processed %s messages\n", reqs, pmessages);

return 0;
}
130 changes: 112 additions & 18 deletions examples/console.d
Original file line number Diff line number Diff line change
@@ -1,31 +1,122 @@
import tinyredis,
std.stdio
import tinyredis.redis,
tinyredis.subscriber,
tinyredis.connection,
tinyredis.response,
tinyredis.parser,
std.stdio,
std.string,
std.algorithm,
std.functional
;

/**
* Callback function for SUBSCRIBE messages.
*/
void handleMessage(string channel, string message)
{
writefln("Channel '%s': %s", channel, message);
}

/**
* Callback function for PSUBSCRIBE messages.
*/
void handlePatternMessage(string pattern, string channel, string message)
{
writefln("Channel '%s' (matching '%s'): %s", channel, pattern, message);
}

/**
* Report the number of remaining subscriptions
*/
void reportSubscriptions(size_t count)
{
writefln("%s subscription%s", count, count == 1 ? "" : "s");
}

/**
* This is a simple console to demonstrate Tiny Redis
*/
void main()
{
auto redis = new Redis();

char[] buf;

auto redis = new Redis(); // Regular connection
auto sub = new Subscriber(); // Subscription connection
size_t subCount = 0; // Number of current subscriptions
bool isSubscribed = false; // Which connection to ping on (Reddis's response differs)

char[] buf;

void updateSubscriptionState() {
isSubscribed = (subCount != 0);
reportSubscriptions(subCount);
}

writeln("Press Enter to process queued messages.");
write("redis > ");
while (stdin.readln(buf))
{
string cmd = cast(string)buf[0 .. $-1];

if(cmd == "exit")
return;

if(cmd.length > 0)
string line = cast(string)buf[0 .. $-1].strip;

if(line.length > 0)
try{
Response resp = redis.send(cmd);
if (resp.isString) {
writeln('"', resp.toDiagnosticString(), '"');
} else {
writeln(resp.toDiagnosticString());
const found = line.findSplit(" ");
const cmd = found[0].toLower;
auto channels = found[2].splitter(' '); // Used only under some cases; still...

switch (cmd)
{
case "exit":
return;

case "subscribe":
// .idup because buf is shared by all command lines
channels.each!(
c => subCount = sub.subscribe(c.idup, toDelegate(&handleMessage)));
updateSubscriptionState();
break;

case "unsubscribe":
if (channels.empty)
subCount = sub.unsubscribe();
else
channels.each!(c => subCount = sub.unsubscribe(c));
updateSubscriptionState();
break;

case "psubscribe":
// .idup because buf is shared by all command lines
channels.each!(
c => subCount = sub.psubscribe(c.idup, toDelegate(&handlePatternMessage)));
updateSubscriptionState();
break;

case "punsubscribe":
if (channels.empty)
subCount = sub.punsubscribe();
else
channels.each!(c => subCount = sub.punsubscribe(c));
updateSubscriptionState();
break;

case "quit":
Response resp = sub.quit();
writeln(resp);
subCount = 0;
updateSubscriptionState();
break;

case "ping":
auto data = found[2];
Response resp = (isSubscribed ? sub.ping(data) : redis.send(line));
writeln(resp);
break;

default:
Response resp = redis.send(line);
if (resp.isString) {
writeln('"', resp.toDiagnosticString(), '"');
} else {
writeln(resp.toDiagnosticString());
}
}
}
catch(RedisResponseException e) {
Expand All @@ -34,7 +125,10 @@ void main()
catch(ConnectionException e) {
writeln("(error) ", e.msg);
}


// Opportunity to process queued messages from subscribed channels
sub.processMessages();

write("redis > ");
}
}
37 changes: 30 additions & 7 deletions source/tinyredis/connection.d
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ module tinyredis.connection;
* Authors: Adil Baig, adil.baig@aidezigns.com
*/

import std.socket : TcpSocket;
import tinyredis.parser;
import tinyredis.response;
public:
import std.socket : TcpSocket;

private:
import std.array : appender, back, popBack;
import std.string : format;
import tinyredis.parser;
import tinyredis.response;

debug(tinyredis) {
import std.stdio : writeln;
Expand Down Expand Up @@ -124,10 +129,28 @@ private :
byte[1024 * 16] buff;
size_t len = conn.receive(buff);

if(len == 0)
throw new ConnectionException("Server closed the connection!");
else if(len == TcpSocket.ERROR)
throw new ConnectionException("A socket error occurred!");
if (conn.blocking)
{
if(len == 0)
throw new ConnectionException("Server closed the connection!");
else if(len == TcpSocket.ERROR)
throw new ConnectionException("A socket error occurred!");
}
else
{
if (len == -1)
{
import core.stdc.errno;

if (errno == EWOULDBLOCK)
{
len = 0;
errno = 0;
}
else
throw new ConnectionException(format("A socket error occurred! errno: %s", errno));
}
}

buffer ~= buff[0 .. len];
debug(tinyredis) { writeln("Response : ", "'" ~ escape(cast(string)buffer) ~ "'", " Length : ", len); }
Expand Down
Loading

0 comments on commit 3818eb1

Please sign in to comment.