# データ分析基盤を関数型言語の特性を活かして構築してみる

# なんで関数型言語でデータ分析か?
データ分析と言えば、pythonやRなどの言語が主流です。理由としては、実装が簡単、ライブラリが豊富というのが主な理由であると思います。 
実際にpythonやRで機械学習のライブラリをす多少は叩いてみたこともありますが、初心者が軽く触ってもなんとなく結果がでるのはとてもおもしろかったです。ただし、なんとなく以上に進めない歯痒さや、ライブラリの中身がブラックボックスになっているもどかしさ、そして何より、コードの再利用、テスタビリティー、可読性の観点で、自分の至らなさが原因で、どうしても難しいところがありました。そこで、 **オブジェクト&関数型言語により出来る限りスクラッチで圏と関数、オブジェクト指向の概念を基に実装するチャレンジをすることで、より見晴らしの良いデータ分析の基盤を構築してみようと思いました。**  
そのため、この基盤を構築するにあたって、3つの概念をうまく組み合わせる必要があります。
- 圏や関数の数学的な概念
- データ分析で行われるデータ処理、機械学習、モデルの検証などの概念
- オブジェクト指向などの、プログラミングの概念

それぞれ3つの全く異なる分野ですが、数学とデータ分析はそれぞれの分野が互いに補完し合い、プログラミングにおいて有益なはたらきをします。
圏や関数（正確に言うと、関数は圏の概念を構築しますが、わかりやすさのため、あえて分けています。）は「数学」という強固なフレームワークをもち、そのフレームワークのおかげで、自分で一部だけ実装すれば、**定理に裏付けされたロジックにより**、あとはよしなにやってくれるための基盤をつくることができます。(デザインパターンで言うところの、Templeteパターン、Factoryパターンの使いどころです。)  
データ分析は、データの変換と処理により構築されるので、そのような数学的基盤はとても有用です。数学のフレームワークをバックグラウンドに、**具体的に何をするかを**アルゴリズムにより肉付けてきます。    
数学（フレームワーク）とデータ分析（アルゴリズム）を仲介役として、関数型プログラミングによって実装する事で、**この２つの概念を結びつけ、再利用性と可読性に秀でたデータ分析基盤を構築するのが、この記事の目的です。**  
なお、プログラミング言語としては、Scala(ver2.11.7 ~)を採用します。この言語は関数型としての特性と、オブジェクト指向としての両方の性質をもつので、より、可読性、再利用性の高いプログラミングを行う事ができると期待できます。

# データ分析と圏論、関数との相互作用
データ分析について、簡単にまとめ、改めて圏論、関数との相性について考えます。

## データ分析の区分
### 分類 (Classification)
履歴データから、分類するためのパターンを抽出する(ex xx度以上の体温の場合、風邪と分類する)

### 予測(Prediction)
履歴データを参照し、現在の状態から、別の状態を予測（推測）する（ex 大規模の患者データを参考に、現在の健康状態から疾病リスクを予測する）

### 最適化(Optimization)
全体最適を大規模のデータから構築する

### 回帰 (Regression)
データに沿った関数を提供する（連続的な分類モデルと同一視出来る）

上記のどのようなデータ分析も、大まかには以下のプロセスを通して構築されます

## データ分析の作業フロー

1. 問題の定義
2. データの入手
3. データのクリーニング（ノイズの除去、規格化など）
4. データのパターン分け（必要に応じて）
5. 特徴量の選択と、モデルの選択
6. モデルの学習と検証　
7. モデルの性能の向上


結局のところ、データ分析とは、 **何らかの観測データ[T]を最終的な分析結果[V]に変換しているにすぎないです。** この考え方は、 **圏論のフレームワークを基に実装する可能性を示唆してます**

# 構造と圏
Wikiでの圏の定義は以下です。
- 対象の類 ob(C)
- 対象の間の射の類 hom(C)
- 各射 f ∈ hom(C) には始域と呼ばれる対象 a ∈ Ob(C) および終域と呼ばれる対象 b ∈ ob(C) が付随して、"f は a から b への射である" と言い、f: a → b と書き表す。
- このとき、任意の三対象 a, b, c ∈ ob(C) に対し、射の合成と呼ばれる二項演算 hom(a, b) × hom(b, c) → hom(a, c); (f, g) ↦ g ∘ f が存在して以下の公理を満足する:
    - 結合律: f: a → b, g: b → c, h: c → d ならば h ∘ (g ∘ f) = (h ∘ g) ∘ f が成り立つ。
    - 単位律: 各対象 x ∈ ob(C) に対して x の恒等射と呼ばれる自己射 idx = 1x: x → x が存在して、任意の射 f: a → x および g: x → b に対して 1x ∘ f = f and g ∘ 1x = g を満たす。

