Skip to content

Commit

Permalink
Some more utilities for dealing with iterators efficiently in for.clj
Browse files Browse the repository at this point in the history
  • Loading branch information
cnuernber committed Apr 10, 2021
1 parent f0e501d commit 7867bbd
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 1 deletion.
22 changes: 22 additions & 0 deletions java/tech/v3/datatype/IOReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package tech.v3.datatype;

import java.io.Reader;

public interface IOReader {
int doRead(char[] cbuf, int off, int len);
void doClose();
default Reader makeReader(Object lock) {
return new Reader (lock) {
public int read(char[] cbuf, int off, int len) {
if (lock != null) {
synchronized(lock) {
return doRead(cbuf, off, len);
}
} else {
return doRead(cbuf, off, len);
}
}
public void close() { doClose(); }
};
}
}
152 changes: 152 additions & 0 deletions src/tech/v3/datatype/mmap/reader.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
(ns tech.v3.datatype.mmap.reader
"Experimental namespace that reads data directly via mmap. So far experimental
results show that it is slower than using a buffered reader."
(:require [tech.v3.datatype.mmap :as mmap]
[tech.v3.datatype.nio-buffer :as nio-buffer]
[tech.v3.datatype.native-buffer :as native-buffer]
[tech.v3.datatype.base :as dtype-base])
(:import [java.io InputStream Reader]
[java.nio.charset Charset CharsetDecoder CoderResult]
[java.nio CharBuffer ByteBuffer]
[java.nio.file Files Paths]
[tech.v3.datatype.native_buffer NativeBuffer]
[tech.v3.datatype IOReader]))


(set! *warn-on-reflection* true)
(set! *unchecked-math* :warn-on-boxed)


(defn ->charset
^Charset [charset]
(cond
(instance? Charset charset)
charset
(instance? String charset)
(Charset/forName charset)))


(def ^{:private true
:tag 'long}
read-len (* 8 1024 1024))


(deftype ^:private MMapCharsetReader [^NativeBuffer nbuf
^CharsetDecoder decoder
^long bytesPerChar
^{:unsynchronized-mutable true
:tag long} pos
^{:unsynchronized-mutable true
:tag ByteBuffer} bbuf
^{:unsynchronized-mutable true} flushed]
IOReader
(doClose [this])
(doRead [this char-ary off end]
(let [len (- end off)]
(cond
(== 0 len)
0
flushed
-1
:else
(let [char-buf (CharBuffer/wrap char-ary off len)
^CharBuffer char-buf (if (== 0 off)
char-buf
(.slice char-buf))]
(loop [cur-offset (long (if flushed
len
(.position char-buf)))]
(if (== len cur-offset)
(.position char-buf)
(let [[^ByteBuffer local-bbuf eof]
(if (and bbuf (.hasRemaining bbuf))
[bbuf false]
(let [available (- (.n-elems nbuf) pos)
buf-size (min available read-len)
;;This call is time consuming
local-bbuf (-> (dtype-base/sub-buffer nbuf pos buf-size)
(nio-buffer/native-buf->nio-buf
{:resource-type nil}))]
(set! pos (+ pos buf-size))
[local-bbuf (== 0 available)]))
eof (boolean eof)]
(set! bbuf local-bbuf)
(let [code-result (.decode decoder bbuf char-buf eof)
^CoderResult code-result
(if (and eof (.isUnderflow code-result))
(do
(set! flushed true)
(.flush decoder char-buf))
code-result)
_ (when (or (.isMalformed code-result)
(.isError code-result)
(.isUnmappable code-result))
(.throwException code-result))
cur-offset (long (if (or (.isOverflow code-result)
(and (.isUnderflow code-result) eof))
len
(.position char-buf)))]
(recur cur-offset))))))))))


(deftype MMapReader [^NativeBuffer nbuf
^{:unsynchronized-mutable true
:tag long} pos]
IOReader
(doClose [this])
(doRead [this char-ary off end]
(let [available (- (.n-elems nbuf) pos)
len (min available (- end off))]
(cond
(== available 0)
-1
(== len 0)
0
:else
(let [unsafe (native-buffer/unsafe)]
(dotimes [idx len]
(aset char-ary (+ off idx)
(unchecked-char
(.getByte unsafe (+ (.address nbuf) pos idx)))))
(set! pos (+ pos len))
len)))))


(defn native-buf-reader
^Reader [^NativeBuffer nbuf charset]
(if charset
(let [charset (->charset charset)
charsetDecoder (.newDecoder charset)
bytesPerChar (max 1 (Math/ceil (/ 1.0
(.averageCharsPerByte charsetDecoder))))]
(-> (MMapCharsetReader. nbuf charsetDecoder bytesPerChar 0 nil false)
(.makeReader nbuf)))
(-> (MMapReader. nbuf 0)
(.makeReader nbuf))))


(defn mmap-reader
(^Reader [fname charset mmap-options]
(native-buf-reader (mmap/mmap-file fname mmap-options) charset))
(^Reader [fname charset]
(mmap-reader fname charset nil)))


(comment
(defn test-read-file
[^Reader rdr]
(with-open [rdr rdr]
(let [char-buf (char-array 4096)]
(loop [n-read (.read rdr char-buf)
total-read 0]
(if (== -1 n-read)
total-read
(recur (.read rdr char-buf) (+ total-read n-read)))))))

(time (test-read-file (clojure.java.io/reader "/home/chrisn/Downloads/yellow_tripdata_2016-01.csv")))
;;3625.28ms
(time (test-read-file (mmap-reader "/home/chrisn/Downloads/yellow_tripdata_2016-01.csv" nil)))
;;4500ms
(time (test-read-file (Files/newBufferedReader (Paths/get "/home/chrisn/Downloads/yellow_tripdata_2016-01.csv" (into-array String [])))))
;;3692ms
)
44 changes: 43 additions & 1 deletion src/tech/v3/parallel/for.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
(ns tech.v3.parallel.for
"Serial and parallel iteration strategies across iterators and index spaces."
(:import [java.util.concurrent ForkJoinPool Callable Future ForkJoinTask]
[java.util ArrayDeque PriorityQueue Comparator Spliterator Iterator]
[java.util ArrayDeque PriorityQueue Comparator Spliterator Iterator
ArrayList List]
[java.util.stream Stream]
[java.util.function Consumer IntConsumer LongConsumer DoubleConsumer]
[clojure.lang IFn]))
Expand All @@ -22,6 +23,20 @@
(ForkJoinPool/getCommonPoolParallelism))


