Skip to content

Commit

Permalink
Updating C# Examples
Browse files Browse the repository at this point in the history
  • Loading branch information
metadings committed Jan 16, 2015
1 parent e363d46 commit aef1dd9
Show file tree
Hide file tree
Showing 25 changed files with 262 additions and 204 deletions.
11 changes: 6 additions & 5 deletions examples/C#/HWServer.cs
Expand Up @@ -13,18 +13,19 @@ static partial class Program
public static void HWServer(IDictionary<string, string> dict, string[] args)
{
using (var context = ZContext.Create())
using (var responder = ZSocket.Create(context, ZSocketType.REP)) {

using (var responder = ZSocket.Create(context, ZSocketType.REP))
{
responder.Bind("tcp://*:5555");

while (true) {
while (true)
{

using (ZFrame request = responder.ReceiveFrame())
using (ZFrame request = responder.ReceiveFrame())
{
Console.WriteLine("Received {0}", request);
Thread.Sleep(1);

using (ZFrame reply = ZFrame.Create("World"))
using (ZFrame reply = ZFrame.Create("World"))
{
responder.SendFrame(reply);
}
Expand Down
13 changes: 8 additions & 5 deletions examples/C#/Identity.cs
Expand Up @@ -13,17 +13,19 @@ static partial class Program
public static void Identity(IDictionary<string, string> dict, string[] args)
{
using (var context = ZContext.Create())
using (var sink = ZSocket.Create(context, ZSocketType.ROUTER)) {

using (var sink = ZSocket.Create(context, ZSocketType.ROUTER))
{
sink.Bind("inproc://example");

using (var anonymous = ZSocket.Create(context, ZSocketType.REQ)) {
using (var anonymous = ZSocket.Create(context, ZSocketType.REQ))
{
anonymous.Connect("inproc://example");
anonymous.SendFrame(ZFrame.Create("ROUTER uses REQ's generated UUID"));
}
Identity_Dump(sink);

using (var identified = ZSocket.Create(context, ZSocketType.REQ)) {
using (var identified = ZSocket.Create(context, ZSocketType.REQ))
{
identified.Identity = Encoding.UTF8.GetBytes("PEER2");
identified.Connect("inproc://example");
identified.SendFrame(ZFrame.Create("ROUTER uses REQ's socket identity"));
Expand All @@ -34,7 +36,8 @@ public static void Identity(IDictionary<string, string> dict, string[] args)

static void Identity_Dump(ZSocket sink)
{
using (ZMessage msg = sink.ReceiveMessage()) {
using (ZMessage msg = sink.ReceiveMessage())
{
Console.WriteLine("---");
Console.WriteLine("[0] {0}", msg[0].ReadString());
Console.WriteLine("[1] {0}", msg[1].ReadString());
Expand Down
35 changes: 22 additions & 13 deletions examples/C#/Interrupt.cs
Expand Up @@ -13,20 +13,27 @@ static partial class Program
public static void Interrupt(IDictionary<string, string> dict, string[] args)
{
using (var context = ZContext.Create())
using (var responder = ZSocket.Create(context, ZSocketType.REP)) {
using (var responder = ZSocket.Create(context, ZSocketType.REP))
{

var thread = new Thread(() => {
Console.CancelKeyPress += (sender, e) => {
var thread = new Thread(() =>
{
Console.CancelKeyPress += (sender, e) =>
{
// e.Cancel = false;
context.Terminate();
};
while (true) {
if (Console.KeyAvailable) {
while (true)
{
if (Console.KeyAvailable)
{
ConsoleKeyInfo info = Console.ReadKey(true);
if (info.Modifiers == ConsoleModifiers.Control && info.Key == ConsoleKey.C) {
if (info.Modifiers == ConsoleModifiers.Control && info.Key == ConsoleKey.C)
{
break;
}
if (info.Key == ConsoleKey.Escape) {
if (info.Key == ConsoleKey.Escape)
{
context.Terminate();
break;
}
Expand All @@ -37,28 +44,30 @@ public static void Interrupt(IDictionary<string, string> dict, string[] args)
thread.Start();
thread.Join(64);


responder.Bind("tcp://*:5555");

ZError error;
ZFrame request;
while (true) {

while (true)
{
if (null == (request = responder.ReceiveFrame(out error)))
{
if (error == ZError.ETERM) {
if (error == ZError.ETERM)
{
Console.WriteLine("Terminating, you have pressed ESC.");
break;
}
throw new ZException(error);
}

using (request) {
using (request)
{
string respondText = "Hello";
Console.WriteLine("Received: {0}!", respondText, request.ReadString());

Console.Write("Sending {0}... ", respondText);
using (var response = ZFrame.Create(respondText)) {
using (var response = ZFrame.Create(respondText))
{
responder.SendFrame(response);
}
}
Expand Down
86 changes: 43 additions & 43 deletions examples/C#/LBBroker.cs
Expand Up @@ -33,7 +33,7 @@ static void LBBroker_Client(ZContext context, int i)
{
request.Add(ZFrame.Create("Hello"));

// Send request
// Send request
client.SendMessage(request);
}

Expand Down Expand Up @@ -69,39 +69,39 @@ static void LBBroker_Worker(ZContext context, int i)
worker.SendFrame(ready);
}

ZError error;
ZMessage request;
ZError error;
ZMessage request;

while (true)
{
// Get request
if (null == (request = worker.ReceiveMessage(out error)))
{
// We are using "out error",
// to NOT throw a ZException ETERM
if (error == ZError.ETERM)
break;

throw new ZException(error);
}

using (request)
{
string worker_id = request[0].ReadString();

string requestText = request[2].ReadString();
Console.WriteLine("WORKER{0}: {1}", i, requestText);

// Send reply
using (var commit = new ZMessage())
{
commit.Add(ZFrame.Create(worker_id));
commit.Add(ZFrame.Create(string.Empty));
commit.Add(ZFrame.Create("OK"));

worker.SendMessage(commit);
}
}
if (null == (request = worker.ReceiveMessage(out error)))
{
// We are using "out error",
// to NOT throw a ZException ETERM
if (error == ZError.ETERM)
break;

throw new ZException(error);
}

using (request)
{
string worker_id = request[0].ReadString();

string requestText = request[2].ReadString();
Console.WriteLine("WORKER{0}: {1}", i, requestText);

// Send reply
using (var commit = new ZMessage())
{
commit.Add(ZFrame.Create(worker_id));
commit.Add(ZFrame.Create(string.Empty));
commit.Add(ZFrame.Create("OK"));

worker.SendMessage(commit);
}
}
}
}
}
Expand Down Expand Up @@ -146,7 +146,7 @@ public static void LBBroker(IDictionary<string, string> dict, string[] args)
// requeue that worker and forward the reply to the original client
// using the reply envelope.

// Queue of available workers
// Queue of available workers
var worker_queue = new List<string>();

var pollers = new ZPollItem[]
Expand All @@ -155,14 +155,14 @@ public static void LBBroker(IDictionary<string, string> dict, string[] args)
ZPollItem.CreateReceiver(frontend)
};

ZError error;
ZMessage incoming;
ZError error;
ZMessage incoming;

while (true)
{
if (pollers[0].PollIn(out incoming, out error, TimeSpan.FromMilliseconds(64)))
{
// Handle worker activity on backend
// Handle worker activity on backend

// incoming[0] is worker_id
string worker_id = incoming[0].ReadString();
Expand All @@ -187,15 +187,15 @@ public static void LBBroker(IDictionary<string, string> dict, string[] args)
outgoing.Add(ZFrame.Create(string.Empty));
outgoing.Add(ZFrame.Create(reply));

// Send
// Send
frontend.SendMessage(outgoing);
}

if (--clients == 0)
{
// break the while (true) when all clients said Hello
break;
}
if (--clients == 0)
{
// break the while (true) when all clients said Hello
break;
}
}
}
if (worker_queue.Count > 0)
Expand All @@ -204,7 +204,7 @@ public static void LBBroker(IDictionary<string, string> dict, string[] args)

if (pollers[1].PollIn(out incoming, out error, TimeSpan.FromMilliseconds(64)))
{
// Here is how we handle a client request
// Here is how we handle a client request

// incoming[0] is client_id
string client_id = incoming[0].ReadString();
Expand All @@ -222,11 +222,11 @@ public static void LBBroker(IDictionary<string, string> dict, string[] args)
outgoing.Add(ZFrame.Create(string.Empty));
outgoing.Add(ZFrame.Create(requestText));

// Send
// Send
backend.SendMessage(outgoing);
}

// Dequeue the next worker identity
// Dequeue the next worker identity
worker_queue.RemoveAt(0);
}
}
Expand Down
17 changes: 10 additions & 7 deletions examples/C#/MSPoller.cs
Expand Up @@ -14,26 +14,29 @@ public static void MSPoller(IDictionary<string, string> dict, string[] args)
{
using (var context = ZContext.Create())
using (var receiver = ZSocket.Create(context, ZSocketType.PULL))
using (var subscriber = ZSocket.Create(context, ZSocketType.SUB)) {

using (var subscriber = ZSocket.Create(context, ZSocketType.SUB))
{
receiver.Connect("tcp://127.0.0.1:5557");

subscriber.Connect("tcp://127.0.0.1:5556");
subscriber.SetOption(ZSocketOption.SUBSCRIBE, "72622 ");

var pollers = new ZPollItem[] {
var pollers = new ZPollItem[]
{
ZPollItem.CreateReceiver(receiver),
ZPollItem.CreateReceiver(subscriber)
};

ZError error;
ZMessage msg;
while (true) {

if (pollers[0].PollIn(out msg, out error, TimeSpan.FromMilliseconds(64))) {
while (true)
{
if (pollers[0].PollIn(out msg, out error, TimeSpan.FromMilliseconds(64)))
{
// Process task
}
if (pollers[1].PollIn(out msg, out error, TimeSpan.FromMilliseconds(64))) {
if (pollers[1].PollIn(out msg, out error, TimeSpan.FromMilliseconds(64)))
{
// Process weather update
}
}
Expand Down
13 changes: 8 additions & 5 deletions examples/C#/MSReader.cs
Expand Up @@ -14,20 +14,23 @@ public static void MSReader(IDictionary<string, string> dict, string[] args)
{
using (var context = ZContext.Create())
using (var receiver = ZSocket.Create(context, ZSocketType.PULL))
using (var subscriber = ZSocket.Create(context, ZSocketType.SUB)) {

using (var subscriber = ZSocket.Create(context, ZSocketType.SUB))
{
receiver.Connect("tcp://127.0.0.1:5557");

subscriber.Connect("tcp://127.0.0.1:5556");
subscriber.SetOption(ZSocketOption.SUBSCRIBE, "72622 ");

ZError error;
ZFrame frame;
while (true) {
if (null != (frame = receiver.ReceiveFrame(ZSocketFlags.DontWait, out error))) {
while (true)
{
if (null != (frame = receiver.ReceiveFrame(ZSocketFlags.DontWait, out error)))
{
// Process task
}
if (null != (frame = subscriber.ReceiveFrame(ZSocketFlags.DontWait, out error))) {
if (null != (frame = subscriber.ReceiveFrame(ZSocketFlags.DontWait, out error)))
{
// Process weather update
}
Thread.Sleep(1);
Expand Down
23 changes: 13 additions & 10 deletions examples/C#/MTRelay.cs
Expand Up @@ -12,9 +12,9 @@ static partial class Program
{
public static void MTRelay(IDictionary<string, string> dict, string[] args)
{
using (var context = ZContext.Create())
using (var receiver = ZSocket.Create(context, ZSocketType.PAIR)) {

using (var context = ZContext.Create())
using (var receiver = ZSocket.Create(context, ZSocketType.PAIR))
{
receiver.Bind("inproc://step3");

var thread = new Thread(() => MTRelay_step2(context));
Expand All @@ -26,27 +26,30 @@ public static void MTRelay(IDictionary<string, string> dict, string[] args)
}
}

static void MTRelay_step2(ZContext context) {
using (var receiver = ZSocket.Create(context, ZSocketType.PAIR)) {

static void MTRelay_step2(ZContext context)
{
using (var receiver = ZSocket.Create(context, ZSocketType.PAIR))
{
receiver.Bind("inproc://step2");

var thread = new Thread(() => MTRelay_step1(context));
thread.Start();

receiver.ReceiveFrame();
}
using (var xmitter = ZSocket.Create(context, ZSocketType.PAIR)) {

using (var xmitter = ZSocket.Create(context, ZSocketType.PAIR))
{
xmitter.Connect("inproc://step3");

Console.WriteLine("Step 2 ready, signaling step 3");
xmitter.SendFrame(ZFrame.Create("READY"));
}
}

static void MTRelay_step1(ZContext context) {
using (var xmitter = ZSocket.Create(context, ZSocketType.PAIR)) {
static void MTRelay_step1(ZContext context)
{
using (var xmitter = ZSocket.Create(context, ZSocketType.PAIR))
{

xmitter.Connect("inproc://step2");

Expand Down

0 comments on commit aef1dd9

Please sign in to comment.