諸々ややこしいですが、重要なのは、 **圏とは、集合（集合）と矢印（つながり）で構築される概念なんだな** ということです。
- 圏（構造）=集合（データの集合）+相互作用（データ同士のつながり）

いわばこれから作るデータの流れの**インフラ基盤を提供してくれる存在**という認識をもつことが出来ればと思います。

# Scalaでデータ分析を始める
上に書いたように
- データ分析は インプッットデータ=>処理1=>処理2=>...=>アウトプット　と**変換処理をつなげていくことである**
- 圏論&関数はデータを変換する流れを構築する**パイプライン**を構築する素晴らしい基盤となる

この二つを上手く組み合わせるために、Scalaによりデータ分析パイプラインを構築していきます



データ分析はどのような場合も、７つのワークフローに沿って、データの分析がおこなわれいきます。つまり、そのための、モデルを構築していきます

目標として、例えば、元のデータ[T]=>データのサンプル作成[U]=>データの規格化[V]=> データの抽出[W]までを行うワークフローを以下のように定義します

In [17]:
trait Workflow[T, U, V, W] extends UsesSampling[T, U] with UsesNormalization[U, V] with UsesAggregation[V, W] {

  def ||>(t: T): Try[W] =
    for {
      u <- sampler |> t   // サンプル作成
      v <- normalizer |> u // サンプルデータの規格化
      w <- aggregator |> v // 規格化したデータから一部を抽出
    } yield w
}

