Distributed, asynchronous message passing system for Clozure Common Lisp.
Warning: Erlangen is immature, experimental software, subject to bugs and changes.
You can now start Clozure Common Lisp and load Erlangen:
(ql:quickload :erlangen) (use-package :erlangen)
Alternatively, you can build Erlangen as a stand-alone executable that boots
into a REPL that uses the packages
cd ~/quicklisp/local-projects/erlangen make bin/erlangen-kernel && bin/erlangen-kernel
Programming a Parallelized Map with Asynchronous Agents and Message Passing
Let’s jump straight into a practical example. The Erlangen repository contains
erlangen.example package which implements
parallel-map, a parallelized
map function. Its like Common Lisp’s
map over a single vector, only that it
spreads the work across multiple concurrent agents (fancy processes/native
parallel-map returns the results of calling a function on the elements in a
vector in a new vector. It uses up to level agents to parallelize the
workload. Optionally, a result-type can be specified as the element type of
the result vector. Here is roughly how
- it spawns some worker agents and attaches to them in monitor mode, so that it will receive an exit notification for each worker
- it sends each worker agent a message with a chunk of work
- it waits for and receives the exit notification of each worker, which contains the chunk’s result, and inserts them into the final result vector
The worker agents initially do nothing at all. They each just wait to receive a function to execute, and quit when they are done. Note that we use the Trivia pattern matching library to match received messages.
(defun worker () (ematch (receive) ((and (type function) function) (funcall function))))
Eventually, each worker will receive a
map-thunk closure, which maps the
mapping function over a chunk of the vector bounded by start and end.
(defun map-chunk (function vector start end result-type) (lambda () (let ((results (make-array (- end start) :element-type result-type))) (loop for i from start below end do (setf (aref results (- i start)) (funcall function (aref vector i)))) (values start end results))))
Now let’s look at
parallel-map. To distribute the work, it computes
- length—the length of our input vector
- n-chunks—the number of chunks we will divide the work up into
- chunk-size—the minimum length (in elements) of a chunk
and spawns a worker agent for each chunk.
(defun parallel-map (function vector &key (level 2) (result-type t)) (let* ((length (length vector)) (n-chunks (min level length)) (chunk-size (floor length n-chunks)) (workers (loop for i from 1 to n-chunks collect (spawn 'worker :attach :monitor))))
Next it sends each worker a closure for the chunk it should process. It divides the work into n-chunks intervals of at least chunk-size length, that fully cover the vector.
(loop for worker in workers for chunk from 1 for start from 0 by chunk-size for end = (if (< chunk n-chunks) (+ start chunk-size) length) do (send (map-chunk function vector start end result-type) worker))
Finally it allocates a vector to store the results in, and waits to receive
each chunk result. If any worker exits unexpectedly,
parallel-map exits with
that workers exit reason. Again, because we attached to the worker agents in
monitor mode, all remaining workers will also receive the exit signal and
(loop with results = (make-array length :element-type result-type) for worker in workers do (ematch (receive) ((list (type agent) :ok start end chunk-result) (replace results chunk-result :start1 start :end1 end)) ((list (type agent) :exit reason) (exit reason))) finally (return results))))
Now we can spawn
parallel-map agents like this
(spawn '(parallel-map 1+ #(2 4 6 8 10 12 14) :level 3) :attach :monitor) (receive) → (#<AGENT #x302002A191ED> :OK #(3 5 7 9 11 13 15))
What fun are agents if they aren’t distributed over a network? Erlangen comes with support for distribution via TCP/IP built in. Each instance of Erlangen can act as a node, and talk to other Erlangen nodes. In order to facilitate port discovery of of remote nodes, there needs to be a port mapper running on the host. To build and run the Erlangen port mapper, execute the commands
make bin/erlangen-port-mapper bin/erlangen-port-mapper localhost &
in a shell in the root of the Erlangen repository. Now build the Erlangen kernel in the same way to conveniently run additional Erlangen instances, and use it to start a node named map-node.
make bin/erlangen-kernel bin/erlangen-kernel -n -e '(node :host "localhost" :name "map-node")'
Hint: if you use Emacs, you can start a new Erlangen instance with Slime via
C-u M-x slime RET /path/to/erlangen-kernel.
Finally, we can also make our initial Erlangen instance a node, and offload some work to map-node:
(spawn '(node :host "localhost")) (spawn '(erlangen.examples:parallel-map 1+ #(2 4 6 8 10 12 14)) :attach :monitor :node "map-node") (receive) → ("localhost/map-node/0" :OK #(3 5 7 9 11 13 15))
What happened? We spawned an agent on the remote map-node instance to run
parallel-map, and received its exit notification transparently over the