Skip to content

Commit

Permalink
Merge pull request #647 from fluree/feature/federated-time-travel-sin…
Browse files Browse the repository at this point in the history
…gle-t
  • Loading branch information
mpoffald authored Nov 6, 2023
2 parents b3e4cea + 18f7036 commit 0065b71
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 20 deletions.
65 changes: 45 additions & 20 deletions src/fluree/db/api/query.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -171,36 +171,61 @@
:fql (query-fql db query)
:sparql (query-sparql db query)))

(defn contextualize-ledger-400-error
[info-str e]
(let [e-data (ex-data e)]
(if (= 400
(:status e-data))
(ex-info
(str info-str
(ex-message e))
e-data
e)
e)))

(defn load-alias
[conn alias t opts]
(go-try
(let [address (<? (nameservice/primary-address conn alias nil))
ledger (<? (jld-ledger/load conn address))
db (ledger-proto/-db ledger)]
(<? (restrict-db db t opts)))))
(try
(let [address (<? (nameservice/primary-address conn alias nil))
ledger (<? (jld-ledger/load conn address))
db (ledger-proto/-db ledger)]
(<? (restrict-db db t opts)))
(catch Exception e
(throw (contextualize-ledger-400-error
(str "Error loading ledger " alias ": ")
e))))))

(defn load-aliases
[conn aliases opts]
(go-try
(loop [[alias & r] aliases
db-map {}]
(if alias
;; TODO: allow restricting federated dataset components individually by t
(let [db (<? (load-alias conn alias nil opts))
db-map* (assoc db-map alias db)]
(recur r db-map*))
db-map))))
[conn aliases global-t opts]
(do (when (some? global-t)
(try (util/str->epoch-ms global-t)
(catch Exception e
(throw
(contextualize-ledger-400-error
(str "Error in federated query: top-level `t` values "
"must be iso-8601 wall-clock times. ")
e)))))
(go-try
(loop [[alias & r] aliases
db-map {}]
(if alias
;; TODO: allow restricting federated dataset components individually by t
(let [db (<? (load-alias conn alias global-t opts))
db-map* (assoc db-map alias db)]
(recur r db-map*))
db-map)))))

