In [1]:
from utils import create_or_load_pickle, ignore_warnings, ProgressReporter
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
ignore_warnings()

# まとめ
ビッグデータとは，コンピュータが速度やメモリ，データの大きすぎといった問題で処理しきれないデータ  
- jug
    - 長時間に及ぶ計算において，一時的な結果をキャッシュして置いたり，並列計算させたりできる
    - タスク: キャッシュを取られ，並列計算される関数．TaskGeneratorデコレータで指定．依存関係のあるタスクは自動的に解決される
    - jug executeで実行され，キャッシュディレクトリが生成される
- AWS
    - 処理能力の高い環境を一時間単位でレンタルできる
    - 大規模なマルチコア，GPUなどを使ってjugなどを利用できる
- starcluster
    - Amazonのマシン作成作業をスクリプトで自動化するためのパッケージ
    - Python2.7までしか対応していない．おそらくPython3ではboto3が推奨
    - マスターノードと複数のスレーブノードからなるクラスタを作成できる．
    - クラスタ内ではjugなどのジョブをキューイングエンジンによって分担して実行できる．

現代は，コンピュータの処理速度のペースよりもデータが増えるペースの方が早くなっており，いずれ追いつけなくなる．  
ビッグデータの定義は，データが大きすぎるため処理を終えることができないようなデータのこととする．  
本章の前半では，jugというパッケージを使って中規模のデータを対象にシステムを作成し，以下のようなことを行う．  
- 一連の処理をタスクに分割する  
- 中間結果をキャッシュする
- マルチコアを利用する(グリッド上の複数台コンピュータを含む)

本章の後半では，AWSクラウドによるビッグデータの利用方法を見ていく．  
また，pythonのパッケージであるstarclusterを使って，AWS上のクラスタを操作する．

# ビッグデータ入門
ビッグデータとは，データの増加するスピードが速すぎたりメモリに入りきらなかったりして処理しきれないデータを指す．  
本章の前半では，マルチコア(1台もしくは複数台のマシン)を使って，計算を高速化する方法を学ぶ．  

# jugを使って，一連の処理をタスクに分割する
jugはLuis Pedro Coelho(著者の一人)によって開発されているパッケージで，一斉に問題を解くことができる．  
- 一時的な結果をディスクやデータベースに保存し，あとから再計算させることができる
- マルチコアもしくはクラスタ上の複数台のコンピュータを利用できる．
    - キューイングシステムを使うバッチ処理を行う環境で性能を発揮
        - Portable Batch System(PBS), Load Sharing Facility(LSF), Oracle Grid Engine(OGE, Sun Grid Engine)などがある．

## タスクについて
タスクとは，jugで行う処理の構成要素となる関数をさす．  
例えば次のような関数がタスクになる．

In [1]:
def double(x):
    return 2 * x

jugを用いることで，これらのタスクは次のように書くことができる．

In [2]:
from jug import Task
t1 = Task(double, 3)
t2 = Task(double, 642.34)

上記のコードをjugfile.pyという名前で保存し，コマンドライン上でjug executeを実行する．  
するとdoubleというタスクが2つ実行された旨が表示され，キャッシュデータの入ったjugfile.jugdataディレクトリが作成される．  
再度jug executeを実行すると，すでに計算は行われたのでこれ以上計算する必要はなく，計算が行われない．  
<br>
ここで純粋関数と一般的な関数に気を付ける．  
純粋関数とは単に入力された値を受け取り，結果を返すだけの服地作用のない関数．  
一般的な関数はファイルの読み書き，グローバル変数の読み込み，引数の書き換えなど，言語の許可する操作を行う関数．  
Haskellなどの関数型言語では，純粋関数と一般関数を区別するシンタックスが用意されている．  
jugのタスクは純粋関数である必要はないが，並列処理を行うため実行順序がランダムなので，グローバル変数の読み書きをうまく行えない．  
こういったミスを発見するため，jug execute --debug が用意されている．  
<br>
より自然に，効率よくタスクを生成，実行したい．