(defn cpu-pool-map-reduce
"Execute map-fn in the separate threads of ForkJoinPool's common pool"
[map-fn reduce-fn]
(if (ForkJoinTask/inForkJoinPool)
(reduce-fn (map-fn 0))
(let [parallelism (ForkJoinPool/getCommonPoolParallelism)
pool (ForkJoinPool/commonPool)]
(->> (repeatedly parallelism #(.submit pool ^Callable map-fn))
;;force all submissions to start
(doall)
(map deref)
(reduce-fn)))))


(defn indexed-map-reduce
"Execute `indexed-map-fn` over `n-groups` subranges of `(range num-iters)`.
Then call `reduce-fn` passing in entire in order result value sequence.
Expand Down Expand Up @@ -120,6 +135,8 @@
"Convert a stream or an iterable into an iterator."
^Iterator [item]
(cond
(instance? Iterator item)
item
(instance? Iterable item)
(.iterator ^Iterable item)
(instance? Stream item)
Expand All @@ -129,6 +146,31 @@
(type item))))))


(defn rest-iter
"Mutable update the iterator calling 'next' and return iterator."
^java.util.Iterator [item]
(let [iterator (->iterator item)]
(when (.hasNext iterator)
(.next iterator))
iterator))


(defn batch-iter
"Given an iterator, batch it up into an implementation of `java.util.List` that
contains batches of the data. Return a sequence of batches."
[^long batch-size item]
(let [iter (->iterator item)
next-batch (ArrayList. batch-size)]
(when (.hasNext iter)
(loop [continue? (and (.hasNext iter)
(< (count next-batch) batch-size))]
(if continue?
(do
(.add next-batch (.next iter))
(recur (.hasNext iter)))
(cons next-batch (lazy-seq (batch-iter batch-size iter))))))))


(defmacro doiter
"Execute body for every item in the iterable. Expecting side effects, returns nil."
[varname iterable & body]
Expand Down

0 comments on commit 7867bbd

Please sign in to comment.