From cb467c38d0992c3a4f5bd55a55243a47f17d8ba9 Mon Sep 17 00:00:00 2001 From: Julien Fleury Date: Thu, 23 Aug 2018 17:26:27 +0200 Subject: [PATCH 01/10] Write send-all! function, keep servers and connections in atoms, handle multiple opened servers --- README.md | 39 +++++++++++--- project.clj | 6 ++- src/websocket_server/core.clj | 39 ++++++++++++-- test/websocket_server/core_test.clj | 79 +++++++++++++++++++++++++---- 4 files changed, 141 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 9040dad..e838e04 100644 --- a/README.md +++ b/README.md @@ -10,11 +10,7 @@ be paired with [fentontravers/websocket-client](https://github.com/ftravers/webs # Usage ```clojure -(require '[websocket-server.core :refer [start-ws-server]]) - -;; After we start the server a function is returned -;; that we use for stopping the server. -(defonce ws-server (atom nil)) +(require '[websocket-server.core :refer :all]) (defn request-handler-upcase-string "The function that will take incoming data off the websocket, @@ -22,13 +18,22 @@ be paired with [fentontravers/websocket-client](https://github.com/ftravers/webs whatever is received." [data] (clojure.string/upper-case (str data))) +; Always call this module functions with a port, unless you want to apply it to every server you opened on all ports. +(def port 8899) + (defn start "Demonstrate how to use the websocket server library." [] - (let [port 8899] - (reset! ws-server (start-ws-server port request-handler-upcase-string)))) + (start-ws-server port request-handler-upcase-string)) + +(defn send-all! + [data] + (send-all! port data)) -(defn stop "Stop websocket server" [] (@ws-server)) +(defn stop + "Stop websocket server" + [] + (stop-ws-server port)) ``` Here is another example that expects EDN in the form of a map that @@ -46,3 +51,21 @@ back. (hash-map :count) str)) ``` + +# Multiple servers usage + +```clojure +(start-ws-server 8000 request-handler-1) +(start-ws-server 8001 request-handler-2) +(start-ws-server 8002 request-handler-3) + +; Send "Hello" to all channels opened to websocket on port 8000 +(send-all! 8000 "Hello") + +(stop-ws-server 8002) + +; Send "Hi!" to all channels opened to websocket on port 8000 or 8001 +(send-all! "Hello") + +(stop-all-ws-servers) +``` \ No newline at end of file diff --git a/project.clj b/project.clj index 4c1e463..c764cda 100644 --- a/project.clj +++ b/project.clj @@ -9,5 +9,9 @@ [org.clojure/core.async "0.2.395"]] :target-path "target/%s" - :profiles {:dev {:source-paths ["dev" "src"]} + :jvm-opts ["--add-modules" "java.xml.bind"] + + :profiles {:dev {:source-paths ["dev" "src"] + :dependencies [[stylefruits/gniazdo "1.0.1"] + [org.clojure/core.async "0.4.474"]]} :uberjar {:aot :all}}) diff --git a/src/websocket_server/core.clj b/src/websocket_server/core.clj index 69a6787..8f91a22 100644 --- a/src/websocket_server/core.clj +++ b/src/websocket_server/core.clj @@ -2,11 +2,18 @@ (:require [org.httpkit.server :as http] [taoensso.timbre :as log])) -(defn websocket-server [cb req] +; In case we want to start multiple servers, we will keep them as port -> server +(defonce channel-hub (atom {})) +(defonce servers (atom {})) + +(defn websocket-server [port cb req] (http/with-channel req channel + (swap! channel-hub assoc-in [port channel] req) (http/on-close channel - (fn [status] (log/debug (str "Websocket channel closed with status: " status)))) + (fn [status] + (log/debug (str "Websocket channel closed with status: " status)) + (swap! channel-hub update port dissoc channel))) (http/on-receive channel (fn [data] @@ -16,7 +23,31 @@ (log/debug "RESP: " resp) (http/send! channel resp))))))) +(defn send-all! + ([data] + ; Warning: When calling this function without port, you are sending data on every websocket server opened, and every channel + (doseq [port (keys @channel-hub)] + (send-all! port data))) + ([port data] + (doseq [channel (keys (@channel-hub port))] + (http/send! channel data)))) + +(defn stop-ws-server [port] + (when-not (nil? (@servers port)) + ;; graceful shutdown: wait 100ms for existing requests to be finished + ;; :timeout is optional, when no timeout, stop immediately + ((@servers port) :timeout 100) + (swap! servers dissoc port))) + (defn start-ws-server [port callback] - (http/run-server (partial websocket-server callback) - {:port port})) + (if (@servers port) + (stop-ws-server port)) + (let [server + (http/run-server (partial websocket-server port callback) + {:port port})] + (swap! servers assoc port server) + server)) +(defn stop-all-ws-servers [] + (doseq [port (keys @servers)] + (stop-ws-server port))) diff --git a/test/websocket_server/core_test.clj b/test/websocket_server/core_test.clj index 9686d22..6bf7fda 100644 --- a/test/websocket_server/core_test.clj +++ b/test/websocket_server/core_test.clj @@ -1,19 +1,80 @@ (ns websocket-server.core-test - (:require [websocket-server.core :refer [start-ws-server send!]] - [clojure.edn :as edn])) + (:require + [clojure.test :refer :all] + [clojure.core.async :refer [chan !!]] + [websocket-server.core :refer :all] + [clojure.edn :as edn] + [gniazdo.core :as ws])) -(defonce ws-server (atom nil)) +(defn request-handler-upcase [data] + (clojure.string/upper-case (str data))) -(defn request-handler-upcase [channel data] - (send! channel (clojure.string/upper-case (str data)))) +(def port 7890) -(defn start [] +(defn start "Demonstrate how to use the websocket server library." - (let [port 7890] - (reset! ws-server (start-ws-server port request-handler-upcase)))) + [] + (start-ws-server port request-handler-upcase)) -(defn stop "Stop websocket server" [] (@ws-server)) +(defn stop + "Stop websocket server" + [] + (stop-ws-server port)) (defn restart [] (stop) (start)) +(deftest send-msg-and-check-resp + (start-ws-server port request-handler-upcase) + (let [ch (chan) + client-ws + (ws/connect + (str "ws://localhost:" port) + :on-receive #(>!! ch %))] + (is (some? client-ws)) + (ws/send-msg client-ws "Hello") + (is (= "HELLO" (!! ch [i %]))))] + (send-all! port "Test") + (let [resps (set (repeatedly n #(!! ch [i %]))))] + (ws/close (nth clients-ws 2)) + (stop-ws-server 8002) + (send-all! "Test1") + (let [resps (set (repeatedly (dec n) #( Date: Fri, 24 Aug 2018 15:22:35 +0200 Subject: [PATCH 02/10] input and output functions --- README.md | 19 +++++++++ dev/user.clj | 3 +- project.clj | 5 +-- src/websocket_server/core.clj | 72 ++++++++++++++++++++++------------- 4 files changed, 67 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index e838e04..63f9034 100644 --- a/README.md +++ b/README.md @@ -68,4 +68,23 @@ back. (send-all! "Hello") (stop-all-ws-servers) +``` + +# Use transformer functions when receiving and sending data + +```clojure +(require '[clojure.data.json :as json]) + +(defn request-handler-json + [[action data]] + [action + (case action + "upcase" (request-handler-upcase-string data) + data)]) + +; json/read-str will be applied before sending data to request-json-handler +; json/write-str will be applied before sending data from request-json-handler +; back on the websocket, or when using (send-all! port data) +(start-ws-server 8000 request-handler-json json/read-str json/write-str) + ``` \ No newline at end of file diff --git a/dev/user.clj b/dev/user.clj index dab1364..063436b 100644 --- a/dev/user.clj +++ b/dev/user.clj @@ -1,5 +1,4 @@ (ns user - (:require [clojure.tools.namespace.repl :refer [refresh]] - [clojure.repl :refer [doc source]] + (:require [clojure.repl :refer [doc source]] [clojure.pprint :refer [pprint pp]] [clojure.stacktrace :as st])) diff --git a/project.clj b/project.clj index c764cda..8de4e82 100644 --- a/project.clj +++ b/project.clj @@ -6,11 +6,10 @@ :dependencies [[org.clojure/clojure "1.9.0-alpha16"] [http-kit "2.2.0"] [com.taoensso/timbre "4.8.0"] - [org.clojure/core.async "0.2.395"]] + [org.clojure/core.async "0.2.395"] + [org.clojure/data.json "0.2.6"]] :target-path "target/%s" - :jvm-opts ["--add-modules" "java.xml.bind"] - :profiles {:dev {:source-paths ["dev" "src"] :dependencies [[stylefruits/gniazdo "1.0.1"] [org.clojure/core.async "0.4.474"]]} diff --git a/src/websocket_server/core.clj b/src/websocket_server/core.clj index 8f91a22..da331bf 100644 --- a/src/websocket_server/core.clj +++ b/src/websocket_server/core.clj @@ -1,27 +1,29 @@ (ns websocket-server.core (:require [org.httpkit.server :as http] - [taoensso.timbre :as log])) + [taoensso.timbre :as log] + [clojure.data.json :as json])) ; In case we want to start multiple servers, we will keep them as port -> server (defonce channel-hub (atom {})) (defonce servers (atom {})) (defn websocket-server [port cb req] - (http/with-channel req channel - (swap! channel-hub assoc-in [port channel] req) - (http/on-close - channel - (fn [status] - (log/debug (str "Websocket channel closed with status: " status)) - (swap! channel-hub update port dissoc channel))) - (http/on-receive - channel - (fn [data] - (if (http/websocket? channel) - (let [resp (cb data)] - (log/debug "RECV: " data) - (log/debug "RESP: " resp) - (http/send! channel resp))))))) + (let [{:keys [in-fn out-fn]} (@servers port)] + (http/with-channel req channel + (swap! channel-hub assoc-in [port channel] req) + (http/on-close + channel + (fn [status] + (log/debug (str "Websocket channel closed with status: " status)) + (swap! channel-hub update port dissoc channel))) + (http/on-receive + channel + (fn [data] + (if (http/websocket? channel) + (let [resp (cb (in-fn data))] + (log/debug "RECV: " data) + (log/debug "RESP: " resp) + (http/send! channel (out-fn resp))))))))) (defn send-all! ([data] @@ -30,24 +32,40 @@ (send-all! port data))) ([port data] (doseq [channel (keys (@channel-hub port))] - (http/send! channel data)))) + (http/send! channel ((get-in @servers [port :out-fn]) data))))) (defn stop-ws-server [port] - (when-not (nil? (@servers port)) + (when (@servers port) ;; graceful shutdown: wait 100ms for existing requests to be finished ;; :timeout is optional, when no timeout, stop immediately - ((@servers port) :timeout 100) + ((get-in @servers [port :server]) :timeout 100) (swap! servers dissoc port))) -(defn start-ws-server [port callback] - (if (@servers port) - (stop-ws-server port)) - (let [server - (http/run-server (partial websocket-server port callback) - {:port port})] - (swap! servers assoc port server) - server)) +(defn start-ws-server + ([port callback] + (start-ws-server port callback identity identity)) + ([port callback in-fn out-fn] + (if (@servers port) + (stop-ws-server port)) + (let [server + (http/run-server (partial websocket-server port callback) + {:port port})] + (swap! servers assoc port + {:server server + :in-fn in-fn + :out-fn out-fn}) + server))) (defn stop-all-ws-servers [] (doseq [port (keys @servers)] (stop-ws-server port))) + +(comment + (def port 8899) + (start-ws-server port + (fn [[action data]] + (println action data) + [action (clojure.string/upper-case (str data))]) + json/read-str json/write-str) + (send-all! port ["~#'" "Message from backend"]) + (stop-ws-server port)) \ No newline at end of file From 4241339f4c5049c62c66042ea60478df736eec3c Mon Sep 17 00:00:00 2001 From: Julien Fleury Date: Fri, 24 Aug 2018 17:03:16 +0200 Subject: [PATCH 03/10] Example of use with https://github.com/ftravers/reframe-websocket/ --- ...Leiningen__org_clojure_data_json_0_2_6.xml | 9 +++++++ src/websocket_server/core.clj | 25 ++++++++++++++++--- 2 files changed, 30 insertions(+), 4 deletions(-) create mode 100644 .idea/libraries/Leiningen__org_clojure_data_json_0_2_6.xml diff --git a/.idea/libraries/Leiningen__org_clojure_data_json_0_2_6.xml b/.idea/libraries/Leiningen__org_clojure_data_json_0_2_6.xml new file mode 100644 index 0000000..bf1254f --- /dev/null +++ b/.idea/libraries/Leiningen__org_clojure_data_json_0_2_6.xml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/src/websocket_server/core.clj b/src/websocket_server/core.clj index da331bf..563d148 100644 --- a/src/websocket_server/core.clj +++ b/src/websocket_server/core.clj @@ -62,10 +62,27 @@ (comment (def port 8899) - (start-ws-server port + (defn handle [data] + (println "Message handled:" data) + data) + ; Example to work with https://github.com/ftravers/transit-websocket-client/ + (start-ws-server + port (fn [[action data]] - (println action data) - [action (clojure.string/upper-case (str data))]) + [action (handle data)]) json/read-str json/write-str) - (send-all! port ["~#'" "Message from backend"]) + (send-all! port ["~#'" (str [[:back-msg] "Message from backend"])]) + ; Example to work with https://github.com/ftravers/reframe-websocket/ + (start-ws-server + port + (fn [[store-path data]] + [store-path (handle data)]) + (fn [s] + (let [[_ rf-msg] (json/read-str s)] + (read-string rf-msg))) + (fn [msg] + (json/write-str + ["~#'" (str msg)]))) + (send-all! port [[:back-msg] "Message from backend"]) + (send-all! port [[:back-msg] {:map 134 :text "EDN from backend"}]) (stop-ws-server port)) \ No newline at end of file From 9056eac3f07539af1691a42d6bc8562b6099ae4f Mon Sep 17 00:00:00 2001 From: Julien Fleury Date: Fri, 24 Aug 2018 17:09:09 +0200 Subject: [PATCH 04/10] 0.4.12 --- project.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project.clj b/project.clj index 8de4e82..640915d 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject fentontravers/websocket-server "0.4.12-SNAPSHOT" +(defproject fentontravers/websocket-server "0.4.12" :description "WebSocket Server Library" :url "https://github.com/ftravers/websocket-server" :license {:name "Eclipse Public License" From 035cf704722acd5c655d6487452c9ed5b0a78bb7 Mon Sep 17 00:00:00 2001 From: Juleffel Date: Fri, 24 Aug 2018 17:17:07 +0200 Subject: [PATCH 05/10] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 63f9034..f11f773 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ back. ; json/read-str will be applied before sending data to request-json-handler ; json/write-str will be applied before sending data from request-json-handler -; back on the websocket, or when using (send-all! port data) +; back on the websocket, or when using send-all! (start-ws-server 8000 request-handler-json json/read-str json/write-str) -``` \ No newline at end of file +``` From 84da243ee121880b706b4ed69af4b03932b8214c Mon Sep 17 00:00:00 2001 From: Juleffel Date: Mon, 27 Aug 2018 09:40:20 +0200 Subject: [PATCH 06/10] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f11f773..bd099fe 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # WebSocket Server This is the server side of the websocket connection. This is meant to -be paired with [fentontravers/websocket-client](https://github.com/ftravers/websocket-client). +be paired with [juleffel/websocket-client](https://github.com/juleffel/websocket-client). # Clojars From d59f9e245b4fc1568e9bac308967bafc3e47f24c Mon Sep 17 00:00:00 2001 From: Juleffel Date: Mon, 27 Aug 2018 09:41:02 +0200 Subject: [PATCH 07/10] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index bd099fe..71cfb5d 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,11 @@ # WebSocket Server This is the server side of the websocket connection. This is meant to -be paired with [juleffel/websocket-client](https://github.com/juleffel/websocket-client). +be paired with [fentontravers/websocket-client](https://github.com/fentontravers/websocket-client). # Clojars -![](https://clojars.org/fentontravers/websocket-server/latest-version.svg) +![https://clojars.org/juleffel/websocket-server/](https://clojars.org/juleffel/websocket-server/latest-version.svg) # Usage From c4665a8b24faeb4a7f02ba9a9b1e086f899d087d Mon Sep 17 00:00:00 2001 From: Juleffel Date: Mon, 27 Aug 2018 09:44:54 +0200 Subject: [PATCH 08/10] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 71cfb5d..65312ae 100644 --- a/README.md +++ b/README.md @@ -5,8 +5,8 @@ be paired with [fentontravers/websocket-client](https://github.com/fentontravers # Clojars -![https://clojars.org/juleffel/websocket-server/](https://clojars.org/juleffel/websocket-server/latest-version.svg) - +![Foo](https://clojars.org/juleffel/websocket-server/latest-version.svg)] + # Usage ```clojure From 4614f64979376bd05315af9fc3c9af17d1930493 Mon Sep 17 00:00:00 2001 From: Juleffel Date: Mon, 27 Aug 2018 09:47:12 +0200 Subject: [PATCH 09/10] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 65312ae..9d72af3 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ be paired with [fentontravers/websocket-client](https://github.com/fentontravers # Clojars -![Foo](https://clojars.org/juleffel/websocket-server/latest-version.svg)] +![Foo](https://clojars.org/juleffel/websocket-server/latest-version.svg) # Usage From 7521d4b1744893036767c6119f6928ff32633dd5 Mon Sep 17 00:00:00 2001 From: Julien Fleury Date: Mon, 27 Aug 2018 17:43:02 +0200 Subject: [PATCH 10/10] Use options keys instead of positional arguments for all the optional args --- README.md | 13 ++++++---- project.clj | 2 +- src/websocket_server/core.clj | 40 +++++++++++++++++------------ test/websocket_server/core_test.clj | 8 +++--- 4 files changed, 36 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index 63f9034..7849fd6 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ be paired with [fentontravers/websocket-client](https://github.com/ftravers/webs (defn start "Demonstrate how to use the websocket server library." [] - (start-ws-server port request-handler-upcase-string)) + (start-ws-server port :on-receive request-handler-upcase-string)) (defn send-all! [data] @@ -55,9 +55,9 @@ back. # Multiple servers usage ```clojure -(start-ws-server 8000 request-handler-1) -(start-ws-server 8001 request-handler-2) -(start-ws-server 8002 request-handler-3) +(start-ws-server 8000 :on-receive request-handler-1) +(start-ws-server 8001 :on-receive request-handler-2) +(start-ws-server 8002 :on-receive request-handler-3) ; Send "Hello" to all channels opened to websocket on port 8000 (send-all! 8000 "Hello") @@ -85,6 +85,9 @@ back. ; json/read-str will be applied before sending data to request-json-handler ; json/write-str will be applied before sending data from request-json-handler ; back on the websocket, or when using (send-all! port data) -(start-ws-server 8000 request-handler-json json/read-str json/write-str) +(start-ws-server 8000 + :on-receive request-handler-json + :in-fn json/read-str + :out-fn json/write-str) ``` \ No newline at end of file diff --git a/project.clj b/project.clj index 640915d..18d71e4 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject fentontravers/websocket-server "0.4.12" +(defproject juleffel/websocket-server "0.4.13" :description "WebSocket Server Library" :url "https://github.com/ftravers/websocket-server" :license {:name "Eclipse Public License" diff --git a/src/websocket_server/core.clj b/src/websocket_server/core.clj index 563d148..9dd7465 100644 --- a/src/websocket_server/core.clj +++ b/src/websocket_server/core.clj @@ -7,20 +7,22 @@ (defonce channel-hub (atom {})) (defonce servers (atom {})) -(defn websocket-server [port cb req] +(defn websocket-server [port on-open on-receive on-close req] (let [{:keys [in-fn out-fn]} (@servers port)] (http/with-channel req channel (swap! channel-hub assoc-in [port channel] req) + (if on-open (on-open channel req)) (http/on-close channel (fn [status] (log/debug (str "Websocket channel closed with status: " status)) + (if on-close (on-close status)) (swap! channel-hub update port dissoc channel))) (http/on-receive channel (fn [data] - (if (http/websocket? channel) - (let [resp (cb (in-fn data))] + (if (and on-receive (http/websocket? channel)) + (let [resp (on-receive (in-fn data))] (log/debug "RECV: " data) (log/debug "RESP: " resp) (http/send! channel (out-fn resp))))))))) @@ -42,19 +44,18 @@ (swap! servers dissoc port))) (defn start-ws-server - ([port callback] - (start-ws-server port callback identity identity)) - ([port callback in-fn out-fn] - (if (@servers port) - (stop-ws-server port)) - (let [server - (http/run-server (partial websocket-server port callback) - {:port port})] - (swap! servers assoc port - {:server server - :in-fn in-fn - :out-fn out-fn}) - server))) + [port & {:keys [on-open on-receive on-close in-fn out-fn] + :or {in-fn identity, out-fn identity}}] + (if (@servers port) + (stop-ws-server port)) + (let [server + (http/run-server (partial websocket-server port on-open on-receive on-close) + {:port port})] + (swap! servers assoc port + {:server server + :in-fn in-fn + :out-fn out-fn}) + server)) (defn stop-all-ws-servers [] (doseq [port (keys @servers)] @@ -68,18 +69,23 @@ ; Example to work with https://github.com/ftravers/transit-websocket-client/ (start-ws-server port + :on-receive (fn [[action data]] [action (handle data)]) - json/read-str json/write-str) + :in-fn json/read-str + :out-fn json/write-str) (send-all! port ["~#'" (str [[:back-msg] "Message from backend"])]) ; Example to work with https://github.com/ftravers/reframe-websocket/ (start-ws-server port + :on-receive (fn [[store-path data]] [store-path (handle data)]) + :in-fn (fn [s] (let [[_ rf-msg] (json/read-str s)] (read-string rf-msg))) + :out-fn (fn [msg] (json/write-str ["~#'" (str msg)]))) diff --git a/test/websocket_server/core_test.clj b/test/websocket_server/core_test.clj index 6bf7fda..e2130bc 100644 --- a/test/websocket_server/core_test.clj +++ b/test/websocket_server/core_test.clj @@ -14,7 +14,7 @@ (defn start "Demonstrate how to use the websocket server library." [] - (start-ws-server port request-handler-upcase)) + (start-ws-server port :on-receive request-handler-upcase)) (defn stop "Stop websocket server" @@ -24,7 +24,7 @@ (defn restart [] (stop) (start)) (deftest send-msg-and-check-resp - (start-ws-server port request-handler-upcase) + (start-ws-server port :on-receive request-handler-upcase) (let [ch (chan) client-ws (ws/connect @@ -37,7 +37,7 @@ (stop-ws-server port)) (deftest send-all-test - (start-ws-server port #(throw (Exception. "Shouldn't be called as we are initiating messages on the server side"))) + (start-ws-server port :on-receive #(throw (Exception. "Shouldn't be called as we are initiating messages on the server side"))) (let [ch (chan) n 3 clients-ws @@ -58,7 +58,7 @@ (deftest multiple-servers-test (let [n 3] (doseq [i (range 3)] - (start-ws-server (+ 8000 i) #(throw (Exception. "Shouldn't be called as we are initiating messages on the server side")))) + (start-ws-server (+ 8000 i) :on-receive #(throw (Exception. "Shouldn't be called as we are initiating messages on the server side")))) (let [ch (chan) clients-ws (doall