サーバーもラップトップもまた電話ですら、今日ではプログラムが使うことができる制御の独立した複数のスレッドによるマルチコアのチップで作られている。
プログラムがそれらのチップのすべての長所を引き出すように設計する必要がある。
Clojureはこのマルチコアの世界のなかでそのために生まれた。

Javaのような言語における並列性の大部分の問題は共有する可変的な状態を管理する問題にある。
これまで見てきたように、Clojureは基本的に(複数のスレッド間で安全に使うことができる)不変的なデータに頼っている。
また、これも見てきたように、状態をもつコンテナ(`atom`、`ref`、`agent`、`var`)を使い、明白な状態を作ることができる。
すべてのコンテナは一般的な更新モデルを使い、それにより状態は純粋な関数によってひとつの不変的な値から別の値へいつでも変換される。
この多くの一般的なエラーのクラスを取り除くアプローチの組み合わせにより、それらすべてコアが何かしら働かせる方法の本当の問題に焦点を合せることが可能になる。

わたしたちが出会う最初の問題のひとつは、主なスレッドから作業を移すことと主なスレッドがその仕事をする間に非同期に実行する方法です。
一度あれをすれば、その非同期なタスクが仕事を終えたときにその結果を受け取る方法も必要とする。
Clojureの`future`と`promise`へと飛び込みます。

長寿命なタスク指向な並列性のために、一連のタスクをワーカー・スレッドのプールに振り分けて処理します。
JavaはClojureから直接呼び出せるキューとワーカーのための頑強な道具を持っている。
その道具によって全てのコアを使って仕事の流れを効果的に処理することが可能になる。

いくつかの場合において、並列に、複数のコレクションに渡り、同じ変換をすべての要素に行う、きめ細かい仕事を実装する必要がある。
それらの問題をコレクションとシーケンス関数によってアプローチする方法をすでに見てきたが、Clojureは`reducer`と呼ばれる他のオプションを持っている。
`reducer`によってまるでシーケンスであるかのように変換を構成するがその実行は並列であることが可能になる。

最後に、プログラムの全体的な構造を組織するためにどのようにスレッド(と軽量プロセス)を使うことができるか考えたい。
`core.async`ライブラリはその構成を助けるために`channel`と`go block`の概念を定義する。
アプリケーションの構造を定義する方法を見る。

# Push Waiting to the Background

大部分のプログラムは外の世界とファイルやソケットや標準終端ストリームを通してつながる。
それらの入力と出力をI/Oと呼ぶ。
現代のプロセッサは一秒に何十億もの命令を実行できるが、大部分のI/Oは比較的低速です。
たくさんのプログラムがファイルからデータを読みだしや外部サーバからの応答の受信やユーザがやりたいことを見付けるののを待つためのかなりの時間を過す。

この街を効果的に行うことが必要であり、プログラムは他の仕事を続けられる。
待っているあいだ、他の処理を実行することもできるし、同じことを並列で待つこともできる。
例えば、ウェブ・ブラウザは外部のウェブ・サーバがコンテンツを返すのを待つために時間を過すひとつのプログラムです。一方で同時に、現在のページを表示し、リンクのクリックへの応答をし、ページのスクロールをしたりする。

## Fire and Forget

はじめに、応答の必要がないバックグラウンドでなされる必要がある簡単な仕事の場合を考えよう。
アプリケーションを組み立てて、イベントが発生するたびに外部のメーター法のコレクタを呼びたいと想像してください。
外部のサービスを`inc-stat`という便利な関数に包むことができる。
状態を更新するためにそれを呼び出す。

```
(inc-stat :pageview)
```

この関数はネットワークごしに外部のウェブ・サーバを呼ぶことになる。
ページ・ビューを生成しているあいだにそれを呼んだとき、呼び出しをする時間は各ページを構築するのが遅くなる。下の図から分るように。

![fig 05-01](fig_05_01.jpg)

この仕事をバックグラウンドのスレッドに移動するには、Clojureに含まれる`future`関数を使う。

```
(future (inc-stat :pageview))
```

`future`関数はボディを1つ取り、Clojureが管理するバックグラウンド・スレッド・プールにそのボディを呼び出す。
その違いを図で見ることができる。

![fig 05-02](fig_05_02.jpg)

また、ボディを渡す代りに非同期に引数なしの関数を呼び出す`future-call`を使うこともできる。
どちらの関数も非同期な活動を制御し検査する`java.lang.Future`オブジェクトを返す。
`future-cancel`関数はその実行をキャンセルする、一方`future-done?`と`future-cancelled?`はその状態の情報を与える。

しかしながら、遠隔地のサービスへ独立した統計的増加メッセージの洪水を送ることは非効率的に見える。
送信する前にいくつかの増分メッセージをまとめておくほうが理にかなっています。(??)
そうするためには、非同期でかつ状態をもつことが必要です。

## Asynchronous and Stateful

「State, Identity and Change」において、Clojureにおけるいくつかの状態コンテナ(`var`、`atom`、`ref`)を調べた。
われわれはもうひとつの状態コンテナである`agent`の導入をここまで引き伸ばした。

