Permalink
Browse files

fixed one race condition in distributed triple spaces

  • Loading branch information...
1 parent 0c7ca21 commit 3770ac06765924622cd8a90f196ee697a177a262 @antoniogarrote committed Jun 5, 2010
@@ -44,17 +44,52 @@
(defn process-in-blocking-op
"Process a response from the triple space to a previously requested RDB operation"
([name should-deliver rabbit-conn options prom]
- (do (log :info "about to block...")
- (let [read (rabbit/consume-n-messages rabbit-conn name (str "queue-client-" (:client-id options)) 1)
- result (-> (java.io.BufferedReader. (java.io.StringReader. (first read)))
- (read-ts-response)
- (read-token-separator)
- (read-success)
- (read-token-separator)
- (return-rdf-stanzas))]
- (when should-deliver (deliver-notify rabbit-conn name "in" (pack-stanzas result)))
- (deliver prom (map #(with-model (defmodel) (model-to-triples (document-to-model (java.io.ByteArrayInputStream. (.getBytes %1)) :xml))) result))))))
+ (do ;(log :info (str "about to block queue : " (:cliend-id options) " and options: " options))
+ (let [read (rabbit/consume-n-messages rabbit-conn name (str "queue-client-" (:client-id options)) 1)
+ result (-> (java.io.BufferedReader. (java.io.StringReader. (first read)))
+ (read-ts-response)
+ (read-token-separator)
+ (read-success)
+ (read-token-separator)
+ (return-rdf-stanzas))]
+ (when should-deliver (deliver-notify rabbit-conn name "in" (pack-stanzas result)))
+ (deliver prom (map #(with-model (defmodel) (model-to-triples (document-to-model (java.io.ByteArrayInputStream. (.getBytes %1)) :xml))) result))))))
+(defn remove-and-notify
+ "Removes triples from the model and notify all the queues that the triples have been extracted"
+ ([model triples-to-remove rabbit-conn name]
+ (let [flattened-triples (flatten-1 triples-to-remove)]
+ ;; deleting read triples
+ (with-model model (model-remove-triples flattened-triples))
+ ;; delivering notifications
+ (let [w (java.io.StringWriter.)
+ triples (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "<ts:tokensep/>") w))
+ w triples-to-remove)]
+ (.write w "</ts:response>")
+ (deliver-notify rabbit-conn name "in" (.toString w)))
+ ;; returning triple sets
+ triples-to-remove)))
+
+(defn results-to-rdf-response
+ ([results]
+ (let [w (java.io.StringWriter.)]
+ (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "<ts:tokensep/>") w))
+ w results)
+ (response (.toString w)))))
+
+(defn results-to-rdf-stanzas
+ ([results]
+ (let [w (java.io.StringWriter.)]
+ (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "<ts:tokensep/>") w))
+ w results)
+ (.write w "</ts:response>")
+ (.toString w))))
;;
;; RemoteTripleSpace
;;
@@ -90,13 +125,7 @@
;; deleting read triples
(with-model model (model-remove-triples flattened-triples))
;; delivering notifications
- (let [w (java.io.StringWriter.)
- triples (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
- (output-string m w :xml)
- (.write w "<ts:tokensep/>") w))
- w triples-to-remove)]
- (.write w "</ts:response>")
- (deliver-notify rabbit-conn name "in" (.toString w)))
+ (deliver-notify rabbit-conn name "in" (results-to-rdf-stanzas triples-to-remove))
;; returning triple sets
triples-to-remove)))))
@@ -112,32 +141,28 @@
;; no triples? block and wait for response
:should-block
;; triples? delete and notify
- (let [flattened-triples (flatten-1 triples-to-remove)]
- ;; deleting read triples
- (model-critical-write model (with-model model (model-remove-triples flattened-triples)))
- ;; delivering notifications
- (let [w (java.io.StringWriter.)
- triples (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
- (output-string m w :xml)
- (.write w "<ts:tokensep/>") w))
- w triples-to-remove)]
- (.write w "</ts:response>")
- (deliver-notify rabbit-conn name "in" (.toString w)))
- ;; returning triple sets
- triples-to-remove))))]
+ (remove-and-notify model triples-to-remove rabbit-conn name))))]
(if (= :should-block res-inb)
(let [prom (promise)]
(log :info "storing remote state in queue for inb")
(plaza.triple-spaces.multi-remote-server.auxiliary/store-blocking-rd queue [(query-to-string query) :inb (:client-id options)])
(process-in-blocking-op name true rabbit-conn options prom)
- @prom)
+ ; we try to read again to prevent possible race
+ ; condition with writer process writing
+ ; while we were registering the queue
+ (let [triples-to-remove-2 (query-triples model query)]
+ (if (empty? triples-to-remove-2)
+ @prom
+ (model-critical-write model (remove-and-notify model triples-to-remove-2 rabbit-conn name)))))
+ ; we have read the right result, we don't need to block and wait
res-inb)))
-
;; out operation
(out [this triples]
(model-critical-write model
+ ;; Adding the triples
(with-model model (model-add-triples triples))
+ ;; Processing pending queues for inb or rdb operations
(redis/with-server queue
(let [ks (redis/keys (queue-key-pattern))
vs (map #(fmt-in %1) (if (empty? ks) [] (apply redis/mget ks)))
@@ -156,26 +181,14 @@
(recur (rest queues)
to-delete
keys-to-delete)
- (let [w (java.io.StringWriter.)
- respo (response
- (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
- (output-string m w :xml)
- (.write w "<ts:tokensep/>") w))
- w results))]
- (.write w "</ts:response>")
+ (let [respo (results-to-rdf-response results)]
(log :info (str "*** queue to blocked client: \r\n" respo))
(rabbit/publish rabbit-conn name (str "exchange-" name) client-id respo)
(recur (rest queues)
(if (= kind-op :inb) (conj to-delete (flatten-1 results)) to-delete)
(conj keys-to-delete key))))))))
- ;; delivering notifications
- (let [w (java.io.StringWriter.)
- triples (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
- (output-string m w :xml)
- (.write w "<ts:tokensep/>") w))
- w [triples])]
- (.write w "</ts:response>")
- (deliver-notify rabbit-conn name "out" (.toString w)))
+ ;; delivering notifications for notify op. readers
+ (deliver-notify rabbit-conn name "out" (results-to-rdf-stanzas [triples]))
triples))))
@@ -184,8 +197,7 @@
(swap [this pattern triples filters]
(model-critical-write model
- (let [w (java.io.StringWriter.)
- triples-to-remove (query-triples model (gen-query pattern filters))
+ (let [triples-to-remove (query-triples model (gen-query pattern filters))
flattened-triples (if (empty? triples-to-remove) [] (flatten-1 triples-to-remove))]
(when-not (empty? triples-to-remove)
(with-model model
@@ -208,18 +220,11 @@
(if (empty? results)
(recur (rest queues)
keys-to-delete)
- (let [w (java.io.StringWriter.)
- respo (response
- (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
- (output-string m w :xml)
- (.write w "<ts:tokensep/>") w))
- w results))]
- (.write w "</ts:response>")
- (when (= kind-op :inb)
- (with-model model (model-remove-triples (flatten-1 results))))
- (rabbit/publish rabbit-conn name (str "exchange-" name) client-id respo)
- (recur (rest queues)
- (conj keys-to-delete key)))))))))))
+ (do (when (= kind-op :inb)
+ (with-model model (model-remove-triples (flatten-1 results))))
+ (rabbit/publish rabbit-conn name (str "exchange-" name) client-id (results-to-rdf-response results))
+ (recur (rest queues)
+ (conj keys-to-delete key)))))))))))
triples-to-remove)))
@@ -150,7 +150,7 @@
(clean-ts (ts :ts))
(let [prom (promise)]
(.start (Thread. (fn [] (let [res (inb (ts :ts) [[?s ?p ?o]])] (deliver prom [res (ts :ts)])))))
- (breathe)
+; (breathe)
(out (ts :ts) [[:a :b :c]])
(let [[res tsp] @prom]
(is (= (resource-id (first (ffirst res))) "http://plaza.org/ontologies/a"))

0 comments on commit 3770ac0

Please sign in to comment.