Browse files

WIP multi-remote vs distributed TS

  • Loading branch information...
1 parent c15331b commit 9686a2c52239583b72775a6c0443c24577840065 @antoniogarrote committed Jun 2, 2010
View
265 src/plaza/triple_spaces/distributed_server.clj
@@ -5,46 +5,251 @@
(ns plaza.triple-spaces.distributed-server
(:use [saturnine]
[saturnine.handler]
+ [plaza utils]
[plaza.triple-spaces.server.rabbit :as rabbit]
[clojure.contrib.logging :only [log]]
(plaza.triple-spaces core server)
- (plaza.triple-spaces.distributed-server auxiliary)
+ (plaza.triple-spaces.server auxiliary)
+ (plaza.triple-spaces.multi-remote-server auxiliary)
(plaza.rdf core sparql)
(plaza.rdf.implementations sesame))
(:import [java.util UUID]
[java.net URL]))
+(defn gen-query
+ ([pattern-or-vector filters-pre]
+ (let [pattern-pre (if (:pattern (meta pattern-or-vector)) pattern-or-vector (make-pattern pattern-or-vector))
+ vars-pre (pattern-collect-vars pattern-pre)
+ vars (if-not (empty? vars-pre) vars-pre [:p])
+ [pattern filters] (if-not (empty? vars-pre)
+ [pattern-pre filters-pre]
+ (let [s (nth (first pattern-pre) 0)
+ p (nth (first pattern-pre) 1)
+ o (nth (first pattern-pre) 2)]
+ [(cons [s ?p o] (rest pattern-pre))
+ (cons (f :sameTerm ?p p) filters-pre)]))
+ query (if (empty? filters)
+ (defquery
+ (query-set-pattern pattern)
+ (query-set-type :select)
+ (query-set-vars vars))
+ (defquery
+ (query-set-pattern pattern)
+ (query-set-type :select)
+ (query-set-vars vars)
+ (query-set-filters filters)))]
+ query)))
+
+(defn process-in-blocking-op
+ "Process a response from the triple space to a previously requested RDB operation"
+ ([name should-deliver rabbit-conn options prom]
+ (do (log :info "about to block...")
+ (let [read (rabbit/consume-n-messages rabbit-conn name (str "queue-client-" (:client-id options)) 1)
+ result (-> (java.io.BufferedReader. (java.io.StringReader. (first read)))
+ (read-ts-response)
+ (read-token-separator)
+ (read-success)
+ (read-token-separator)
+ (return-rdf-stanzas))]
+ (when should-deliver (deliver-notify rabbit-conn name "in" (pack-stanzas result)))
+ (deliver prom (map #(with-model (defmodel) (model-to-triples (document-to-model (java.io.ByteArrayInputStream. (.getBytes %1)) :xml))) result))))))
;;
-;; DistributedTripleSpace
+;; RemoteTripleSpace
;;
+(deftype DistributedTripleSpace [name model queue rabbit-conn options] TripleSpace
+
+ ;; rd operation
+ (rd [this pattern] (rd this pattern []))
+
+ (rd [this pattern filters] (query-triples model (gen-query pattern filters)))
+
+ ;; rdb operation
+ (rdb [this pattern] (rdb this pattern []))
+
+ (rdb [this pattern filters]
+ (let [query (gen-query pattern filters)
+ ts (query-triples model query)]
+ (if (empty? ts)
+ (let [prom (promise)]
+ (log :info "storing remote state in queue for rdb")
+ (plaza.triple-spaces.multi-remote-server.auxiliary/store-blocking-rd queue [(query-to-string query) :rdb (:client-id options)])
+ (process-in-blocking-op name false rabbit-conn options prom)
+ @prom) ts)))
+
+
+ ;; in operation
+ (in [this pattern] (in this pattern []))
+
+ (in [this pattern filters]
+ (model-critical-write model
+ (let [triples-to-remove (query-triples model (gen-query pattern filters))]
+ (if (empty? triples-to-remove) triples-to-remove
+ (let [flattened-triples (flatten-1 triples-to-remove)]
+ ;; deleting read triples
+ (with-model model (model-remove-triples flattened-triples))
+ ;; delivering notifications
+ (let [w (java.io.StringWriter.)
+ triples (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "<ts:tokensep/>") w))
+ w triples-to-remove)]
+ (deliver-notify rabbit-conn name "in" (.toString w)))
+ ;; returning triple sets
+ triples-to-remove)))))
+
+
+ ;; inb operation
+ (inb [this pattern] (inb this pattern []))
+
+ (inb [this pattern filters]
+ (model-critical-write model
+ (let [query (gen-query pattern filters)
+ triples-to-remove (query-triples model query)]
+ (if (empty? triples-to-remove)
+ ;; no triples? block and wait for response
+ (let [prom (promise)]
+ (log :info "storing remote state in queue for inb")
+ (plaza.triple-spaces.multi-remote-server.auxiliary/store-blocking-rd queue [(query-to-string query) :inb (:client-id options)])
+ (process-in-blocking-op name true rabbit-conn options prom)
+ @prom)
+ ;; triples? delete and notify
+ (let [flattened-triples (flatten-1 triples-to-remove)]
+ ;; deleting read triples
+ (with-model model (model-remove-triples flattened-triples))
+ ;; delivering notifications
+ (let [w (java.io.StringWriter.)
+ triples (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "<ts:tokensep/>") w))
+ w triples-to-remove)]
+ (deliver-notify rabbit-conn name "in" (.toString w)))
+ ;; returning triple sets
+ triples-to-remove)))))
+
+
+ ;; out operation
+ (out [this triples]
+ (model-critical-write model
+ (with-model model (model-add-triples triples))
+ (redis/with-server queue
+ (let [ks (redis/keys (queue-key-pattern))
+ vs (map #(fmt-in %1) (if (empty? ks) [] (apply redis/mget ks)))
+ queues-red (reduce (fn [acum [k v]] (if (= 0 (redis/del k)) acum (conj acum [k v]))) [] (zipmap ks vs))]
+ (redis/atomically
+ (loop [queues queues-red
+ to-delete []
+ keys-to-delete []]
+ (if (empty? queues)
+ (do (with-model model (model-remove-triples to-delete))
+ (doseq [k keys-to-delete] (redis/del k)))
+ (let [[key [pattern kind-op client-id]] (first queues)
+ results (query-triples model pattern)]
+ (log :info (str "*** checking queued " kind-op " -> " pattern " ? " (empty? results)))
+ (if (empty? results)
+ (recur (rest queues)
+ to-delete
+ keys-to-delete)
+ (let [w (java.io.StringWriter.)
+ respo (response
+ (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "</ts:response>") w))
+ w results))]
+ (log :info (str "*** queue to blocked client: \r\n" respo))
+ (rabbit/publish rabbit-conn name (str "exchange-" name) client-id respo)
+ (recur (rest queues)
+ (if (= kind-op :inb) (conj to-delete (flatten-1 results)) to-delete)
+ (conj keys-to-delete key))))))))
+ ;; delivering notifications
+ (let [w (java.io.StringWriter.)
+ triples (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "</ts:response>") w))
+ w [triples])]
+ (deliver-notify rabbit-conn name "out" (.toString w)))
+ triples))))
+
+
+ ;; swap operation
+ (swap [this pattern triples] (swap this pattern triples []))
+
+ (swap [this pattern triples filters]
+ (model-critical-write model
+ (let [w (java.io.StringWriter.)
+ triples-to-remove (query-triples model (gen-query pattern filters))
+ flattened-triples (if (empty? triples-to-remove) [] (flatten-1 triples-to-remove))]
+ (when-not (empty? triples-to-remove)
+ (with-model model
+ (model-remove-triples flattened-triples)
+ (model-add-triples triples))
+
+ ;; Processing queues
+ (redis/with-server queue
+ (let [ks (redis/keys (queue-key-pattern))
+ vs (map #(fmt-in %1) (if (empty? ks) [] (apply redis/mget ks)))
+ queues-red (reduce (fn [acum [k v]] (if (= 0 (redis/del k)) acum (conj acum [k v]))) [] (zipmap ks vs))]
+ (redis/atomically
+ (loop [queues queues-red
+ keys-to-delete []]
+ (if (empty? queues)
+ (doseq [k keys-to-delete] (redis/del k))
+ (let [[key [pattern kind-op client-id]] (first queues)
+ results (query-triples model pattern)]
+ (log :info (str "checking queued " kind-op " -> " pattern " ? " (empty? results)))
+ (if (empty? results)
+ (recur (rest queues)
+ keys-to-delete)
+ (let [w (java.io.StringWriter.)
+ respo (response
+ (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "</ts:response>") w))
+ w results))]
+ (when (= kind-op :inb)
+ (with-model model (model-remove-triples (flatten-1 results))))
+ (rabbit/publish rabbit-conn name (str "exchange-" name) client-id respo)
+ (recur (rest queues)
+ (conj keys-to-delete key)))))))))))
+ triples-to-remove)))
+
+
+ ;; notify operation
+ (notify [this op pattern f] (notify this op pattern [] f))
+
+ (notify [this op pattern filters f]
+ (parse-notify-response name pattern filters f (plaza.utils/keyword-to-string op) rabbit-conn options))
+
+
+ ;; inspect
+ (inspect [this] :not-yet)
+
+ ;; clean
+ (clean [this] :ok))
+
+(defn make-distributed-triple-space
+ "Creates a new distributed triple space"
+ ([name model & options]
+ (let [uuid (.toString (UUID/randomUUID))
+ opt-map (-> (apply array-map options)
+ (assoc :routing-key uuid)
+ (assoc :queue (str "box-" uuid)))
+ rabbit-conn (rabbit/connect opt-map)]
+
+ ;; Creating a channel and declaring exchanges
+ (rabbit/make-channel rabbit-conn name)
+ (rabbit/declare-exchange rabbit-conn name (str "exchange-" name))
+ (rabbit/declare-exchange rabbit-conn name (str "exchange-out-" name))
+ (rabbit/declare-exchange rabbit-conn name (str "exchange-in-" name))
+
+ ;; Creating queues and bindings
+ (rabbit/make-queue rabbit-conn name (str "queue-client-" uuid) (str "exchange-" name) uuid)
-(defhandler #^{:doc
- "The MessageParser handler for parsing incoming messages"}
- MessageParser [name model redis-conn rabbit-conn options]
- (connect [_] (println "Connecting"))
- (upstream [this msg] (let [parsed (plaza.triple-spaces.server.auxiliary/parse-message msg)]
- (log :info (str "*** parsed: " parsed "\r\n\r\n"))
- (log :info (str "*** " parsed " " name " " model " " redis-conn " " rabbit-conn " " options))
- (send-down (apply-operation-dist parsed name model redis-conn rabbit-conn options))))
- (error [this msg]
- (log :error msg)
- (plaza.triple-spaces.server.auxiliary/failure msg)))
-
-
-(defn start-distributed-triple-server
- "Starts a distributd triple server using rabbitmq, redis and a triple store as infrastructure"
- [name port model & options]
- (let [opt-map (apply array-map options)
- rabbit-conn (apply rabbit/connect options)
- redis-conn (build-redis-connection-hash opt-map)]
-
- ;; Declaring channels and exchanges for this triple space
- (make-channel rabbit-conn name)
- (declare-exchange rabbit-conn name (str "exchange-" name))
- (declare-exchange rabbit-conn name (str "exchange-out-" name))
- (declare-exchange rabbit-conn name (str "exchange-in-" name))
-
- ;; start the server
- (start-server port :string :print (MessageParser. name model redis-conn rabbit-conn opt-map))))
+ ;; startint the server
+ (let [ts (plaza.triple-spaces.distributed-server.DistributedTripleSpace. name
+ model
+ (build-redis-connection-hash opt-map)
+ rabbit-conn
+ (assoc opt-map :client-id uuid))]
+ (fn [] ts)))))
View
32 src/plaza/triple_spaces/distributed_server/redis.clj
@@ -1,32 +0,0 @@
-;; @author amitrahtore
-;; @from http://github.com/amitrathore/redis-clojure/blob/master/src/redis.clj
-;; @date 01.06.2010
-
-(ns plaza.triple-spaces.distributed-server.redis
- (:refer-clojure :exclude [get set type keys sort])
- (:use redis.internal))
-
-(defmacro atomically
- "Execute all redis commands in body atomically, ie. sandwiched in a
- MULTI/EXEC statement. If an exception is thrown the EXEC command
- will be terminated by a DISCARD, no operations will be performed and
- the exception will be rethrown."
- [& body]
- `(do
- (plaza.triple-spaces.distributed-server.redis/multi)
- (try
- (do
- ~@body
- (plaza.triple-spaces.distributed-server.redis/exec))
- (catch Exception e#
- (plaza.triple-spaces.distributed-server.redis/discard)
- (throw e#)))))
-;;
-;; Commands
-;;
-(defcommands
- ;; MULTI/EXEC/DISCARD
- (multi [] :inline)
- (exec [] :inline)
- (discard [] :inline)
-)
View
50 src/plaza/triple_spaces/multi_remote_server.clj
@@ -0,0 +1,50 @@
+;; @author Antonio Garrote
+;; @email antoniogarrote@gmail.com
+;; @date 28.05.2010
+
+(ns plaza.triple-spaces.multi-remote-server
+ (:use [saturnine]
+ [saturnine.handler]
+ [plaza.triple-spaces.server.rabbit :as rabbit]
+ [clojure.contrib.logging :only [log]]
+ (plaza.triple-spaces core server)
+ (plaza.triple-spaces.distributed-server auxiliary)
+ (plaza.rdf core sparql)
+ (plaza.rdf.implementations sesame))
+ (:import [java.util UUID]
+ [java.net URL]))
+
+
+
+;;
+;; MultiRemoteTripleSpace
+;;
+
+(defhandler #^{:doc
+ "The MessageParser handler for parsing incoming messages"}
+ MessageParser [name model redis-conn rabbit-conn options]
+ (connect [_] (println "Connecting"))
+ (upstream [this msg] (let [parsed (plaza.triple-spaces.server.auxiliary/parse-message msg)]
+ (log :info (str "*** parsed: " parsed "\r\n\r\n"))
+ (log :info (str "*** " parsed " " name " " model " " redis-conn " " rabbit-conn " " options))
+ (send-down (apply-operation-dist parsed name model redis-conn rabbit-conn options))))
+ (error [this msg]
+ (log :error msg)
+ (plaza.triple-spaces.server.auxiliary/failure msg)))
+
+
+(defn start-multi-remote-triple-server
+ "Starts a distributd triple server using rabbitmq, redis and a triple store as infrastructure"
+ [name port model & options]
+ (let [opt-map (apply array-map options)
+ rabbit-conn (apply rabbit/connect options)
+ redis-conn (build-redis-connection-hash opt-map)]
+
+ ;; Declaring channels and exchanges for this triple space
+ (make-channel rabbit-conn name)
+ (declare-exchange rabbit-conn name (str "exchange-" name))
+ (declare-exchange rabbit-conn name (str "exchange-out-" name))
+ (declare-exchange rabbit-conn name (str "exchange-in-" name))
+
+ ;; start the server
+ (start-server port :string :print (MessageParser. name model redis-conn rabbit-conn opt-map))))
View
202 src/plaza/triple_spaces/multi_remote_server/auxiliary.clj
@@ -0,0 +1,202 @@
+;; @author Antonio Garrote
+;; @email antoniogarrote@gmail.com
+;; @date 31.05.2010
+
+(ns plaza.triple-spaces.multi-remote-server.auxiliary
+ (:use (plaza.rdf core sparql)
+ (plaza utils)
+ (plaza.rdf.implementations jena)
+ [plaza.triple-spaces.server.rabbit :as rabbit]
+ [plaza.triple-spaces.server auxiliary]
+ [clojure.contrib.logging :only [log]])
+ (:require [clojure.contrib.string :as string]
+ [redis])
+ (:import [java.util UUID]
+ [com.rabbitmq.client
+ ConnectionParameters
+ Connection
+ Channel
+ AMQP
+ ConnectionFactory
+ Consumer
+ QueueingConsumer]))
+
+(defn build-redis-connection-hash
+ [opts-hash] {:host (:redis-host opts-hash) :port (:redis-port opts-hash) :db (:redis-db opts-hash)})
+
+(defmacro fmt-out
+ [& forms]
+ `(pr-str ~@forms))
+
+(defn fmt-in
+ [msg]
+ (read-string msg))
+
+(defn- apply-rd-operation
+ "Applies a TS rd operation over a model"
+ ([pattern model]
+ (let [w (java.io.StringWriter.)]
+ (response
+ (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "<ts:tokensep/>") w))
+ w (query-triples model pattern))))))
+
+(defn queue-gen-key
+ [[pattern type client-id]]
+ (str "blk:" client-id))
+
+(defn queue-key-pattern
+ [] "blk:*")
+
+(defn store-blocking-rd
+ ([redis [pattern type client-id]]
+ (redis/with-server redis (redis/set (queue-gen-key [pattern type client-id])
+ (fmt-out [pattern type client-id])))))
+
+
+(defn- apply-rdb-operation
+ "Applies a TS rd operation over a model"
+ ([pattern model queues client-id]
+ (let [w (java.io.StringWriter.)
+ results (query-triples model pattern)]
+ (if (empty? results)
+ (do (store-blocking-rd queues [pattern :rdb client-id])
+ (blocking-response))
+ (response
+ (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "</ts:response>") w))
+ w results))))))
+
+(defn- apply-out-operation
+ "Applies a TS rd operation over a model"
+ ([rdf-document model name rabbit-conn queues-ref]
+ (with-model model (document-to-model (java.io.ByteArrayInputStream. (.getBytes rdf-document)) :xml))
+ (model-critical-write model
+ (redis/with-server queues-ref
+ (let [ks (redis/keys (queue-key-pattern))
+ vs (map #(fmt-in %1) (if (empty? ks) [] (apply redis/mget ks)))
+ queues-red (reduce (fn [acum [k v]] (if (= 0 (redis/del k)) acum (conj acum [k v]))) [] (zipmap ks vs))]
+ (redis/atomically
+ (loop [queues queues-red
+ to-delete []
+ keys-to-delete []]
+ (if (empty? queues)
+ (do (with-model model (model-remove-triples to-delete))
+ (doseq [k keys-to-delete] (redis/del k)))
+ (let [[key [pattern kind-op client-id]] (first queues)
+ results (query-triples model pattern)]
+ (log :info (str "*** checking queued " kind-op " -> " pattern " ? " (empty? results)))
+ (if (empty? results)
+ (recur (rest queues)
+ to-delete
+ keys-to-delete)
+ (let [w (java.io.StringWriter.)
+ respo (response
+ (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "</ts:response>") w))
+ w results))]
+ (log :info (str "*** queue to blocked client: \r\n" respo))
+ (rabbit/publish rabbit-conn name (str "exchange-" name) client-id respo)
+ (recur (rest queues)
+ (if (= kind-op :inb) (conj to-delete (flatten-1 results)) to-delete)
+ (conj keys-to-delete key))))))))
+ (success))))))
+
+(defn- apply-in-operation
+ "Applies a TS in operation over a model"
+ ([pattern model name]
+ (model-critical-write model
+ (let [w (java.io.StringWriter.)
+ triples-to-remove (query-triples model pattern)
+ flattened-triples (flatten-1 triples-to-remove)]
+ ;; deleting read triples
+ (with-model model (model-remove-triples flattened-triples))
+ ;; returning triple sets
+ (let [triples (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "</ts:response>") w))
+ w triples-to-remove)]
+ (response triples))))))
+
+(defn- apply-inb-operation
+ "Applies a TS inb operation over a model"
+ ([pattern model name rabbit-conn queues client-id]
+ (model-critical-write model
+ (let [w (java.io.StringWriter.)
+ triples-to-remove (query-triples model pattern)]
+ (if (empty? triples-to-remove)
+ (dosync (log :info "*** Storing INB operation in the queue")
+ (store-blocking-rd queues [pattern :rdb client-id])
+ (blocking-response))
+ (do
+ ;; deleting read triples
+ (with-model model (model-remove-triples (flatten-1 triples-to-remove)))
+ ;; returning triple sets
+ (let [triples (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "</ts:response>") w))
+ w triples-to-remove)]
+ (response triples))))))))
+
+(defn apply-swap-operation-dist
+ "Applies a TS swap operation over a model"
+ ([pattern rdf-document model name rabbit-conn queues-ref]
+ (model-critical-write model
+ (let [w (java.io.StringWriter.)
+ triples-to-remove (query-triples model pattern)
+ flattened-triples (if (empty? triples-to-remove) [] (flatten-1 triples-to-remove))]
+ (when-not (empty? triples-to-remove)
+ (with-model model
+ (model-remove-triples flattened-triples)
+ (document-to-model (java.io.ByteArrayInputStream. (.getBytes rdf-document)) :xml))
+
+ ;; Processing queues
+ (redis/with-server queues-ref
+ (let [ks (redis/keys (queue-key-pattern))
+ vs (map #(fmt-in %1) (if (empty? ks) [] (apply redis/mget ks)))
+ queues-red (reduce (fn [acum [k v]] (if (= 0 (redis/del k)) acum (conj acum [k v]))) [] (zipmap ks vs))]
+ (redis/atomically
+ (loop [queues queues-red
+ keys-to-delete []]
+ (if (empty? queues)
+ (doseq [k keys-to-delete] (redis/del k))
+ (let [[key [pattern kind-op client-id]] (first queues)
+ results (query-triples model pattern)]
+ (log :info (str "checking queued " kind-op " -> " pattern " ? " (empty? results)))
+ (if (empty? results)
+ (recur (rest queues)
+ keys-to-delete)
+ (let [w (java.io.StringWriter.)
+ respo (response
+ (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "</ts:response>") w))
+ w results))]
+ (when (= kind-op :inb)
+ (with-model model (model-remove-triples (flatten-1 results))))
+ (rabbit/publish rabbit-conn name (str "exchange-" name) client-id respo)
+ (recur (rest queues)
+ (conj keys-to-delete key)))))))))))
+
+ ;; Sending the response back to the original client
+ (response
+ (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "</ts:response>") w))
+ w triples-to-remove))))))
+
+(defn apply-operation-dist
+ "Applies an operation over a Plaza model"
+ ([message name model queues rabbit-conn options]
+ (log :info "*** about to apply operation")
+ (condp = (:operation message)
+ "rd" (apply-rd-operation (:pattern message) model)
+ "rdb" (apply-rdb-operation (:pattern message) model queues (:client-id message))
+ "out" (apply-out-operation (:value message) model name rabbit-conn queues)
+ "in" (apply-in-operation (:pattern message) model name)
+ "inb" (apply-inb-operation (:pattern message) model name rabbit-conn queues (:client-id message))
+ "swap" (apply-swap-operation-dist (:pattern message) (:value message) model name rabbit-conn queues)
+ (throw (Exception. "Unsupported operation")))))
View
3 src/plaza/triple_spaces/server/rabbit.clj
@@ -32,7 +32,8 @@
"Connects to a RabbitMQ server.
Args: :username :password :host :port :virtual-host"
([& args]
- (let [{:keys [username password virtual-host port host]} (check-default-values (apply array-map args) *default-rabbit-parameters*)
+ (let [args-map (if (map? (first args)) (first args) (apply hash-map args))
+ {:keys [username password virtual-host port host]} (check-default-values args-map *default-rabbit-parameters*)
#^ConnectionParameters params (doto (new ConnectionParameters)
(.setUsername username)
(.setPassword password)
View
176 test/plaza/triple_spaces/distributed_server_test.clj
@@ -0,0 +1,176 @@
+(ns plaza.triple-spaces.distributed-server-test
+ (:use
+ [plaza.rdf core]
+ [plaza.triple-spaces distributed-server]
+ [plaza.triple-spaces server]
+ [plaza.triple-spaces core]
+ [plaza.rdf core sparql predicates]
+ [plaza.rdf.implementations jena] :reload-all)
+ (:use [clojure.test]))
+
+(init-jena-framework)
+
+(defn- clean-ts
+ ([ts] (in ts [[?s ?p ?o]])))
+
+(defn- breathe
+ ([] (Thread/sleep 2000)))
+
+(defn- build-mulgara
+ ([] (build-model :mulgara :rmi "rmi://localhost/server1")))
+
+(def *should-test* true)
+
+(when *should-test*
+
+ (deftest test-out-rd
+ (let [m (build-mulgara)]
+ (def-ts :ts (make-distributed-triple-space "test" m :redis-host "localhost" :redis-db "testdist" :redis-port 6379))
+ (clean-ts (ts :ts))
+ (out (ts :ts) [[:a :b :c]])
+ (let [res (rd (ts :ts) [[?s ?p ?o]])]
+ (is (= (resource-id (first (ffirst (rd (ts :ts) [[?s ?p ?o]])))) "http://plaza.org/ontologies/a"))
+ (is (= (resource-id (second (ffirst (rd (ts :ts) [[?s ?p ?o]])))) "http://plaza.org/ontologies/b"))
+ (is (= (resource-id (nth (ffirst (rd (ts :ts) [[?s ?p ?o]])) 2)) "http://plaza.org/ontologies/c")))
+ (clean (ts :ts))
+ (unregister-ts :ts)
+ (breathe)))
+
+ (deftest test-out-rd-2
+ (let [m (build-mulgara)]
+ (def-ts :ts (make-distributed-triple-space "test" m :redis-host "localhost" :redis-db "testdist" :redis-port 6379))
+ (clean-ts (ts :ts))
+ (out (ts :ts) [[:a :b :c]])
+ (out (ts :ts) [[:a :b :d]])
+ (let [res (rd (ts :ts) [[?s ?p ?o]])]
+ (is (= (resource-id (first (ffirst res))) "http://plaza.org/ontologies/a"))
+ (is (= (resource-id (second (ffirst res))) "http://plaza.org/ontologies/b"))
+ (is (= (resource-id (nth (ffirst res) 2)) "http://plaza.org/ontologies/d"))
+ (is (= (resource-id (first (first (second res)))) "http://plaza.org/ontologies/a"))
+ (is (= (resource-id (second (first (second res)))) "http://plaza.org/ontologies/b"))
+ (is (= (resource-id (nth (first (second res)) 2)) "http://plaza.org/ontologies/c")))
+ (unregister-ts :ts))
+ (breathe))
+
+ (deftest test-out-in-rd
+ (let [m (build-mulgara)]
+ (def-ts :ts (make-distributed-triple-space "test" m :redis-host "localhost" :redis-db "testdist" :redis-port 6379))
+ (clean-ts (ts :ts))
+ (out (ts :ts) [[:a :b :c]])
+ (let [res (in (ts :ts) [[?s ?p ?o]])
+ resb (rd (ts :ts) [[?s ?p ?o]])]
+ (is (= (resource-id (first (ffirst res))) "http://plaza.org/ontologies/a"))
+ (is (= (resource-id (second (ffirst res))) "http://plaza.org/ontologies/b"))
+ (is (= (resource-id (nth (ffirst res) 2)) "http://plaza.org/ontologies/c"))
+ (is (empty? resb)))
+ (clean (ts :ts))
+ (unregister-ts :ts)
+ (breathe)))
+
+ (deftest test-swap
+ (let [m (build-mulgara)]
+ (def-ts :ts (make-distributed-triple-space "test" m :redis-host "localhost" :redis-db "testdist" :redis-port 6379))
+ (clean-ts (ts :ts))
+ (out (ts :ts) [[:a :b :c]])
+ (let [res (swap (ts :ts) [[?s ?p :c]] [[:e :f :g]])
+ resb (rd (ts :ts) [[?s ?p ?o]])]
+ (is (= (resource-id (first (ffirst res))) "http://plaza.org/ontologies/a"))
+ (is (= (resource-id (second (ffirst res))) "http://plaza.org/ontologies/b"))
+ (is (= (resource-id (nth (ffirst res) 2)) "http://plaza.org/ontologies/c"))
+ (is (= (resource-id (first (ffirst resb))) "http://plaza.org/ontologies/e"))
+ (is (= (resource-id (second (ffirst resb))) "http://plaza.org/ontologies/f"))
+ (is (= (resource-id (nth (ffirst resb) 2)) "http://plaza.org/ontologies/g")))
+ (clean (ts :ts))
+ (unregister-ts :ts)
+ (breathe)))
+
+ (deftest test-out-rdb
+ (let [m (build-mulgara)
+ m2 (build-mulgara)]
+ (def-ts :ts (make-distributed-triple-space "test" m :redis-host "localhost" :redis-db "testdist" :redis-port 6379))
+ (def-ts :ts2 (make-distributed-triple-space "test" m2 :redis-host "localhost" :redis-db "testdist" :redis-port 6379))
+ (clean-ts (ts :ts))
+ (let [prom (promise)]
+ (.start (Thread. (fn [] (let [res (rdb (ts :ts) [[?s ?p ?o]])] (deliver prom [res (ts :ts)])))))
+ (breathe)
+ (out (ts :ts2) [[:a :b :c]])
+ (let [[res tsp] @prom]
+ (is (= (resource-id (first (ffirst res))) "http://plaza.org/ontologies/a"))
+ (is (= (resource-id (second (ffirst res))) "http://plaza.org/ontologies/b"))
+ (is (= (resource-id (nth (ffirst res) 2)) "http://plaza.org/ontologies/c"))
+ (clean (ts :ts))
+ (unregister-ts :ts)
+ (unregister-ts :ts2)
+ (breathe)))))
+
+ (deftest test-out-inb
+ (let [m (build-mulgara)
+ m2 (build-mulgara)]
+ (def-ts :ts (make-distributed-triple-space "test" m :redis-host "localhost" :redis-db "testdist" :redis-port 6379))
+ (def-ts :ts2 (make-distributed-triple-space "test" m2 :redis-host "localhost" :redis-db "testdist" :redis-port 6379))
+ (clean-ts (ts :ts))
+ (let [prom (promise)]
+ (.start (Thread. (fn [] (let [res (inb (ts :ts) [[?s ?p ?o]])] (deliver prom [res (ts :ts)])))))
+ (out (ts :ts) [[:a :b :c]])
+ (let [[res tsp] @prom]
+ (is (= (resource-id (first (ffirst res))) "http://plaza.org/ontologies/a"))
+ (is (= (resource-id (second (ffirst res))) "http://plaza.org/ontologies/b"))
+ (is (= (resource-id (nth (ffirst res) 2)) "http://plaza.org/ontologies/c"))
+ (let [res (rd (ts :ts) [[?s ?p ?o]])]
+ (is (empty? res)))
+ (clean (ts :ts))
+ (unregister-ts :ts)
+ (unregister-ts :ts2)
+ (breathe)))))
+
+(deftest test-notify-out
+ (let [m (build-mulgara)
+ m2 (build-mulgara)]
+ (def-ts :ts (make-distributed-triple-space "test" m2 :redis-host "localhost" :redis-db "testdist" :redis-port 6379))
+ (def-ts :ts2 (make-distributed-triple-space "test" m2 :redis-host "localhost" :redis-db "testdist" :redis-port 6379))
+ (clean-ts (ts :ts))
+ (let [q (java.util.concurrent.LinkedBlockingQueue.)]
+ (.start (Thread. #(notify (ts :ts2) :out [[?s :e ?o]] (fn [triples] (.put q triples)))))
+
+ (out (ts :ts) [[:a :b :c]])
+ (out (ts :ts) [[:a :e :c]])
+ (out (ts :ts) [[:a :b :f]])
+ (out (ts :ts) [[:a :e :f]])
+
+ (is (not (nil? (.take q))))
+ (is (not (nil? (.take q))))
+
+ (let [res (.poll q 1 java.util.concurrent.TimeUnit/SECONDS)]
+ (is (empty? res))))
+ (clean (ts :ts))
+ (unregister-ts :ts)
+ (unregister-ts :ts2)
+ (breathe)))
+
+ ); end *should-test*
+
+;
+;(deftest test-notify-in
+; (let [*s* (start-triple-server "test" 7555 (build-model :jena))]
+; (def-ts :ts (make-remote-triple-space "test" :ts-host "localhost" :ts-port 7555))
+; (clean-ts (ts :ts))
+; (let [q (java.util.concurrent.LinkedBlockingQueue.)]
+; (out (ts :ts) [[:a :e :c]])
+; (out (ts :ts) [[:a :e :d]])
+; (out (ts :ts) [[:a :b :c]])
+;
+; (.start (Thread. #(notify (ts :ts) :in [[?s :e ?o]] (fn [triples] (.put q triples)))))
+;
+; (in (ts :ts) [[?s :e :c]])
+; (in (ts :ts) [[?s :b :c]])
+; (in (ts :ts) [[?s :e :d]])
+;
+; (is (not (nil? (.take q))))
+; (is (not (nil? (.take q))))
+;
+; (let [res (.poll q 1 java.util.concurrent.TimeUnit/SECONDS)]
+; (is (empty? res))))
+; (unregister-ts :ts)
+; (stop-server *s*)
+; (breathe)))
+;

0 comments on commit 9686a2c

Please sign in to comment.