Skip to content

Commit

Permalink
Merge branch 'main' into feature/policy-query2
Browse files Browse the repository at this point in the history
  • Loading branch information
bplatz committed Jun 7, 2024
2 parents d2d5d28 + 8f1f988 commit c938ac6
Show file tree
Hide file tree
Showing 15 changed files with 272 additions and 220 deletions.
7 changes: 7 additions & 0 deletions src/clj/fluree/db/database/async.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@
(>! error-ch e))))
match-ch))

(-activate-alias [db alias']
(when (= alias alias')
db))

(-aliases [_]
[alias])

jld-response/NodeFormatter
(-forward-properties [_ iri select-spec context compact-fn cache fuel-tracker error-ch]
(let [prop-ch (async/chan)]
Expand Down
69 changes: 36 additions & 33 deletions src/clj/fluree/db/db/json_ld.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,6 @@
", however provided db is only at t value: " (:t db))
{:status 500 :error :db/indexing}))))

(defn force-index-update
[{:keys [commit] :as db} {data-map :data, :keys [spot post opst tspo] :as commit-index}]
(let [index-t (:t data-map)
commit* (assoc commit :index commit-index)]
(-> db
(empty-novelty index-t)
(assoc :commit commit*
:novelty* (empty-novelty db index-t)
:spot spot
:post post
:opst opst
:tspo tspo)
(assoc-in [:stats :indexed] index-t))))

(defn newer-index?
[commit {data-map :data, :as _commit-index}]
(if data-map
Expand All @@ -122,9 +108,18 @@
(defn index-update
"If provided commit-index is newer than db's commit index, updates db by cleaning novelty.
If it is not newer, returns original db."
[{:keys [commit] :as db} commit-index]
[{:keys [commit] :as db} {data-map :data, :keys [spot post opst tspo] :as commit-index}]
(if (newer-index? commit commit-index)
(force-index-update db commit-index)
(let [index-t (:t data-map)
commit* (assoc commit :index commit-index)]
(-> db
(empty-novelty index-t)
(assoc :commit commit*
:spot spot
:post post
:opst opst
:tspo tspo)
(assoc-in [:stats :indexed] index-t)))
db))

(defn match-id
Expand Down Expand Up @@ -222,6 +217,7 @@
(async/close! matched-ch))
matched-ch))


;; TODO - can use transient! below
(defn stage-update-novelty
"If a db is staged more than once, any retractions in a previous stage will
Expand Down Expand Up @@ -315,6 +311,10 @@
(recur r (assoc sid->s-flakes sid (into (set s-flakes) existing-flakes))))
sid->s-flakes))))

(defn get-max-ns-code
[ns-codes]
(->> ns-codes keys (apply max)))

(defn final-db
"Returns map of all elements for a stage transaction required to create an
updated db."
Expand All @@ -323,16 +323,16 @@
(let [[add remove] (if stage-update?
(stage-update-novelty (get-in db [:novelty :spot]) new-flakes)
[new-flakes nil])

mods (<? (modified-subjects (policy/root db) add))

db-after (-> db
(update :staged conj [txn author-did annotation])
(assoc :policy policy) ;; re-apply policy to db-after
(assoc :t t)
(commit-data/update-novelty add remove)
(commit-data/add-tt-id)
(vocab/hydrate-schema add mods))]
mods (<? (modified-subjects (policy/root db) add))
max-ns-code (get-max-ns-code (:namespace-codes db))
db-after (-> db
(update :staged conj [txn author-did annotation])
(assoc :t t
:max-namespace-code max-ns-code
:policy policy) ; re-apply policy to db-after
(commit-data/update-novelty add remove)
(commit-data/add-tt-id)
(vocab/hydrate-schema add mods))]
{:add add :remove remove :db-after db-after :db-before db-before :mods mods :context context})))

