# Transducer: 一种强大的函数组合模式

## map / filter

map的语义是"映射"，即对集合中所有的元素都执行一次变换。

In [22]:
const list = [1, 2, 3, 4, 5]

list.map(x => x + 1)

[ 2, 3, 4, 5, 6 ]

In [23]:
function map(f, xs) {
  const ret = []
  for (let i = 0; i < xs.length; i++) {
    ret.push(f(xs[i]))
  }
  return ret
}

In [24]:
map(x => x + 1, [1, 2, 3, 4, 5])

[ 2, 3, 4, 5, 6 ]

上述有意地使用for语句实现，以比较清晰地表达出：map的实现依赖于集合类型。

- 顺利执行
- 即时求值的，不是lazy的。

再看 `filter`：

In [80]:
function filter(f, xs) {
  const ret = []
  for (let i = 0; i < xs.length; i++) {
    if (f(xs[i])) {
      ret.push(xs[i])
    }
  }
  return ret
}

In [50]:
var range = n => [...Array(n).keys()]
filter(x => x % 2 === 1, range(10))

[ 1, 3, 5, 7, 9 ]

同样，filter 的实现也依赖于具体的集合类型，当前实现是要求 `xs` 是一个数组。

那如何让 map支持多种数据类型呢？ 比如 `Set`、`Map` 或者自定义数据类型。

有一种常规的方式：是依赖于集合的接口（协议）。

不同的语言有不同的实现，`JS`在这方面的原生支持比较弱， 不过也可行：

- 使用 `Symbol.iterator` 迭代。
- 使用 `Object#constractor` 来获得构造函数。

那如何抽象地支持不同数据类型的 `push` 呢？ 

仿造`ramdajs`库，可依赖于自定义的 `@@transducer/step` 函数。

In [81]:
function map(f, xs) {
  const ret = new xs.constructor()  // 1. 构造
  for (const x of xs) { // 2. 迭代
    ret['@@transducer/step'](f(x))  // 3. 收集
  }
  return ret
}

In [82]:
Array.prototype['@@transducer/step'] = Array.prototype.push

[Function: push]

In [83]:
map(x => x + 1, [1, 2, 3, 4, 5])

[ 2, 3, 4, 5, 6 ]

In [84]:
Set.prototype['@@transducer/step'] = Set.prototype.add

[Function: add]

In [85]:
map(x => x + 1, new Set([1, 2, 3, 4, 5]))

Set (5) {2, 3, 4, 5, 6}

使用这种方式，我们就可以实现较轴象版本的 `map`, `filter` 等函数，

关键是将

- 构造
- 迭代
- 收集

三个操作代理给具体的集合类。

In [49]:
function filter(f, xs) {
  const ret = new xs.constructor()
  for (const x of xs) {
    if (f(x)) {
      ret['@@transducer/step'](x)
    }
  }
  return ret
}

In [51]:
filter(x => x % 2 === 1, range(10))

[ 1, 3, 5, 7, 9 ]

In [52]:
filter(x => x > 3, new Set(range(10)))

Set (6) {4, 5, 6, 7, 8, 9}

## compose

上述 `map` 和 `filter` 在组合使用时，会带来一些问题。

In [55]:
range(10)
  .map(x => x + 1)
  .filter(x => x % 2 === 1)
  .slice(0, 3)

[ 1, 3, 5 ]

虽然只用到其中的5个元素，但是会遍历集合中的全部元素。

每一个步骤，都会产生一个中间集合对象。

我们用`compose` 再实现一遍这个逻辑

In [72]:
function compose(...fns) {
  return fns.reduceRight((acc, fn) => x => fn(acc(x)), x => x)
}

为了能组合，将map、filter等函数实现成curry形式

In [73]:
function curry(f) {
  return (...args) => data => f(...args, data)
}

In [86]:
var rmap = curry(map)
var rfilter = curry(filter)