他の状態コンテナのように、`agent`は不変的な値を保持し、同じ更新モデルを使って修正される。
他のコンテナと違うのは、`agent`は非同期に更新されるということです。

メーター法コレクタを考えてください。
特定の統計のためのカウンタを`agent`に保持しましょう。

In [1]:
(def pageview-stat (agent 0))

#'user/pageview-stat

すべての`agent`の更新ごとに遠隔サーバを呼ぶのでなく、10回ごとにだけ呼ぶとします(`agent`の状態が10で割り切れる数に逹したとき)。
これは監視をしやすい(すべての状態コンテナで働く)。

In [3]:
(defn remote-send [key new-val]
  ,,, )
(add-watch
    pageview-stat
    :pageview
    (fn [key agent old new]
        (when (zero? (mod new 10))
            (remote-send key new))))

#agent[{:status :ready, :val 0} 0x515d3428]

ここで、状態へのなにかしらの変更により発火する`pageview-stat``agent`へ監視を加えた。
今、新しい`agent`の状態が10の倍数のときだけ外部サービスの要求の発火、それはわれわれにいくつかのバッチ処理を与える。(???)

そして、ちょうどその`agent`上で非同期に実行される関数が送るのようにアプリケーションが使う増加関数を定義できる。(??)


In [1]:
(defn inc-stat [stat]
    (send-off stat inc))

#'user/inc-stat

Clojureは`agent`上にて非同期に更新処理を呼び出す2つの関数を提供する。`send`と`send-off`です。
計算量が多く、入出力をブロックしない`agent`の更新には`send`を使用してください。(??)
基礎になるスレッド・プールはスレッドの固定された組を使い、ちょうど間に合った方法の中でこれらの更新が完了することに依存する。
任意の時間、ブロックする更新のためには`send-off`を使用してください。
基礎になるスレッド・プール(将来のためにも使う)は必要に応じて増えるので、ブロッキングは問題になりません。
`inc-stat`において、外部のサービスを(`agent`のスレッドにて実行する監視を経由し)潜在的に呼び出しているので、`send-off`を使います。

`agent`のひとつの追加的な特徴は、STMトランザクションの内部`agent`上または`agent`の活動自体の内部で呼び出される`send`や`send-off`はトランザクションが完了するまで遅らされる。
こうして、副作用を生産するために、(成功するために再試行が必要かもしれない)STMトランザクションの内側またはその他の`agent`の更新活動の内側で`agent`を安全に呼び出すことができる。

    #### Shutting Down
    
    Javaにおいて、スレッドはダエモン・スレッドとしてマークされる。
    JVMはすべてのダエモンでない・スレッドが仕事を終えたとき(典型的にこれは主要な開始するスレッドが終了したときに発生する)優雅にシャット・ダウンする
    スレッドをダエモンスレッドとしてマークすることはそれはバックグラウンドに仕事し、シャットダウンを妨げるべきでないことを意味する。

    `future`と`agent`の活動を処理するスレッドはダエモン・スレッドではない。
    もしアプリケーションが想定するとおりに終了する代りにハングするようならば、アプリケーションが終了するまでの間、`shutdown-agent`の呼び出しを追加することが必要だろう。

これまで、なにも応答せずに、バックグラウンドにブロックする活動をしてきた。
バックグラウンド・スレッドで行われた仕事から、どのように応答を受け取ることができるのか見ましょう。

## Waiting for a Response

ときどき、仕事をバックグラウンドに移して後でその結果を取りに戻りたいと思う。
例えば、オンライン・ストアにて、いくつかの製品を取り、それらの価格を比べたいというアプリケーションを考えてください。
それぞれの店に順番にシングルスレッドで問い合せると、次のようになる。


In [6]:
(defn query [store product] ,,,)

(defn query-stores [product stores]
    (for [store stores]
        (query store product)))

#'user/query-stores

これを実行する時間は各店に問い合わせる時間の合計であり、図示すると次のようになる。

![fig-05-03](fig_05_03.jpg)

もっと上手くできます!
仕事をバックグラウンド・スレッドに移す魔法のような可能性により、すべての店へ同時に問い合わせることができる。
非同期な問い合わせのために`future`を使う。


In [5]:
(defn query-stores [product stores]
    (for [store stores]
        (future (query store product))))

#'user/query-stores

しかしながら、すでに述べたとおり、`futre`は`java.lang.Future`を返すので、`query-store`は今、それらのシーケンスを返す。実際の結果ではない。
非同期な呼び出しの結果をブロックして待つために、`future`を`deref`または短縮形の`@`により間接参照する必要がある。

ここで`query-store`関数を2段階で実装した。
(すでに)第一段階で、すべての問い合わせに着手し、`future`オブジェクトのシーケンスを生産した。
必要とする唯一の変更は、`doall`を呼ぶことでその実行を(lazyでなく)強制することです。
もし`doall`を呼ばないならば、その問い合わせは後でシーケンスが実体化されるまで着手されない。
第二段階で各`future`を間接参照し、それぞれが完了するまでブロックする。

