Skip to content


Browse files Browse the repository at this point in the history
Switch from usocket to cl-async
  • Loading branch information
borodust committed Apr 19, 2017
1 parent 1ad1d04 commit e8a41e5
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 72 deletions.
8 changes: 5 additions & 3 deletions client/src/connector.lisp
Expand Up @@ -28,16 +28,18 @@
(loop while enabled-p
do (log-errors
(usocket:wait-for-input connection)
(let ((message (decode-message (connection-stream-of this))))
(let* ((stream (connection-stream-of this))
(message (decode-message stream)))
(if-let ((reply-id (getf message :reply-for)))
(with-instance-lock-held (this)
(if-let ((handler (gethash reply-id message-table)))
(remhash reply-id message-table)
(funcall handler message))
(log:error "Handler not found for message with id ~A" reply-id)))
(encode-message (process-command (getf message :command) message)
(connection-stream-of this)))))
(encode-message (process-command (getf message :command) message) stream)
(force-output stream)))))
finally (usocket:socket-close connection)))))

Expand Down
2 changes: 1 addition & 1 deletion mortar-combat.asd
Expand Up @@ -57,7 +57,7 @@
:author "Pavel Korolev"
:mailto ""
:license "GPLv3"
:depends-on (log4cl cl-muth usocket flexi-streams mortar-combat/common
:depends-on (log4cl cl-muth cl-async flexi-streams mortar-combat/common
cl-bodge/engine cl-bodge/utils ironclad uuid)
:serial t
:pathname "proxy/"
Expand Down
18 changes: 9 additions & 9 deletions proxy/peer.lisp
Expand Up @@ -15,11 +15,11 @@

(defun register-peer (registry connection name)
(with-slots (peer-table peer-by-id) registry
(with-hash-entries ((info connection)
(with-hash-entries ((peer-by-info connection)
(peer-by-name name))
(when info
(error "Peer was already registered for provided connection ~A" info))
(when peer-by-info
(error "Peer was already registered for provided connection ~A" peer-by-info))
(unless peer-by-name
(let* ((id (loop for id = (make-random-uuid)
while (gethash id peer-by-id)
Expand All @@ -28,8 +28,8 @@
:id id
:name name
:info-connection connection)))
(setf info peer
name peer
(setf peer-by-info peer
peer-by-name peer
(gethash id peer-by-id) peer)

Expand All @@ -46,7 +46,7 @@

(defun update-peer-proxy-connection (registry peer proxy-connection)
(with-slots (peer-table) registry
(with-slots ((peer-proxy proxy-connection)) peer
(remhash (proxy-connection-of peer) peer-table)
(setf peer-proxy proxy-connection
(gethash proxy-connection peer-table) peer))))
(remhash (proxy-connection-of peer) peer-table)
(setf (gethash proxy-connection peer-table) peer))
(with-slots ((peer-proxy proxy-connection) proxy-stream) peer
(setf peer-proxy proxy-connection)))
111 changes: 52 additions & 59 deletions proxy/proxy.lisp
Expand Up @@ -8,94 +8,87 @@

(define-constant +server-version+ 1)
(define-constant +routing-buffer-size+ (* 64 1024))
(define-constant +client-socket-timeout+ (* 5 60))

(defclass mortar-combat-proxy (enableable generic-system)
((proxy-socket :initform nil)
((proxy-server :initform nil)
(peer-registry :initform (make-instance 'peer-registry) :reader peer-registry-of)
(arena-registry :initform (make-instance 'arena-registry) :reader arena-registry-of)
(arenas :initform (make-hash-table :test #'equal) :reader arena-list-of)
(routing-buffer :initform (make-array +routing-buffer-size+
:element-type '(unsigned-byte 8)))
(info-socket :initform nil)))
(info-server :initform nil)))

(defun reply-to (message)
(process-command (getf message :command) message))

(defun process-request ()
;; fixme: record connection's last communication timestamp
;; to autoclose idle connections
(let ((stream (usocket:socket-stream *connection*)))
(when (listen stream)
;; fixme: make async: read available chunk, don't wait for more
(let ((message (decode-message stream)))
(when (listp message)
(encode-message (reply-to message) stream)
(force-output stream))))))
(defun process-request (stream)
(let ((message (decode-message stream)))
(when (listp message)
(encode-message (reply-to message) stream)
(force-output stream))))

(defun pour-stream (source-peer destination-peer)
(defun pour-stream (source-stream destination-stream)
(with-slots (routing-buffer) *system*
(when-let ((src-conn (proxy-connection-of source-peer))
(dst-conn (proxy-connection-of destination-peer)))
(let ((source-stream (usocket:socket-stream src-conn))
(destination-stream (usocket:socket-stream dst-conn)))
(when (listen source-stream)
;; no need to do full copy, hence no loop: let server do other work in between
(let ((bytes-read (read-sequence routing-buffer source-stream)))
(write-sequence routing-buffer destination-stream :end bytes-read)
(force-output destination-stream)))))))

(defun route-stream ()
(when-let ((arena (find-arena-by-peer (arena-registry-of *system*) *peer*)))
(let ((arena-server (server-of arena)))
(if (eq arena-server *peer*)
(loop for client in (clients-of arena)
do (pour-stream arena-server client))
(pour-stream *peer* arena-server)))))
(when (and source-stream destination-stream )
(loop with buf-len = (length routing-buffer)
for bytes-read = (read-sequence routing-buffer source-stream)
do (write-sequence routing-buffer destination-stream :end bytes-read)
while (= bytes-read buf-len))
(force-output destination-stream))))

(defun process-input ()
(if (and *peer* (eq (proxy-connection-of *peer*) *connection*))
(defun route-stream (stream)
(when-let ((arena (find-arena-by-peer (arena-registry-of *system*) *peer*)))
(let ((arena-server (server-of arena)))
(flet ((wrap-into-stream (peer)
;; fixme: find a way to avoid stream instantiating
(make-instance 'as:async-output-stream
:socket (proxy-connection-of peer))))
(if (eq arena-server *peer*)
(loop for client in (clients-of arena)
do (pour-stream stream (wrap-into-stream client)))
(pour-stream stream (wrap-into-stream arena-server)))))))

(defmethod initialize-system :after ((this mortar-combat-proxy))
(with-slots (proxy-socket info-socket peer-registry) this
(setf proxy-socket (usocket:socket-listen #(127 0 0 1) 8222
:element-type '(unsigned-byte 8))
info-socket (usocket:socket-listen #(127 0 0 1) 8778
:element-type '(unsigned-byte 8)))
(in-new-thread "socket-listener"
(let ((sockets (list proxy-socket info-socket))
(*system* this))
(flet ((%accept (passive-socket)
(push (usocket:socket-accept passive-socket) (cddr sockets))))
(loop while (enabledp this) do
(loop for rest-connections on (cdr (usocket:wait-for-input sockets))
for *connection* = (second rest-connections)
when (and *connection* (usocket:socket-state *connection*))
do (let ((*peer* (find-peer-by-property (peer-registry-of this) *connection*)))
(when (process-input)
(pop (cdr rest-connections)))))
((usocket:socket-state info-socket) (%accept info-socket))
((usocket:socket-state proxy-socket) (%accept proxy-socket))))))))))
(with-slots (proxy-server info-server peer-registry) this
(labels ((process-condition (e)
(log:error "Server event: ~A" e))
(on-accept (socket)
(declare (ignorable socket))
#++ (as:set-socket-timeouts socket +client-socket-timeout+ nil))
(process-input (socket stream)
(let* ((*system* this)
(*connection* socket)
(*peer* (find-peer-by-property (peer-registry-of this) *connection*)))
(if (and *peer* (eq (proxy-connection-of *peer*) *connection*))
(route-stream stream)
(process-request stream)))))
(make-server (host port)
(as:tcp-server host port #'process-input
:event-cb #'process-condition
:connect-cb #'on-accept
:stream t)))
(in-new-thread "connector-thread"
(as:with-event-loop ()
(setf proxy-server (make-server "" 8222)
info-server (make-server "" 8778)))))))

(defmethod make-system-context ((this mortar-combat-proxy))
(make-instance 'mortar-combat-context))

(defmethod discard-system :before ((this mortar-combat-proxy))
(with-slots (proxy-socket info-socket) this
(usocket:socket-close proxy-socket)
(usocket:socket-close info-socket)))
(with-slots (proxy-server info-server) this
(as:close-tcp-server proxy-server)
(as:close-tcp-server info-server)))

(define-system-function find-arena mortar-combat-proxy (name)
Expand Down

0 comments on commit e8a41e5

Please sign in to comment.