diff --git a/.idea/libraries/Leiningen__org_clojure_data_json_0_2_6.xml b/.idea/libraries/Leiningen__org_clojure_data_json_0_2_6.xml new file mode 100644 index 0000000..bf1254f --- /dev/null +++ b/.idea/libraries/Leiningen__org_clojure_data_json_0_2_6.xml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/README.md b/README.md index 9040dad..7e77bc2 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,16 @@ # WebSocket Server This is the server side of the websocket connection. This is meant to -be paired with [fentontravers/websocket-client](https://github.com/ftravers/websocket-client). +be paired with [fentontravers/websocket-client](https://github.com/fentontravers/websocket-client). # Clojars -![](https://clojars.org/fentontravers/websocket-server/latest-version.svg) - +![Foo](https://clojars.org/juleffel/websocket-server/latest-version.svg) + # Usage ```clojure -(require '[websocket-server.core :refer [start-ws-server]]) - -;; After we start the server a function is returned -;; that we use for stopping the server. -(defonce ws-server (atom nil)) +(require '[websocket-server.core :refer :all]) (defn request-handler-upcase-string "The function that will take incoming data off the websocket, @@ -22,13 +18,22 @@ be paired with [fentontravers/websocket-client](https://github.com/ftravers/webs whatever is received." [data] (clojure.string/upper-case (str data))) +; Always call this module functions with a port, unless you want to apply it to every server you opened on all ports. +(def port 8899) + (defn start "Demonstrate how to use the websocket server library." [] - (let [port 8899] - (reset! ws-server (start-ws-server port request-handler-upcase-string)))) + (start-ws-server port :on-receive request-handler-upcase-string)) + +(defn send-all! + [data] + (send-all! port data)) -(defn stop "Stop websocket server" [] (@ws-server)) +(defn stop + "Stop websocket server" + [] + (stop-ws-server port)) ``` Here is another example that expects EDN in the form of a map that @@ -46,3 +51,42 @@ back. (hash-map :count) str)) ``` + +# Multiple servers usage + +```clojure +(start-ws-server 8000 :on-receive request-handler-1) +(start-ws-server 8001 :on-receive request-handler-2) +(start-ws-server 8002 :on-receive request-handler-3) + +; Send "Hello" to all channels opened to websocket on port 8000 +(send-all! 8000 "Hello") + +(stop-ws-server 8002) + +; Send "Hi!" to all channels opened to websocket on port 8000 or 8001 +(send-all! "Hello") + +(stop-all-ws-servers) +``` + +# Use transformer functions when receiving and sending data + +```clojure +(require '[clojure.data.json :as json]) + +(defn request-handler-json + [[action data]] + [action + (case action + "upcase" (request-handler-upcase-string data) + data)]) + +; json/read-str will be applied before sending data to request-json-handler +; json/write-str will be applied before sending data from request-json-handler +; back on the websocket, or when using send-all! +(start-ws-server 8000 + :on-receive request-handler-json + :in-fn json/read-str + :out-fn json/write-str) +``` diff --git a/dev/user.clj b/dev/user.clj index dab1364..063436b 100644 --- a/dev/user.clj +++ b/dev/user.clj @@ -1,5 +1,4 @@ (ns user - (:require [clojure.tools.namespace.repl :refer [refresh]] - [clojure.repl :refer [doc source]] + (:require [clojure.repl :refer [doc source]] [clojure.pprint :refer [pprint pp]] [clojure.stacktrace :as st])) diff --git a/project.clj b/project.clj index 4c1e463..18d71e4 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject fentontravers/websocket-server "0.4.12-SNAPSHOT" +(defproject juleffel/websocket-server "0.4.13" :description "WebSocket Server Library" :url "https://github.com/ftravers/websocket-server" :license {:name "Eclipse Public License" @@ -6,8 +6,11 @@ :dependencies [[org.clojure/clojure "1.9.0-alpha16"] [http-kit "2.2.0"] [com.taoensso/timbre "4.8.0"] - [org.clojure/core.async "0.2.395"]] + [org.clojure/core.async "0.2.395"] + [org.clojure/data.json "0.2.6"]] :target-path "target/%s" - :profiles {:dev {:source-paths ["dev" "src"]} + :profiles {:dev {:source-paths ["dev" "src"] + :dependencies [[stylefruits/gniazdo "1.0.1"] + [org.clojure/core.async "0.4.474"]]} :uberjar {:aot :all}}) diff --git a/src/websocket_server/core.clj b/src/websocket_server/core.clj index 69a6787..9dd7465 100644 --- a/src/websocket_server/core.clj +++ b/src/websocket_server/core.clj @@ -1,22 +1,94 @@ (ns websocket-server.core (:require [org.httpkit.server :as http] - [taoensso.timbre :as log])) + [taoensso.timbre :as log] + [clojure.data.json :as json])) -(defn websocket-server [cb req] - (http/with-channel req channel - (http/on-close - channel - (fn [status] (log/debug (str "Websocket channel closed with status: " status)))) - (http/on-receive - channel - (fn [data] - (if (http/websocket? channel) - (let [resp (cb data)] - (log/debug "RECV: " data) - (log/debug "RESP: " resp) - (http/send! channel resp))))))) +; In case we want to start multiple servers, we will keep them as port -> server +(defonce channel-hub (atom {})) +(defonce servers (atom {})) -(defn start-ws-server [port callback] - (http/run-server (partial websocket-server callback) - {:port port})) +(defn websocket-server [port on-open on-receive on-close req] + (let [{:keys [in-fn out-fn]} (@servers port)] + (http/with-channel req channel + (swap! channel-hub assoc-in [port channel] req) + (if on-open (on-open channel req)) + (http/on-close + channel + (fn [status] + (log/debug (str "Websocket channel closed with status: " status)) + (if on-close (on-close status)) + (swap! channel-hub update port dissoc channel))) + (http/on-receive + channel + (fn [data] + (if (and on-receive (http/websocket? channel)) + (let [resp (on-receive (in-fn data))] + (log/debug "RECV: " data) + (log/debug "RESP: " resp) + (http/send! channel (out-fn resp))))))))) +(defn send-all! + ([data] + ; Warning: When calling this function without port, you are sending data on every websocket server opened, and every channel + (doseq [port (keys @channel-hub)] + (send-all! port data))) + ([port data] + (doseq [channel (keys (@channel-hub port))] + (http/send! channel ((get-in @servers [port :out-fn]) data))))) + +(defn stop-ws-server [port] + (when (@servers port) + ;; graceful shutdown: wait 100ms for existing requests to be finished + ;; :timeout is optional, when no timeout, stop immediately + ((get-in @servers [port :server]) :timeout 100) + (swap! servers dissoc port))) + +(defn start-ws-server + [port & {:keys [on-open on-receive on-close in-fn out-fn] + :or {in-fn identity, out-fn identity}}] + (if (@servers port) + (stop-ws-server port)) + (let [server + (http/run-server (partial websocket-server port on-open on-receive on-close) + {:port port})] + (swap! servers assoc port + {:server server + :in-fn in-fn + :out-fn out-fn}) + server)) + +(defn stop-all-ws-servers [] + (doseq [port (keys @servers)] + (stop-ws-server port))) + +(comment + (def port 8899) + (defn handle [data] + (println "Message handled:" data) + data) + ; Example to work with https://github.com/ftravers/transit-websocket-client/ + (start-ws-server + port + :on-receive + (fn [[action data]] + [action (handle data)]) + :in-fn json/read-str + :out-fn json/write-str) + (send-all! port ["~#'" (str [[:back-msg] "Message from backend"])]) + ; Example to work with https://github.com/ftravers/reframe-websocket/ + (start-ws-server + port + :on-receive + (fn [[store-path data]] + [store-path (handle data)]) + :in-fn + (fn [s] + (let [[_ rf-msg] (json/read-str s)] + (read-string rf-msg))) + :out-fn + (fn [msg] + (json/write-str + ["~#'" (str msg)]))) + (send-all! port [[:back-msg] "Message from backend"]) + (send-all! port [[:back-msg] {:map 134 :text "EDN from backend"}]) + (stop-ws-server port)) \ No newline at end of file diff --git a/test/websocket_server/core_test.clj b/test/websocket_server/core_test.clj index 9686d22..e2130bc 100644 --- a/test/websocket_server/core_test.clj +++ b/test/websocket_server/core_test.clj @@ -1,19 +1,80 @@ (ns websocket-server.core-test - (:require [websocket-server.core :refer [start-ws-server send!]] - [clojure.edn :as edn])) + (:require + [clojure.test :refer :all] + [clojure.core.async :refer [chan !!]] + [websocket-server.core :refer :all] + [clojure.edn :as edn] + [gniazdo.core :as ws])) -(defonce ws-server (atom nil)) +(defn request-handler-upcase [data] + (clojure.string/upper-case (str data))) -(defn request-handler-upcase [channel data] - (send! channel (clojure.string/upper-case (str data)))) +(def port 7890) -(defn start [] +(defn start "Demonstrate how to use the websocket server library." - (let [port 7890] - (reset! ws-server (start-ws-server port request-handler-upcase)))) + [] + (start-ws-server port :on-receive request-handler-upcase)) -(defn stop "Stop websocket server" [] (@ws-server)) +(defn stop + "Stop websocket server" + [] + (stop-ws-server port)) (defn restart [] (stop) (start)) +(deftest send-msg-and-check-resp + (start-ws-server port :on-receive request-handler-upcase) + (let [ch (chan) + client-ws + (ws/connect + (str "ws://localhost:" port) + :on-receive #(>!! ch %))] + (is (some? client-ws)) + (ws/send-msg client-ws "Hello") + (is (= "HELLO" (!! ch [i %]))))] + (send-all! port "Test") + (let [resps (set (repeatedly n #(!! ch [i %]))))] + (ws/close (nth clients-ws 2)) + (stop-ws-server 8002) + (send-all! "Test1") + (let [resps (set (repeatedly (dec n) #(