-
-
Notifications
You must be signed in to change notification settings - Fork 137
/
history.cljc
221 lines (183 loc) · 8.68 KB
/
history.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
(ns fulcro.history
(:require #?(:clj [clojure.future :refer :all])
[fulcro.logging :as log]
[fulcro.util :as util]
[clojure.set :as set]
[clojure.spec.alpha :as s]))
(defn is-timestamp? [t]
#?(:clj (instance? java.util.Date t)
:cljs (instance? js/Date t)))
(s/def ::max-size pos-int?)
(s/def ::db-before map?)
(s/def ::db-after map?)
(s/def ::client-time is-timestamp?)
(s/def ::tx vector?)
(s/def ::tx-time int?)
(s/def ::tx-result (s/or :nil nil? :map map?))
(s/def ::network-sends (s/map-of keyword? vector?)) ; map of sends that became active due to this tx
(s/def ::history-step (s/keys :req [::db-after ::db-before] :opt [::tx ::tx-result :fulcro.client.impl.data-fetch/network-result ::network-sends ::client-time]))
(s/def ::history-steps (s/map-of int? ::history-step))
(s/def ::active-remotes (s/map-of keyword? (s/map-of pos-int? pos-int?))) ; map of remote to the tx-time of any send(s) that are still active
(s/def ::history (s/keys :opt [::active-remotes] :req [::max-size ::history-steps]))
(s/def ::history-atom (s/and #(util/atom? %) #(s/valid? ::history (deref %))))
(def max-tx-time #?(:clj Long/MAX_VALUE :cljs 9200000000000000000))
(defn- decrement-or-remove [m k]
(if (= 1 (get m k 1))
(dissoc m k)
(update m k dec)))
(defn add-or-increment [m k]
(update m k (fnil inc 0)))
(defn remote-activity-started
"Record that remote activity started for the given remote at the given tx-time. Returns a new history."
[history remote tx-time]
(if (and remote tx-time)
(update-in history [::active-remotes remote] add-or-increment tx-time)
history))
(s/fdef remote-activity-started
:args (s/cat :hist ::history :remote (s/or :missing nil? :remote keyword?) :time ::tx-time)
:ret ::history)
(defn remote-activity-finished
"Record that remote activity finished for the given remote at the given tx-time. Returns a new history."
[history remote tx-time]
(if (and remote tx-time)
(update-in history [::active-remotes remote] decrement-or-remove tx-time)
history))
(s/fdef remote-activity-finished
:args (s/cat :hist ::history :remote (s/or :missing nil? :remote keyword?) :time ::tx-time)
:ret ::history)
(defn oldest-active-network-request
"Returns the tx time for the oldest in-flight send that is active. Returns Long/MAX_VALUE if none are active."
[{:keys [::active-remotes] :as history}]
(reduce min max-tx-time (apply concat (some->> active-remotes vals (map keys)))))
(s/fdef oldest-active-network-request
:args (s/cat :hist ::history)
:ret int?)
(s/fdef oldest-active-network-request
:args (s/cat :hist ::history)
:ref int?)
(defn gc-history
"Returns a new history that has been reduced in size to target levels."
[{:keys [::max-size ::history-steps] :as history}]
(if (and (pos-int? max-size) (> (count history-steps) max-size))
(let [oldest-required-history-step (oldest-active-network-request history)
current-size (count history-steps)
overage (- current-size max-size) ; guaranteed positive by `if` above
ordered-step-keys (sort (keys history-steps))
proposed-keeper-keys (drop overage ordered-step-keys)
real-keeper-keys (if (> (first proposed-keeper-keys) oldest-required-history-step)
(do
(log/warn "History has grown beyond max size due to network congestion.")
(drop-while (fn [t] (< t oldest-required-history-step)) ordered-step-keys))
proposed-keeper-keys)]
(update history ::history-steps select-keys real-keeper-keys))
history))
(s/fdef gc-history
:args (s/cat :hist ::history)
:ref ::history)
(defn compressible-tx [tx] (vary-meta tx assoc ::compressible? true))
(s/fdef compressible-tx
:args (s/cat :tx (s/or :missing nil? :tx vector?))
:ret (s/or :missing nil? :tx vector?))
(defn compressible-tx?
"Returns true if the given transaction is marked as compressible."
[tx]
(boolean (some-> tx meta ::compressible?)))
(s/fdef compressible-tx?
:args (s/cat :tx (s/or :missing nil? :tx vector?))
:ret boolean?)
(defn last-tx-time
"Returns the most recent transition edge time recorded in the given history."
[{:keys [::history-steps] :as history}]
(reduce max 0 (keys history-steps)))
(s/fdef last-tx-time
:args (s/cat :hist ::history)
:ret int?)
(defn record-history-step
"Record a history step in the reconciler. "
[{:keys [::history-steps ::active-remotes] :as history} tx-time
{:keys [::tx] :as step}]
(let [last-time (last-tx-time history)
gc? (= 0 (mod tx-time 10))
last-tx (get-in history-steps [last-time ::tx] [])
all-active-steps (reduce set/union #{} (some->> active-remotes vals (map (comp set keys))))
compressible? (and (compressible-tx? tx)
(compressible-tx? last-tx)
(-> (contains? all-active-steps last-time) not))
new-history (cond-> (assoc-in history [::history-steps tx-time] step)
compressible? (update ::history-steps dissoc last-time))]
(if gc?
(gc-history new-history)
new-history)))
(s/fdef record-history-step
:args (s/cat :hist ::history :time ::tx-time :step ::history-step)
:ret ::history)
(defn new-history [size]
{::max-size size ::history-steps {} ::active-remotes {}})
(s/fdef new-history
:args (s/cat :size pos-int?)
:ret ::history)
(defn ordered-steps
"Returns the current valid sequence of step times in the given history as a sorted vector."
[history]
(some-> history ::history-steps keys sort vec))
(s/fdef ordered-steps
:args (s/cat :hist ::history)
:ret (s/or :v (s/every int? :kind vector?) :nothing nil?))
(defn get-step
"Returns a step from the given history that has the given tx-time. If tx-time specifies a spot where there is a gap in the history
(there are steps before and after), then it will return the earlier step, unless the latter was compressible, in which case
it will return the step into which the desired spot was compressed. "
[{:keys [::history-steps] :as history} tx-time]
(if-let [exact-step (get history-steps tx-time)]
exact-step
(let [timeline (ordered-steps history)
[before after] (split-with #(> tx-time %) timeline)
step-before (get history-steps (last before))
step-after (get history-steps (first after))]
(cond
(and step-before step-after (-> step-after ::tx compressible-tx?)) step-after
(and step-before step-after) step-before
:otherwise nil))))
(s/fdef get-step
:args (s/cat :hist ::history :time ::tx-time)
:ret (s/or :nothing nil? :step ::history-step))
(defn history-navigator
"Returns a navigator of history. Use focus-next, focus-previous, and current-step."
[history]
(let [steps (ordered-steps history)]
{:legal-steps steps
:history history
:index (dec (count steps))}))
(declare current-step)
(defn nav-position
"Gives back navigation position as a pair [current-index count-of-steps]"
[history-nav]
[(:index history-nav) (count (:legal-steps history-nav))])
(defn focus-next
"Returns a new history navigation with the focus on the next step (or the last if already there). See history-navigator"
[history-nav]
(let [{:keys [index history legal-steps]} history-nav
last-legal-idx (dec (count legal-steps))]
(update history-nav :index (fn [i] (if (< i last-legal-idx) (inc i) i)))))
(defn focus-previous
"Returns a new history navigation with the focus on the prior step (or the first if already there). See history-navigator"
[history-nav]
(let [{:keys [index history legal-steps]} history-nav]
(update history-nav :index (fn [i] (if (zero? i) 0 (dec i))))))
(defn focus-start
"Returns a new history navigation with the focus on the prior step (or the first if already there). See history-navigator"
[history-nav]
(let [{:keys [index history legal-steps]} history-nav]
(assoc history-nav :index 0)))
(defn focus-end
"Returns a new history navigation with the focus on the prior step (or the first if already there). See history-navigator"
[history-nav]
(let [{:keys [index history legal-steps]} history-nav]
(assoc history-nav :index (-> legal-steps count dec))))
(defn current-step
"Get the current history step from the history-nav. See history-navigator."
[history-nav]
(let [{:keys [index history legal-steps]} history-nav
history-step-tx-time (get legal-steps index)
history-step (get-step history history-step-tx-time)]
history-step))