This repository has been archived by the owner on Dec 12, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
web3_events.cljs
102 lines (86 loc) · 5.14 KB
/
web3_events.cljs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
(ns district.server.web3-events
(:require [cljs-web3-next.eth :as web3-eth]
[district.server.config :refer [config]]
[district.server.smart-contracts :as smart-contracts]
[district.server.web3 :refer [web3]]
[medley.core :as medley]
[mount.core :as mount :refer [defstate]]
[taoensso.timbre :as log]))
(declare start)
(declare stop)
(defstate ^{:on-reload :noop} web3-events
:start (start (merge (:web3-events @config)
(:web3-events (mount/args))))
:stop (stop web3-events))
(defn register-callback! [event-key callback & [callback-id]]
(let [[contract-key event] (if-not (= event-key ::after-past-events-dummy-key)
(get (:events @web3-events) event-key)
[::after-past-events-dummy-contract ::after-past-events-dummy-event])
callback-id (or callback-id (str (random-uuid)))]
(when-not contract-key
(throw (js/Error. (str "Trying to register callback for non existing event " event-key))))
(swap! (:callbacks @web3-events) (fn [callbacks]
(-> callbacks
(assoc-in [contract-key event callback-id] callback)
(assoc-in [:callback-id->path callback-id] [contract-key event]))))
callback-id))
(defn register-after-past-events-dispatched-callback! [callback]
(register-callback! ::after-past-events-dummy-key callback))
(defn unregister-callbacks! [callback-ids]
(doseq [callback-id callback-ids]
(let [path (get-in @(:callbacks @web3-events) [:callback-id->path callback-id])]
(swap! (:callbacks @web3-events) (fn [callbacks]
(-> callbacks
(medley/dissoc-in (into path [callback-id]))
(medley/dissoc-in [:callback-id->path callback-id]))))))
callback-ids)
(defn dispatch [err {:keys [:contract :event] :as evt}]
(if err
(log/error "Error Dispatching" {:err err :event evt} ::event-dispatch)
(when (:dispatch-logging? @web3-events)
(log/info "Dispatching event" {:err err :event evt} ::event-dispatch)))
(when (and err
(fn? (:on-error @web3-events)))
((:on-error @web3-events) err evt))
(when (or (not err)
(and err (:dispatch-on-error? @web3-events)))
(doall
(for [callback (vals (get-in @(:callbacks @web3-events) [(:contract-key contract) event]))]
(callback err evt)))))
(defn- start-dispatching-latest-events! [events]
(web3-eth/get-block-number @web3 (fn [_ last-block-number]
(let [event-filters (doall (for [[contract event->callbacks] (dissoc @(:callbacks @web3-events) :callback-id->path)
[event callbacks] event->callbacks]
(smart-contracts/subscribe-events contract
event
{:from-block last-block-number
:latest-event? true}
(vals callbacks))))]
(log/info "Subscribed to future events" {:events (keys events)
:from-block last-block-number})
(swap! (:event-filters @web3-events) (fn [_ new] new) event-filters)))))
(defn- dispatch-after-past-events-callbacks! []
(let [callbacks (get-in @(:callbacks @web3-events) [::after-past-events-dummy-contract ::after-past-events-dummy-event])
callback-fns (vals callbacks)
callback-ids (keys callbacks)]
(doseq [callback callback-fns]
(callback))
(unregister-callbacks! callback-ids)))
(defn start [{:keys [:events :from-block] :as opts}]
(web3-eth/is-listening? @web3 (fn [_ listening?]
(if-not listening?
(throw (js/Error. "Can't connect to Ethereum node"))
(smart-contracts/replay-past-events-in-order
events
dispatch
{:from-block (or from-block 0)
:to-block "latest"
:on-finish (fn []
(dispatch-after-past-events-callbacks!)
(start-dispatching-latest-events! events))}))))
(merge opts {:callbacks (atom {})
:event-filters (atom nil)}))
(defn stop [web3-events]
(log/info "Stopping web3-events" (:events @web3-events))
(doseq [subscription @(:event-filters @web3-events)]
(web3-eth/unsubscribe subscription)))