Permalink
Browse files

bundled redis library and distributed triple space

  • Loading branch information...
1 parent 13a34ea commit c15331b3e6a01414db272f515d999fa5014c5418 @antoniogarrote committed Jun 1, 2010
View
2 project.clj
@@ -7,7 +7,7 @@
[net.rootdev/java-rdfa "0.3"]
[nu.validator.htmlparser/htmlparser "1.2.0"]
[com.franz/openrdf-sesame-onejar "2.2"]
- [redis-clojure "1.0-SNAPSHOT"]
+ [commons-pool "1.5.4"]
[org.jboss.netty/netty "3.2.0.BETA1"]
[com.rabbitmq/amqp-client "1.7.2"]
[log4j/log4j "1.2.14"]]
View
1 src/plaza/rdf/models.clj
@@ -43,7 +43,6 @@
(to-map [this triples-or-vector] (let [triples (if (:triples (meta triples-or-vector)) triples-or-vector (make-triples triples-or-vector))]
(reduce (fn [ac it] (let [prop (str (resource-id (it properties)))
val (find-property prop triples)]
- (println (str "found: " val " for prop " prop))
(if (nil? val) ac (assoc ac it (nth val 2)))))
{}
(keys properties)))))
View
50 src/plaza/triple_spaces/distributed_server.clj
@@ -0,0 +1,50 @@
+;; @author Antonio Garrote
+;; @email antoniogarrote@gmail.com
+;; @date 28.05.2010
+
+(ns plaza.triple-spaces.distributed-server
+ (:use [saturnine]
+ [saturnine.handler]
+ [plaza.triple-spaces.server.rabbit :as rabbit]
+ [clojure.contrib.logging :only [log]]
+ (plaza.triple-spaces core server)
+ (plaza.triple-spaces.distributed-server auxiliary)
+ (plaza.rdf core sparql)
+ (plaza.rdf.implementations sesame))
+ (:import [java.util UUID]
+ [java.net URL]))
+
+
+
+;;
+;; DistributedTripleSpace
+;;
+
+(defhandler #^{:doc
+ "The MessageParser handler for parsing incoming messages"}
+ MessageParser [name model redis-conn rabbit-conn options]
+ (connect [_] (println "Connecting"))
+ (upstream [this msg] (let [parsed (plaza.triple-spaces.server.auxiliary/parse-message msg)]
+ (log :info (str "*** parsed: " parsed "\r\n\r\n"))
+ (log :info (str "*** " parsed " " name " " model " " redis-conn " " rabbit-conn " " options))
+ (send-down (apply-operation-dist parsed name model redis-conn rabbit-conn options))))
+ (error [this msg]
+ (log :error msg)
+ (plaza.triple-spaces.server.auxiliary/failure msg)))
+
+
+(defn start-distributed-triple-server
+ "Starts a distributd triple server using rabbitmq, redis and a triple store as infrastructure"
+ [name port model & options]
+ (let [opt-map (apply array-map options)
+ rabbit-conn (apply rabbit/connect options)
+ redis-conn (build-redis-connection-hash opt-map)]
+
+ ;; Declaring channels and exchanges for this triple space
+ (make-channel rabbit-conn name)
+ (declare-exchange rabbit-conn name (str "exchange-" name))
+ (declare-exchange rabbit-conn name (str "exchange-out-" name))
+ (declare-exchange rabbit-conn name (str "exchange-in-" name))
+
+ ;; start the server
+ (start-server port :string :print (MessageParser. name model redis-conn rabbit-conn opt-map))))
View
202 src/plaza/triple_spaces/distributed_server/auxiliary.clj
@@ -0,0 +1,202 @@
+;; @author Antonio Garrote
+;; @email antoniogarrote@gmail.com
+;; @date 31.05.2010
+
+(ns plaza.triple-spaces.distributed-server.auxiliary
+ (:use (plaza.rdf core sparql)
+ (plaza utils)
+ (plaza.rdf.implementations jena)
+ [plaza.triple-spaces.server.rabbit :as rabbit]
+ [plaza.triple-spaces.server auxiliary]
+ [clojure.contrib.logging :only [log]])
+ (:require [clojure.contrib.string :as string]
+ [redis])
+ (:import [java.util UUID]
+ [com.rabbitmq.client
+ ConnectionParameters
+ Connection
+ Channel
+ AMQP
+ ConnectionFactory
+ Consumer
+ QueueingConsumer]))
+
+(defn build-redis-connection-hash
+ [opts-hash] {:host (:redis-host opts-hash) :port (:redis-port opts-hash) :db (:redis-db opts-hash)})
+
+(defmacro fmt-out
+ [& forms]
+ `(pr-str ~@forms))
+
+(defn- fmt-in
+ [msg]
+ (read-string msg))
+
+(defn- apply-rd-operation
+ "Applies a TS rd operation over a model"
+ ([pattern model]
+ (let [w (java.io.StringWriter.)]
+ (response
+ (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "<ts:tokensep/>") w))
+ w (query-triples model pattern))))))
+
+(defn queue-gen-key
+ [[pattern type client-id]]
+ (str "blk:" client-id))
+
+(defn queue-key-pattern
+ [] "blk:*")
+
+(defn store-blocking-rd
+ ([redis [pattern type client-id]]
+ (redis/with-server redis (redis/set (queue-gen-key [pattern type client-id])
+ (fmt-out [pattern type client-id])))))
+
+
+(defn- apply-rdb-operation
+ "Applies a TS rd operation over a model"
+ ([pattern model queues client-id]
+ (let [w (java.io.StringWriter.)
+ results (query-triples model pattern)]
+ (if (empty? results)
+ (do (store-blocking-rd queues [pattern :rdb client-id])
+ (blocking-response))
+ (response
+ (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "</ts:response>") w))
+ w results))))))
+
+(defn- apply-out-operation
+ "Applies a TS rd operation over a model"
+ ([rdf-document model name rabbit-conn queues-ref]
+ (with-model model (document-to-model (java.io.ByteArrayInputStream. (.getBytes rdf-document)) :xml))
+ (model-critical-write model
+ (redis/with-server queues-ref
+ (let [ks (redis/keys (queue-key-pattern))
+ vs (map #(fmt-in %1) (if (empty? ks) [] (apply redis/mget ks)))
+ queues-red (reduce (fn [acum [k v]] (if (= 0 (redis/del k)) acum (conj acum [k v]))) [] (zipmap ks vs))]
+ (redis/atomically
+ (loop [queues queues-red
+ to-delete []
+ keys-to-delete []]
+ (if (empty? queues)
+ (do (with-model model (model-remove-triples to-delete))
+ (doseq [k keys-to-delete] (redis/del k)))
+ (let [[key [pattern kind-op client-id]] (first queues)
+ results (query-triples model pattern)]
+ (log :info (str "*** checking queued " kind-op " -> " pattern " ? " (empty? results)))
+ (if (empty? results)
+ (recur (rest queues)
+ to-delete
+ keys-to-delete)
+ (let [w (java.io.StringWriter.)
+ respo (response
+ (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "</ts:response>") w))
+ w results))]
+ (log :info (str "*** queue to blocked client: \r\n" respo))
+ (rabbit/publish rabbit-conn name (str "exchange-" name) client-id respo)
+ (recur (rest queues)
+ (if (= kind-op :inb) (conj to-delete (flatten-1 results)) to-delete)
+ (conj keys-to-delete key))))))))
+ (success))))))
+
+(defn- apply-in-operation
+ "Applies a TS in operation over a model"
+ ([pattern model name]
+ (model-critical-write model
+ (let [w (java.io.StringWriter.)
+ triples-to-remove (query-triples model pattern)
+ flattened-triples (flatten-1 triples-to-remove)]
+ ;; deleting read triples
+ (with-model model (model-remove-triples flattened-triples))
+ ;; returning triple sets
+ (let [triples (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "</ts:response>") w))
+ w triples-to-remove)]
+ (response triples))))))
+
+(defn- apply-inb-operation
+ "Applies a TS inb operation over a model"
+ ([pattern model name rabbit-conn queues client-id]
+ (model-critical-write model
+ (let [w (java.io.StringWriter.)
+ triples-to-remove (query-triples model pattern)]
+ (if (empty? triples-to-remove)
+ (dosync (log :info "*** Storing INB operation in the queue")
+ (store-blocking-rd queues [pattern :rdb client-id])
+ (blocking-response))
+ (do
+ ;; deleting read triples
+ (with-model model (model-remove-triples (flatten-1 triples-to-remove)))
+ ;; returning triple sets
+ (let [triples (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "</ts:response>") w))
+ w triples-to-remove)]
+ (response triples))))))))
+
+(defn apply-swap-operation-dist
+ "Applies a TS swap operation over a model"
+ ([pattern rdf-document model name rabbit-conn queues-ref]
+ (model-critical-write model
+ (let [w (java.io.StringWriter.)
+ triples-to-remove (query-triples model pattern)
+ flattened-triples (if (empty? triples-to-remove) [] (flatten-1 triples-to-remove))]
+ (when-not (empty? triples-to-remove)
+ (with-model model
+ (model-remove-triples flattened-triples)
+ (document-to-model (java.io.ByteArrayInputStream. (.getBytes rdf-document)) :xml))
+
+ ;; Processing queues
+ (redis/with-server queues-ref
+ (let [ks (redis/keys (queue-key-pattern))
+ vs (map #(fmt-in %1) (if (empty? ks) [] (apply redis/mget ks)))
+ queues-red (reduce (fn [acum [k v]] (if (= 0 (redis/del k)) acum (conj acum [k v]))) [] (zipmap ks vs))]
+ (redis/atomically
+ (loop [queues queues-red
+ keys-to-delete []]
+ (if (empty? queues)
+ (doseq [k keys-to-delete] (redis/del k))
+ (let [[key [pattern kind-op client-id]] (first queues)
+ results (query-triples model pattern)]
+ (log :info (str "checking queued " kind-op " -> " pattern " ? " (empty? results)))
+ (if (empty? results)
+ (recur (rest queues)
+ keys-to-delete)
+ (let [w (java.io.StringWriter.)
+ respo (response
+ (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "</ts:response>") w))
+ w results))]
+ (when (= kind-op :inb)
+ (with-model model (model-remove-triples (flatten-1 results))))
+ (rabbit/publish rabbit-conn name (str "exchange-" name) client-id respo)
+ (recur (rest queues)
+ (conj keys-to-delete key)))))))))))
+
+ ;; Sending the response back to the original client
+ (response
+ (reduce (fn [w ts] (let [m (defmodel (model-add-triples ts))]
+ (output-string m w :xml)
+ (.write w "</ts:response>") w))
+ w triples-to-remove))))))
+
+(defn apply-operation-dist
+ "Applies an operation over a Plaza model"
+ ([message name model queues rabbit-conn options]
+ (log :info "*** about to apply operation")
+ (condp = (:operation message)
+ "rd" (apply-rd-operation (:pattern message) model)
+ "rdb" (apply-rdb-operation (:pattern message) model queues (:client-id message))
+ "out" (apply-out-operation (:value message) model name rabbit-conn queues)
+ "in" (apply-in-operation (:pattern message) model name)
+ "inb" (apply-inb-operation (:pattern message) model name rabbit-conn queues (:client-id message))
+ "swap" (apply-swap-operation-dist (:pattern message) (:value message) model name rabbit-conn queues)
+ (throw (Exception. "Unsupported operation")))))
View
32 src/plaza/triple_spaces/distributed_server/redis.clj
@@ -0,0 +1,32 @@
+;; @author amitrahtore
+;; @from http://github.com/amitrathore/redis-clojure/blob/master/src/redis.clj
+;; @date 01.06.2010
+
+(ns plaza.triple-spaces.distributed-server.redis
+ (:refer-clojure :exclude [get set type keys sort])
+ (:use redis.internal))
+
+(defmacro atomically
+ "Execute all redis commands in body atomically, ie. sandwiched in a
+ MULTI/EXEC statement. If an exception is thrown the EXEC command
+ will be terminated by a DISCARD, no operations will be performed and
+ the exception will be rethrown."
+ [& body]
+ `(do
+ (plaza.triple-spaces.distributed-server.redis/multi)
+ (try
+ (do
+ ~@body
+ (plaza.triple-spaces.distributed-server.redis/exec))
+ (catch Exception e#
+ (plaza.triple-spaces.distributed-server.redis/discard)
+ (throw e#)))))
+;;
+;; Commands
+;;
+(defcommands
+ ;; MULTI/EXEC/DISCARD
+ (multi [] :inline)
+ (exec [] :inline)
+ (discard [] :inline)
+)
View
2 src/plaza/triple_spaces/server/mulgara.clj
@@ -9,7 +9,7 @@
[clojure.contrib.logging :only [log]]
(plaza.triple-spaces core)
(plaza.rdf core sparql)
- (plaza.rdf.implementations sesame))
+ (plaza.rdf.implementations jena))
(:import [java.util UUID]
[java.net URL]))
View
158 src/redis.clj
@@ -0,0 +1,158 @@
+(ns redis
+ (:refer-clojure :exclude [get set type keys sort])
+ (:use redis.internal))
+
+(defmacro with-server
+ "Evaluates body in the context of a new connection to a Redis server
+ then closes the connection.
+
+ server-spec is a map with any of the following keys:
+ :host hostname (default \"127.0.0.1\")
+ :port port (default 6379)
+ :db database to use (default 0)"
+ [server-spec & body]
+ `(with-server* ~server-spec (fn []
+ (do
+ (if (:password *connection*)
+ (redis/auth (:password *connection*)))
+ (redis/select (:db *connection*))
+ ~@body))))
+
+(defmacro atomically
+ "Execute all redis commands in body atomically, ie. sandwiched in a
+ MULTI/EXEC statement. If an exception is thrown the EXEC command
+ will be terminated by a DISCARD, no operations will be performed and
+ the exception will be rethrown."
+ [& body]
+ `(do
+ (redis/multi)
+ (try
+ (do
+ ~@body
+ (redis/exec))
+ (catch Exception e#
+ (redis/discard)
+ (throw e#)))))
+
+;;
+;; Reply conversion functions
+;;
+(defn int-to-bool
+ "Convert integer reply to a boolean value"
+ [int]
+ (= 1 int))
+
+(defn string-to-keyword
+ "Convert a string reply to a keyword"
+ [string]
+ (keyword string))
+
+(defn string-to-map
+ "Convert strings with format 'key:value\r\n'+ to a map with {key
+ value} pairs"
+ [#^String string]
+ (let [lines (.split string "(\\r\\n|:)")]
+ (apply hash-map lines)))
+
+(defn string-to-double
+ "Convert a string in floating point format to a double"
+ [string]
+ (if string
+ (Double/parseDouble string)))
+
+(defn int-to-date
+ "Return a Date representation of a UNIX timestamp"
+ [int]
+ (new java.util.Date (long int)))
+
+(defn seq-to-set
+ [sequence]
+ (clojure.core/set sequence))
+
+;;
+;; Commands
+;;
+(defcommands
+ ;; Connection handling
+ (auth [password] :inline)
+ (quit [] :inline)
+ (ping [] :inline)
+ ;; String commands
+ (set [key value] :bulk)
+ (get [key] :inline)
+ (getset [key value] :bulk)
+ (setnx [key value] :bulk int-to-bool)
+ (incr [key] :inline)
+ (incrby [key integer] :inline)
+ (decr [key] :inline)
+ (decrby [key integer] :inline)
+ (exists [key] :inline int-to-bool)
+ (mget [key & keys] :inline)
+ (mset [key value & more] :bulk)
+ (msetnx [key value & more] :bulk int-to-bool)
+ (del [key] :inline int-to-bool)
+ ;; Key space commands
+ (type [key] :inline string-to-keyword)
+ (keys [pattern] :inline)
+ (randomkey [] :inline)
+ (rename [oldkey newkey] :inline)
+ (renamenx [oldkey newkey] :inline int-to-bool)
+ (dbsize [] :inline)
+ (expire [key seconds] :inline int-to-bool)
+ (ttl [key] :inline)
+ ;; List commands
+ (rpush [key value] :bulk)
+ (lpush [key value] :bulk)
+ (llen [key] :inline)
+ (lrange [key start end] :inline)
+ (ltrim [key start end] :inline)
+ (lindex [key index] :inline)
+ (lset [key index value] :bulk)
+ (lrem [key count value] :bulk)
+ (lpop [key] :inline)
+ (rpop [key] :inline)
+ ;; Set commands
+ (sadd [key member] :bulk int-to-bool)
+ (srem [key member] :bulk int-to-bool)
+ (spop [key] :inline)
+ (smove [srckey destkey member] :bulk int-to-bool)
+ (scard [key] :inline)
+ (sismember [key member] :bulk int-to-bool)
+ (sinter [key & keys] :inline seq-to-set)
+ (sinterstore [destkey key & keys] :inline)
+ (sunion [key & keys] :inline seq-to-set)
+ (sunionstore [destkey key & keys] :inline)
+ (sdiff [key & keys] :inline seq-to-set)
+ (sdiffstore [destkey key & keys] :inline)
+ (smembers [key] :inline seq-to-set)
+ ;; ZSet commands
+ (zadd [key score member] :bulk int-to-bool)
+ (zrem [key member] :bulk int-to-bool)
+ (zincrby [key increment member] :bulk string-to-double)
+ (zscore [key member] :bulk string-to-double)
+ (zcard [key] :inline)
+ (zrange [key start end] :inline)
+ (zrevrange [key start end] :inline)
+ (zrangebyscore [key start end] :inline)
+ (zremrangebyscore [key start end] :inline)
+ ;; MULTI/EXEC/DISCARD
+ (multi [] :inline)
+ (exec [] :inline)
+ (discard [] :inline)
+ ;; Multiple database handling commands
+ (select [index] :inline)
+ (move [key dbindex] :inline)
+ (flushdb [] :inline)
+ (flushall [] :inline)
+ ;; Sorting
+ (sort [key & options] :sort)
+ ;; Persistence
+ (save [] :inline)
+ (bgsave [] :inline)
+ (bgrewriteaof [] :inline)
+ (lastsave [] :inline int-to-date)
+ (shutdown [] :inline)
+ ;; Remote control
+ (info [] :inline string-to-map)
+ ;;(monitor [] :inline))
+)
View
300 src/redis/internal.clj
@@ -0,0 +1,300 @@
+(ns redis.internal
+ (:use redis.utils)
+ (:refer-clojure :exclude [send read read-line])
+ (:import [java.io Reader BufferedReader InputStreamReader StringReader]
+ [java.net Socket]
+ [org.apache.commons.pool.impl GenericObjectPool]
+ [org.apache.commons.pool BasePoolableObjectFactory]))
+
+(set! *warn-on-reflection* true)
+
+(def *cr* 0x0d)
+(def *lf* 0x0a)
+(defn- cr? [c] (= c *cr*))
+(defn- lf? [c] (= c *lf*))
+
+(defstruct connection
+ :host :port :password :db :timeout :socket :reader :writer)
+
+(def *connection* (struct-map connection
+ :host "127.0.0.1"
+ :port 6379
+ :password nil
+ :db 0
+ :timeout 5000
+ :socket nil
+ :reader nil
+ :writer nil))
+
+(defn socket* []
+ (or (:socket *connection*)
+ (throw (Exception. "Not connected to a Redis server"))))
+
+(defn send-command
+ "Send a command string to server"
+ [#^String cmd]
+ (let [out (.getOutputStream (#^Socket socket*))
+ bytes (.getBytes cmd)]
+ (.write out bytes)))
+
+(defn- uppercase [#^String s] (.toUpperCase s))
+(defn- trim [#^String s] (.trim s))
+(defn- parse-int [#^String s] (Integer/parseInt s))
+;(defn- char-array [len] (make-array Character/TYPE len))
+
+(defn read-crlf
+ "Read a CR+LF combination from Reader"
+ [#^Reader reader]
+ (let [cr (.read reader)
+ lf (.read reader)]
+ (when-not
+ (and (cr? cr)
+ (lf? lf))
+ (throw (Exception. "Error reading CR/LF")))
+ nil))
+
+(defn read-line-crlf
+ "Read from reader until exactly a CR+LF combination is
+ found. Returns the line read without trailing CR+LF.
+
+ This is used instead of Reader.readLine() method since that method
+ tries to read either a CR, a LF or a CR+LF, which we don't want in
+ this case."
+ [#^BufferedReader reader]
+ (loop [line []
+ c (.read reader)]
+ (when (< c 0)
+ (throw (Exception. "Error reading line: EOF reached before CR/LF sequence")))
+ (if (cr? c)
+ (let [next (.read reader)]
+ (if (lf? next)
+ (apply str line)
+ (throw (Exception. "Error reading line: Missing LF"))))
+ (recur (conj line (char c))
+ (.read reader)))))
+
+;;
+;; Reply dispatching
+;;
+(defn- do-read [#^Reader reader #^chars cbuf offset length]
+ (let [nread (.read reader cbuf offset length)]
+ (if (not= nread length)
+ (recur reader cbuf (+ offset nread) (- length nread)))))
+
+(defn reply-type
+ ([#^BufferedReader reader]
+ (char (.read reader))))
+
+(defmulti parse-reply reply-type :default :unknown)
+
+(defn read-reply
+ ([]
+ (let [reader (*connection* :reader)]
+ (read-reply reader)))
+ ([#^BufferedReader reader]
+ (parse-reply reader)))
+
+(defmethod parse-reply :unknown
+ [#^BufferedReader reader]
+ (throw (Exception. (str "Unknown reply type:"))))
+
+(defmethod parse-reply \-
+ [#^BufferedReader reader]
+ (let [error (read-line-crlf reader)]
+ (throw (Exception. (str "Server error: " error)))))
+
+(defmethod parse-reply \+
+ [#^BufferedReader reader]
+ (read-line-crlf reader))
+
+(defmethod parse-reply \$
+ [#^BufferedReader reader]
+ (let [line (read-line-crlf reader)
+ length (parse-int line)]
+ (if (< length 0)
+ nil
+ (let [#^chars cbuf (char-array length)]
+ (do
+ (do-read reader cbuf 0 length)
+ (read-crlf reader) ;; CRLF
+ (String. cbuf))))))
+
+(defmethod parse-reply \*
+ [#^BufferedReader reader]
+ (let [line (read-line-crlf reader)
+ count (parse-int line)]
+ (if (< count 0)
+ nil
+ (loop [i count
+ replies []]
+ (if (zero? i)
+ replies
+ (recur (dec i) (conj replies (read-reply reader))))))))
+
+(defmethod parse-reply \:
+ [#^BufferedReader reader]
+ (let [line (trim (read-line-crlf reader))
+ int (parse-int line)]
+ int))
+
+;;
+;; Command functions
+;;
+(defn- str-join
+ "Join elements in sequence with separator"
+ [separator sequence]
+ (apply str (interpose separator sequence)))
+
+
+(defn inline-command
+ "Create a string for an inline command"
+ [name & args]
+ (let [cmd (str-join " " (conj args name))]
+ (str cmd "\r\n")))
+
+(defn bulk-command
+ "Create a string for a bulk command"
+ [name & args]
+ (let [data (str (last args))
+ data-length (count (str data))
+ args* (concat (butlast args) [data-length])
+ cmd (apply inline-command name args*)]
+ (str cmd data "\r\n")))
+
+(defn- sort-command-args-to-string
+ [args]
+ (loop [arg-strings []
+ args args]
+ (if (empty? args)
+ (str-join " " arg-strings)
+ (let [type (first args)
+ args (rest args)]
+ (condp = type
+ :by (let [pattern (first args)]
+ (recur (conj arg-strings "BY" pattern)
+ (rest args)))
+ :limit (let [start (first args)
+ end (second args)]
+ (recur (conj arg-strings "LIMIT" start end)
+ (drop 2 args)))
+ :get (let [pattern (first args)]
+ (recur (conj arg-strings "GET" pattern)
+ (rest args)))
+ :store (let [key (first args)]
+ (recur (conj arg-strings "STORE" key)
+ (rest args)))
+ :alpha (recur (conj arg-strings "ALPHA") args)
+ :asc (recur (conj arg-strings "ASC") args)
+ :desc (recur (conj arg-strings "DESC") args)
+ (throw (Exception. (str "Error parsing SORT arguments: Unknown argument: " type))))))))
+
+(defn sort-command
+ [name & args]
+ (when-not (= name "SORT")
+ (throw (Exception. "Sort command name must be 'SORT'")))
+ (let [key (first args)
+ arg-string (sort-command-args-to-string (rest args))
+ cmd (str "SORT " key)]
+ (if (empty? arg-string)
+ (str cmd "\r\n")
+ (str cmd " " arg-string "\r\n"))))
+
+(def command-fns {:inline 'inline-command
+ :bulk 'bulk-command
+ :sort 'sort-command})
+
+(defn parse-params
+ "Return a restructuring of params, which is of form:
+ [arg* (& more)?]
+ into
+ [(arg1 arg2 ..) more]"
+ [params]
+ (let [[args rest] (split-with #(not= % '&) params)]
+ [args (last rest)]))
+
+(defmacro defcommand
+ "Define a function for Redis command name with parameters
+ params. Type is one of :inline, :bulk or :sort, which determines how
+ the command string is constructued."
+ ([name params type] `(defcommand ~name ~params ~type (fn [reply#] reply#)))
+ ([name params type reply-fn] `(~name ~params ~type ~reply-fn)
+ (do
+ (let [command (uppercase (str name))
+ command-fn (type command-fns)
+ [command-params
+ command-params-rest] (parse-params params)]
+ `(defn ~name
+ ~params
+ (let [request# (apply ~command-fn
+ ~command
+ ~@command-params
+ ~command-params-rest)]
+ (send-command request#)
+ (~reply-fn (read-reply))))))))
+
+(defmacro defcommands
+ [& command-defs]
+ `(do ~@(map (fn [command-def]
+ `(defcommand ~@command-def)) command-defs)))
+
+;;
+;; connection pooling
+;;
+(def *pool* (atom nil))
+
+(defn connect-to-server
+ "Create a Socket connected to server"
+ [server]
+ (let [{:keys [host port timeout]} server
+ socket (Socket. #^String host #^Integer port)]
+ (doto socket
+ (.setTcpNoDelay true)
+ (.setKeepAlive true))))
+
+(defn new-redis-connection [server-spec]
+ (let [connection (merge *connection* server-spec)
+ #^Socket socket (connect-to-server connection)
+ input-stream (.getInputStream socket)
+ output-stream (.getOutputStream socket)
+ reader (BufferedReader. (InputStreamReader. input-stream))]
+ (assoc connection
+ :socket socket
+ :reader reader
+ :created-at (System/currentTimeMillis))))
+
+(defn connection-valid? []
+ (= "PONG" (do (send-command (inline-command "PING"))
+ (read-reply))))
+
+(defn connection-factory [server-spec]
+ (proxy [BasePoolableObjectFactory] []
+ (makeObject []
+ (new-redis-connection server-spec))
+ (validateObject [c]
+ (binding [*connection* c]
+ (connection-valid?)))
+ (destroyObject [c]
+ (.close #^Socket (:socket c)))))
+
+(defrunonce init-pool [server-spec]
+ (let [factory (connection-factory server-spec)
+ p (doto (GenericObjectPool. factory)
+ (.setMaxActive 20)
+ (.setLifo false)
+ (.setTimeBetweenEvictionRunsMillis 30000)
+ (.setWhenExhaustedAction GenericObjectPool/WHEN_EXHAUSTED_BLOCK)
+ (.setTestWhileIdle true))]
+ (reset! *pool* p)))
+
+(defn get-connection-from-pool [server-spec]
+ (init-pool server-spec)
+ (.borrowObject #^GenericObjectPool @*pool*))
+
+(defn return-connection-to-pool [c]
+ (.returnObject #^GenericObjectPool @*pool* c))
+
+(defn with-server* [server-spec func]
+ (binding [*connection* (get-connection-from-pool server-spec)]
+ (let [ret (func)]
+ (return-connection-to-pool *connection*)
+ ret)))
View
13 src/redis/utils.clj
@@ -0,0 +1,13 @@
+(ns redis.utils)
+
+(defn create-runonce [function]
+ (let [sentinel (Object.)
+ result (atom sentinel)]
+ (fn [& args]
+ (locking sentinel
+ (if (= @result sentinel)
+ (reset! result (apply function args))
+ @result)))))
+
+(defmacro defrunonce [fn-name args & body]
+ `(def ~fn-name (create-runonce (fn ~args ~@body))))

0 comments on commit c15331b

Please sign in to comment.