Skip to content

Commit

Permalink
Renaming to pmap, upmap and adding check for if already in forkjointask.
Browse files Browse the repository at this point in the history
  • Loading branch information
cnuernber committed Mar 3, 2022
1 parent d9b7441 commit 448843d
Showing 1 changed file with 28 additions and 16 deletions.
44 changes: 28 additions & 16 deletions src/tech/v3/parallel/for.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
[java.util.stream Stream]
[java.util.function Consumer IntConsumer LongConsumer DoubleConsumer]
[clojure.lang IFn]
[java.util.concurrent ForkJoinPool]))
[java.util.concurrent ForkJoinPool])
(:refer-clojure :exclude [pmap]))


(set! *unchecked-math* :warn-on-boxed)
Expand All @@ -19,6 +20,11 @@
default-max-batch-size 64000)


(defn in-fork-join-task?
[]
(ForkJoinTask/inForkJoinPool))


(defn common-pool-parallelism
"Integer number of threads used in the ForkJoinPool's common pool"
^long []
Expand All @@ -29,7 +35,7 @@
"Execute map-fn in the separate threads of ForkJoinPool's common pool.
Map-fn takes a single long which is it's task index."
[map-fn reduce-fn fork-join-pool]
(if (ForkJoinTask/inForkJoinPool)
(if (in-fork-join-task?)
(reduce-fn (map-fn 0))
(let [^ForkJoinPool pool (or fork-join-pool (ForkJoinPool/commonPool))
parallelism (.getParallelism pool)]
Expand All @@ -41,7 +47,9 @@
(reduce-fn)))))


(defn- unchunk [s]
(defn unchunk
"Given a possibly chunked sequence, return an unchunked sequence."
[s]
(when (seq s)
(lazy-seq
(cons (first s)
Expand Down Expand Up @@ -90,17 +98,17 @@
;;Get pairs of (start-idx, len) to launch callables
common-pool pool
;;submit index groups
submissions (->> (range n-groups)
(unchunk)
(map (fn [^long callable-idx]
(let [group-start (* callable-idx group-size)
group-end (min (+ group-start group-size) num-iters)
group-len (- group-end group-start)
callable #(indexed-map-fn group-start group-len)]
(.submit common-pool ^Callable callable)))))
submissions (sequence
(map (fn [^long callable-idx]
(let [group-start (* callable-idx group-size)
group-end (min (+ group-start group-size) num-iters)
group-len (- group-end group-start)
callable #(indexed-map-fn group-start group-len)]
(.submit common-pool ^Callable callable))))
(range n-groups))
next-submissions (drop parallelism submissions)]
;;make a true lazy sequence that will block on the futures and will submit
;;new tasps t
;;new tasks as current onces get processed.
(->> (sequence (map (fn [future submission] (.get ^Future future)))
submissions (concat next-submissions (repeat parallelism nil)))
(reduce-fn))))))
Expand Down Expand Up @@ -136,18 +144,22 @@
~@body)))))))


(defn cpu-pmap
(defn pmap
"pmap using the commonPool. This is useful for interacting with other primitives, namely
[[indexed-map-reduce]] which are also based on this pool."
[map-fn & sequences]
(apply claypoole/pmap (ForkJoinPool/commonPool) map-fn sequences))
(if (in-fork-join-task?)
(apply sequence (map map-fn) sequences)
(apply claypoole/pmap (ForkJoinPool/commonPool) map-fn sequences)))


(defn cpu-upmap
(defn upmap
"Unordered pmap using the commonPool. This is useful for interacting with other primitives,
namely [[indexed-map-reduce]] which are also based on this pool."
[map-fn & sequences]
(apply claypoole/upmap (ForkJoinPool/commonPool) map-fn sequences))
(if (in-fork-join-task?)
(apply sequence (map map-fn) sequences)
(apply claypoole/upmap (ForkJoinPool/commonPool) map-fn sequences)))


(defn as-spliterator
Expand Down

0 comments on commit 448843d

Please sign in to comment.