In [None]:
from jug import TaskGenerator
from time import sleep

@TaskGenerator
def double(x):
    sleep(4)
    return 2 * x

@TaskGenerator
def add(a, b):
    return a + b

@TaskGenerator
def print_final_result(oname, value):
    with open(oname, 'w') as output:
        print("Final result:", value, file=output) # ファイルに出力

y = double(2)
z = double(y)
y2 = double(7)
z2 = double(y2)
print_final_result('output.txt', add(z, z2))

jug execute 上のコードが書かれたファイル名 を行うと，先ほどのように実行される．  
TaskGeneratorを使うことで，一連のタスクを生成し，マルチコアCPUの恩恵を受けることができる．  
同時に，結果同士の依存関係を解決してくれる．  
jug execute jug_file_name.pyをバックグラウンドで実行しておけば，jug statusによって今どのタスクがRunningなのかが分かる．  
上記では4回doubleを実行しているため，普通なら16秒待つところだが，マルチコアで動作していれば大体8秒程度で完了する．  

## 部分的な結果を再利用する
新しい特徴量を追加したい場合を想定する．  
全ての特徴量を再計算するのは無駄である．  

In [None]:
@TaskGenerator
def hfeature(im):
    import mahotas as mh # グローバルモジュールではなく，ローカルに持つ．少しばかりの最適化になる．
    im = mh.imread(fname, as_gray=1)
    return np.mean(mh.features.haralick(image), 0) # Haralick特徴量

@TaskGenerator
def new_features(im):
    import mahotas as mh # グローバルモジュールではなく，ローカルに持つ
    im = mh.imread(fname, as_gray=1)
    es = mh.sobel(im, just_filter=1)
    return np.array([np.dot(es.ravel(), es.ravel())])

import glob
filenames = glob.glob("./data/ch10/SimpleImageDataset/*.jpg")
hfeatures = np.as_array([hfeature(f) for f in filenames])
efeatures = np.as_array([new_features(f) for f in filenames])

features = Task(np.hstack, [hfeatures, efeatures])

# ... 学習コード

一度jugでhfeaturesを求めて置いた後，efeaturesを追加してもう一度実行すると，hfeatureの値はキャッシュデータから読み込まれる．  

## 内部で行われていること
タスクは引数だけでなく，他のタスクを取ることもできる．  
あるタスクがほかのタスクを引数に取れば，その間には依存関係が作られ，子タスクの結果が出るまで親タスクの結果を計算しない．  
この依存関係に基づき，各タスクのためのハッシュを再帰的に計算する．  
ハッシュとは，そこにたどり着くまでに計算した全作業をエンコードして，数値化したもの．  
jug executeは次のような簡単なループ処理によって行われている．

In [None]:
for t in alltasks:
    if t.has_not_run() and not backend_has_value(t.hash()):
        value = t.execute()
        save_to_backend(value, key=t.hash())

実際のループ処理では，並列処理のロック問題のため，より複雑になる．  
ここで生成されるキャッシュデータがjugfile.jugdata/に書き込まれる．  
ファイルだけでなく，Redisデータベースに書き込むことも可能である．  
<br>
タスクを少しでも変更すれば，そのハッシュ値は変更され，再度計算される．  
そのタスクに依存するタスクも，同様にハッシュが計算されるため，再計算される．

## データ分析のためにjugを使用する
jugは中規模のデータ分析を行うのにとりわけ向いている．  
前処理部分は計算済みで，特徴量を計算する部分だけを変更した場合，前処理部分の計算を避けることができる．  
また，jugはnumpy配列に特に最適化されており，他のライブラリとうまく連携することができる．  
<br>
ここでは，もう一度10章でやった画像の特徴量取得に関する計算を例に挙げ，jugによる処理を行ってみる．

In [None]:
from jug import TaskGenerator
import glob
import numpy as np

@TaskGenerator
def hfeatures(fname):
    import mahotas as mh
    import numpy as np
    im = mh.imread(fname, as_grey=1)
    im = mh.stretch(im)
    h = mh.features.haralick(im)
    return np.hstack([h.ptp(0), h.mean()])