(defn validate-db-update
Expand Down Expand Up @@ -370,7 +370,7 @@
[ns ns-code])))
new-namespaces)
new-ns-codes (map-invert new-ns-map)
max-namespace-code* (apply max (vals new-ns-map))]
max-namespace-code* (get-max-ns-code new-ns-codes)]
(assoc db
:namespaces new-ns-map
:namespace-codes new-ns-codes
Expand Down Expand Up @@ -613,8 +613,8 @@
assert (db-assert db-data)
nses (map :value
(get db-data const/iri-namespaces))
_ (log/debug "merge-commit new namespaces:" nses)
_ (log/debug "db max-namespace-code:"
_ (log/trace "merge-commit new namespaces:" nses)
_ (log/trace "db max-namespace-code:"
(:max-namespace-code db))
db* (with-namespaces db nses)
asserted-flakes (assert-flakes db* t-new assert)
Expand Down Expand Up @@ -687,6 +687,13 @@
(-match-class [db fuel-tracker solution s-mch error-ch]
(match-class db fuel-tracker solution s-mch error-ch))

(-activate-alias [db alias']
(when (= alias alias')
db))

(-aliases [_]
[alias])

jld-transact/Transactable
(-stage-txn [db fuel-tracker context identity annotation raw-txn parsed-txn]
(stage db fuel-tracker context identity annotation raw-txn parsed-txn))
Expand Down Expand Up @@ -806,10 +813,6 @@
:namespace-codes iri/default-namespace-codes
:schema (vocab/base-schema)}))

(defn get-max-ns-code
[ns-codes]
(->> ns-codes keys (apply max)))

(defn load-novelty
[conn indexed-db index-t commit-jsonld]
(go-try
Expand Down
53 changes: 33 additions & 20 deletions src/clj/fluree/db/json_ld/branch.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
[commit-map]
(-> commit-map commit-data/->json-ld json-ld/expand))

(defn load-db
[conn alias branch commit]
(let [commit-jsonld (commit-map->commit-jsonld commit)]
(async-db/load conn alias branch commit-jsonld)))

(defn update-index
[{current-commit :commit, :as current-state}
{:keys [conn alias branch], indexed-commit :commit, :as indexed-db}]
Expand All @@ -47,10 +52,9 @@
current-state)
(if (older-commit? current-commit indexed-commit)
(if (newer-index? indexed-commit current-commit)
(let [latest-index (:index indexed-commit)
latest-commit (assoc current-commit :index latest-index)
latest-commit-jsonld (commit-map->commit-jsonld latest-commit)
latest-db (async-db/load conn alias branch latest-commit-jsonld)]
(let [latest-index (:index indexed-commit)
latest-commit (assoc current-commit :index latest-index)
latest-db (load-db conn alias branch latest-commit)]
(assoc current-state
:commit latest-commit
:current-db latest-db))
Expand All @@ -61,21 +65,31 @@
(commit-data/t current-commit))
current-state))))

(defn reload-with-index
[{:keys [commit] :as _db} conn alias branch index]
(let [indexed-commit (assoc commit :index index)]
(load-db conn alias branch indexed-commit)))

(defn use-latest-index
[{db-commit :commit, :as db} idx-commit conn alias branch]
(if (newer-index? idx-commit db-commit)
(let [latest-index (:index idx-commit)]
(reload-with-index db conn alias branch latest-index))
db))

(defn index-queue
[branch-state]
[conn alias branch branch-state]
(let [buf (async/sliding-buffer 1)
queue (async/chan buf)]
(go-loop []
(go-loop [last-index-commit nil]
(when-let [{:keys [db index-files-ch]} (<! queue)]
(try*
(when-let [indexed-db (<? (indexer/index db index-files-ch))]
(swap! branch-state update-index indexed-db))
(catch* e
(log/error e "Error updating index"))
(finally
(when index-files-ch
(async/close! index-files-ch))))
(recur)))
(let [db* (use-latest-index db last-index-commit conn alias branch)]
(if-let [indexed-db (try* (<? (indexer/index db* index-files-ch))
(catch* e
(log/error e "Error updating index")))]
(do (swap! branch-state update-index indexed-db)
(recur (:commit indexed-db)))
(recur last-index-commit)))))
queue))

(defn enqueue-index!
Expand All @@ -89,7 +103,7 @@
commit-map (commit-data/jsonld->clj commit-jsonld)
state (atom {:commit commit-map
:current-db initial-db})
idx-q (index-queue state)]
idx-q (index-queue conn ledger-alias branch-name state)]
{:name branch-name
:conn conn
:alias ledger-alias
Expand All @@ -110,10 +124,9 @@
{:keys [conn alias branch], new-commit :commit, :as new-db}]
(if (next-commit? current-commit new-commit)
(if (newer-index? current-commit new-commit)
(let [latest-index (:index current-commit)
latest-commit (assoc new-commit :index latest-index)
latest-commit-jsonld (commit-map->commit-jsonld latest-commit)
latest-db (async-db/load conn alias branch latest-commit-jsonld)]
(let [latest-index (:index current-commit)
latest-commit (assoc new-commit :index latest-index)
latest-db (load-db conn alias branch latest-commit)]
(assoc current-state
:commit latest-commit
:current-db latest-db))
Expand Down
23 changes: 11 additions & 12 deletions src/clj/fluree/db/json_ld/shacl.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -1102,16 +1102,15 @@
`modified-subjects` is a sequence of s-flakes of modified subjects."
[shape-db data-db modified-subjects context]
(go-try
(doseq [s-flakes modified-subjects]
(doseq [shape-sid (<? (all-node-shape-ids shape-db))]
(let [subject (-> s-flakes first flake/s)
shape (<? (build-shape shape-db shape-sid))
v-ctx {:display (make-display data-db context)
:context context
:shape-db shape-db
:data-db data-db}]
;; only enforce activated shapes
(when (not (get shape const/sh_deactivated))
(let [results (<? (validate-node-shape v-ctx shape s-flakes))]
(when results
(when-let [node-shape-sids (seq (<? (all-node-shape-ids shape-db)))]
(doseq [s-flakes modified-subjects]
(doseq [shape-sid node-shape-sids]
(let [shape (<? (build-shape shape-db shape-sid))
v-ctx {:display (make-display data-db context)
:context context
:shape-db shape-db
:data-db data-db}]
;; only enforce activated shapes
(when (not (get shape const/sh_deactivated))
(when-let [results (<? (validate-node-shape v-ctx shape s-flakes))]
(throw-shacl-violation context results)))))))))
Loading

0 comments on commit c938ac6

Please sign in to comment.