In [7]:
(defn query-stores [product stores]
    (let [futures (doall
                      (for [store stores]
                          (future (query store product))))]
        (map deref futures)))

#'user/query-stores

この`query-stores`関数は各店からの結果のlazyシーケンスを返す。
また、それらを間接参照しない`futures`のlazyシーケンスを返すだろう。(??)
そして、呼び元は各結果を解決するためにいつブロックするかを完全に制御できるだろう。(??)

今、問い合わせを並列に実行している。同時に、いくつかのサービスを待つことができる。もっとスレッドの影響力を行使することで全体の時間を削減する。次の図のように。
この図は`future`に呼び出され、各問い合わせの結果を間接参照することを待っている3つの`query-stores`を示している。

![fig-05-04](fig_05_04.jpg)

この方法で`future`を使うことはひとつのマイナス面をもつ。非同期な計算は元のコードへひとつの結果を返すだけです。
もし計算上の異なる点での複数の値を返して欲しいならば、`promise`を使うことができる。

## Making Promises

`promise`はひとつの値(だけ)をあるスレッドから別のスレッドへ移すために使われます。
従って、複数の`promise`は非同期な計算における異なる時点における値を返すために使われます。
例えば、ある非同期な計算に時間をかけて、その後その実行の開始と終了の時間を取得したいと思ってください。

In [1]:
(defn long-running-task []
    ,,,)

(defn launch-timed []
    (let [begin-promise (promise)
          end-promise (promise)]
        (future (deliver begin-promise (System/currentTimeMillis))
            (long-running-task)
            (deliver end-promise (System/currentTimeMillis)))
        (println "task begin at" @begin-promise)
        (println "task end at" @end-promise)))

#'user/launch-timed

この例において、`begin-promise`と`end-promise`の2つの`promise`を作る。
値は`promise`を経由して`deliver`関数により届けられる。
値はいちどだけ`promise`を経由して届けられ、最初の配達の後、その次の配達はなんの影響もない。

それらの`promise`は後でその値を取り戻すために間接参照できる。
この間接参照は値が利用可能である間ブロックする。そしてそれを返す。
`promise`の値が利用可能であるかブロッキングなしで見るためには、`realize?`関数を使う。
また、`deref`の変種が特定のタイムアウト期間を待つことに気を付けてください。

`future`と`agent`を仕事を非同期におこなうために使えて、待つことを押しやるかバックグラウンド・スレッド上にて仕事をすることが可能になる。
`future`と`promise`は同様に、非同期タスクから結果を返す方法を制御することを可能にしてくれる。
(それがあなたのためになるかもしれないものは何でも)実際の仕事をおこなうプログラムを構築する方法を見ることが必要である。

# Queues and Workers

多くのプログラムはタスク・プロセッサの全体または一部に見える。タスクとは普通外部の要求に対応する仕事の単位である。
ウェブ・アプリはウェブ・ページを構築する要求を受ける。
ウェブ・サーバはAPI呼び出しの処理をする要求を受ける。
バッチ・プログラムはディスクまたはデータベースからファイルを読み出して、それらを適切に処理する。
これら一般的なパターンのすべては、ワーカーのプールに養殖された仕事のキューとしてモデル化される。

キューはタスクを順序付けして保持し、仕事が到着した場所と処理された場所を分離します。
ワーカー・プールによって異る属性のワーカーのプールを作り、その仕事を管理し監視することに使用される並列性の量とポリシーを制御することが可能になります。(??)
あの制御により廃棄におけるハードウェアの使用をフルにすることが可能になる。(???その管理により、私たちは自分の処分でワードウェアを最大限に活用することができます。)

Clojureはキューとワーカーに対していくつかの道具を提供するが、また、Javaですでに利用可能な高品質な道具の再発明を避ける。
Clojureでこれまでに見てきた部品からキューとワーカー・プールを作る方法を考えよう。

## Some Assembly Required

「Model Your Domain」にて、FIFOデータのためにリストやベクタよりさらに効果的なアクセスを提供するためにClojureの永続的なキューを使った。
しかしながら、シングル・スレッドの文脈のなかでそうした。
永続的なキューにおいて、キューの各変形は更新された版を返す。
もし複数のスレッドがキューを共有すれば、それらはみな同じインスタンスを共有することを必要とする。
(`atom`や`ref`のような)状態管理の構造と状態をもったキューの実装の両方が必要である。

われわれの選択肢はClojureの`atom`か参照に永続的なキューを包むことであろう。それはキューの両端が安定したアイデンティティを維持する。
もしこれを`atom`で試みるならば、`atom`の`swap!`関数は`atom`(わたしたちにとってのキュー)の新しい値を返せるだけで、弾かれた値ではないと気づくだろう。(??)
これによりこの主のキューから状態を持った方法で項目を引き出すことが困難になる。(??)

`ref`の選択はより有望にみえる。
このように実装できた。

In [2]:
(defn queue
    "Create a new stateful queue"
    []
    (ref clojure.lang.PersistentQueue/EMPTY))

