Permalink
Browse files

Adding some python and erlang examples

  • Loading branch information...
1 parent b91356c commit 54f49e9a4d3609bdb31b95cf2c9420ddfd95167c @ianbarber committed Jun 8, 2011
Showing with 154 additions and 0 deletions.
  1. +17 −0 log/loglocal.py
  2. +16 −0 log/logserv.erl
  3. +36 −0 queue/queue.erl
  4. +10 −0 repeater/listener.php
  5. +29 −0 repeater/repeater.erl
  6. +15 −0 reqrep/erlrep.erl
  7. +16 −0 statusupdate/eventhub.erl
  8. +15 −0 statusupdate/eventhub.py
View
17 log/loglocal.py
@@ -0,0 +1,17 @@
+import zmq
+bufSz = 3; ctx = zmq.Context()
+inp = ctx.socket(zmq.PULL)
+out = ctx.socket(zmq.PUSH)
+inp.bind("ipc:///tmp/logger")
+out.connect("tcp://localhost:5555")
+msgs = []
+
+while True:
+ msg = inp.recv()
+ print "Received Log"
+ msgs.append(msg)
+ if(len(msgs) == bufSz):
+ print "Forwarding Buffer"
+ for i, msg in enumerate(msgs):
+ out.send(msg, (0, zmq.SNDMORE)[i < bufSz-1])
+ msgs = []
View
16 log/logserv.erl
@@ -0,0 +1,16 @@
+-module(logserv).
+-export([run/0]).
+
+run() ->
+ {ok, Ctx} = erlzmq:context(),
+ {ok, In} = erlzmq:socket(Ctx, [pull, {active,true}]),
+ ok = erlzmq:bind(In, "tcp://*:5555"),
+ loop(In).
+
+loop(In) ->
+ receive
+ {zmq, In, Msg} ->
+ {ok,{obj, [{_, Time}, {_, Log}] }, [] } = rfc4627:decode(Msg),
+ io:format("~B ~s~n", [Time, Log]),
+ loop(In)
+ end.
View
36 queue/queue.erl
@@ -0,0 +1,36 @@
+-module(queue).
+-export([run/0]).
+
+run() ->
+ {ok, Ctx} = erlzmq:context(),
+ {ok, Front} = erlzmq:socket(Ctx, [xrep, {active,true}]),
+ {ok, Back} = erlzmq:socket(Ctx, [xreq, {active,true}]),
+ ok = erlzmq:bind(Front, "tcp://*:5454"),
+ ok = erlzmq:bind(Back, "tcp://*:5455"),
+
+ loop(Front, Back),
+
+ ok = erlzmq:close(Front),
+ ok = erlzmq:close(Back),
+ ok = erlzmq:term(Ctx).
+
+% using my fork of the bindings here, as multipart support in
+% erlzmq2 works slightly differently in master
+loop(Front, Back) ->
+ receive
+ {zmq, Front, Msg} ->
+ io:format("Sending Back: ~p~n",[Msg]),
+ sendall(Back, Msg),
+ loop(Front, Back);
+ {zmq, Back, Msg} ->
+ io:format("Sending Front: ~p~n",[Msg]),
+ sendall(Front, Msg),
+ loop(Front, Back)
+ end.
+
+sendall(To, [Part|[]]) ->
+ erlzmq:send(To, Part);
+
+sendall(To, [Part|Msg]) ->
+ erlzmq:send(To, Part, [sndmore]),
+ sendall(To, Msg).
View
10 repeater/listener.php
@@ -0,0 +1,10 @@
+<?php
+
+$ctx = new ZMQContext();
+$sock = $ctx->getSocket(ZMQ::SOCKET_SUB);
+$sock->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "");
+$sock->connect("tcp://localhost:5656");
+
+while(true) {
+ var_dump(json_decode($sock->recv()));
+}
View
29 repeater/repeater.erl
@@ -0,0 +1,29 @@
+-module(repeater).
+-export([init/1, terminate/1, handle_event/2]).
+-export([run/0]).
+
+init(_) ->
+ spawn_link(?MODULE, run, []).
+
+terminate(Pid) ->
+ Pid ! {cmd, kill}.
+
+handle_event(Event, Pid) ->
+ Pid ! Event.
+
+run() ->
+ {ok, Ctx} = erlzmq:context(),
+ {ok, Sock} = erlzmq:socket(Ctx, [pub]),
+ ok = erlzmq:bind(Sock, "tcp://*:5656"),
+ loop(Sock).
+
+loop(Sock) ->
+ receive
+ {cmd, kill} ->
+ ok;
+ {Action, Id, Event} ->
+ Json = rfc4627:encode({obj, [{action, Action}, {id, Id}, {event, Event}]}),
+ erlzmq:send(Sock, list_to_binary(Json)),
+ loop(Sock)
+ end.
+
View
15 reqrep/erlrep.erl
@@ -0,0 +1,15 @@
+-module(erlrep).
+-export[run/0].
+
+run() ->
+ {ok, Context} = erlzmq:context(),
+ {ok, Socket} = erlzmq:socket(Context, rep),
+ ok = erlzmq:bind(Socket, "tcp://*:5454"),
+ loop(Socket).
+
+loop(Socket) ->
+ {ok, Msg} = erlzmq:recv(Socket),
+ Reply = binary_to_list(Msg) ++ " World",
+ io:format("Sending ~s~n", [Reply]),
+ ok = erlzmq:send(Socket, list_to_binary(Reply)),
+ loop(Socket).
View
16 statusupdate/eventhub.erl
@@ -0,0 +1,16 @@
+-module(eventhub).
+-export([run/0]).
+
+run() ->
+ {ok, Ctx} = erlzmq:context(),
+ {ok, Out} = erlzmq:socket(Ctx, [pub]),
+ ok = erlzmq:connect(Out, "epgm://eth0;239.192.0.1:7601"),
+ erlzmq:setsockopt(Out, [{rate, 10000}]),
+ {ok, In} = erlzmq:socket(Ctx, [pull]),
+ ok = erlzmq:bind(In, "tcp://*:6767"),
+ loop(In, Out).
+
+loop(In, Out) ->
+ {ok, Msg} = erlzmq:recv(In),
+ ok = erlzmq:send(Out, Msg),
+ loop(In, Out).
View
15 statusupdate/eventhub.py
@@ -0,0 +1,15 @@
+import time
+import zmq
+from zmq.devices.basedevice import ProcessDevice
+
+pd = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUB)
+pd.bind_in("tcp://*:6767")
+pd.connect_out("epgm://eth0;239.192.0.1:7601")
+pd.setsockopt_out(zmq.RATE, 10000)
+pd.start()
+
+# Do other things here
+
+# This is just to pretend do some foreground work.
+while True:
+ time.sleep(100)

0 comments on commit 54f49e9

Please sign in to comment.