Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
lruqueue.cs
  • Loading branch information
ptomasroos committed Feb 10, 2012
1 parent 26fb588 commit aec4d88
Showing 1 changed file with 98 additions and 85 deletions.
183 changes: 98 additions & 85 deletions examples/C#/lruqueue.cs
Expand Up @@ -7,120 +7,86 @@
// context and conceptually acts as a separate process.
//

// Author: Michael Compton
// Email: michael.compton@littleedge.co.uk
// Author: Michael Compton, Tomas Roos
// Email: michael.compton@littleedge.co.uk, ptomasroos@gmail.com

using System;
using System.Collections.Generic;
using System.Text;
using ZMQ;
using System.Threading;

namespace lruqueue {
class Program {
// Basic request-reply client using REQ socket
//
static void ClientTask() {
using (Context ctx = new Context(1)) {
using (Socket client = ctx.Socket(SocketType.REQ)) {
ZHelpers.SetID(client, Encoding.Unicode);
client.Connect("tcp://localhost:5555");

// Send request, get reply
client.Send("HELLO", Encoding.Unicode);
string reply = client.Recv(Encoding.Unicode);
Console.WriteLine("Client: {0}", reply);
}
}
}

// Worker using REQ socket to do LRU routing
//
static void WorkerTask() {
using (Context ctx = new Context(1)) {
using (Socket worker = ctx.Socket(SocketType.REQ)) {
ZHelpers.SetID(worker, Encoding.Unicode);
worker.Connect("tcp://localhost:5556");

// Tell broker we're ready for work
worker.Send("READY", Encoding.Unicode);
while (true) {
// Read and save all frames until we get an empty frame
// In this example there is only 1 but it could be more
string address = worker.Recv(Encoding.Unicode);
string empty = worker.Recv(Encoding.Unicode);

// Get request, send reply
string request = worker.Recv(Encoding.Unicode);
Console.WriteLine("Worker: {0}", request);

worker.SendMore(address, Encoding.Unicode);
worker.SendMore();
worker.Send("OK", Encoding.Unicode);
}
}
}
}

const int NBR_CLIENTS = 10;
const int NBR_WORKERS = 3;

static void Main(string[] args) {
List<Thread> workers = new List<Thread>();
List<Thread> clients = new List<Thread>();
// Prepare our context and sockets
using (Context ctx = new Context(1)) {
using (Socket frontend = ctx.Socket(SocketType.ROUTER),
backend = ctx.Socket(SocketType.ROUTER)) {

namespace ZMQGuide
{
internal class Program
{
public static void Main(string[] args)
{
const int workersToStart = 3;
int clientsRunning = 10;

var workers = new List<Thread>();
var clients = new List<Thread>();

using (var context = new Context(1))
{
using (Socket frontend = context.Socket(SocketType.ROUTER), backend = context.Socket(SocketType.ROUTER))
{
frontend.Bind("tcp://*:5555");
backend.Bind("tcp://*:5556");

int clientNbr;
for (clientNbr = 0; clientNbr < NBR_CLIENTS; clientNbr++) {
for (int clientNumber = 0; clientNumber < clientsRunning; clientNumber++)
{
clients.Add(new Thread(ClientTask));
clients[clientNbr].Start();
clients[clientNumber].Start();
}

for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++) {
for (int workerNumber = 0; workerNumber < workersToStart; workerNumber++)
{
workers.Add(new Thread(WorkerTask));
workers[workerNbr].Start();
workers[workerNumber].Start();
}

// Logic of LRU loop
// - Poll backend always, frontend only if 1+ worker ready
// - If worker replies, queue worker as ready and forward reply
// to client if necessary
// - If client requests, pop next worker and send request to it
Queue<string> workerQueue = new Queue<string>();
var workerQueue = new Queue<string>();

// Handle worker activity on backend
backend.PollInHandler += (skt, revents) => {
backend.PollInHandler += (socket, revents) =>
{
// Queue worker address for LRU routing
string workerAddr = skt.Recv(Encoding.Unicode);
workerQueue.Enqueue(workerAddr);
string workerAddress = socket.Recv(Encoding.Unicode);
workerQueue.Enqueue(workerAddress);
// Second frame is empty
string empty = skt.Recv(Encoding.Unicode);
string empty = socket.Recv(Encoding.Unicode);
// Third frame is READY or else a client reply address
string clientAddr = skt.Recv(Encoding.Unicode);
string clientAddress = socket.Recv(Encoding.Unicode);
// If client reply, send rest back to frontend
if (!clientAddr.Equals("READY")) {
empty = skt.Recv(Encoding.Unicode);
string reply = skt.Recv(Encoding.Unicode);
frontend.SendMore(clientAddr, Encoding.Unicode);
if (!clientAddress.Equals("READY"))
{
empty = socket.Recv(Encoding.Unicode);
string reply = socket.Recv(Encoding.Unicode);
frontend.SendMore(clientAddress, Encoding.Unicode);
frontend.SendMore();
frontend.Send(reply, Encoding.Unicode);
clientNbr--; // Exit after N messages
clientsRunning--; // Exit after N messages
}
};
frontend.PollInHandler += (skt, revents) => {

frontend.PollInHandler += (socket, revents) =>
{
// Now get next client request, route to LRU worker
// Client request is [address][empty][request]
string clientAddr = skt.Recv(Encoding.Unicode);
string empty = skt.Recv(Encoding.Unicode);
string request = skt.Recv(Encoding.Unicode);
string clientAddr = socket.Recv(Encoding.Unicode);
string empty = socket.Recv(Encoding.Unicode);
string request = socket.Recv(Encoding.Unicode);
backend.SendMore(workerQueue.Dequeue(), Encoding.Unicode);
backend.SendMore();
Expand All @@ -130,12 +96,59 @@ class Program {
};


while (clientNbr > 0) { // Exit after N messages
if (workerQueue.Count > 0) {
Context.Poller(new List<Socket>(new Socket[] { frontend, backend }));
} else {
Context.Poller(new List<Socket>(new Socket[] { backend }));
}
while (clientsRunning > 0)
{ // Exit after N messages
Context.Poller(workerQueue.Count > 0
? new List<Socket>(new Socket[] {frontend, backend})
: new List<Socket>(new Socket[] {backend}));
}
}
}
}

private static void ClientTask()
{
using (var context = new Context(1))
{
using (Socket client = context.Socket(SocketType.REQ))
{
ZHelpers.SetID(client, Encoding.Unicode);
client.Connect("tcp://localhost:5555");

// Send request, get reply
client.Send("HELLO", Encoding.Unicode);
string reply = client.Recv(Encoding.Unicode);
Console.WriteLine("Client: {0}", reply);
}
}
}

private static void WorkerTask()
{
using (var context = new Context(1))
{
using (Socket worker = context.Socket(SocketType.REQ))
{
ZHelpers.SetID(worker, Encoding.Unicode);
worker.Connect("tcp://localhost:5556");

// Tell broker we're ready for work
worker.Send("READY", Encoding.Unicode);

while (true)
{
// Read and save all frames until we get an empty frame
// In this example there is only 1 but it could be more
string address = worker.Recv(Encoding.Unicode);
string empty = worker.Recv(Encoding.Unicode);

// Get request, send reply
string request = worker.Recv(Encoding.Unicode);
Console.WriteLine("Worker: {0}", request);

worker.SendMore(address, Encoding.Unicode);
worker.SendMore();
worker.Send("OK", Encoding.Unicode);
}
}
}
Expand Down

0 comments on commit aec4d88

Please sign in to comment.