-
Notifications
You must be signed in to change notification settings - Fork 5
/
core.clj
426 lines (383 loc) · 16.1 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
(ns seql.core
"A way to interact with stored entities"
(:require [next.jdbc :as jdbc]
[next.jdbc.result-set :as rs]
[clojure.string :as str]
[clojure.spec.alpha :as s]
[honeysql.core :as sql]
[honeysql.helpers :as h]
[camel-snake-kebab.core :as csk]
[seql.spec]))
;; SQL Query Builder
;; =================
(defn entity-schema
"Look up entity in schema. Takes a schema and an entity. The entity
argument can take be of two different shapes: it can be a keyword
that will match the namespace of the entity in question or a coll of
namespaced keyword (ident name) and arguments"
[{:keys [schema]} entity]
(if (keyword? entity)
[entity (get schema entity)]
(let [[ident-name & args] entity
entity-key (keyword (namespace ident-name))]
[entity-key (get schema entity-key) ident-name args])))
(defn transform-out
"Takes a namespaced keyword and prepare name transforms.
Returns a tuple of database name to aliased name."
[k]
(let [entity (namespace k)
sql-name (csk/->snake_case (name k))]
[(keyword (str entity "." sql-name))
(keyword (str entity "__" (name k)))]))
(defn transform-for-join
"Takes a namespace keyword and returns an identifier in valid SQL
format"
[k]
(let [entity (namespace k)
sql-name (csk/->snake_case (name k))]
(keyword (str entity "." sql-name))))
(defn table-field
[table field]
(keyword (format "%s.%s"
(name table)
(name field))))
(defn add-ident
"Add a where clause for an ident"
[q entity-name ident arg]
(h/merge-where q [:= (table-field entity-name ident) arg]))
(defmulti process-join
"Process join by relation type. Takes a base query, a map with
`:entity` and `:table` keys and a map of options to build the
relation"
#(:type %3))
(defmethod process-join :one-to-many
[q {:keys [entity table]} {:keys [local-id remote-id remote-name]}]
(update q :left-join conj
[table entity]
[:=
(transform-for-join local-id)
(transform-for-join (or remote-name remote-id))]))
(defmethod process-join :one-to-one
[q {:keys [entity table]} {:keys [local-id remote-id remote-name]}]
(update q :left-join conj
[table entity]
[:=
(transform-for-join local-id)
(transform-for-join (or remote-name remote-id))]))
(defn process-field
"Add necessary stanzas to realize field targeting with SQL"
[schema {:keys [relations compounds fields entity]}]
(let [rel-set (set (keys relations))
compound-set (set (keys compounds))
field-set (set fields)]
(fn [q field]
(cond
(and (map? field) (contains? rel-set (-> (keys field) first)))
(let [rel-key (first (keys field))
subfields (first (vals field))
rel-schema (get relations rel-key)
subentity (:remote-entity rel-schema)]
(reduce (process-field schema (get schema subentity))
(process-join q
(assoc (get schema subentity)
:entity subentity)
rel-schema)
subfields))
(contains? compound-set field)
(let [src (:source (get compounds field))]
(-> q
(update :select concat (mapv transform-out src))
(update-in [::meta ::fields] conj field)))
(contains? field-set field)
(-> q
(update :select conj (transform-out field))
(update-in [::meta ::fields] conj field))
:else
(throw (ex-info "unknown field type"
{:type :error/illegal-argument
:field field}))))))
(defn build-query
"Build a base query preparing joins when needed"
[env entity fields]
(let [[entity-name entity-def ident args] (entity-schema env entity)]
(cond-> (reduce (process-field (:schema env) entity-def)
{:from [[(:table entity-def) entity-name]]
:select []
:left-join []
::meta {::entities #{entity}
::entity entity
::fields []}}
fields)
(some? ident)
(add-ident entity-name ident (first args)))))
(defn prepare-field
"Conditionally apply field transform on field id if schema defines
any"
[schema field value]
(if-let [f (second (get-in schema [(keyword (namespace field)) :transforms field]))]
(f value)
value))
(defn add-condition
"If conditions are provided, add them to the query"
[schema q [condition & args]]
(let [entity (-> condition namespace keyword)
table (get-in schema [entity :table])
params (get-in schema [entity :conditions condition])
type (:type params)]
(cond
(= type :static)
(h/merge-where q [:= (table-field table (:field params))
(prepare-field schema
(:field params)
(:value params))])
(= type :field)
(if-not (= 1 (count args))
(throw (ex-info (format "bad arity for field condition: %s" condition)
{:type :error/illegal-argument
:code 400
:condition condition
:args args}))
(h/merge-where q [:= (table-field table (:field params))
(prepare-field schema
(:field params)
(first args))]))
:else
(if-not (= (:arity params) (count args))
(throw (ex-info (format "bad arity for condition: %s" condition)
{:type :error/illegal-argument
:code 400
:condition condition
:args args}))
(h/merge-where q (apply (:handler params) args))))))
(defn sql-query
"Build a SQL query for the pull-syntax expressed. This is an incremental
data-based creation of a "
[env entity fields conditions]
(let [[_ entity-def _ _] (entity-schema env entity)
res (reduce #(add-condition (:schema env) %1 %2)
(build-query env entity (or fields (:defaults entity-def)))
conditions)]
[(dissoc res ::meta) (::meta res)]))
;; Result transformations follows
;; ==============================
(defn extract-ident
"When a query is targetting an ident,
extract the first result"
[entity result]
(if (vector? entity)
(first result)
result))
(defn qualify-key
"Given the enforced terminology at query time,
yield back a qualified keyword"
[k]
(let [[ns tail] (str/split (name k) #"__" 2)]
(keyword (csk/->kebab-case ns)
(csk/->kebab-case tail))))
(defn qualify-result
"Qualify a result with the appropriate namespace"
[m]
(reduce-kv #(assoc %1 (qualify-key %2) %3) {} m))
(defn process-transforms-fn
"Yield a function which processes records and applies predefined
transforms"
[schema type]
(let [;; FIXME we could imagine memoizing this,
;; schemas are quite static
transforms (into {}
(comp (map val)
(map #(get % :transforms)))
schema)
extract (case type :deserialize first :serialize second)]
(fn [m]
(into {}
(map (fn [[k v]]
(let [transform (extract (get transforms k))]
[k (cond-> v
(and (some? v)
(some? transform))
transform)])))
m))))
(defn compound-extra-fields
"Figure out which fields aren't needed once compounds have been
processed on a record"
[compounds fields]
(let [compound-fields (-> fields
(filter #(contains? (set (keys compounds)) %))
(mapcat (:source #(get compounds %))))]
(set
(remove (set fields) compound-fields))))
(defn merge-compounds-fn
"Merge compounds into record"
[compounds]
(fn [record k]
(let [{:keys [source handler]} (get compounds k)
extract (apply juxt source)]
(assoc record k (apply handler (extract record))))))
(defn process-compounds-fn
"Yield a schema-specific function to process compounds on the fly"
[schema {::keys [fields]}]
(let [;; FIXME another one we could memoize
compounds (into {}
(comp (map val)
(map :compounds))
schema)
extra-fields (compound-extra-fields compounds fields)]
(fn [m]
(->> fields
(filter #(contains? (set (keys compounds)) %))
(reduce (merge-compounds-fn compounds) m)
(filter #(not (contains? extra-fields (key %))))
(into {})))))
(defn recompose-relations
"The join query perfomed by `query` returns a flat list of entries,
potentially unsorted (this is database implementation specific)
recompose a tree of entities as specified in fields.
"
[schema fields records]
(letfn [(add-relation-fn [group]
(fn [record relation]
(let [rel-key (first (keys relation))
rel-namespace (-> rel-key namespace keyword)
rel-fields (first (vals relation))
rel-type (get-in schema [rel-namespace :relations rel-key :type])]
(if (= :one-to-one rel-type)
(assoc record rel-key (first (walk-tree rel-fields group)))
(assoc record rel-key (walk-tree rel-fields group))))))
(walk-tree [fields records]
(let [plain-fields (remove map? fields)
relations (filter map? fields)
partitioner (apply juxt plain-fields)
extract #(select-keys % plain-fields)
groups (->> (sort-by partitioner records)
(partition-by partitioner))]
(if (empty? relations)
(remove #(every? nil? (vals %))
(map #(extract (first %)) groups))
(for [g groups]
(reduce (add-relation-fn g) (extract (first g)) relations)))))]
(walk-tree fields records)))
(defn query
"Look up entities."
([env entity]
(let [[_ entity-def _ _] (entity-schema env entity)]
(query env entity (:fields entity-def) [])))
([env entity fields]
(query env entity fields []))
([env entity fields conditions]
(s/assert ::query-args [env entity fields conditions])
(let [[q qmeta] (sql-query env entity fields conditions)
schema (:schema env)]
(->> (jdbc/plan (:jdbc env) (sql/format q))
(into [] (comp
(map qualify-result)
(map (process-transforms-fn schema :deserialize))
(map (process-compounds-fn schema qmeta))))
(recompose-relations schema fields)
(extract-ident entity)))))
;; Mutation support
;; ================
(defn find-mutation
"Fetch mutation description"
[env mutation]
(let [entity (-> mutation namespace keyword)]
(or (get-in env [:schema entity :mutations mutation])
(throw (ex-info (format "unknown mutation: %s" mutation)
{:type :error/illegal-argument
:code 400
:mutation mutation})))))
(defn find-listeners
"Fetch listeners"
[env mutation]
(let [entity (-> mutation namespace keyword)]
(or (get-in env [:schema entity :listeners mutation]) {})))
(defn success-result?
[result]
(some-> result first :next.jdbc/update-count pos?))
(defn mutate!
"Perform a mutation. Since mutations are spec'd, parameters are
expected to conform it."
([env mutation params]
(mutate! env mutation params {}))
([env mutation params metadata]
(s/assert ::mutate-args [env mutation params])
(let [{:keys [spec handler pre]} (find-mutation env mutation)
listeners (find-listeners env mutation)]
(when-not (s/valid? spec params)
(throw (ex-info (format "mutation params do not conform to %s: %s"
spec
(s/explain-str spec params))
{:type :error/illegal-argument
:code 400
:explain (s/explain-str spec params)})))
(let [transform (process-transforms-fn (:schema env)
:serialize)
transformed-params (transform params)
statement (-> transformed-params
(handler)
(sql/format))
result (jdbc/with-transaction [jdbc (:jdbc env)]
;; if we have preconditions check these first
(when (seq pre)
(run! (fn [{:keys [name query valid?]
:or {valid? seq}
:as pre}]
(let [result (jdbc/execute! jdbc
(-> transformed-params
(query)
(sql/format)))]
(when-not (valid? result)
(throw (ex-info (format "Precondition %s on mutation %s failed"
name
mutation)
{:type :error/mutation-failed
:code 409
:mutation mutation
:params params
:pre (dissoc pre :valid? :query)})))))
pre))
(jdbc/execute! jdbc statement))]
(when-not (success-result? result)
(throw (ex-info (format "the mutation has failed: %s" mutation)
{:type :error/mutation-failed
:code 404 ;; Likely the mutation has failed
;; because the where clauses did
;; not match
:mutation mutation
:params params})))
(run! (fn [[key listener]]
(listener {:key key
:mutation mutation
:result result
:params params
:metadata metadata}))
listeners)
result))))
;; Environment modifiers
;; =====================
(defn add-listener!
"Given an environment, add a mutation handler.
The handlers is bound by `key`, if specified, otherwise the `key` will
default to the mutation key. Yields an updated environment"
([env mutation handler]
(add-listener! env mutation mutation handler))
([env mutation key handler]
(let [entity (-> mutation namespace keyword)]
(update-in env
[:schema entity :listeners mutation key]
(fn [h]
(when h
(throw (ex-info (format "Listener already registered for %s"
key)
{:type ::already-registered-error
:key key})))
handler)))))
(defn remove-listener!
"Given an environment, remove a mutation handler by `key` if
specified, otherwise it will remove a handler that match the
mutation `key`. Yields an updated environment."
([env mutation]
(remove-listener! env mutation mutation))
([env mutation key]
(let [entity (-> mutation namespace keyword)]
(update-in env [:schema entity :listeners mutation] dissoc key))))