(defn enq
    "Enqueue item in q"
    [q item]
    (dosync
        (alter q conj item)))

(defn deq
    "Dequeue item from q (nil if none)"
    [q]
    (dosync
        (let [item (peek @q)]
            (alter q pop)
            item)))

#'user/deq

しかしながら、このキューはブロックしない。
一般的にコンシューマーがキューが空のときにデータが届くのを待つために`deq`にてブロックすることを望むが、この実装はただ`nil`を返すだけで代りにコンシューマーが繰り返し確認することを要求する。
この理由により、Clojureの永続的なキューはスレッドを跨ぐ仕事のキューの管理としてはふつう良い道具でない。

代りに、キューとワーカーに対するJavaの支援に注目することを必要とする。
これはJavaライブラリが多種多様な振舞いに対して力強く支援してくれる領域です。

## Java Queues

キューとワーカーを支援するJavaクラスの大半は`java.util.concurrent`パッケージの中にある。
Javaは多くのブロッキング・キューの実装を提供する。(すべて`java.util.concurrent.BlockingQueue`の実装)そして、それらはClojureから簡単に使うことができる。

Javaキューの実装の中の主要な違いのひとつはデータをバッファする方法です。
例えば、`LinkedBlockingQueue`はオブションの縛られたバッファを提供し、`ArrayBlockingQueue`は縛られたバッファを提供し、`SynchronousQueue`はまったくバッファを提供しない。生産者と消費者は両方が一方から他方への値を手にする準備ができるまで待たねばならない。
`LinkedTransferQueue`は任意に縛られたバッファで`SynchronousQueue`の渡す能力を組み立てる。(??)

これまで言及してきたすべてのキューはFIFOの順番で値を提供するが、Javaも項目を並べ直した2つのキューを提供する。
`PriorityBlockingQueue`はアッという間にキューの前面の高い優先度の項目内にて泡立つ。(??)
`DelayQueue`は遅延のあるメッセージを取り、遅延が切れたときにだけ可能になる。(??)

縛られたバッファーのキューもしばしば、生産者が満杯のバッファに出会うときカスタム化のための機会を提供する。(??)
Javaのブロッキング・キューAPIによって、ブロッキング、時限ブロッキング、特別な値を返すこと、例外を投げることが可能になる。

普通のJava相互運用メソッドコールを通じて`put`、`take`または他の`BlockingQueue`メソッドを呼ぶことができる。
キューを経由していくつかのメッセージを押し出しましょう。


In [3]:
(ns ch5.jqueue
    (:import [java.util.concurrent LinkedBlockingQueue]))

(defn pusher [q n]
    (loop [i 0]
        (when (< i n)
            (.put q i)
            (recur (inc i))))
    (.put q :END))

(defn popper [q]
    (loop [items []]
        (let [item (.take q)]
            (if (= item :END)
                item
                (recur (conj items item))))))

(defn flow [n]
    (let [q (LinkedBlockingQueue.)
          consumer (future popper q)
          begin (System/currentTimeMillis)
          recieved @consumer
          end (System/currentTimeMillis)]
        (println "Recieved:" (count recieved) "in" (- end begin) "ms")))


#'ch5.jqueue/flow

`pusher`関数は`n`の数だけキューに置き、最後に`:END`を付けてメッセージの完了を通知する。
`popper`関数は同じキューから`:END`メッセージを受けとるまでメッセージを取り出す。
それらの関数をバックグラウンド・スレッド・プール上で実行されるであろう`future`の中で実行する。

しかし、われわれはどのスレッドがその`future`を実行するか制御しない。
Clojureの`future`と`agent`は非同期実行のための比較的簡潔なAPIを提供するが、監視と制御のいくらかの喪失がある。
代りに、多くのスレッドで仕事をするためのJavaのビルト・イン支援を使うことができる。

## Making Threads

Javaはスレッドの工場(`ThreadFactory`)とキューとワーカー・プールの組み合わせ(`ExecutorService`)を表すインターフェイスを提供する。

このように、プロセッサの数に合せた計算スレッドの固定されたプールを作ることができる。

