/
signal.clj
186 lines (163 loc) · 5.47 KB
/
signal.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
(ns io.github.humbleui.signal
(:refer-clojure :exclude [mapv reset! swap!])
(:require
[io.github.humbleui.core :as core]
[io.github.humbleui.protocols :as protocols])
(:import
[clojure.lang IDeref]
[java.lang.ref Reference WeakReference]))
(def ^:private ^:dynamic *context*
nil)
(def ^:private ^:dynamic *effects*
nil)
(defn make-ref [val]
(WeakReference. val))
(defn read-ref [^WeakReference ref]
(.get ref))
(defmacro doouts [[sym outputs] & body]
`(doseq [ref# ~outputs
:let [~sym (read-ref ref#)]
:when (some? ~sym)]
~@body))
(defn disj-output [signal output]
(let [outputs (:outputs signal)
outputs' (core/without #(identical? (read-ref %) output) outputs)]
(protocols/-set! signal :outputs outputs')))
(defn- set-state! [signal state]
; (core/log "set-state!" (:value signal) state)
(when (and
(= :eager (:type signal))
*effects*
(not= :clean state))
(vswap! *effects* conj signal))
(when (not= state (:state signal))
(protocols/-set! signal :state state)
(when-not (= :clean state)
(doouts [out (:outputs signal)]
(set-state! out :check)))))
(defn- reset-impl! [signal value' cache']
(protocols/-set! signal :state :clean)
(when (not= (:value signal) value')
(protocols/-set! signal :value value')
(protocols/-set! signal :cache cache')
(doouts [out (:outputs signal)]
(set-state! out :dirty)))
value')
(defn- read-dirty [signal]
(let [*context (volatile! (transient #{}))
{value' :value
cache' :cache} (binding [*context* *context]
((:value-fn signal) (:value signal) (:cache signal)))]
(when-not (= :eager (:type signal))
(let [inputs (:inputs signal)
inputs' (persistent! @*context)
_ (assert (every? #(= :lazy (:type %)) inputs'))
ref (make-ref signal)]
;; remove from inputs we don’t reference
(doseq [input inputs
:when (not (inputs' input))]
(disj-output input signal))
;; add to newly acquired inputs
(doseq [input inputs'
:when (not (inputs input))]
(let [outputs (:outputs input)
outputs' (conj outputs ref)]
(protocols/-set! input :outputs outputs')))
(protocols/-set! signal :inputs inputs')
(reset-impl! signal value' cache')))))
(defn- read-check [signal]
(loop [inputs (:inputs signal)]
(if (empty? inputs)
(do
(protocols/-set! signal :state :clean)
(:value signal))
(do
(binding [*context* nil]
@(first inputs))
(if (= :dirty (:state signal))
(read-dirty signal)
(recur (next inputs)))))))
;; User APIs
;; TODO synchronize
(core/deftype+ Signal [name value-fn ^:mut value ^:mut cache ^:mut inputs ^:mut outputs ^:mut state type]
Object
(toString [_]
(str "#Signal{name=" name ", state=" state ", value=" value "}"))
IDeref
(deref [this]
(when *context*
(vswap! *context* conj! this))
(case state
:clean value
:dirty (read-dirty this)
:check (read-check this)
:disposed (throw (ex-info "Can't read disposed signal" {})))))
(defn signal* [name value-fn]
(let [signal (map->Signal
{:name name
:value-fn value-fn
:inputs #{}
:outputs #{}
:state :dirty
:type :lazy})]
(read-dirty signal) ;; force deps
signal))
(defmacro signal-named
"Observable derived computation"
[name & body]
`(signal* ~name (fn [~'_ ~'_] {:value (do ~@body)})))
(defmacro signal
"Observable derived computation"
[& body]
`(signal* :anonymous (fn [~'_ ~'_] {:value (do ~@body)})))
(defmacro defsignal [name & body]
`(def ~name
(signal* (quote ~name) (fn [~'_ ~'_] {:value (do ~@body)}))))
(defn maybe-read [signal-or-value]
(if (instance? Signal signal-or-value)
@signal-or-value
signal-or-value))
(defn reset! [signal value']
(let [*effects (volatile! #{})]
;; clear out all dependencies
(doseq [input (:inputs signal)]
(disj-output input signal))
(protocols/-set! signal :inputs #{})
;; change value and collect all triggered effects
(binding [*effects* *effects]
(reset-impl! signal value' nil))
;; execute effects
(doseq [effect @*effects]
@effect)))
(defn swap! [signal f & args]
(reset! signal (apply f @signal args)))
(defn dispose! [signal]
(doseq [input (:inputs signal)]
(disj-output input signal))
(doto signal
(protocols/-set! :state :disposed)
(protocols/-set! :value nil)
(protocols/-set! :cache nil)))
(defmacro effect [inputs & body]
`(let [inputs# (->> ~inputs
(filter #(instance? Signal %)))
signal# (map->Signal
{:value-fn (fn [_# _#]
~@body)
:inputs (set inputs#)
:outputs #{}
:state :clean
:type :eager})]
(doseq [input# inputs#
:let [outputs# (:outputs input#)]]
(protocols/-set! input# :outputs (conj outputs# (make-ref signal#))))
signal#))
(defn mapv [f *xs]
(signal*
:mapv
(fn [old-val cache]
(let [xs @*xs
mapping (into {} (map vector cache old-val))
xs' (clojure.core/mapv #(or (mapping %) (f %)) xs)]
{:cache xs
:value xs'}))))