(defn load-dataset
[conn defaults named opts]
[conn defaults named global-t opts]
(go-try
(if (and (= (count defaults) 1)
(empty? named))
(let [alias (first defaults)]
(<? (load-alias conn alias nil opts))) ; return an unwrapped db if the data set
; consists of one ledger
(<? (load-alias conn alias global-t opts))) ; return an unwrapped db if the data set
; consists of one ledger
(let [all-aliases (->> defaults (concat named) distinct)
db-map (<? (load-aliases conn all-aliases opts))
db-map (<? (load-aliases conn all-aliases global-t opts))
default-coll (-> db-map
(select-keys defaults)
vals)
Expand All @@ -212,13 +237,13 @@
(go-try
(let [{query :subject, did :did} (or (<? (cred/verify query))
{:subject query})
{:keys [opts] :as query*} (update query :opts sanitize-query-options did)
{:keys [t opts] :as query*} (update query :opts sanitize-query-options did)

default-aliases (some-> query* :from util/sequential)
named-aliases (some-> query* :from-named util/sequential)]
(if (or (seq default-aliases)
(seq named-aliases))
(let [ds (<? (load-dataset conn default-aliases named-aliases opts))
(let [ds (<? (load-dataset conn default-aliases named-aliases t opts))
query** (update query* :opts dissoc :meta :max-fuel ::util/track-fuel?)
max-fuel (:max-fuel opts)
default-ctx (conn-proto/-default-context conn)
Expand Down
137 changes: 137 additions & 0 deletions test/fluree/db/query/time_travel_test.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns fluree.db.query.time-travel-test
(:require [clojure.test :refer [deftest is testing]]
[clojure.string :as str]
[fluree.db.json-ld.api :as fluree]
[fluree.db.test-utils :as test-utils]
[fluree.db.util.core :as util]))
Expand Down Expand Up @@ -89,3 +90,139 @@
(is (= 4 (count all-movies)))
(is (util/exception? too-early))
(is (re-matches #"There is no data as of .+" (ex-message too-early))))))

(deftest ^:integration query-connection-time-travel
(testing "query-connection queries with time travel"
(let [t1 "2023-11-04T00:00:00Z"
query-time "2023-11-05T00:00:00Z"
t2 "2023-11-06T00:00:00Z"
conn @(fluree/connect {:method :memory
:defaults
{:context {"id" "@id",
"type" "@type",
"ex" "http://example.org/",
"f" "https://ns.flur.ee/ledger#",
"rdf" "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs" "http://www.w3.org/2000/01/rdf-schema#",
"schema" "http://schema.org/",
"xsd" "http://www.w3.org/2001/XMLSchema#"}
:context-type :string}})

ledger1 (with-redefs [util/current-time-iso (fn [] t1)]
@(fluree/create-with-txn conn
{"f:ledger" "test/time1"
"@graph" [{"@id" "ex:time-test"
"@type" "ex:foo"
"ex:time" 1}]}
{:context-type :string}))
ledger2 (with-redefs [util/current-time-iso (fn [] t1)]
@(fluree/create-with-txn conn
{"f:ledger" "test/time2"
"@graph" [{"@id" "ex:time-test"
"ex:p1" "value1"}
{"@id" "ex:foo"
"ex:p2" "t1"}]}
{:context-type :string}))
_ (with-redefs [util/current-time-iso (fn [] t2)]
@(fluree/transact! conn {"f:ledger" "test/time1"
"@graph" [{"@id" "ex:time-test"
"ex:time" 2}]}
{:context-type :string}))
_ (with-redefs [util/current-time-iso (fn [] t2)]
@(fluree/transact! conn
{"f:ledger" "test/time2"
"@graph" [{"@id" "ex:time-test"
"ex:p1" "value2"}
{"@id" "ex:foo"
"ex:p2" "t2"}]}
{:context-type :string}))]
(testing "Single ledger"
(let [q '{:from "test/time1"
:select {"ex:time-test" ["*"]}
:t 1}]
(is (= [{"id" "ex:time-test"
"type" "ex:foo"
"ex:time" 1}]
@(fluree/query-connection conn q))
"should return only results for `t` of `1`"))
(let [q {:from "test/time1"
:select {"ex:time-test" ["*"]}
:t query-time}]
(is (= [{"id" "ex:time-test"
"type" "ex:foo"
"ex:time" 1}]
@(fluree/query-connection conn q))
"should return only results for `t` of `1`"))
(let [q '{:from "test/time1"
:select {"ex:time-test" ["*"]}
:t "1988-05-30T12:40:44.823Z"}
invalid-time (try @(fluree/query-connection conn q)
(catch Exception e e))]

(is (util/exception? invalid-time))
(is (= 400 (-> invalid-time ex-data :status)))
(is (str/includes? (ex-message invalid-time)
"There is no data as of"))
(is (str/includes? (ex-message invalid-time)
"test/time1")
"message should report which ledger has an error")))
(testing "Across multiple ledgers"
(let [q {:from ["test/time1" "test/time2"]
:select '[?p1 ?time]
:where '{"@id" "ex:time-test"
"ex:p1" ?p1
"ex:time" ?time}
:t query-time}]
(is (= [["value1" 1]]
@(fluree/query-connection conn q))
"should return results for first commit from both ledgers"))
(testing "from-named"
(let [q {:from-named ["test/time1" "test/time2"]
:select '[?p2 ?time]
:where '[[:graph "test/time1" {"@id" "ex:time-test"
"ex:time" ?time}]
[:graph "test/time2" {"@id" "ex:foo"
"ex:p2" ?p2}]]
:t query-time}]
(is (= [["t1" 1]]
@(fluree/query-connection conn q))
"should be results as of `t` = 1 for both ledgers")))
(testing "Not all ledgers have data for given `t`"
(with-redefs [util/current-time-iso (fn [] "1970-01-01T00:12:00.00000Z")]
(let [ledger-valid @(fluree/create-with-txn conn
{"f:ledger" "test/time-before"
"@graph" [{"@id" "ex:time-test"
"ex:p1" "value"}]}
{:context-type :string})]
(let [q '{:from ["test/time1" "test/time-before"]
:select [?p1 ?time]
:where {"@id" "ex:time-test"
"ex:p1" ?p1
"ex:time" ?time}
;;`t` is valid for "ledger-valid",
;;but not "test/time1"
:t "1988-05-30T12:40:44.823Z"}
invalid-time (try @(fluree/query-connection conn q)
(catch Exception e e))]
(is (util/exception? invalid-time))
(is (= 400 (-> invalid-time ex-data :status)))
(is (str/includes? (ex-message invalid-time)
"There is no data as of"))
(is (str/includes? (ex-message invalid-time)
"test/time1")
"message should report which ledger has an error")))))
(testing "Federated queries must use wall-clock time as global `t` value"
(with-redefs [util/current-time-iso (fn [] "1970-01-01T00:12:00.00000Z")]
(let [q '{:from ["test/time1" "test/time-before"]
:select [?p1 ?time]
:where {"@id" "ex:time-test"
"ex:p1" ?p1
"ex:time" ?time}
:t 1}
invalid-time (try @(fluree/query-connection conn q)
(catch Exception e e))]
(is (util/exception? invalid-time))
(is (= 400 (-> invalid-time ex-data :status)))
(is (str/includes? (ex-message invalid-time)
"Error in federated query: top-level `t` value")
"error message should indicate invalid t value type"))))))))

0 comments on commit 0065b71

Please sign in to comment.