Skip to content
9 changes: 9 additions & 0 deletions .idea/libraries/Leiningen__org_clojure_data_json_0_2_6.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 55 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,34 +1,39 @@
# WebSocket Server

This is the server side of the websocket connection. This is meant to
be paired with [fentontravers/websocket-client](https://github.com/ftravers/websocket-client).
be paired with [fentontravers/websocket-client](https://github.com/fentontravers/websocket-client).

# Clojars

![](https://clojars.org/fentontravers/websocket-server/latest-version.svg)
<a href="https://clojars.org/juleffel/websocket-server/" target="_blank">![Foo](https://clojars.org/juleffel/websocket-server/latest-version.svg)</a>

# Usage

```clojure
(require '[websocket-server.core :refer [start-ws-server]])

;; After we start the server a function is returned
;; that we use for stopping the server.
(defonce ws-server (atom nil))
(require '[websocket-server.core :refer :all])

(defn request-handler-upcase-string
"The function that will take incoming data off the websocket,
process it and return a reponse. In our case we'll simply UPPERCASE
whatever is received."
[data] (clojure.string/upper-case (str data)))

; Always call this module functions with a port, unless you want to apply it to every server you opened on all ports.
(def port 8899)

(defn start
"Demonstrate how to use the websocket server library."
[]
(let [port 8899]
(reset! ws-server (start-ws-server port request-handler-upcase-string))))
(start-ws-server port :on-receive request-handler-upcase-string))

(defn send-all!
[data]
(send-all! port data))

(defn stop "Stop websocket server" [] (@ws-server))
(defn stop
"Stop websocket server"
[]
(stop-ws-server port))
```

Here is another example that expects EDN in the form of a map that
Expand All @@ -46,3 +51,42 @@ back.
(hash-map :count)
str))
```

# Multiple servers usage

```clojure
(start-ws-server 8000 :on-receive request-handler-1)
(start-ws-server 8001 :on-receive request-handler-2)
(start-ws-server 8002 :on-receive request-handler-3)

; Send "Hello" to all channels opened to websocket on port 8000
(send-all! 8000 "Hello")

(stop-ws-server 8002)

; Send "Hi!" to all channels opened to websocket on port 8000 or 8001
(send-all! "Hello")

(stop-all-ws-servers)
```

# Use transformer functions when receiving and sending data

```clojure
(require '[clojure.data.json :as json])

(defn request-handler-json
[[action data]]
[action
(case action
"upcase" (request-handler-upcase-string data)
data)])

; json/read-str will be applied before sending data to request-json-handler
; json/write-str will be applied before sending data from request-json-handler
; back on the websocket, or when using send-all!
(start-ws-server 8000
:on-receive request-handler-json
:in-fn json/read-str
:out-fn json/write-str)
```
3 changes: 1 addition & 2 deletions dev/user.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
(ns user
(:require [clojure.tools.namespace.repl :refer [refresh]]
[clojure.repl :refer [doc source]]
(:require [clojure.repl :refer [doc source]]
[clojure.pprint :refer [pprint pp]]
[clojure.stacktrace :as st]))
9 changes: 6 additions & 3 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
(defproject fentontravers/websocket-server "0.4.12-SNAPSHOT"
(defproject juleffel/websocket-server "0.4.13"
:description "WebSocket Server Library"
:url "https://github.com/ftravers/websocket-server"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.9.0-alpha16"]
[http-kit "2.2.0"]
[com.taoensso/timbre "4.8.0"]
[org.clojure/core.async "0.2.395"]]
[org.clojure/core.async "0.2.395"]
[org.clojure/data.json "0.2.6"]]
:target-path "target/%s"

:profiles {:dev {:source-paths ["dev" "src"]}
:profiles {:dev {:source-paths ["dev" "src"]
:dependencies [[stylefruits/gniazdo "1.0.1"]
[org.clojure/core.async "0.4.474"]]}
:uberjar {:aot :all}})
106 changes: 89 additions & 17 deletions src/websocket_server/core.clj
Original file line number Diff line number Diff line change
@@ -1,22 +1,94 @@
(ns websocket-server.core
(:require [org.httpkit.server :as http]
[taoensso.timbre :as log]))
[taoensso.timbre :as log]
[clojure.data.json :as json]))

(defn websocket-server [cb req]
(http/with-channel req channel
(http/on-close
channel
(fn [status] (log/debug (str "Websocket channel closed with status: " status))))
(http/on-receive
channel
(fn [data]
(if (http/websocket? channel)
(let [resp (cb data)]
(log/debug "RECV: " data)
(log/debug "RESP: " resp)
(http/send! channel resp)))))))
; In case we want to start multiple servers, we will keep them as port -> server
(defonce channel-hub (atom {}))
(defonce servers (atom {}))

(defn start-ws-server [port callback]
(http/run-server (partial websocket-server callback)
{:port port}))
(defn websocket-server [port on-open on-receive on-close req]
(let [{:keys [in-fn out-fn]} (@servers port)]
(http/with-channel req channel
(swap! channel-hub assoc-in [port channel] req)
(if on-open (on-open channel req))
(http/on-close
channel
(fn [status]
(log/debug (str "Websocket channel closed with status: " status))
(if on-close (on-close status))
(swap! channel-hub update port dissoc channel)))
(http/on-receive
channel
(fn [data]
(if (and on-receive (http/websocket? channel))
(let [resp (on-receive (in-fn data))]
(log/debug "RECV: " data)
(log/debug "RESP: " resp)
(http/send! channel (out-fn resp)))))))))

(defn send-all!
([data]
; Warning: When calling this function without port, you are sending data on every websocket server opened, and every channel
(doseq [port (keys @channel-hub)]
(send-all! port data)))
([port data]
(doseq [channel (keys (@channel-hub port))]
(http/send! channel ((get-in @servers [port :out-fn]) data)))))

(defn stop-ws-server [port]
(when (@servers port)
;; graceful shutdown: wait 100ms for existing requests to be finished
;; :timeout is optional, when no timeout, stop immediately
((get-in @servers [port :server]) :timeout 100)
(swap! servers dissoc port)))

(defn start-ws-server
[port & {:keys [on-open on-receive on-close in-fn out-fn]
:or {in-fn identity, out-fn identity}}]
(if (@servers port)
(stop-ws-server port))
(let [server
(http/run-server (partial websocket-server port on-open on-receive on-close)
{:port port})]
(swap! servers assoc port
{:server server
:in-fn in-fn
:out-fn out-fn})
server))

(defn stop-all-ws-servers []
(doseq [port (keys @servers)]
(stop-ws-server port)))

(comment
(def port 8899)
(defn handle [data]
(println "Message handled:" data)
data)
; Example to work with https://github.com/ftravers/transit-websocket-client/
(start-ws-server
port
:on-receive
(fn [[action data]]
[action (handle data)])
:in-fn json/read-str
:out-fn json/write-str)
(send-all! port ["~#'" (str [[:back-msg] "Message from backend"])])
; Example to work with https://github.com/ftravers/reframe-websocket/
(start-ws-server
port
:on-receive
(fn [[store-path data]]
[store-path (handle data)])
:in-fn
(fn [s]
(let [[_ rf-msg] (json/read-str s)]
(read-string rf-msg)))
:out-fn
(fn [msg]
(json/write-str
["~#'" (str msg)])))
(send-all! port [[:back-msg] "Message from backend"])
(send-all! port [[:back-msg] {:map 134 :text "EDN from backend"}])
(stop-ws-server port))
79 changes: 70 additions & 9 deletions test/websocket_server/core_test.clj
Original file line number Diff line number Diff line change
@@ -1,19 +1,80 @@
(ns websocket-server.core-test
(:require [websocket-server.core :refer [start-ws-server send!]]
[clojure.edn :as edn]))
(:require
[clojure.test :refer :all]
[clojure.core.async :refer [chan <!! >!!]]
[websocket-server.core :refer :all]
[clojure.edn :as edn]
[gniazdo.core :as ws]))

(defonce ws-server (atom nil))
(defn request-handler-upcase [data]
(clojure.string/upper-case (str data)))

(defn request-handler-upcase [channel data]
(send! channel (clojure.string/upper-case (str data))))
(def port 7890)

(defn start []
(defn start
"Demonstrate how to use the websocket server library."
(let [port 7890]
(reset! ws-server (start-ws-server port request-handler-upcase))))
[]
(start-ws-server port :on-receive request-handler-upcase))

(defn stop "Stop websocket server" [] (@ws-server))
(defn stop
"Stop websocket server"
[]
(stop-ws-server port))

(defn restart [] (stop) (start))

(deftest send-msg-and-check-resp
(start-ws-server port :on-receive request-handler-upcase)
(let [ch (chan)
client-ws
(ws/connect
(str "ws://localhost:" port)
:on-receive #(>!! ch %))]
(is (some? client-ws))
(ws/send-msg client-ws "Hello")
(is (= "HELLO" (<!! ch)))
(ws/close client-ws))
(stop-ws-server port))

(deftest send-all-test
(start-ws-server port :on-receive #(throw (Exception. "Shouldn't be called as we are initiating messages on the server side")))
(let [ch (chan)
n 3
clients-ws
(doall
(for [i (range n)]
(ws/connect
(str "ws://localhost:" port)
:on-receive #(>!! ch [i %]))))]
(send-all! port "Test")
(let [resps (set (repeatedly n #(<!! ch)))]
(is (= resps
(set (for [i (range n)] [i "Test"]))))
(doseq [client-ws clients-ws]
(ws/close client-ws))))
(stop-ws-server port))


(deftest multiple-servers-test
(let [n 3]
(doseq [i (range 3)]
(start-ws-server (+ 8000 i) :on-receive #(throw (Exception. "Shouldn't be called as we are initiating messages on the server side"))))
(let [ch (chan)
clients-ws
(doall
(for [i (range n)]
(ws/connect
(str "ws://localhost:" (+ 8000 i))
:on-receive #(>!! ch [i %]))))]
(ws/close (nth clients-ws 2))
(stop-ws-server 8002)
(send-all! "Test1")
(let [resps (set (repeatedly (dec n) #(<!! ch)))]
(is (= resps
(set (for [i (range (dec n))] [i "Test1"])))))
(send-all! 8000 "Test2")
(let [resp (<!! ch)]
(is (= resp [0 "Test2"])))
(ws/close (nth clients-ws 0))
(ws/close (nth clients-ws 1))
(stop-all-ws-servers))))