@TaskGenerator
def label_for(f):
    return f[:-(3+1+2)] # 3はjpg, 1はドット，2は数字の文字数

@TaskGenerator
def perform_cross_validation(features, labels):
    from sklearn.model_selection import cross_val_score
    from sklearn.linear_model import LogisticRegression
    return cross_val_score(LogisticRegression(), features, labels, cv=5).mean()

@TaskGenerator
def write_result(ofname, value):
    with open(ofname, 'w') as out:
        print("Result is:", value, file=out)

filenames = glob.glob("../ch10/SimpleImageDataset/*.jpg")

# TaskGeneratorはどのような関数にも使用することができる．
as_array = TaskGenerator(np.array)

features = as_array([hfeatures(f) for f in filenames])

labels = list(map(label_for, filenames)) # こういうとこでつまづいてもそれまで計算していた値がキャッシュされているから大丈夫！
res = perform_cross_validation(features, labels)

write_result('output_images.txt', res)

以上のコードを.pyファイルで保存し，jug executeすると計算が行われ，結果がoutput_images.txtに出力される．  
TaskGeneratorによるわずかな手間がかかるが，より便利にjugを使うためには必要になる．

以下にjugのその他の興味深いコマンドについて示す． 
- jug invalidate function_name: function_nameの関数の結果が不適切であるとみなし，その結果に依存する関数を含めて再計算する
- jug status --cache: jug statusの高速化，さらに --clearを追記することでキャッシュを削除しjugfile.pyの変更判定を行う
- jug cleanup: 余分なキャッシュファイルをすべて削除

