/
async.clj
112 lines (91 loc) · 3.96 KB
/
async.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
(ns clj-bitstamp.async
(:require
[clojure.core.async :as async]
[cheshire.core :as cheshire]
[cheshire.parse :as parse]
[tol.core :as tol])
(:import
com.pusher.client.channel.ChannelEventListener
com.pusher.client.channel.SubscriptionEventListener
com.pusher.client.connection.ConnectionEventListener
com.pusher.client.connection.ConnectionState
com.pusher.client.Pusher))
(def default-pusher-key "de504dc5763aeef9ff52")
(defn state->keyword
[state]
(tol/case+ state
ConnectionState/ALL :all
ConnectionState/CONNECTING :connecting
ConnectionState/CONNECTED :connected
ConnectionState/DISCONNECTING :disconnecting
ConnectionState/DISCONNECTED :disconnected
nil))
(defn- connection-listener
[pusher callback]
(reify ConnectionEventListener
(onConnectionStateChange [this change]
(callback pusher
:change
{:current (-> change .getCurrentState state->keyword)
:previous (-> change .getPreviousState state->keyword)}))
(onError [this message code exception]
(callback pusher
:error
{:message message
:code (keyword code)
:exception exception}))))
(defn- channel-listener
[callback]
(reify ChannelEventListener
(onSubscriptionSucceeded [this channel-name]
(callback this channel-name))))
(defn- coerce-orderbook
[str-big-decimals? ob]
(let [coerced-ob (update ob :timestamp tol/->int)]
(if str-big-decimals?
(merge coerced-ob
(tol/update-values (partial map #(mapv bigdec %)) (select-keys ob [:asks :bids])))
coerced-ob)))
(defn subscription-listener
[callback str-big-decimals?]
(reify SubscriptionEventListener
(onEvent [this channel-name event-name data]
(binding [parse/*use-bigdecimals?* true]
(let [parsed (cheshire/decode data true)]
(callback channel-name (keyword event-name) (coerce-orderbook str-big-decimals? parsed)))))))
(defn disconnect
[pusher]
(.disconnect pusher))
(defn connect
[pusher]
(.connect pusher))
(defn pusher-factory
[pusher {:keys [channel-name pusher-key event-name status-buffer-or-n data-buffer-or-n str-big-decimals?]
:as opts
:or {str-big-decimals? true}}]
(let [status-ch (async/chan status-buffer-or-n)
data-ch (async/chan data-buffer-or-n)
pusher-callback (fn [pusher action data]
(async/>!! status-ch [action data]))
subs-callback (fn [channel-name event-name data]
(async/>!! data-ch [channel-name event-name data]))
pusher-channel (.subscribe pusher channel-name)]
(.connect pusher (connection-listener pusher pusher-callback) (into-array [ConnectionState/ALL]))
(.bind pusher-channel (name event-name) (subscription-listener subs-callback str-big-decimals?))
[pusher pusher-channel status-ch data-ch]))
(defn new-pusher
"Opens a new connection with the Pusher server, subscribes a requested channel and returns a following tuple
[pusher pusher-channel status-ch data-ch]
- `pusher` a Pusher instance
- `pusher` a subscribed Channel instance
- `status-ch` an async channel containing a pusher and channel messages as a tuple `[action data]`
- `data-ch` an async channel containing data for the subscribed channel as a tuple `[channel-name event-name data]`
The function takes an option object:
- `channel-name` a channel name to subscribe
- `pusher-key` a Pusher key, default de504dc5763aeef9ff52
- `event-name` an event name to bind on the subscribed channel
- `status-buffer-or-n` a buffer-or-n for the status channel
- `data-buffer-or-n` a buffer-or-n for the data channel
- `str-big-decimals?` when `true` coerce a volume and price (in strings) as `BigDecimal`, default `true`"
[{:keys [channel-name pusher-key event-name status-buffer-or-n data-buffer-or-n] :as opts}]
(pusher-factory (Pusher. (or pusher-key default-pusher-key)) opts))