In [4]:
(import '[java.util.concurrent Executors])
(def processors (.availableProcessors (Runtime/getRuntime)))
(defonce executor (Executors/newFixedThreadPool processors))

(defn submit-task [^Runnable task]
    (.submit executor task))

#'ch5.jqueue/submit-task

Javaは実行可能なタスクを`Runnable`または`Callable`インターフェイスを使って表す。
役立つことに、全てのClojureの引数がない関数はそのインターフェイスを実装している。
タスク(任意のClojureの関数)は呼び出しのために`ExecutorService`に渡される。
そして、要求のストリームをタップすることと、それらを実行のためのタスクとして提出することは容易である。(??)

Java5にて追加された、Javaの`Executor`は当時一般的だった4から8コアのマシンのための粗粒のタスクの並列性を支援するために設計されている。
マシンにコアが追加されるにつれて、ひとつのキューを待つために発生した競合がキューから項目を取得するワーカーに対するボトルネックを作った。

これに対処し、他の計算パターンを利用するために、Java7で`fork/join`と呼ばれる新しいフレームワークが導入された。
`fork/join`はより小さなきめ細かい計算タスクと再帰計算とより多くのコアを支援するために設計され、調整されている。
`fork/join`は多くのワーカー・キューを使い、ほかから仕事を盗むことが可能です。
つまり、キューがやるべき仕事がなくなったら、他のキューの後ろからタスクを取ってくる。キューの間で仕事が自動的に均衡します。

`java.util.concurrent.ForkJoinPool`クラスはJavaの`fork/join`の実装に対する主なエントリー・ポイントです。
一度`ForkJoinPool`を構築したら、それはまた`ExecutorService`でもあり、同じ方法でそれに対してタスクを提出できる。
しかしながら、Clojureはその開発者にとってもっと自然な方法で`fork/join`を活用するフレームワークを提供する。
次に、どのように、そしていつそのフレームワークを使うか見よう。

# Parallelism with Reducers

Clojureにおけるほとんどのデータ操作はシーケンスに関数を適用するというふうに特定される。(??)
シーケンス(かれらの定義によれば)はある順番に並んだ論理的な値のリストです。大部分のコア・ライブラリのシーケンス関数は`lazy`に適用される。順番に、かつシングル・スレッド上で。(??)
あなたが「Use Your Core」の章で推測するかもしれないように、最後の詳細が問題です。

`reducer`はシーケンシャルなデータ上の変換を表現する別の方法です。シーケンス関数の構成に似ている。(???)
しかしながら、`reducer`は`fork/join`を使ってその変換を並列に実行できる。

## From Sequences to Reducers

具体的な例を考えましょう。
運送会社は今すぐに配送するために必要な製品の全てについてのデータを持っている。
各製品は領域エンティティであり配送クラスや重さ(など他の属性も)へのキーを持つ。(??)


In [5]:
{:id "230984234"
 :class :ground
 :weight 10
 :volume 300}

{:id "230984234", :class :ground, :weight 10, :volume 300}

現在の陸送すべての合計の重さを計算するために、陸送だけを選択するためにシーケンス関数を使い、その重さを取り出して、合計することができる。

In [8]:
(ns shipping.domain)

(defn ground? [product]
    (= :ground (:class product)))

#'shipping.domain/ground?

In [9]:
(defn qround-weight [products]
    (->> products
         (filter ground?)
         (map :weight)
         (reduce +)))

#'shipping.domain/qround-weight

Clojureはプログラムを一連の構成可能なシーケンスの操作として表すのを容易にする。
lazyさ、それはひとかたまりで一時的な最適化により支援される、は大きな製品のリストに対してその操作を効果的に行うことを可能にする。
しかしながら、このコードは仕事をするのにシングル・スレッドを使うだけです。

Clojureは`pmap`という`map`の特別な並列版を提供する。それはシーケンスの要素を取り、`future`でバックグラウンド・スレッドに異なる要素を送ることで仕事を並列に行う。

しかしながら、ほとんどの場合において要素ごとのなすべきタスクは小さい(ここでは、ただマップからひとつの属性を抜き出している)。
`future`を呼ぶことはスレッドの境界を越えて仕事を渡し、その結果を引き出すための同期のオーバーヘッドを加えることになる。
このオーバーヘッドに比べてタスクが小さいとき、`pmap`は対向のシングル・スレッドよりより遅くなるだろう。
加えて、このユース・ケースにおいて、まだコードの`filter`と`reduce`の部分を並列化していない。

Clojureはこの問題への解決策を持っている。`reducer`です。
`reducer`は(ちょうどシーケンスを扱うように)一連の構成可能なきめ細かい操作としてデータの変換を組み立てる方法を提供するが、すべての変換を実行する間に並列性を達成する。(??)
ボーナスとして、`reducer`は(ガベージ・コレクションにより後でかならず取り戻される)シーケンスについて見るほとんどの中間結果を作ることを避ける。

`reducer`は削減関数により組み立てられた縮小可能なコレクションから成る。
縮小可能なコレクションとは削減操作をそれ自身に対してできるだけ効果的に行う方法を知るコレクション以外のなにものでもない。
削減関数とは(ちょうど普通に`reduce`に渡すような関数)削減の間、結果を積み重ねる方法を述べる関数です。

多くの削減操作が、すでにシーケンス・ライブラリで使った版を反映して提供される。(`map`、`filter`、`mapcat`など)(??)
その操作のおのおのは`reducer`を取り、また、返すが、変換を行わない。
代りに、その操作は新しい操作を考慮するために単に削減関数を修正するだけです。(??)

変換を行なうために、`fold`という新しい`reduce`のような関数を呼び出す。
図fig-05-05で示すように、`fold`は元のコレクションをグループに分割し、削減関数を使って各グループにおいて`reduce`を行う。
現在、永続的なベクタとマップだけが並列に`fold`されうる。他のコレクションはすべてひとつの連続した`reduce`に後退する。(?)
この連続した`reduce`でさえ、等しいシーケンスの版よりもっと効率的である。なぜならば、中間結果を避けるからである。

前の例に返って、`clojure.core.reducers`ライブラリを引き込み、関数の`reducer`版を使って陸送の重さの計算を書き直すことができる。

In [11]:
(ns shipping.reducer
    (:require [shipping.domain :refer (ground?)]
              [clojure.core.reducers :as r]))

(defn ground-weight [products]
    (->> products
         (r/filter ground?)
         (r/map :weight)
         (r/fold +)))

#'shipping.reducer/ground-weight

この実装は`clojure.core`ネームスペースの代りに`clojure.core.reducers`ネームスペースの関数を使うことを除いて、元の版に似ている。
他のアプローチに対しての`reducer`の主な利点のひとつは、操作の構成可能な形を保持することを可能にするところです。(??)

![fig-05-05](fig_05_05.jpg)

`map`と`filter`の`reducer`版は製品の元のベクタ上で変換を行わないことを思い出してください。
`fold`の最後の呼び出しまでなにもおきない。
この例では、`fold`のいちばん簡潔な版を使っている。それは`reduce`と`combine`段階の両方で同じ関数を使う。

再帰的に問題を分割するどんな並列計算でもはただ仕事をするだけもより高価である分割することと再結合するときを決めることを必要とする。
これに対する最良の答えはない。個別の計算のサイズに依存している。
`fold`関数によりパーティションのサイズを指定することが可能になる。デフォルトはグループごとに512要素(`+`のような簡潔な数学操作が上手く働くサイズ)です。
さらに複雑な変換ではより小さなパーティションのサイズが恩恵を受けるだろう。

マルチ・コア・マシンにおけるパフォーマンスの向上のために`reducer`を使っている。それでは、シーケンスと`reducer`の間のパフォーマンスを比較しよう。

## Reducer Performance

シーケンス版と`reducer`版をますます大きな製品のベクタの上で実行しよう。
詳細を理解するために、データを2つの尺度で見よう。
次の図は製品の数(N)が32、128、512、2048のときの結果を示している。
デフォルトのパーティションのサイズが512なので、Nが512より小さいとき、`fold`実際には並列ではない。ひとつのパーティションだから。
これらのテストは4つのハイパースレッド・コア(8コアと表示される)のMacBook Proにて実行された。

期待通り、シーケンス版と`reducer`版のパフォーマンスはNが512になるまでは同じです。
しかしながら、一度そのパーティションのサイズを越えて、シーケンス版がシングル・スレッドで、`reducer`版がデータを異なるスレッド上で並列に実行される複数のパーティションに分割する。
さあ、引き戻って、次の図にさらに3つのデータ地点(N=8192、32768、131072)を加えることで、さらにNの値が大きくなるときの衝撃を見よう。

Nが大きくなるにつれて、`reducer`の利点が明らかになる。`reducer`は仕事を4つのコアに分割している、一方で、シーケンス版はひとつのコアを使っている。
加えて、`reducer`版は出すゴミが少なく、ガベージ・コレクタのロードを削減している。

`reducer`の主な利点のひとつは、もっとコアが多いマシンに移ったとき、同じコードがより速く実行することです。
マシンのコアを切ることでこの違いを示すシミュレートができる。
Nを131072に固定して、代りにコアの数を変えて、シーケンス版と`reducer`版のテストに返ろう。
このテストにおいて、あらゆる高価を排除するためハイパースレッディングを切る。

シーケンス版のパフォーマンスがコアの数に関係なく効果的に同じであることが、前の図から見ることができる。なぜならばコードはとにかく一つのスレッドに縛られているからです。
シングル・コアの場合は特に悪い。なぜならば、ガベージ・コレクションとマシンにおける他の仕事がたったひとつのコアを使っているからです。
しかしながら、`reducer`版は追加のコアに対して明確に勝っており、同じコードは自動的により速くなる。
8か16コアのサーバに移動すればこのコードはまだもっと速くできることを予想できる。

`reducer`を使うとき、データのサイズと、パーティションに対する仕事のサイズと、利用できるコアの数について考えることは重要です。
要素の数がパーティションのサイズよりも少ないとき、シングル・スレッドです。(ただし、それでもある程度の利点は見える)。
一度パーティションのサイズを越えると、`reducer`はマルチ・スレッドになる。しかし、オーバーヘッドのためこの利点はコアの数に等しくはならない。

他のキーは並列な`fold`可能なコレクションは(今のところは。さらに追加されるだろう。)永続的なベクタと永続的なマップだけです。
ほかのコレクション(とシーケンス)すべてはひとつのパーティションを越えて`reduce`に戻ります。
`reduce`のほうが速い可能性が高いですがマルチ・スレッドではありません。(??)

`reducer`はシーケンス・ライブラリの使いやすさの利点とマルチ・スレッドの`fork/join`のパフォーマンスを組合せる。両方の世界の良い所を。
`reducer`のほとんどを作るためには、大量の処理が必要で、データが`fold`可能なベクタかマップに存在する場合に使用を制限する。

# Thinking in Processes

今まで一回の非同期(`future`と`promise`)のために複数のコアを使う方法を見てきた。粗挽きのタスクの並列性ときめ細かいデータの並列性。(??)
しかし、どきどき、並列性でなく並行性に興味を持つ。つまり、プログラムを並行なスレッドの実行の組として設計する可能性。
必然的に、それらの実行のスレッド間で値を伝えもしたいと思います。一度だけでなく継続的に一連の値として。

`core.async`ライブラリはこの必要性への答として作られました。
もともとは、Clojure自体の一部として着想されましたが、コア・ライブラリよりもっと速い進化を可能にするために、最終的に独立したライブラリとしてリリースされました。
`core.async`ライブラリは2つの中心となる概念を提供します。(独立したスレッドの実行である)`go`ブロックと(ある場所から他の場所へ値を渡す手段である)`channel`です。
次は、それらを使う方法を探ります。

## Channels

`channel`は、プログラムの2つ以上の部分の間で一連の値を時間の経過とともに伝達するキューのような手段です。
`channel`はスレッドの間に生成され、渡される。それらは状態を持っている。

`channel`はその内部に値を保持するためにバッファを使う。
デフォルトでは`channel`はバッファされない(長さ0)でJavaの`SynchronousQueue`のようなものです。
バッファされていない`channel`は生産者と消費者の両方が`channel`間で値を扱うことが可能になるまでブロックする。
`core.async`ライブラリも固定長のバッファとバッファを削除する(もしいっぱいならば新しいデータを捨てる)こととバッファをスライドさせる(もしいっぱいならば古いデータを捨てる)ことを提供する。

`core.async`で`channel`を生成するには、`chan`関数を使う。
ここに異なるバッファの型とサイズで`channel`を生成するいくつかの例がある。

In [2]:
(require '[clojure.core.async :refer (chan dropping-buffer sliding-buffer)])

(chan)    ;; unbufferd(length = 0)
(chan 10) ;; bufferd (length = 10)
(chan (dropping-buffer 10)) ;; drop new values when full
(chan (sliding-buffer 10))  ;; drop old values when full

#object[clojure.core.async.impl.channels.ManyToManyChannel 0x131f5cab "clojure.core.async.impl.channels.ManyToManyChannel@131f5cab"]

どんな一般的なClojureの値でも`channel`の中に置くことができ、他方に伝達できる。
ひとつの例外は`nil`で、これは`channel`が閉まっておりなにもデータが残っていないことを示すために使われる特別な値です。
`channel`は`close!`関数により閉められる。

`channel`における2つのもっとも重要な操作は`put`と`take`で、それらは文脈と使用法に依存するいくつかの形式を持つ。
普通のスレッドから`channel`を使うとき、`put`の操作は`>!!`で`take`の操作は`<!!`です。
ここに`channel`を生成し、そこに値を`put`してから`take`して値を取り戻す例がある。

In [3]:
(require '[clojure.core.async :refer (chan <!! >!!)])

(def c (chan 1))
(>!! c "hello")
(println (<!! c))

hello


前のコード例において、現在のスレッドから`put`と`take`の両方の操作を行なっているが、実際のプログラムでは、`channel`の2つの端は一般的に異なるスレッドかコンポーネントからそれらの間で値を送るために使われる。
バッファのサイズに1を指定して`channel`を生成したことに気づくかもしれません。
もしバッファなしの`channel`を使っていたら、この例にて待つために`put`はブロックしただろう。そしてこの例が完了することを妨げていただろう。

`core.async`における`channel`は必ず縛りがある。
これは、後で問題になるような設計上の問題を回避するための意図的な設計の制約です。(??)
システム内にて縛りがないキューは予期せぬロードが重なりあう所に置かれ、やがてリソースが尽きてシステムをクラッシュさせるだろう。

代わりに、`core.async`は固定のサイズを選ぶかいっぱいになったときに何を捨てるかについてのポリシーをインスタンス化することによってバッファの長さを縛ることを要求する。
固定サイズのバッファはいっぱいになったキューに追加を試みるとき生産者をブロックをする背圧を作る。
これは設計を前もって考えることを促進します、生産中ではなく。システムはロードを待つことによるブロックを明示的に扱うことをハッキリと設計されねばならない。受ける仕事を減らすかなにをやらないかを選択する。

完全に異なるサブシステム内の専用スレッドから`channel`を使うことは可能ではあるけれども、`goブロック`からそれを使うことがより一般的です。
`goブロック`によりスレッドのプールにより支援される軽量プロセス・ループを生成することが可能になる。

## Go Blocks

伝統的に、Java(またはClojure)プログラムはそのプログラムの各部分の実際の処理を含む(実際のOSのスレッドに対応する)スレッドを作る。
`core.async`ライブラリは異なる伝統に従っている。それはC.A.R.Hoareの古い作品「Communicating Sequential Processes」の遺産に基く。

ここではその作品の詳細には触れない。
重要なことはプログラムを構築する方法についての異なる方法の考え方を学ぶことです。
スレッドは少なくて高価なリソースです。
それらはスタック空間と他のリソースを消費し、それらは比較的開始するのが遅い。
そのスレッドがI/Oをブロックするとき、わたしたちはそのシステムのリソースを浪費する。

代りに、`core.async`はスレッド・プール対応づけられていて、仕事の準備が整ったときにのみ実行する軽量プロセスの観点から考えることを奨励する。
`channel`からのまたは`channel`へのメッセージを待つ間ブロックする代りに、プロセスが再度実行の準備ができるまでそれらのプロセスは駐車(parked)できる。(??)
これによって複数のI/O操作に渡って選択するための、また、最初のものが完了したときに進むいくつかの興味深い新しい振舞いを実装できる。

`core.async`において、そのプロセスを(Go言語の同様な概念にもとづき)`goブロック`を呼ぶ。
`goブロック`内では`channel`を使うが、`put`や`take`操作は`<!`と`>!`である。

ここにメッセージを受け取って表示処理する`goブロック`をつくる関数の例がある。

In [3]:
(require '[clojure.core.async :refer (go <!)])

(defn go-print
    "Pull messages from channel c and print them."
    [c]
    (go
        (loop []
            (when-some [val (<! c)]
                       (println "Recieved a message:" val)
                       (recur)))))

#'user/go-print

この例において、`goブロック`は軽量プロセスとして実行する。
`channel`操作(`<!`や`>!`など)に到着するときに、もし`channel`操作が実行されれば、実行は継続する。
もし`channel`操作が続けられなければ、`goブロック`は駐車(park)する。
駐車(parked)した`goブロック`はスレッドを消費せず、効果的に停止(suspended)した計算はデータを待つ。
`channel`操作が進めるようになったとき、`goブロック`は起き上がって続きの実行をする。

`goブロック`はプログラムを潜在的な並列プロセスに分割する偉大な道具です。
`core.async`が`goブロック`と`channel`を特によく支援する、あるユース・ケースはデータ変換の段階のパイプラインを構築することです。

## Pipelines

`core.async`ライブラリは並列な変換の段階にて2つの`channel`を接続する一連の関数を提供する。`pipeline`、`pipeline-blocking`、`pipeline-async`だ。
このpipeline関数は(より簡単な`pipe`と同様に)入力`channel`から出力`channel`へと値を移すが、重要なある特徴を提供する。並列な`transducer`の実行です。

この特徴によって`channel`で分割されたデータの変換の段階を作ることに対して`pipeline`が偉大になる。(??)
各`transducer`の段階は多くの変換を潜在的に組合せることができるので、並列性の程度、および`channel`の分離に価値がある場所についての多くの選択を提供する。

例えば、SNSのストリームを処理するシステムを考えでください。
`transducer`として定義した一連の変換を提供できる。


In [1]:
;; parse message into set of words
(def parse-words (map #(set (clojure.string/split % #"\s"))))

;; filter messages that contain a word of interest
(def interesting (filter #(contains? % "Clojure")))

;; detect sentiment based on different word lists
(defn match [search-words message-words]
    (count (clojure.set/intersection search-words message-words)))
(def happy (partial match #{"happy" "awsome" "rocks" "amazing"}))
(def sad (partial match #{"sad" "bug" "crash"}))
(def score (map #(hash-map :words %1
                           :happy (happy %1)
                           :sad (sad %1))))

#'user/score

これらの`transducer`は入力ストリームから出力ストリームへのひとつの段階のpipelineで一緒に構成することができます。


In [9]:
(defn sentiment-state
    [in out]
    (let [xf (comp parse-words interesting score)]
        (clojure.core.async/pipeline 4 out xf in)))

#'user/sentiment-state

これは、`in`から`out`へ4つの並列なスレッドで接続し、それぞれが組み合された`transducer`の変換を処理します。
しかし、アーカイブへログを取っているような感情の分析が進行中である間に異る`pipeline`の段階にて発生している他の分析があるかもしれない。(??)
その場合、この段階を2つに分割することができる。感情の分析の前に新しい中間`channel`を作ります。

In [4]:
(require '[clojure.core.async :as async])

(defn interesting-stage
    [in intermediate]
    (let [xf (comp parse-words interesting)]
        (async/pipeline 4 intermediate xf in)))

(defn score-stage
    [intermediate out]
    (async/pipeline 1 out score intermediate))

(defn assemble-stages
    [in out]
    (let [intermediate (async/chan 100)]
        (interesting-stage in intermediate)
        (score-stage intermediate out)))

#'user/assemble-stages

今、入力メッセージのすべてを取って興味のあるものだけを出力する(4つのスレッドまでを使う)最初の段階と興味のあるメッセージを取りそれらにスコアを付ける第2の段階を持つ。
第2の段階に対して、量が比較的少ないため、並列処理をひとつのスレッドに減らすことができる。
いちどそれらの段階を組み合せれば、`intermediate`メッセージ`channel`を他の目的に使う機会がまたある。



# Wrapping Up
