Skip to content
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
84 lines (75 sloc) 2.31 KB
;; Copyright (c) Cognitect, Inc.
;; All rights reserved.
(ns cognitect.xform.async-edn
(:import [ IOException PushbackReader])
(:refer-clojure :exclude (read))
[clojure.core.async :as a :refer (<!! >! >!! close! go thread)]
[ :as io]
[clojure.spec.alpha :as s]
[clojure.edn :as edn]
[cognitect.anomalies :as anom]))
(s/def ::channel any?)
(defmacro with-ex-anom
[& forms]
(catch Throwable t#
{::anom/category ::anom/fault
::anom/message (.getMessage t#)})))
(s/fdef reader
:args (s/cat :readable ::readable :ch ::channel :close? any?)
:ret ::channel)
(defn reader
"Reads forms from readable, placing them on ch. Closes the
reader when readable complete or when ch found closed.
Closes ch when input side closes iff close? is truthy.
Returns a channel that will get the number of forms read and
possible anomaly map."
([readable ch]
(reader readable ch true))
([readable ch close?]
(with-open [rdr (io/reader readable)]
(let [pbr (PushbackReader. rdr)]
(loop [n 0]
(if-let [form (edn/read {:eof nil} pbr)]
(if (>!! ch form)
(recur (inc n))
{::anom/category ::anom/interrupted
::anom/message "Destination channel closed"
:forms n})
{:forms n}))))
(when close?
(close! ch))))))))
(defn read
(let [ch (a/chan 10)]
(go (let [result (<!! (reader readable ch false))]
(when (::anom/category result)
(>! ch result))
(close! ch)))
(s/fdef writer
:args (s/cat :ch ::channel :writeable ::writeable)
:ret ::channel)
(defn writer
"Takes forms from ch, appending them to writeable with
prn. Closes writeable when ch closes. Returns a map with
number of :forms written."
[ch writeable]
(with-open [writer (io/writer writeable)]
(binding [*print-length* nil
*print-level* nil
*out* writer]
(loop [n 0]
(if-let [form (<!! ch)]
(prn form)
(recur (inc n)))
{:forms n})))))))
You can’t perform that action at this time.