defined [32mtrait [36mWorkflow[0m

## モデルとは


- モデルとは、 **特定のシステムから観測されるデータを説明するため**に使用される。対象のデータをを説明するためのパターンをモデルとして構築し、そこから、新たなパターンを構築する  

と定義します  
Scala(関数型言語)においては、関数と圏の概念により、数学的バックグラウンドに基づいたモデルの基盤をつくっていきます  
ここでは圏と関数のよさを活かしたモデリングをしていきます
- 圏: データ処理の基盤構造（フレームワーク）を提供
- 関数: 圏では運用上難しいところを小回りよく活用するための処理を提供


### 圏論の立場によるモデルの構築
数学的には、関数も圏の一部と見なせる、ため圏と関数を分けるのは良くないが、プログラミングの立場からは、その役割が異なる事から意図的に区別してまとめる。大まかには、**圏: 大まかな構造（フレームワーク）を提供、関数: 圏では運用上難しいところを小回りよく活用する**ように役割をかえてつかう
- データの操作および連鎖に関して、高度に抽象化されたフレームワークを提供する
- 非常に協力だが、反面、高度に抽象化されている故に、小回りが効かない


データ変換を行う以下のようなモデルを定義します

その前に、データ処理に何らかのコンテキスト（設定）を注入する必要がある場合を考え、それ用の`case class`を定義します

In [1]:
trait Config

case class ConfigInt(iParam: Int) extends Config

case class ConfigDouble(fParam: Double) extends Config

case class ConfigArrayDouble(fParams: Array[Double]) extends Config

case object ConfigNone extends Config

defined [32mtrait [36mConfig[0m
defined [32mclass [36mConfigInt[0m
defined [32mclass [36mConfigDouble[0m
defined [32mclass [36mConfigArrayDouble[0m
defined [32mobject [36mConfigNone[0m

次に、データ分析の基盤を構築する`Monad`として、`Transform`を定義します。これは、`|>`に特定の処理を記述することで、`[T:インプットデータ]|>[A:処理済みのデータ] `
として、構築します

In [3]:
abstract class Transform[T, A](config: Config) {
  // T:インプットする特徴量のデータ,A:アウトプットされるデータ
  self =>
  def |> : PartialFunction[T, Try[A]] // データ処理を行うメソッド

  def map[B](f: A => B): Transform[T, B] = new Transform[T, B](config) {
    override def |> = new PartialFunction[T, Try[B]] {
      override def isDefinedAt(t: T) =
        self.|>.isDefinedAt(t)

      override def apply(t: T): Try[B] = self.|>(t).map(f)
    }
  }

  def flatMap[B](f: A => Transform[T, B]): Transform[T, B] = new Transform[T, B](config) {
    override def |> = new PartialFunction[T, Try[B]] {
      override def isDefinedAt(t: T) =
        self.|>.isDefinedAt(t)

      override def apply(t: T): Try[B] = self.|>(t).flatMap(f(_).|>(t))
    }
  }

  def andThen[B](tr: Transform[A, B]): Transform[T, B] = new Transform[T, B](config) {
    override def |> = new PartialFunction[T, Try[B]] {
      override def isDefinedAt(t: T) =
        self.|>.isDefinedAt(t) && tr.|>.isDefinedAt(self.|>(t).get)

      override def apply(t: T) = tr.|>(self.|>(t).get)
    }
  }
}

defined [32mclass [36mTransform[0m

### 関数の立場によるモデルの構築
関数の特性により、小回りが効くモデルを提供します。関数を構築するプロセスは、以下の3つに分解できます

- 対象となる問題に対しての変数の宣言
- 変数から問題を解くための方程式（モデル）の構築
- モデルのインスタンス化と実行

#### 変数の宣言
まず、変数を宣言します。Fはベクトルをベクトルに変換する関数を変数に持ち、Gはベクトルを一つの値に変換する関数を変数に持ちます

In [4]:
type V=Vector[Double]
trait F{val f:V=>V} // R^n=>R^n
trait G{val g:V=>Double} // R^n=> R

defined [32mtype [36mV[0m
defined [32mtrait [36mF[0m
defined [32mtrait [36mG[0m

#### モデルの定義

この２つを関数合成した関数`h=f◦g`をモデルとして定義したい場合、、`class H`は以下のように定義できます

In [5]:
class H{self:F with G=>def apply(v:V):Double=g(f(v)) }

defined [32mclass [36mH[0m

#### モデルのインスタンス化
上記で抽象化したモデルをインスタンスとして、実現するために、実装をします。これでモデルを実行できました

In [14]:
import java.lang._
val h=new H with F with G{
    val f=(v:V)=>v.map(Math.exp) // f:x=>exp(x)
    val g = (v:V)=>v.sum// g:x=>Σxi
}

// モデルのベクトルを代入
h(Vector(1.0,2.0,3.0))

[32mimport [36mjava.lang._[0m
[36mh[0m: [32mH[0m with [32mF[0m with [32mG[0m = cmd13$$user$$anonfun$2$$anon$1@96fb872
[36mres13_2[0m: [32mscala[0m.[32mDouble[0m = [32m30.19287485057736[0m

# 圏と関数で実際のデータ処理を行う
ここから、上記の圏と関数の概念にオブジェクト指向の特性を活かして、実装していきます
Scalaでは、`trait`により、**ロジックを注入していく事で、データ分析のワークフローの過程に沿って、動的にモジュールを置換する事が可能になる**  
そのため、ここからは上記での圏の概念と関数の概念によるモデリングに加え、オブジェクト指向の考え方を活用するため[MinmalCakePattern](https://qiita.com/pab_tech/items/1c0bdbc8a61949891f1f)を使用する

具体例として、データ処理として、
- 連続的な値の順列を作成
- その順列を[0,1]の間に収まるように規格化
- さらにその中から最大の値を抽出する

の三つのプロセスを行う必要があるとします

上で定義したデータ処理プロセスを`Transform`により構築します  
まず、MInimakCakePatternによりそれぞれの処理を責務としてもつ変数を定義します

In [11]:
trait UsesSampling[T, A] {
  val sampler: Transform[T, A]
}

trait UsesNormalization[T, A] {
  val normalizer: Transform[T, A]
}

trait UsesAggregation[T, A] {
  val aggregator: Transform[T, A]
}

defined [32mtrait [36mUsesSampling[0m
defined [32mtrait [36mUsesNormalization[0m
defined [32mtrait [36mUsesAggregation[0m

データ処理をそれぞれのフェイズに分解した。これらを組み合わせる事で、ワークフローを構築する

In [12]:
// ワークフローの流れを構築
trait Workflow[T, U, V, W] extends UsesSampling[T, U] with UsesNormalization[U, V] with UsesAggregation[V, W] {

  def ||>(t: T): Try[W] =
    for {
      u <- sampler |> t
      v <- normalizer |> u
      w <- aggregator |> v
    } yield w
}

defined [32mtrait [36mWorkflow[0m

これで、処理の流れは構築できました。ここから、ロジックを注入していきます

In [20]:
// 必要な型とのAiliasと定数を定義
type DblF = Double => Double
type DblArray = Array[Double]
val (samples, normRatio,splits) = (100, 10, 4)

defined [32mtype [36mDblF[0m
defined [32mtype [36mDblArray[0m
[36msamples[0m: [32mInt[0m = [32m100[0m
[36mnormRatio[0m: [32mInt[0m = [32m10[0m
[36msplits[0m: [32mInt[0m = [32m4[0m

In [16]:
// サンプルとして、使用する値を定義
trait MixInSampling {
  val sampler: Transform[DblF, DblArray] = new Transform[DblF, DblArray](ConfigInt(samples)) {
    override def |> : PartialFunction[DblF, Try[DblArray]] = {
      case f: DblF => Try(Array.tabulate(samples)(n => f(1.0 * n / samples))) // 0,0.01,0.02,,,1.00までの値をfで変換した要素をもつベクトルを生成
    }
  }
}

defined [32mtrait [36mMixInSampling[0m

In [18]:
// 最大値の最小値で規格化するロジックを持つclass
case class MinMax(values:DblArray){

  val range = (0.0, 0.0)

  // valuesのなかで最小な値をと最大な値を抽出する
  protected val minMax = values.foldLeft(range) { (mM, x) => {
    val min = mM._1
    val max = mM._2
    (if (x < min) x else min, if (x > max) x else max)
  }
  }

  val min = minMax._1
  val max = minMax._2

  //[lox,high]の間で正規化する yi-low=(xi-x_low/(x_hig-x_low)(high-low))
  def normalize(low: Double = 0.0, high: Double = 1.0): DblArray = {
    val ratio = (high - low) / (max - min)
    values.map(x => (x - min) * ratio.toDouble + low)
  }
}

// 上で定義したMinMaxにより、値を[0,1]の間に規格化する
trait MixInNormalizer {
  val normalizer: Transform[DblArray, DblArray] = new Transform[DblArray, DblArray](ConfigDouble(normRatio)) {
    override def |> = {
      case x: DblArray if (x.nonEmpty) => Try(MinMax[Double](x).normalize())
    }
  }
}

: 

In [18]:
// 規格化した値の中から、一番大きな値のindexを抽出するロジックを定義
trait MixInAggregator {
  val aggregator = new Transform[DblArray, Int](ConfigInt(splits)) {
    override def |> : PartialFunction[DblArray, Try[Int]] = {
      case x: DblArray if x.nonEmpty => Try(x.indices.find(x(_) == 1.0).getOrElse(-1)) // 規格化したデータのうち最大の値（=1.0）の値のindexを取得する
    }
  }
}

defined [32mtrait [36mMixInAggregator[0m

In [None]:
これらのロジックをWorkflowに注入することで、データ処理を実行します

In [None]:
  val workflow= new Workflow[DblF,DblArray,DblArray,Int] with MixInSampling with MixInNormalizer with MixInAggregator

  val g=(x:Double)=>Math.log(x+1.0)+Random.nextDouble // サンプルデータをこの関数により変換する

  val out=workflow ||> g

  println(out)


結果

In [None]:
Success(96)

96番目の値が最大値でした

MInimalCakePatternにより、ロジックの変更、注入、分解が非常に簡単になっているので、例えば、「最後のAggregationを除いて、サンプルのロジックを少し変えたい」
と言う場合は、以下のように少し変えるだけで簡単にできます

In [19]:
// 例えば、サンプルのロジックを入れ替え、aggregationを外したい場合
// 入れ替えることも、使い回す事も、拡張する事も自由自在
// ワークフローの流れを構築
trait WorkflowWithoutAgr[T, U, V] extends UsesSampling[T, U] with UsesNormalization[U, V]{

  def ||>(t: T): Try[V] =
    for {
      u <- sampler |> t
      v <- normalizer |> u
    } yield v
}

defined [32mtrait [36mWorkflowWithoutAgr[0m

In [19]:
trait MixInSampling2 {
  val sampler: Transform[DblF, DblArray] = new Transform[DblF, DblArray](ConfigInt(samples)) {
    override def |> : PartialFunction[DblF, Try[DblArray]] = {
      case f: DblF => Try(Array.tabulate(samples)(n => f(Math.exp(1.0 * n / samples)))) // サンプルをexpにかけるように少し変更
    }
  }

: 

In [None]:
  val workflowWithoutAgr=new WorkflowWithoutAgr[DblF,DblArray,DblArray] with MixInSampling2 with MixInNormalizer

  val out2 = workflowWithoutAgr ||> g
  println(out2)

このように簡単に付け替え、と修正ができました。データ処理の部品化により、より簡単にデータ処理のパイプラインを構築することができます

# まとめ
- 圏と関数でデータの流れを制御するパイプラインのインフラ基盤を構築した
- そのインフラ基盤にデータ分析のアルゴリズムを注入した
- さらにオブジェクト指向により、そのパイプラインの構築やメンテナンスを容易にした

ことが、できました。とはいえ、まだデータの前処理の段階なので、次回は単純ベイズモデルを例に、データの前処理から、モデルの精度検証までまとめます

# リファレンス
- [Scala for MachineLeaning](https://www.amazon.co.jp/dp/B00R6585KO/ref=dp-kindle-redirect?_encoding=UTF8&btkr=1)
- [ScalaDesignPattern](https://www.amazon.co.jp/dp/B017XSFKW4/ref=dp-kindle-redirect?_encoding=UTF8&btkr=1)                             
- [Scalaにおける最適なDependency Injectionの方法を考察する](https://qiita.com/pab_tech/items/1c0bdbc8a61949891f1f)      
- [圏論勉強会@ワークスアプリケーション](http://nineties.github.io/category-seminar/#/)
                                    