Perl6 ZMQ binding
Perl 6 C Makefile
Switch branches/tags
Nothing to show
Latest commit 9f40bc9 Dec 17, 2017 @gabrielash gabrielash bugfix echoserver
Permalink
Failed to load latest commit information.
lib bugfix echoserver Dec 17, 2017
t bugfix echoserver Dec 17, 2017
xt meta Sep 4, 2017
.gitignore -Makefile Sep 4, 2017
.travis.yml Update .travis.yml Sep 15, 2017
LICENSE.md 0.0.1 Aug 27, 2017
META6.json bugfix echoserver Dec 17, 2017
README.md bugfix echoserver Dec 17, 2017

README.md

Net::ZMQ

SYNOPSIS

Net::ZMQ is a Perl6 binding library for ZeroMQ

Introduction

Version 0.1.2

Status

This is in development. The only certainty is that the tests pass on my machine.

Alternatives

There is an an earlier project on github: https://github.com/arnsholt/Net-ZMQ I started this one primarily to learn both Perl6 and ZMQ. The older project may be more stable and suitable to your needs. If you do boldly go and use this one, please share bugs and fixes!

ZMQ Versions

Current development is with ZeroMQ 4.2.

Portability

Development is on linux/x64. Due to some pointer voodoo, it is possible the code will break on other architectures/OSes. This should not be too hard to fix, but it depends on other people trying it on other platforms.

Example Code

use v6;
use Net::ZMQ::V4::Constants;
use Net::ZMQ::Context;
use Net::ZMQ::Socket;
use Net::ZMQ::Message;

my Context $ctx .= new :throw-everything;
my Socket $s1 .= new($ctx, :pair, :throw-everything);
my Socket $s2 .= new($ctx, :pair, :throw-everything);

my $endpoint = 'inproc://con';
$s1.bind($endpoint);
$s2.connect($endpoint);

my $counter = 0;
my $callme = sub ($d, $h) { say 'sending ++$counter'};

MsgBuilder.new\
      .add('a short envelope' )\
      .add( :newline )\
      .add( :empty )\
      .add('a very long story', :max-part-size(255), :newline )\
      .add('another long chunk à la française', :divide-into(3), :newline )\
      .add( :empty )\
      .finalize\
      .send($s1, :callback( $callme ));

my $message = $s2.receive( :slurp );
say $message;

$s1.unbind.close;
$s2.disconnect.close;

Documentation

Net::ZMQ::V4::Constants

holds all the constants from zmq.h v4. They are grouped with tags. The tags not loaded by default are

  • :EVENT
  • :DEPRECATED
  • :DRAFT Experimental, not in stable version
  • :RADIO
  • :IOPLEX multiplexing
  • :SECURITY

Net::ZMQ::V4::LowLevel

holds NativeCall bindings for all the functions in zmq.h most calls are machine generated and the only check is that they compile. constant ZMQ_LOW_LEVEL_FUNCTIONS_TESTED holds a list of the calls used and tested in the module so far. loading Net::ZMQ::V4::Version prints it

Net::ZMQ::V4::Version

use in order to chack version compatibility. It exports

  • verion()
  • version-major()

Net::ZMQ::Context, ::Socket, ::Message, ::Poll

These are the main classes providing a higher-level Perl6 OO interface to ZMQ

Context
         .new( :throw-everything(True))      # set to True to throw non fatal errors
         .terminate() 			                 # manually release all resources (gc would do that)
         .shutdown()			                   # close all sockets
         .get-option(name)                   # get Context option
         .set-option(name, value)	           # set Context option

          options can also be accessed through methods with the name of the option
          with/without get- and set- prefixes.
             e.g get: .get-io-threads()  .io-threads()
             set: .set-io-threads(2) .io-threads(2)
          Net::ZMQ::ContextOptions holds the dispatch table
Socket
Attributes
  context   - the zmq-context; must be supplied to new()
  type      - the ZMQ Socket Type constant: One of
    :pair :publisher :subscriber :client :server :dealer :router :pull :push :xpub :xsub :stream
    must be supplied to new()
  last-error - the last zmq error reported
  throw-everything  - when true, all non-fatal errors except EAGAIN (async) throw
  async-fail-throw  - when true, EAGAIN (async) throws; when false EAGAIN returns Any
  max-send-bytes    - largest single part send in bytes
  max-recv-number   - longest charcter string representing an integer number
                      in a single, integer message part
  max-recv-bytes    - bytes threshhold for truncating receive methods

Methods
Methods that do not return a useful value return self on success and Any on failure.
Send methods return the number of bytes sent or Any.

Socket Wrapper Methods
    close()
    bind( endpoint --> self )         ;endpoint must be a string with a valid zmq endpoint
    unbind( endpoint = last-endpoint  --> self )
    connect( endpoint  --> self )
    disconnect( endpoint = last-endpoint --> self )

Send Methods
      -part sends with SNDMORE flag (incomplete)
      -split causes input to be split and sent in message parts
      -async duh!
      all methods return the number of bytes sent or Any
    send( Str , :async, :part, :enc --> Int)
    send( Int, :async, :part -->Int )
    send( buf8, :async, :part, :max-send-bytes -->Int )
    send(Str, Int split-at :split! :async, :part. :enc  -->Int )
    send(buf8, Int split-at :split! :async, :part -->Int )
    send(buf8, Array splits, :part, :async, :callback, :max-send-bytes -->Int )
    send(:empty!, :async, :part -->Int )        

