Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

lrqueue2.cs bug fix, peering1.cs finished, peering2.cs works but seem…

…s to suffer from starvation. c/peering.cs there seems to be a typo in the example
  • Loading branch information...
commit 3d3800259735de2de487321b19d93f75e8f6b379 1 parent 8429462
@ptomasroos ptomasroos authored
View
1  examples/C#/lruqueue2.cs
@@ -88,7 +88,6 @@ public static void Main(string[] args)
// Handle worker activity on backend
backend.PollInHandler += (socket, revents) =>
{
- ZmqMessage
var zmsg = new ZMessage(socket);
// Use worker address for LRU routing
workerQueue.Enqueue(zmsg.Unwrap());
View
4 examples/C#/peering1.cs
@@ -1,5 +1,5 @@
//
-// Broker peering simulation (part 1) in Python
+// Broker peering simulation (part 1) in C#
// Prototypes the state flow
// Note! ipc doesnt work on windows and therefore type peering1 8001 8002 8003
@@ -7,8 +7,6 @@
// Email: ptomasroos@gmail.com
using System;
-using System.Linq;
-using System.Collections.Generic;
using System.Text;
using System.Threading;
using ZMQ;
View
228 examples/C#/peering2.cs
@@ -1,7 +1,12 @@
//
-// Broker peering simulation (part 1) in Python
-// Prototypes the state flow
-// Note! ipc doesnt work on windows and therefore type peering1 8001 8002 8003
+// Broker peering simulation (part 2) in C#
+// Prototypes the request-reply flow
+//
+// While this example runs in a single process, that is just to make
+// it easier to start and stop the example. Each thread has its own
+// context and conceptually acts as a separate process.
+//
+// Note! ipc doesnt work on windows and therefore type peering2 801 802 803
// Author: Tomas Roos
// Email: ptomasroos@gmail.com
@@ -17,13 +22,19 @@ namespace ZMQGuide
{
internal class Program
{
+ private const int numberOfClients = 10;
+ private const int numberOfWorkers = 3;
+ private static string cloudFeAddress;
+ private static string localBeAddress;
+ private static string localFeAddress;
+ private static readonly Random randomizer = new Random(DateTime.Now.Millisecond);
+ private static List<string> peers = new List<string>();
+
public static void Main(string[] args)
{
- var randomizer = new Random(DateTime.Now.Millisecond);
-
if (args.Length < 2)
{
- Console.WriteLine("Usage: peering1 <myself> <peer_1> … <peer_N>");
+ Console.WriteLine("Usage: peering2 <myself> <peer_1> … <peer_N>");
return;
}
@@ -32,40 +43,209 @@ public static void Main(string[] args)
using (var context = new Context(1))
{
- using (Socket statebe = context.Socket(SocketType.PUB), statefe = context.Socket(SocketType.SUB))
+ using (Socket cloudfe = context.Socket(SocketType.ROUTER), cloudbe = context.Socket(SocketType.ROUTER),
+ localfe = context.Socket(SocketType.ROUTER), localbe = context.Socket(SocketType.ROUTER))
{
- var bindAddress = "tcp://127.0.0.1:" + myself;
- statebe.Bind(bindAddress);
- Thread.Sleep(1000);
+ cloudFeAddress = "tcp://127.0.0.1:" + myself;
+ cloudfe.Identity = Encoding.Unicode.GetBytes(myself);
+ cloudfe.Bind(cloudFeAddress);
+ cloudbe.Identity = Encoding.Unicode.GetBytes(myself);
for (int arg = 1; arg < args.Length; arg++)
{
var endpoint = "tcp://127.0.0.1:" + args[arg];
- statefe.Connect(endpoint);
- statefe.Subscribe(string.Empty, Encoding.Unicode);
- Thread.Sleep(1000);
+ peers.Add(endpoint);
+ Console.WriteLine("I: connecting to cloud frontend at " + endpoint);
+ cloudbe.Connect(endpoint);
}
- statefe.PollInHandler += (socket, revents) =>
- {
- string peerName = socket.Recv(Encoding.Unicode);
- string available = socket.Recv(Encoding.Unicode);
+ localFeAddress = cloudFeAddress + "1";
+ localfe.Bind(localFeAddress);
+ localBeAddress = cloudFeAddress + "2";
+ localbe.Bind(localBeAddress);
+
+ Console.WriteLine("Press Enter when all brokers are started: ");
+ Console.ReadKey();
+
+ var workers = new List<Thread>();
+ for (int workerNumber = 0; workerNumber < numberOfWorkers; workerNumber++)
+ {
+ workers.Add(new Thread(WorkerTask));
+ workers[workerNumber].Start();
+ }
+
+ var clients = new List<Thread>();
+ for (int clientNumber = 0; clientNumber < numberOfClients; clientNumber++)
+ {
+ clients.Add(new Thread(ClientTask));
+ clients[clientNumber].Start();
+ }
+
+ var workerQueue = new Queue<byte[]>();
+
+ var localfeReady = false;
+ var cloudfeReady = false;
+
+ var backends = new PollItem[2];
+ backends[0] = localbe.CreatePollItem(IOMultiPlex.POLLIN);
+ backends[0].PollInHandler += (socket, revents) =>
+ {
+ var zmsg = new ZMessage(socket);
+
+ // Use worker address for LRU routing
+ workerQueue.Enqueue(zmsg.Unwrap());
+
+ if (zmsg.BodyToString() != "READY")
+ {
+ SendReply(zmsg, cloudfe, localfe);
+ }
+ };
+
+ backends[1] = cloudbe.CreatePollItem(IOMultiPlex.POLLIN);
+ backends[1].PollInHandler += (socket, revents) =>
+ {
+ var zmsg = new ZMessage(socket);
+ // We don't use peer broker address for anything
+ zmsg.Unwrap();
+
+ SendReply(zmsg, cloudfe, localfe);
+ };
+
+ var frontends = new PollItem[2];
+ frontends[0] = cloudfe.CreatePollItem(IOMultiPlex.POLLIN);
+ frontends[0].PollInHandler += (socket, revents) =>
+ {
+ cloudfeReady = true;
+ };
+
+ frontends[1] = localfe.CreatePollItem(IOMultiPlex.POLLIN);
+ frontends[1].PollInHandler += (socket, revents) =>
+ {
+ localfeReady = true;
+ };
- Console.WriteLine("{0} - {1} workers free\n", peerName, available);
- };
while (true)
{
- int count = Context.Poller(1000 * 1000, statefe);
-
- if (count == 0)
+ var timeout = (workerQueue.Count > 0 ? 1000000 : -1);
+ var rc = Context.Poller(backends, timeout);
+
+ if (rc == -1)
+ break; // Interrupted
+
+ while (workerQueue.Count > 0)
{
- statebe.Send(myself, Encoding.Unicode);
- statebe.Send(randomizer.Next(10).ToString(), Encoding.Unicode);
+ Context.Poller(frontends, 0);
+ bool reRoutable;
+
+ ZMessage msg;
+
+ if (cloudfeReady)
+ {
+ cloudfeReady = false;
+ msg = new ZMessage(cloudfe);
+ reRoutable = false;
+ }
+ else if (localfeReady)
+ {
+ localfeReady = false;
+ msg = new ZMessage(localfe);
+ reRoutable = true;
+ }
+ else
+ {
+ break;
+ }
+
+ //if (reRoutable && workerQueue.Count > 0 && randomizer.Next(3) == 0)
+ if (reRoutable && peers.Count > 0 && randomizer.Next(4) == 0)
+ {
+ var randomPeer = randomizer.Next(1, args.Length - 1);
+ var endpoint = "tcp://127.0.0.1:" + args[randomPeer];
+ msg.Wrap(Encoding.Unicode.GetBytes(endpoint), new byte[0]);
+ msg.Send(cloudbe);
+ }
+ else
+ {
+ msg.Wrap(workerQueue.Dequeue(), new byte[0]);
+ msg.Send(localbe);
+ }
}
}
}
}
}
+
+ private static void SendReply(ZMessage msg, Socket cloudfe, Socket localfe)
+ {
+ var address = Encoding.Unicode.GetString(msg.Address);
+ // Route reply to cloud if it's addressed to a broker
+
+ if (peers.Count(peerAddress => peerAddress == address) == 1)
+ {
+ Console.WriteLine("Sending to cloud frontend");
+ msg.Send(cloudfe);
+ }
+ else
+ {
+ Console.WriteLine("Sending to local frontend");
+ msg.Send(localfe);
+ }
+ }
+
+ private static void WorkerTask()
+ {
+ using (var ctx = new Context(1))
+ {
+ using (var worker = ctx.Socket(SocketType.REQ))
+ {
+ ZHelpers.SetID(worker, Encoding.Unicode);
+ worker.Connect(localBeAddress);
+
+ var msg = new ZMessage("READY");
+ msg.Send(worker);
+
+ while (true)
+ {
+ var recvMsg = new ZMessage(worker);
+ Console.WriteLine("Worker: {0}", recvMsg.BodyToString());
+
+ Thread.Sleep(1000);
+
+ recvMsg.StringToBody("OK");
+ recvMsg.Send(worker);
+ //var okmsg = new ZMessage("OK");
+ //okmsg.Send(worker);
+ }
+ }
+ }
+ }
+
+ private static void ClientTask()
+ {
+ using (var ctx = new Context(1))
+ {
+ using (var client = ctx.Socket(SocketType.REQ))
+ {
+ ZHelpers.SetID(client, Encoding.Unicode);
+ client.Connect(localFeAddress);
+
+ while (true)
+ {
+ client.Send("HELLO", Encoding.Unicode);
+ string reply = client.Recv(Encoding.Unicode);
+
+ if (string.IsNullOrEmpty(reply))
+ {
+ break;
+ }
+
+ Console.WriteLine("Client: {0}", reply);
+
+ Thread.Sleep(1000);
+ }
+ }
+ }
+ }
}
}
View
2  examples/C/peering2.c
@@ -90,7 +90,7 @@ int main (int argc, char *argv [])
// Connect cloud backend to all peers
void *cloudbe = zsocket_new (ctx, ZMQ_ROUTER);
- zsockopt_set_identity (cloudfe, self);
+ zsockopt_set_identity (cloudbe, self);
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv [argn];
Please sign in to comment.
Something went wrong with that request. Please try again.