Skip to content

Commit

Permalink
WebSocket support.
Browse files Browse the repository at this point in the history
Draft WebSocket support.
  • Loading branch information
neotyk committed May 21, 2012
1 parent 391739a commit 6b066f1
Showing 1 changed file with 109 additions and 1 deletion.
110 changes: 109 additions & 1 deletion src/clj/http/async/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
(:import (java.io ByteArrayOutputStream)
(java.util.concurrent LinkedBlockingQueue)
(com.ning.http.client AsyncHttpClient AsyncHttpClientConfig$Builder)
(com.ning.http.client.websocket WebSocket
WebSocketUpgradeHandler$Builder
WebSocketListener
WebSocketByteListener
WebSocketTextListener
WebSocketCloseCodeReasonListener)
(com.ning.http.client.providers.netty NettyAsyncHttpProviderConfig)))

;; # Client Lifecycle
Expand Down Expand Up @@ -275,6 +281,108 @@
[resp]
(:raw-url resp))

;; websocket
(defprotocol IWebSocket
(-sendText [this text])
(-sendByte [this byte]))

(extend-protocol IWebSocket
WebSocket
(-sendText [ws text]
(.sendTextMessage ws text))
(-sendByte [ws byte]
(.sendMessage ws byte)))

(defn send
"Send message via WebSocket."
[ws & {text :text
byte :byte}]
(when (satisfies? IWebSocket ws)
(if text
(-sendText ws text)
(-sendByte ws byte))))

(defn create-ws-text-listener
"Creates WebSocket text listener."
[cb]
(reify
WebSocketTextListener
(^{:tag void} onMessage [_ #^String s]
(println "ws msg:" s)
(cb s))
(^{:tag void} onFragment [_ #^String s #^boolean last]
(println "ws frag:" s ", last:" last))

WebSocketListener
(^{:tag void} onOpen [_ #^WebSocket ws]
(println "ws open:" ws))
(^{:tag void} onClose [_ #^WebSocket ws]
(println "ws close:" ws))
(^{:tag void} onError [_ #^Throwable t]
(println "ws error:" t))


WebSocketCloseCodeReasonListener
(onClose [_ ws code reason]
(println "ws close:" ws ", code:" code ", reason:" reason))
))

(defn create-ws-byte-listener
"Creates WebSocket binary listner."
[cb]
(reify
WebSocketByteListener
(^{:tag void} onMessage [_ #^bytes b]
(println "ws msg:" (alength b))
(cb b))
(^{:tag void} onFragment [_ #^bytes b #^boolean last]
(println "ws frag:" (alength b) ", last:" last))

WebSocketListener
(^{:tag void} onOpen [_ #^WebSocket ws]
(println "ws open:" ws))
(^{:tag void} onClose [_ #^WebSocket ws]
(println "ws close:" ws))
(^{:tag void} onError [_ #^Throwable t]
(println "ws error:" t))


WebSocketCloseCodeReasonListener
(onClose [_ ws code reason]
(println "ws close:" ws ", code:" code ", reason:" reason))
))

(defn websocket
"Opens WebSocket connection."
{:tag WebSocket}
[client #^String url & {text-cb :text byte-cb :byte}]
(let [b (WebSocketUpgradeHandler$Builder.)]
(when text-cb (.addWebSocketListener b (create-ws-text-listener text-cb)))
(when byte-cb (.addWebSocketListener b (create-ws-byte-listener byte-cb)))
(.get (.executeRequest client (prepare-request :get url) (.build b)))))

;; closing
(defprotocol IClosable
(-close [this])
(-open? [this]))

(extend-protocol IClosable
AsyncHttpClient
(-close [client] (.close client))
(-open? [client] (not (.isClosed client)))

WebSocket
(-close [soc] (.close soc))
(-open? [soc] (.isOpen soc)))

(defn close
"Closes client."
([client] (.close client)))
[client]
(when (satisfies? IClosable client)
(-close client)))

(defn open?
"Checks if client is open."
[client]
(when (satisfies? IClosable client)
(-open? client)))

0 comments on commit 6b066f1

Please sign in to comment.