function take(n, xs) {
  const ret = new xs.constructor()
  for (const x of xs) {
    if (n <= 0) {
      break
    }
    n--
    ret['@@transducer/step'](x)
  }
  return ret
}
var rtake = curry(take)

In [87]:
take(3, range(10))

[ 0, 1, 2 ]

In [88]:
take(4, new Set(range(10)))

Set (4) {0, 1, 2, 3}

In [89]:
const takeFirst3Odd = compose(
  rtake(3),
  rfilter(x => x % 2 === 1),
  rmap(x => x + 1)
)

takeFirst3Odd(range(10))

[ 1, 3, 5 ]

到目前为止，我们的实现在表达上是清晰简洁的，在运行时上是浪费的。

## 函数的形状

### Transformer

`curry` 版本的 `map` 函数 是这样的：

```ts
const map = f => xs => ...
```

即 `map(x => ...)` 返回一个单参数的函数。

```ts
type Transformer = (xs: T) => R
```

单参数的函数可组合。

具体来说是，这些函数的入参是”数据“，出参是处理后的数据， 函数就是数据转换器(Transformer)。

```ts
data ->> map(...) ->> filter(...) ->> reduce(...) -> result
```

In [101]:
function pipe(...fns) {
  return x => fns.reduce((ac, f) => f(ac), x)
}

In [102]:
const reduce = (f, init) => xs => xs.reduce(f, init)

const f = pipe(
  rmap(x => x + 1),
  rfilter(x => x % 2 === 1),
  rtake(5),
  reduce((a, b) => a + b, 0)
)

f(range(100))

25

`Transformer` 是一个单参数函数，方便函数组合。

```ts
type Transformer = (x: T) => T
```

### Reducer

reducer是一个双参数的函数，可用于表达更复杂的逻辑。

```ts
type Reducer = (ac: R, x: T) => R
```

#### sum

In [106]:
// add is an reducer
const add = (a, b) => a + b
const sum = xs => xs.reduce(add, 0)

sum(range(11))

55

#### map

In [116]:
function concat(list, x) {
  list.push(x)
  return list
}

In [117]:
const map = f => xs => xs.reduce((ac, x) => concat(ac, f(x)), [])

map(x => x * 2)(range(10))

[ 0, 2, 4, 6, 8, 10, 12, 14, 16, 18 ]

#### filter

In [119]:
const filter = f => xs => xs.reduce((ac, x) => f(x) ? concat(ac, x) : ac, [])

filter(x => x > 3 && x < 10)(range(20))

[ 4, 5, 6, 7, 8, 9 ]

#### take

如何实现`take`？, 这需要`reduce`有类似于`break`的功能。

In [123]:
function reduced(x) {
  return x && x['@@transducer/reduced'] ? x : { '@@transducer/reduced': true, '@@transducer/value': x }
}

function reduce(f, init) {
  return xs => {
    let ac = init
    for (const x of xs) {
      const r = f(ac, x)
      if (r && r['@@transducer/reduced']) {
        return r['@@transducer/value']
      }
      ac = r
    }
    return ac
  }
}

In [126]:
function take(n) {
  return xs => {
    let i = 0
    return reduce((ac, x) => {
      if (i === n) {
        return reduced(ac)
      }
      i++
      return concat(ac, x)
    }, [])(xs)
  }
}

In [127]:
take(4)(range(10))

[ 0, 1, 2, 3 ]

### Transducer

终于见到我们的主角

先重新审视之前的 `map` 实现

```ts
function map(f, xs) {
  const ret = []
  for (let i = 0; i < xs.length; i++) {
    ret.push(f(xs[i]))
  }
  return ret
}
```

需要想办法把上述依赖于集合的逻辑剥离出去，将其抽象成一个`Reducer`。


In [130]:
function rmap(f) {
  return reducer => { // 由Reducer决定如何迭代和收集
    return (ac, x) => {
      return reducer(ac, f(x))  // map的本质
    }
  }
}

