Browse files

delphi lazy and simple pirate examples

  • Loading branch information...
1 parent 5e0e9af commit dfc0434de952fb01acd1da4db140098d238e8ea4 @bvarga bvarga committed Jan 17, 2013
Showing with 302 additions and 0 deletions.
  1. +99 −0 examples/Delphi/lpclient.dpr
  2. +53 −0 examples/Delphi/lpserver.dpr
  3. +84 −0 examples/Delphi/spqueue.dpr
  4. +66 −0 examples/Delphi/spworker.dpr
View
99 examples/Delphi/lpclient.dpr
@@ -0,0 +1,99 @@
+program lpclient;
+//
+// Lazy Pirate client
+// Use zmq_poll to do a safe request-reply
+// To run, start lpserver and then randomly kill/restart it
+// @author Varga Balázs <bb.varga@gmail.com>
+//
+{$APPTYPE CONSOLE}
+
+uses
+ SysUtils
+ , zmqapi
+ ;
+
+const
+ REQUEST_TIMEOUT = 2500; // msecs, (> 1000!)
+ REQUEST_RETRIES = 3; // Before we abandon
+ SERVER_ENDPOINT = 'tcp://localhost:5555';
+
+var
+ ctx: TZMQContext;
+ client: TZMQSocket;
+ sequence,
+ retries_left,
+ expect_reply: Integer;
+ request,
+ reply: Utf8String;
+ poller: TZMQPoller;
+begin
+ ctx := TZMQContext.create;
+ Writeln( 'I: connecting to server...' );
+ client := ctx.Socket( stReq );
+ client.Linger := 0;
+ client.connect( SERVER_ENDPOINT );
+
+ poller := TZMQPoller.Create( true );
+ poller.Register( client, [pePollIn] );
+
+ sequence := 0;
+ retries_left := REQUEST_RETRIES;
+ while ( retries_left > 0 ) and not ctx.Terminated do
+ try
+ // We send a request, then we work to get a reply
+ inc( sequence );
+ request := Format( '%d', [sequence] );
+ client.send( request );
+
+ expect_reply := 1;
+ while ( expect_reply > 0 ) do
+ begin
+ // Poll socket for a reply, with timeout
+ poller.poll( REQUEST_TIMEOUT );
+
+ // Here we process a server reply and exit our loop if the
+ // reply is valid. If we didn't a reply we close the client
+ // socket and resend the request. We try a number of times
+ // before finally abandoning:
+
+ if pePollIn in poller.PollItem[0].revents then
+ begin
+ // We got a reply from the server, must match sequence
+ client.recv( reply );
+ if StrToInt( reply ) = sequence then
+ begin
+ Writeln( Format( 'I: server replied OK (%s)', [reply] ) );
+ retries_left := REQUEST_RETRIES;
+ expect_reply := 0;
+ end else
+ Writeln( Format( 'E: malformed reply from server: %s', [ reply ] ) );
+
+ end else
+ begin
+ dec( retries_left );
+
+ if retries_left = 0 then
+ begin
+ Writeln( 'E: server seems to be offline, abandoning' );
+ break;
+ end else
+ begin
+ Writeln( 'W: no response from server, retrying...' );
+ // Old socket is confused; close it and open a new one
+ poller.Deregister( client, [pePollIn] );
+ client.Free;
+ Writeln( 'I: reconnecting to server...' );
+ client := ctx.Socket( stReq );
+ client.Linger := 0;
+ client.connect( SERVER_ENDPOINT );
+ poller.Register( client, [pePollIn] );
+ // Send request again, on new socket
+ client.send( request );
+ end;
+ end;
+ end;
+ except
+ end;
+ poller.Free;
+ ctx.Free;
+end.
View
53 examples/Delphi/lpserver.dpr
@@ -0,0 +1,53 @@
+program lpserver;
+//
+// Lazy Pirate server
+// Binds REQ socket to tcp://*:5555
+// Like hwserver except:
+// - echoes request as-is
+// - randomly runs slowly, or exits to simulate a crash.
+// @author Varga Balázs <bb.varga@gmail.com>
+//
+{$APPTYPE CONSOLE}
+
+uses
+ SysUtils
+ , zmqapi
+ ;
+
+var
+ context: TZMQContext;
+ server: TZMQSocket;
+ cycles: Integer;
+ request: Utf8String;
+begin
+ Randomize;
+
+ context := TZMQContext.create;
+ server := context.socket( stRep );
+ server.bind( 'tcp://*:5555' );
+
+ cycles := 0;
+ while not context.Terminated do
+ try
+ server.recv( request );
+ inc( cycles );
+
+ // Simulate various problems, after a few cycles
+ if ( cycles > 3 ) and ( random(3) = 0) then
+ begin
+ Writeln( 'I: simulating a crash' );
+ break;
+ end else
+ if ( cycles > 3 ) and ( random(3) = 0 ) then
+ begin
+ Writeln( 'I: simulating CPU overload' );
+ sleep(2000);
+ end;
+
+ Writeln( Format( 'I: normal request (%s)', [request] ) );
+ sleep (1000); // Do some heavy work
+ server.send( request );
+ except
+ end;
+ context.Free;
+end.
View
84 examples/Delphi/spqueue.dpr
@@ -0,0 +1,84 @@
+program spqueue;
+//
+// Simple Pirate broker
+// This is identical to load-balancing pattern, with no reliability
+// mechanisms. It depends on the client for recovery. Runs forever.
+// @author Varga Balázs <bb.varga@gmail.com>
+//
+{$APPTYPE CONSOLE}
+
+uses
+ SysUtils
+ , zmqapi
+ ;
+
+const
+ WORKER_READY = '\001'; // Signals worker is ready
+
+var
+ ctx: TZMQContext;
+ frontend,
+ backend: TZMQSocket;
+ workers: TZMQMsg;
+ poller: TZMQPoller;
+ pc: Integer;
+ msg: TZMQMsg;
+ identity,
+ frame: TZMQFrame;
+begin
+ ctx := TZMQContext.create;
+ frontend := ctx.Socket( stRouter );
+ backend := ctx.Socket( stRouter );
+ frontend.bind( 'tcp://*:5555' ); // For clients
+ backend.bind( 'tcp://*:5556' ); // For workers
+
+ // Queue of available workers
+ workers := TZMQMsg.create;
+
+ poller := TZMQPoller.Create( true );
+ poller.Register( backend, [pePollIn] );
+ poller.Register( frontend, [pePollIn] );
+
+
+ // The body of this example is exactly the same as lbbroker2.
+ while not ctx.Terminated do
+ try
+ // Poll frontend only if we have available workers
+ if workers.size > 0 then
+ pc := 2
+ else
+ pc := 1;
+ poller.poll( 1000, pc );
+
+ // Handle worker activity on backend
+ if pePollIn in poller.PollItem[0].revents then
+ begin
+ // Use worker identity for load-balancing
+ backend.recv( msg );
+ identity := msg.unwrap;
+ workers.add( identity );
+
+ // Forward message to client if it's not a READY
+ frame := msg.first;
+ if frame.asUtf8String = WORKER_READY then
+ begin
+ msg.Free;
+ msg := nil;
+ end else
+ frontend.send( msg );
+ end;
+
+ if pePollIn in poller.PollItem[1].revents then
+ begin
+ // Get client request, route to first available worker
+ frontend.recv( msg );
+ msg.wrap( workers.pop );
+ backend.send( msg );
+ end;
+ except
+ end;
+
+ workers.Free;
+ ctx.Free;
+end.
+
View
66 examples/Delphi/spworker.dpr
@@ -0,0 +1,66 @@
+program spworker;
+//
+// Simple Pirate worker
+// Connects REQ socket to tcp://*:5556
+// Implements worker part of load-balancing
+// @author Varga Balázs <bb.varga@gmail.com>
+//
+{$APPTYPE CONSOLE}
+
+uses
+ SysUtils
+ , zmqapi
+ , zhelpers
+ ;
+
+const
+ WORKER_READY = '\001'; // Signals worker is ready
+
+var
+ ctx: TZMQContext;
+ worker: TZMQSocket;
+ identity: String;
+ frame: TZMQFrame;
+ cycles: Integer;
+ msg: TZMQMsg;
+begin
+ ctx := TZMQContext.create;
+ worker := ctx.Socket( stReq );
+
+ // Set random identity to make tracing easier
+ identity := s_random( 8 );
+ worker.Identity := identity;
+ worker.connect( 'tcp://localhost:5556' );
+
+ // Tell broker we're ready for work
+ Writeln( Format( 'I: (%s) worker ready', [identity] ) );
+ frame := TZMQFrame.create;
+ frame.asUtf8String := WORKER_READY;
+ worker.send( frame );
+ cycles := 0;
+ while not ctx.Terminated do
+ try
+ worker.recv( msg );
+
+ // Simulate various problems, after a few cycles
+ Inc( cycles );
+
+ if ((cycles > 3) and (random(5) = 0)) then
+ begin
+ Writeln( Format( 'I: (%s) simulating a crash', [identity] ) );
+ msg.Free;
+ msg := nil;
+ break;
+ end else
+ if ( (cycles > 3) and (random(5) = 0) ) then
+ begin
+ Writeln( Format( 'I: (%s) simulating CPU overload', [identity] ));
+ sleep (3000);
+ end;
+ Writeln( Format('I: (%s) normal reply', [identity]) );
+ sleep(1000); // Do some heavy work
+ worker.send( msg );
+ except
+ end;
+ ctx.Free;
+end.

0 comments on commit dfc0434

Please sign in to comment.