Receive Methods
      -bin causes return type to be a byte buffer (buf8) instead of a string
      -int retrieves a single integer message
      -slurp causes all waiting parts of a message to be aseembled and returned as single object
      -truncate truncates at a maximum byte length
      -async duh!
    receive(:truncate!, :async, :bin, :enc )
    receive(:int!, :async, :max-recv-number --> Int)
    receive(:slurp!, :async, :bin, :enc)
    receive(:async, :bin, :enc)

Options Methods
    there are option getters and setter for every socket option
    the list of options is in SocketOptions.pm
    every option name creates four legal invocations
      -setters
        option-name(new-value)
        set-option:$name(new-value)
      -getters
        option-name()
        get-option-name()
        e.g.
        * .get-identity()  .identity()
        * .set-identity(id) .identity(id)
    options can also be accessed explicitly with the ZMQ option Constant.
        valid Type Objects are Str, buf8 and Int
        get-option(Int opt-contant, Type-Object return-type, Int size )

Misc Methods
    doc(-->Str) ;this

The Message classes is an OO interface to the zero-copy mechanism. It uses a builder to build an immutable message that can be sent (and re-sent) zero-copied. See example above for useage.

MsgBuilder

builds a Message object that can be used to send complex messages. uses zero-copy internally.

  USAGE example

      my MsgBuilder $builder  .= new;
      my Message $msg =
        $builder.add($envelope)\
                .add(:empty)\
                .add($content-1, :max-part-size(1024) :newline)\
                .add($content-2, :max-part-size(1024) :newline)\
                .finalize;

      $msg.send($socket);


Methods
    new()
    add( Str, :max-part-size :divide-into, :newline, :enc --> self)
    add( :empty --> self)
    add( :newline --> self)
    finalize( --> Message)
Message

Immutable message

Methods
    send(Socket, :part, :async, Callable:($,$ --> Int:D) :callback  --> Int)                  
    send-all(@sockets, :part, :async, Callable:($,$ --> Int:D) :callback  --> Int)
    bytes( --> Int)          
    segments( --> Int)  
    copy(:enc --> Str)
MsgRecv

MsgRecv accumulates message parts received on one or more sockets with minimal copying. parts can be examined, slectevely sent over sockets. and transforming functions can be queued on each part.

methods:
    slurp(Socket, :async)
              accumulate waiting message parts from a socket
    push-transform(UInt, &func)
              queue a transfrm function ( for example, encoding). The function should
              conform to :(Str:D --> Str:D|Any ) or (Blob:D --> Str:D|Any 
            . Any effectively delete the part. 
    push-transform(&func)
              queue a global transfrm function 

    set-encoding( 'ENCODING' ) 
              a wrapper of push-transform

    send(Socket, $from = 0, $n = elems, :async ) sends all or ome of the parts

    [ UInt ] returns message part, transformed by all the transformers
    
    at-raw( UInt )  returns message part as raw bunary data
PollBuilder

PollBuilder builds a polled set of receiving sockets

(Silly) Usage
my $poll = PollBuilder.new\
  .add( StrPollHandler.new( $socket-1, sub ($m) { say "got --$m-- on  socket 1";} ))\
  .add( StrPollHandler.new( $socket-2, sub ($m) { say "got --$m-- on  socket 2";} ))\
  .add( $socket-3, { False })\
  .delay( 500 )\
  .finalize;

1 while $poll.poll(:serial);
say "Done!";


 Methods
  add( PollHandler --> self)
  add( Socket, Callable:(Socket) --> self)  
  delay( Int msecs --> self)                #   -1 => blocks, 0 => no delay
  finalize( --> Poll)
PollHandler

PollHandler is an an abstract class that represents an action to do on a socket when it polls positive. It has four readymade subclasses that feed the action a different

argument:
    * StrPollHandler
    * MessagePollHandler
    * SocketPollHandler
    * MsgRecvPollHandler

Methods
  new( Socket, Callable:(T) )
  do( Socket )  --  this method is called by the Poll object and can be subclassed
                    to create new types of responses
Poll

Poll holds and polls an immutable collection of receiving sockets

Methods
poll()   
  poll returns a sequence of the results of the callback functions associated with the succesfully
  polled sockets or an empty sequence. It throws on error.
poll(:serial)
  primarily fo testing: returns a single result, from the callback of the first succesfully polled
  socket, or Any. The order is defined by the building invocation.
Proxy

runs a steerable proxy

new( :frontend!($socket.as-ptr), :backend!($socket.as-ptr)
        :capture($socket.as-ptr) , :control($socket.as-ptr))
run()
EchoServer

runs an echo server (connect with :client )

methods
  new( :uri )
  run                           blocks on the invoking thread
  detach( --> Promise)          runs in a promise
  shutdown                      stops the server  (no restart possible)

LICENSE

All files (unless noted otherwise) can be used, modified and redistributed under the terms of the Artistic License Version 2. Examples (in the documentation, in tests or distributed as separate files) can be considered public domain.

ⓒ 2017 Gabriel Ash