构造消失了、迭代消失了、元素的收集也消失了。 

通过一个`reducer`， 我们的map只包含了它的职责中的逻辑。

再看看`filter`

In [131]:
function rfilter(f) {
  return reducer => (ac, x) => {
    return f(x) ? reducer(ac, x) : ac
  }
}

注意到 `rfilter` 以及上面的 `rmap` 它的返回值形式：

` reducer => (acc, x) => ... `

它其实是一个 `Transfomer`, 参数和返回值都是 `Reducer`， 它就是 `Transducer`。

`Transformer` 是可以组合的。


In [134]:
function rtake(n) {
  return reducer => {
    let i = 0
    return (ac, x) => {
      if (i === n) {
        return reduced(ac)
      }
      i++
      return reducer(ac, x)
    }
  }
}

### into

如何使用 `transducer` 呢？

In [135]:
compose

[Function: compose]

In [142]:
var tf = compose(
  rmap(x => x + 1),
  rfilter(x => x % 2 === 1),
  rtake(5)
)
tf

[Function (anonymous)]

我们需要将 迭代和收集 使用reduer实现出来。

In [155]:
const collect = (ac, x) => {
  ac.push(x)
  return ac
}

const reducer = tf(collect)
reduce(reducer, [])(range(100))


[ 1, 3, 5, 7, 9 ]

可以工作了，我们还注意到迭代是"按需”的, 虽然集合元素有100个，但是只迭代了前面10个元素。

将上述逻辑封装到一个函数中。

In [157]:
const collect = (ac, x) => {
  ac.push(x)
  return ac
}

function into(init, tf) {
  const reducer = tf(collect)
  return reduce(reducer, init)
}

In [159]:
into([], compose(
  rmap(x => x + 1),
  rfilter(x => x % 2 === 1),
  rtake(8)
))  (range(100))

[ 1, 3, 5, 7, 9, 11, 13, 15 ]

写一个函数，用于判断是否为素数

## 流程

### fibs

假设我们有某种异步的数据集合，比如一个异步的无限斐波那契生成器。

In [182]:
function sleep(n) {
  return new Promise(r => setTimeout(r, n))
}

async function *fibs() {
  let [a, b] = [0, 1]
  while (true) {
    await sleep(10)
    yield a
    ;[a, b] = [b, a + b]
  }
}

In [183]:
const s = fibs()
async function start() {
  let i = 0
  for await (const item of s) {
    console.log(item)
    i++
    if (i > 10) {
      break
    }
  }
}

start()

Promise [Promise] {}

0
1
1
2
3
5
8
13
21
34
55


我们要实现支持以上数据结构的`into`

将数组版本的代码帖旁边作为参考：

```ts
const collect = (ac, x) => {
  ac.push(x)
  return ac
}

function into(init, tf) {
  const reducer = tf(collect)
  return reduce(reducer, init)
}
```

In [205]:
const collect = (ac, x) => {
  ac.push(x)
  return ac
}

const reduce = (reducer, init) => {
  return async iter => {
    let ac = init
    for await (const item of iter) {
      if (ac && ac['@@transducer/reduced']) {
        return ac['@@transducer/value']
      }
      ac = reducer(ac, item)
    }
    return ac
  }
}

function sinto(init, tf) {
  const reducer = tf(collect)
  return reduce(reducer, init)
}

收集操作一样， 迭代操作不一样。

In [204]:
const task = sinto([], compose(
  rmap(x => x + 1),
  rfilter(x => x % 2 === 1),
  rtake(8)
))
    
task(fibs()).then(res => {
  console.log(res)
})

Promise [Promise] {}

1,3,9,35,145,611,2585,10947


同样逻辑，应用于不同的数据结构。

## 参考

- [Transducers are Coming](https://clojure.org/news/2014/08/06/transducers-are-coming)
- [Transducers - Clojure Reference](https://clojure.org/reference/transducers)