Skip to content

Commit

Permalink
add doc-strings to aleph.format, and take advantage of proxy-channels…
Browse files Browse the repository at this point in the history
… to return result-channels for every write
  • Loading branch information
ztellman committed Mar 19, 2011
1 parent 1f7c0cc commit bebaf35
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 104 deletions.
124 changes: 98 additions & 26 deletions src/aleph/formats.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
;; the terms of this license.
;; You must not remove this notice, or any other, from this software.

(ns
(ns ^{:author "Zachary Tellman"}
aleph.formats
(:require
[clojure.contrib.json :as json]
Expand All @@ -18,6 +18,9 @@
InputStreamReader
PrintWriter
ByteArrayOutputStream]
[java.net
URLEncoder
URLDecoder]
[java.nio
ByteBuffer]
[org.jboss.netty.handler.codec.base64
Expand All @@ -31,45 +34,62 @@

;;;

(defn byte-array? [data]
(defn byte-array?
"Returns true if 'x' is a byte array."
[x]
(and
(.isArray (class data))
(= (.getComponentType (class data)) Byte/TYPE)))
(.isArray (class x))
(= (.getComponentType (class x)) Byte/TYPE)))

(defn channel-buffer? [buf]
(instance? ChannelBuffer buf))
(defn channel-buffer?
"Returns true if 'x' is a Netty ChannelBuffer."
[x]
(instance? ChannelBuffer x))

(defn channel-buffer->input-stream
"Transforms a Netty ChannelBuffer to an InputStream."
[^ChannelBuffer buf]
(when buf
(ChannelBufferInputStream. buf)))

(defn channel-buffer->byte-buffer
"Transforms a Netty ChannelBuffer to a ByteBuffer. This may involve concatenating several ByteBuffers together,
to avoid this use channel-buffer->byte-buffers instead."
[^ChannelBuffer buf]
(when buf
(.toByteBuffer buf)))

(defn channel-buffer->byte-buffers
"Transforms a Netty ChannelBuffer into a sequence of one or more ByteBuffers."
[^ChannelBuffer buf]
(when buf
(seq (.toByteBuffers buf))))

(defn byte-buffers->channel-buffer [bufs]
(defn byte-buffers->channel-buffer
"Transforms a sequence of ByteBuffers into a Netty ChannelBuffer."
[bufs]
(ChannelBuffers/wrappedBuffer (into-array ByteBuffer (remove nil? bufs))))

(defn channel-buffer->string
"Transforms a Netty ChannelBuffer into a string. By default, 'charset' is UTF-8."
([buf]
(channel-buffer->string buf "UTF-8"))
([buf charset]
(when buf
(.toString ^ChannelBuffer buf ^String charset))))

(defn concat-channel-buffers [& bufs]
(defn concat-channel-buffers
"Takes one or more Netty ChannelBuffers, and returns a ChannelBuffer representing them as a contiguous byte sequence.
This does not require any copies."
[& bufs]
(ChannelBuffers/wrappedBuffer (into-array ChannelBuffer (remove nil? bufs))))

(defn concat-byte-buffers [& bufs]
(defn concat-byte-buffers
"Takes one or more ByteBuffers, and returns a ByteBuffer representing them as a contiguous sequence. When more than one
buffer is passed in, this requires copying all buffers onto a new buffer."
[& bufs]
(if (= 1 (count bufs))
(first bufs)
(.duplicate ^ByteBuffer (first bufs))
(let [size (apply + (map #(.remaining ^ByteBuffer %) bufs))
buf (ByteBuffer/allocate size)]
(doseq [b bufs]
Expand All @@ -79,6 +99,7 @@
;;;

(defn byte-buffer->string
"Transforms a ByteBuffer into a string. By default, 'charset' is UTF-8."
([buf]
(byte-buffer->string buf "UTF-8"))
([^ByteBuffer buf charset]
Expand All @@ -88,6 +109,7 @@
(String. ary ^String charset)))))

(defn string->byte-buffer
"Transforms a string into a ByteBuffer. By default, 'charset' is UTF-8."
([s]
(string->byte-buffer s "UTF-8"))
([s charset]
Expand All @@ -97,6 +119,8 @@
;;;

(defn input-stream->byte-buffer
"Transforms an InputStream into a ByteBuffer. If the stream isn't closed, this function will
block until it has been closed."
[^InputStream stream]
(when stream
(let [available (.available stream)]
Expand All @@ -110,21 +134,27 @@
(recur ary (+ offset byte-count) bufs)))))))))

(defn byte-buffer->channel-buffer
"Transforms a ByteBuffer into a Netty ChannelBuffer."
[^ByteBuffer buf]
(when buf
(ByteBufferBackedChannelBuffer. buf)))

(defn input-stream->channel-buffer
"Transforms an InputStream into a Netty ChannelBuffer. If the stream isn't closed, this function
will block until it has been closed."
[stream]
(-> stream input-stream->byte-buffer byte-buffer->channel-buffer))

(defn string->channel-buffer
"Transforms a string into a Netty ChannelBuffer. By default, 'charset' is UTF-8."
([s]
(string->channel-buffer s "UTF-8"))
([s charset]
(-> s (string->byte-buffer charset) byte-buffer->channel-buffer)))

(defn to-channel-buffer
"Transforms the data into a Netty ChannelBuffer. 'charset' is only used when the data is a string, and
by default is UTF-8."
([data]
(to-channel-buffer data "utf-8"))
([data charset]
Expand All @@ -136,7 +166,9 @@
(and (sequential? data) (every? #(instance? ByteBuffer %) data)) (byte-buffers->channel-buffer data)
(byte-array? data) (-> data ByteBuffer/wrap byte-buffer->channel-buffer))))

(defn to-channel-buffer? [data]
(defn to-channel-buffer?
"Returns true if the data can be transformed to a Netty ChannelBuffer."
[data]
(or
(instance? ChannelBuffer data)
(instance? ByteBuffer data)
Expand All @@ -147,48 +179,59 @@

;;;

(defn base64-encode [string]
(-> string
string->channel-buffer
(Base64/encode)
channel-buffer->string))
(defn base64-encode
"Encodes the string into a base64 representation."
[string]
(-> string string->channel-buffer Base64/encode channel-buffer->string))

(defn base64-decode [string]
(-> string
string->channel-buffer
(Base64/decode)
channel-buffer->string))
(defn base64-decode
"Decodes a base64 encoded string."
[string]
(-> string string->channel-buffer Base64/decode channel-buffer->string))

;;;

(defn data->json->channel-buffer [data]
(defn data->json->channel-buffer
"Transforms a Clojure data structure to JSON, and returns a Netty ChannelBuffer representing the encoded data."
[data]
(let [output (ByteArrayOutputStream.)
writer (PrintWriter. output)]
(json/write-json data writer)
(.flush writer)
(-> output .toByteArray ByteBuffer/wrap byte-buffer->channel-buffer)))

(defn data->json->string [data]
(defn data->json->string
"Transforms a Clojure data structure to JSON, and returns a string representation of the encoded data."
[data]
(-> data data->json->channel-buffer channel-buffer->string))

(defn channel-buffer->json->data [buf]
(defn channel-buffer->json->data
"Takes a Netty ChannelBuffer containing JSON, and returns a Clojure data structure representation."
[buf]
(-> buf channel-buffer->input-stream InputStreamReader. (json/read-json-from true false nil)))

(defn string->json->data [s]
(defn string->json->data
"Takes a string representation of JSON, and returns it as a Clojure data structure."
[s]
(-> s string->channel-buffer channel-buffer->json->data))

;;;

(defn channel-buffer->xml->data [buf]
(defn channel-buffer->xml->data
"Takes a Netty ChannelBuffer containing XML, and returns a Clojure hash representing the parsed data."
[buf]
(-> buf channel-buffer->input-stream xml/parse))

(defn string->xml->data
"Takes an XML string, and returns a Clojure hash representing the parsed data."
([s]
(string->xml->data s "utf-8"))
([s charset]
(-> s (string->channel-buffer charset) channel-buffer->input-stream xml/parse)))

(defn data->xml->string
"Takes a Clojure data structure representing a parse tree or prxml structure, and returns an XML string.
By default, 'charset' is UTF-8."
([x]
(data->xml->string x "utf-8"))
([x charset]
Expand All @@ -199,6 +242,8 @@
(map? x) (xml/emit-element x)))))

(defn data->xml->channel-buffer
"Takes a Clojure data structure representing a parse tree or prxml structure, and returns a Netty ChannelBuffer.
By default, 'charset' is UTF-8."
([x]
(data->xml->channel-buffer x "utf-8"))
([x charset]
Expand All @@ -215,3 +260,30 @@

;;;

(defn url-decode
"Takes a URL-encoded string, and returns a standard representation of the string. By default, 'charset' is UTF-8.
'options' may contain a :url-decodings hash, which maps encoded strings onto how they should be decoded. For instance,
the Java decoder doesn't recognize %99 as a trademark symbol, even though it's sometimes encoded that way. To allow
for this encoding, you can simply use:
(url-decode s \"utf-8\" {:url-decodings {\"%99\" \"\\u2122\"}})"
([s]
(url-decode s "utf-8"))
([s charset]
(url-decode s charset nil))
([s charset options]
(let [s (reduce (fn [s [from to]] (.replace ^String s (str from) to)) s (:url-decodings options))]
(URLDecoder/decode s charset))))

(defn url-encode
"URL-encodes a string. By default 'charset' is UTF-8.
'options' may contain a :url-encodings hash, which can specify custom encodings for certain characters or substrings."
([s]
(url-encode s "utf-8"))
([s charset]
(url-encode s charset nil))
([s charset options]
(let [s (reduce (fn [s [from to]] (.replace ^String s (str from) to)) s (:url-encodings options))]
(URLEncoder/encode s charset))))
4 changes: 2 additions & 2 deletions src/aleph/http/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
(run-pipeline in
read-channel
(fn [^HttpChunk response]
(let [body (:body (decode-aleph-msg {:headers headers :body (.getContent response)} (:auto-transform options)))]
(let [body (:body (decode-aleph-msg {:headers headers :body (.getContent response)} options))]
(if (.isLast response)
(close out)
(do
Expand Down Expand Up @@ -83,7 +83,7 @@
read-channel
(fn [chunk]
(when chunk
(enqueue out (DefaultHttpChunk. (:body (encode-aleph-msg {:headers headers :body chunk} (:auto-transform options))))))
(enqueue out (DefaultHttpChunk. (:body (encode-aleph-msg {:headers headers :body chunk} options)))))
(if (drained? in)
(enqueue out HttpChunk/LAST_CHUNK)
(restart)))))
Expand Down
27 changes: 17 additions & 10 deletions src/aleph/http/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

;;;

(def request-methods
(def keyword->request-method
{:get HttpMethod/GET
:put HttpMethod/PUT
:delete HttpMethod/DELETE
Expand All @@ -54,10 +54,16 @@
:options HttpMethod/OPTIONS
:head HttpMethod/HEAD})

(def request-method->keyword
(zipmap
(vals keyword->request-method)
(keys keyword->request-method)))

;;;

(defn encode-aleph-msg [aleph-msg auto-transform?]
(let [headers (:headers aleph-msg)
(defn encode-aleph-msg [aleph-msg options]
(let [auto-transform? (:auto-transform options)
headers (:headers aleph-msg)
body (:body aleph-msg)
content-type (str/lower-case (or (headers "content-type") (headers "Content-Type") "text/plain"))
charset (or (headers "charset") (headers "Charset") "utf-8")]
Expand All @@ -80,8 +86,9 @@
:else
aleph-msg)))

(defn decode-aleph-msg [aleph-msg auto-transform?]
(let [headers (:headers aleph-msg)
(defn decode-aleph-msg [aleph-msg options]
(let [auto-transform? (:auto-transform options)
headers (:headers aleph-msg)
content-type (str/lower-case (or (headers "content-type") (headers "Content-Type") "text/plain"))
charset (or (headers "charset") (headers "Charset") "utf-8")]
(cond
Expand Down Expand Up @@ -123,7 +130,7 @@
(defn- netty-request-method
"Get HTTP method from Netty request."
[^HttpRequest req]
{:request-method (->> req .getMethod .getName .toLowerCase keyword)})
{:request-method (->> req .getMethod request-method->keyword)})

(defn- netty-request-uri
"Get URI from Netty request."
Expand All @@ -146,7 +153,7 @@
:body (let [body (:body
(decode-aleph-msg
{:headers headers :body (.getContent req)}
(:auto-transform options)))]
options))]
(if (final-netty-message? req)
body
(let [ch (channel)]
Expand Down Expand Up @@ -176,7 +183,7 @@
(if (channel? body)
(.setHeader netty-msg "Transfer-Encoding" "chunked")
(do
(.setContent netty-msg (:body (encode-aleph-msg msg (:auto-transform options))))
(.setContent netty-msg (:body (encode-aleph-msg msg options)))
(HttpHeaders/setContentLength netty-msg (-> netty-msg .getContent .readableBytes))))
(HttpHeaders/setContentLength netty-msg 0))
netty-msg))
Expand All @@ -186,7 +193,7 @@
uri (URI. scheme nil host port (:uri request) (:query-string request) (:fragment request))
req (DefaultHttpRequest.
HttpVersion/HTTP_1_1
(request-methods (:request-method request))
(keyword->request-method (:request-method request))
(str
(when-not (= \/ (-> uri .getPath first))
"/")
Expand Down Expand Up @@ -219,4 +226,4 @@
(defn transform-netty-response [^HttpResponse response headers options]
{:status (-> response .getStatus .getCode)
:headers headers
:body (:body (decode-aleph-msg {:headers headers, :body (.getContent response)} (:auto-transform options)))})
:body (:body (decode-aleph-msg {:headers headers, :body (.getContent response)} options))})
Loading

0 comments on commit bebaf35

Please sign in to comment.