他にも拡張機能はたくさんある．内部で計算されたデータを確認するような機能もある．  
jugのドキュメントの[barriers](https://readthedocs.org/)のページを参照．

# AWSを使用する
AWSでは処理能力の高い環境を一時間単位でレンタルできる．  
ここでは，仮想マシンとディスクスペースを提供しているEC2(Elastic Compute Cluster)を利用する．  
EC2には3つのモードがある．  
- リザーブドモード: 前払い制で単位時間のコストが安い
- 単位時間のレートを固定するモード
- 全体の状況に応じてレートを変動するモード

テスト用には「Free tier(無料版)」でシングルマシンを使える．  
CPUは遅いが，インタフェースの使い方に慣れたり，色んな実験を行うのに適している．  
<br>
EC2にはコストに応じて様々なマシンがある．  
シングルコアから大容量RAMを備えたマルチコア，GPUも．  
LinuxサーバかWindowsサーバを選べるが，Linuxサーバの方が若干安い．  
<br>
操作や設定スクリプトの記述はブラウザのUIコンソールから行えるが，プログラムのインタフェースの方が比較的変更が少ない．  
<br>
AWSでは，ユーザ名のことを「アクセスキー」，パスワードのことを「シークレットキー」と呼び，これらでログインを行う．  
アクセスキー，シークレットキーのペアはいつでもいくつでも?作ることができ，それぞれに異なる権限を与えることができる．  
<br>
サーバのリージョンは物理的に近いリージョンを選んだ方がデータの送受信が早い．  
また，海外のリージョンを使うときには個人情報に気を付ける．  

## 初めてのマシンを作成する
無理に有料マシンを使うことはない．  
まずは[AWS](https://us-east-2.console.aws.amazon.com/console/home?nc2=h_ct&region=us-east-2&src=header-signin#)にアクセスし，サインアップもしくはログインを済ませる．  

- EC2を選択
- 右上メニューからまずリージョンを選択(ここではアジアパシフィック(東京))
- 適当なLinuxサーバを選択．Pythonなどがすでに入っているバージョンがあるが，ない方を選択
- 無料枠，最小構成のt2.microを選択
- 「確認」まで進み，「起動」をクリック
- 「新しいキーペアの作成」を行う．ここでは「awskeys」というキーペア名にし，キーペアをダウンロードする．
    - ダウンロードされるpemファイルは大切に保存すること！
- 「インスタンスを作成」する．(AWSのサーバはインスタンスと呼ばれる)
- インスタンスリストページでインスタンスのステータスが緑色の「runnning」になるまで待つ
- 作成したインスタンスを選択した状態で，「接続」ボタンを押すと，ssh接続コマンドの例が出てくる．
  - Mac
    - awskeys.pemのパーミッションを400に変える．そうでないとBad Permissionでエラーになる．
    - コマンドの例をawskeys.pemがあるディレクトリで実行すると接続
    - ec2-userのところは，メインページの右上に表示されている名前
  - Windows(https://dev.classmethod.jp/cloud/aws/first-login-to-ec2-linux/)
    - Windowsではpemのパーミッションの都合でログインできないので，TeratermやPuTTYを使ってログインする．
    - Teratermでは，ホスト名にパブリックDNSをコピペしてOK
    - ユーザ名には初期ユーザ名(多分ec2-user)，パスフレーズは空欄
    - RSA/DSA/ECDSA...鍵を使うのところを選択し，秘密鍵を選択．すべてのファイルを表示で見える．  
    - OKでログイン

### PythonパッケージをAmazon Linuxにインストールする
デフォルトだとPython2系なのでPython3.7を入れ，他にも必要な物をインストールする  
``` bash
> sudo yum -y install python3* gcc-c++ git libpng* libtiff*
> python3
> sudo pip3 install numpy scipy matplotlib scikit-learn jug mahotas imread
```

### クラウド上でjugを実行する
gitで画像ファイルを含むプロジェクトフォルダを好きなところに置く
```
git clone https://github.com/luispedro/BuildingMachineLearningSystemsWithPython
```
SimpleImateDatasetにアクセスできるよう先ほどのコードを変更し，viでコピペ．  
後はjug execute & でバックグラウンド動作させ，jug statusで見守る
<br>
結果が出るまでそこそこ待つ．  
より強力なインスタンスを使うには
- インスタンスを一旦停止
- change instance type(例えばextra largeインスタンスは当時0.58$/h かかっていた)
- マシンを起動
- 接続情報が変更されているので「接続」を押してアドレスを確認
    - Elastic IPによって固定アドレスが手に入る．そんなに高くない．

## starclusterでクラスタの生成を自動化する
starcluster: Amazonのマシン作成作業をスクリプトで自動化するためのパッケージ  
しかしPython2.7までしか対応していないので詳細は書かない．  
恐らくPython3ではboto3を利用するが，一応どんなことができるかだけ書いていく．

``` bash
> pip install starcluster # 失敗する
```

- starcluster help: config ファイルを作成
    - AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEYを設定
    - 16個のマシン(ノード)からなるクラスタ，smallclusterを定義．さらにマスターノードが作成されるので17台のマシンになる
    - イメージ(どのOSか，どのソフトウェアが入っているか，といった情報が入っている初期ディスク)を指定することもできる
- starcluster createkey mykey -o .ssh/mykey.rsa: sshキーを作成
- starcluster start mycluster: クラスタを生成

ジョブ用のキューイングエンジンが準備されているので，これらのノードを使ってバッチ処理ができる．  
- マスターノードにログイン
    - starcluster sshmaster mycluster
- マスターノードでスクリプトを準備
    - #!/usr/bin/env bash
    - jug execute jugfile.py
    - というスクリプトファイルrun-jugfile.shと共に用意しておく．このshファイルには実行権限を与える．
- ジョブをキューに送る．jugはもちろん，どんなUnixコマンドでも使える．
    - For c in 'seq 16'; do qsub run-jugfile.sh; done
    - 16個のジョブを作成し，jugを実行
- ジョブの完了を待つ
    - マスターノードではjug statusを使える
- マスターノードで結果を読む．
    - この時点ですべてのスレーブノードを停止させることもできる．
- やることが終わったら全ノードを停止
    - starcluster terminate mycluster