Skip to content

API_Reference_CN

核心力量 edited this page Jan 29, 2019 · 177 revisions

API指引

符号术语

上游与下游

Canal的工作方式可以描述为数据流过0个或多个中间转换操作,并最终汇总到终结操作中形成返回结果。其中,各个操作之间构成了上下游的关系,例如:

[1, 2, 3] >A> {中间转换1} >B> {中间转换2} >C> {终结操作} = 结果

数据源[1,2,3]作为上游A流入{中间转换1}
经过{中间转换1}的转换得到数据流B,流入下游{中间转换2}
对于{中间转换2}而言,数据流B是它的上游,以此类推。

惰性计算

上面这个典型的例子描述了数据在逻辑上的流转方式,但从实际触发的角度上讲,除非终结操作被调用,否则任何上游的中间转换都不会被执行。
不仅如此,如果下游明确只需要上游的一部分数据,那么上游的其余数据并不会被下游接收处理。

Pair、Key与Value

Pair在Canal中特指长度为2的数组,其中第一个元素为Key,第二个为Value,简记为[K, V]

["a", 1] // Key为"a"、Value为1的一个Pair

注:由于Javascript会将任何类型转换为String存储为Key,因此Canal仅支持将String类型作为Key;
对于非String型对象,可以考虑重写其toString()方法以返回预期的Key;
否则,强行将非String型对象作为Key,可能导致不符合预期的结果。
例如,默认情况下{}.toString()"[object Object]"
默认的Key识别算子记为Kop,它将数据的第一个元素作为Key:

Kop = function(data) {
  return data[0];  // 该返回值的toString()方法会被隐含调用以生成Key
}

默认的Value识别算子记为Vop,它将数组的第二个元素视为Value:

Vop = function(data) {
  return data[1];
}

Lambda表达式

本文采用如下记法来表示Lambda表达式:

  • (in) => out,表示接受一个参数,并返回一个值;
  • (in1,in2) => Void,表示接受两个参数,但不返回任何值;
  • () => out,表示不接受任何参数,但返回一个值。

可选参数

一些函数为了使用上的便利,在调用时允许不指定某些参数的取值,本文使用[]表示其为可选。

(data[,index]) => Value,表示该Lambda表达式至少接受一个参数,第二个参数index为可选。

不变性

Javascript中的基础数据类型都满足不变性,但一些高级数据结构则可能会破坏这一原则,例如:

数组的元素可以被修改,虽然数组本身还是这个数组。

这种情况称为可变性。与不变性相比,可变性虽然带来些许性能优势,但往往会导致某些数据的修改无法被直观地发现。
不变性的做法是始终生成新的数据,而不去修改原来的数据,从形式上看,新的数据就是Lambda表达式的输出值,这样的代码风格也更容易被理解。
因此,除非你非常清楚自己的代码在进行怎样的操作,通常情况下不建议使用可变性的代码风格。

泛型

通常意义上讲,Javascript是一门弱类型语言,本文也不会就Javascript的具体类型作过多的讨论;
但在一些场合下,数据的含义是非常值得进行标注的,本文借用泛型标记<T>来标注数据的含义,例如:

一个Canal实例当前承载的数据是Pair,则标注为Canal<[Key, Value]>

常用方法

Canal.of(source[,begin[,end]]): Canal<Value>

使用一个数组作为数据源构造一个Canal实例。

source: Array<Value>,Array型数据源;
begin: Integer,起始下标(含),默认为0;
end: Integer,结束下标(不含),默认为数组长度。

var a = Canal.of([ 0, 1, 2, 3 ], 1, 3);
a.collect();
[
  1,
  2
]

Canal.of(source[,close]): Canal<Value>

使用一个函数作为无穷的数据源构造一个Canal实例。

source: ([index]) => Value,数据生成算子;如果返回Canal.eod(),则表示迭代终止。

  • index: Integer,触发此生成计算的序号(从0开始计)。

close: ([number]) => Void,关闭算子,当迭代结束时,将执行该动作。

  • number: Integer,被迭代的数据个数。
var a = Canal.of(function(i) {
  return i + 1;
});
a.take(3); // 获取前3个
[
  1,
  2,
  3
]

Canal.of(source[, keys]): Canal<[Key, Value]>

将一个对象中的K/V转换为一系列Pair,作为数据源构造一个Canal实例。

source: Object<Key, Value>,Map型数据源;
keys: Array<String>,映射Key的序列,默认为Map中的所有Key。

var a = Canal.of(
{
  "A": 0,
  "B": 1
});
a.collect();
[
  ["A", 0],
  ["B", 1]
]

Canal.on(cls[, func]): Void

将Canal方法附加到目标类的prototype上,然后通过该类的实例对象调用func方法,即可隐含地调用Canal.of()方法来获取Canal实例。

cls: Class,类;
func: String,调用的方法名,默认为"canal"。

Canal.on(Array);
[1, 2, 3, 4, 5].canal(1, 4).collect();  // 1,4将作为附加参数隐含地传递给Canal.of()方法
[
  2,
  3,
  4
]

Canal.off(cls[, func]): Void

从目标类的prototype上解除Canal方法。

Canal.on(Array);
[1, 2, 3].canal().count();
Canal.off(Array);
try{
    [1, 2, 3].canal().count();  // Exception occurs here
}catch(e){
    0
}
3
0

通用终结操作

Canal定义了一组终结操作,这些操作会对上游Canal的数据进行运算,并返回Array、Map等实际的值数据。

collect(): Array<Value>

收集Canal中的数据到一个数组中,并返回该数组。

Canal.of([1, 2, 3]).collect();
[
  1,
  2,
  3
]

count(): Integer

计算并返回Canal中数据的个数。

Canal.of([1, 2, 3]).count();
3

countByValue([vop]): Map<value,count>

收集各值与其个数的对应关系到一个映射对象中,并返回该对象。

vop: (data) => Value,数据的Value识别算子,默认为:

function(d) {
  return d;
}
Canal.of([1, 1, 2]).countByValue();
{
  "1": 2,
  "2": 1
}

first([pred]): Option<Value>

获取首个满足条件的数据,如果存在则返回Some<Value>,否则返回None

pred: (data[,index]) => Boolean,筛选算子,index是数据的序号(从0开始计),返回true以表示data满足条件,否则返回false,默认为() => true

Canal.of([1, 2, 3])
.first(function(d) {
  return d > 1;
}).get();
2

fold(zero, folder): R

对数据进行折叠,并返回折叠得到最终的值。

zero: R,折叠的初始值;
folder: (last, data) => R,折叠算子。

  • last: R,上一次折叠的结果,初次折叠取值为zero
  • data: Value,本次折叠所需要处理的数据;
  • R,折叠返回的数据,如果不返回(或返回undefined),则默认取last作为返回值。

注:RValue的类型可以不同。

Canal.of([1, 2, 3])
.fold(0, function(last, data) {
  return last + data;
});
6

foreach(action)

对每个数据执行特定的动作。

action: (data[,index]) => Void,动作算子,对传入的数据进行特定的操作,但不需要返回任何值。

Canal.of([1, 2, 3])
.foreach(function(d){
  console.log("This is " + d);
});
This is 1
This is 2
This is 3

reduce(reducer): Option<Value>

对数据进行归约,并返回归约结果,如果上游数据非空则返回Some<Value>,否则返回None

reducer: (a, b) => c,归约算子,将ab归约为c

注:abc的类型必须一致(如需不一致,请使用fold方法),且归约过程满足交换律与结合律。

Canal.of([1, 2, 3, 4])
.reduce(function(a, b) {
  return a * b;
}).get();
24

take(num): Array<Value>

收集前num个数据到一个数组中,并返回该数组。

num: Integer,需要收集的数据个数。

Canal.of([1, 2, 3]).take(2);
[
  1,
  2
]

Pair终结操作

collectAsMap([kop[,vop]]): Object<Key, Value>

将数据收集为一个Map对象。
注:本方法返回的结果中只会保留唯一的Key,即便上游Pair数据的Key存在重复。

kop: (data) => Key,Key识别算子,默认为Kop
vop: (data) => Value,Value识别算子,默认为Vop

Canal.of([
  ["a", 1],
  ["b", 2],
  ["c", 3]
]).collectAsMap();
{
  "a": 1,
  "b": 2,
  "c": 3
}

countByKey([kop]): Object<Key,Integer>

统计数据中各个Key的个数。

kop: (data) => Key,Key识别算子,默认为Kop

Canal.of([
  ["a", 1],
  ["a", 2],
  ["b", 3]
]).countByKey();
{
  "a": 2,
  "b": 1
}

通用中间转换

Canal定义了一组中间转换操作,区别于终结操作,这些操作只会返回一个新的Canal实例,以便衔接后续的操作,而不会即时对操作进行计算,亦即所谓的惰性计算;由于触发真正的计算需要调用终结操作,下文均以collect()作为终结操作来说明数据转换的结果。

cartesian(that)

与另一个Canal实例做笛卡尔积,结果为本Canal与另一个Canal的元素构成的Pair,形如[ValueL, ValueR]

that: Canal<ValueR>,待做笛卡尔积的Canal实例。

Canal.of([1, 2, 3])
.cartesian(Canal.of(["A", "B", "C"]))
.collect();
[
  [1, "A"],
  [1, "B"],
  [1, "C"],
  [2, "A"],
  [2, "B"],
  [2, "C"],
  [3, "A"],
  [3, "B"],
  [3, "C"]
]

distinct([cmp])

去除数据中的重复项。

cmp: (a,b) => Number,比较算子,当a小于b时返回负数,当a大于b时返回正数,当a等于b时返回0cmpnull或未指定表示使用默认比较算子(Javascript内置比较算子)。

Canal.of([ 1, 2, 2, 3 ]).distinct().collect();
[
  1,
  2,
  3
]

filter(pred)

对数据进行筛选。

pred: (data[,index]) => Boolean,筛选算子,当且仅当data需要被保留时返回trueindex是数据的序号(从0开始计)。

Canal.of([ 1, 2, 3 ])
.filter(function(d) {
  return d > 1; // 保留所有取值大于1的数据
}).collect();
[
  2,
  3
]

flatMap([mapper])

将单个数据映射为一个列表进入下游。

mapper: (data[,index]) => Array,默认为(data) => data

  • data,数据本身;
  • index,数据的序号(从0开始计);
  • Array,转换后的数据列表。
Canal.of([1, 2, 3])
.flatMap(function(d) {
  return [d, d + 0.5]; // 将单个数据扩展为本身以及一个额外的数据
}).collect();
[
  1,
  1.5,
  2,
  2.5,
  3,
  3.5
]

groupBy([kop[,vop]])

将数据根据Key进行分组,具有相同Key的数据将归集到同一个数组中,形如[Key, Array<Value>]

kop: (data) => Key,Key的识别算子,默认为Kop
vop: (data) => Value,Value的识别算子,默认为(data) => data

Canal.of([
  2,
  3,
  4
])
.groupBy(function(d){
  return d % 2;
})
.collect();
[
  ["0", [2, 4]],
  ["1", [3]]
]

intersection(that[,cmp])

与另一个Canal实例取交集,将既属于本Canal又属于另一个Canal的数据传输至下游。
注:交集结果中不会有重复元素。

that: Canal,另一个Canal实例;
cmp: (a,b) => Number,比较算子,当a小于b时返回负数,当a大于b时返回正数,当a等于b时返回0cmpnull或未指定表示使用默认比较算子(Javascript内置比较算子)。

Canal.of([1, 2, 3])
.intersection(Canal.of([1, 2]))
.collect();
[
  1,
  2
]

keyBy(kop)

将数据映射为Pair,使用Key识别算子生成Key,数据本身作为Value。

kop: (data[,index]) => Key,Key识别算子,从数据中提取Key。

Canal.of([1, 2, 3])
.keyBy(function(d){
  return d % 2;
}).collect();
[
  ["1", 1],
  ["0", 2],
  ["1", 3]
]

limit([num])

限制最多将num个数据传递给下游。

num: Integer,传递数据的最大个数。

Canal.of([ 1, 2, 3 ]).limit(2).collect();
[
  1,
  2
]

map(mapper)

对数据进行映射转换。

mapper: (data[,index]) => Value,映射算子。

  • data,数据本身;
  • index,数据的序号(从0开始计);
  • Value,映射后的数据。
Canal.of([ 1, 2, 3 ])
.map(function(d, i) {
  return [i, d]; // 将每一个数据转换为一个Pair,Key为序号,Value为数据本身。
}).collect();
[
  [0, 1],
  [1, 2],
  [2, 3]
]

peek(action)

在下游接收数据后,对该数据执行特定的动作。

action: (data[,index]) => Void,动作算子,对传入的数据进行特定的操作,但不需要返回任何值。
注:action仅在下游确定接收了data之后才会被执行。

Canal.of(["a", "b", "c"])
.peek(function(d) {
  console.log("This is " + d);
}).take(2);
This is a
This is b

reverse()

将上游数据的顺序反转后传输给下游。

Canal.of([1, 2, 3]).reverse().collect();
[
  3,
  2,
  1
]

skip(num)

跳过上游数据的前num个数据,并将剩余的数据传递至下游。

num: Integer,跳过的数据个数。

Canal.of([1, 2, 3, 4]).skip(2).collect();
[
  3,
  4
]

sortBy(kop1[[,asc1][,kop2[,asc2]...]] | orders)

根据数据数据的Key对数据进行排序。

kop: (data) => Key,Key识别算子;
asc: Booleantrue表示按照升序排序,false表示按降序排序,默认为true
orders: Array[kop1[[,asc1][,kop2[,asc2]...]]]

Canal.of([ {
  "id" : 3,
  "score" : 32
}, {
  "id" : 1,
  "score" : 12
}, {
  "id" : 2,
  "score" : 23
}, {
  "id" : 1,
  "score" : 22
} ]) //
.sortBy(function(d)
{
  return d.id;
}, function(d)
{
  return d.score;
}, false).collect();
[
  {
    "id" : 1,
    "score" : 22
  }, {
    "id" : 1,
    "score" : 12
  }, {
    "id" : 2,
    "score" : 23
  }, {
    "id" : 3,
    "score" : 32
  }
]

sortWith([cmp[,asc]])

对数据进行排序。

cmp: (a,b) => Number,比较算子,当a小于b时返回负数,当a大于b时返回正数,当a等于b时返回0cmpnull或未指定表示使用默认比较算子(Javascript内置比较算子);
asc: Booleantrue表示按照升序排序,false表示按降序排序,默认为true

Canal.of([3, 4, 2, 1]).sortWith().collect();
[
  1,
  2,
  3,
  4
]

stratifyBy(kop1[[,asc1][,kop2[,asc2]...]] | orders)

根据数据数据的Key对数据进行排序分层,相同的层级的数据归集到同一个数组中。

kop: (data) => Key,Key识别算子;
asc: Booleantrue表示按照升序排序,false表示按降序排序,默认为true
orders: Array[kop1[[,asc1][,kop2[,asc2]...]]]

Canal.of([ {
  "id" : 4
}, {
  "id" : 3
}, {
  "id" : 1
}, {
  "id" : 3
} ]).stratifyBy(function(d)
{
  return d.id;
}, true).collect();
[ 
  [ {
    "id" : 1
  } ], 
  [ {
    "id" : 3
  }, {
    "id" : 3
  } ], 
  [ {
    "id" : 4
  } ] 
]

stratifyWith([cmp[,asc]])

对数据进行排序分层,相同的层级的数据归集到同一个数组中。

cmp: (a,b) => Number,比较算子,当a小于b时返回负数,当a大于b时返回正数,当a等于b时返回0cmpnull或未指定表示使用默认比较算子(Javascript内置比较算子);
asc: Booleantrue表示按照升序排序,false表示按降序排序,默认为true

Canal.of([ {
  "id" : 4
}, {
  "id" : 3
}, {
  "id" : 1
}, {
  "id" : 3
} ]).stratifyWith(function(a, b)
{
  return a.id - b.id;
}, false).collect();
[ 
  [ {
    "id" : 4
  } ], 
  [ {
    "id" : 3
  }, {
    "id" : 3
  } ], 
  [ {
    "id" : 1
  } ] 
]

subtract(that[,cmp])

与另一个Canal实例取差集,将属于本Canal但不属于另一个Canal的数据传输至下游。
注:差集结果中可能存在重复元素,当且仅当本Canal中的数据存在重复且不属于另一个Canal。

that: Canal,另一个Canal实例;
cmp: (a,b) => Number,比较算子,当a小于b时返回负数,当a大于b时返回正数,当a等于b时返回0cmpnull或未指定表示使用默认比较算子(Javascript内置比较算子)。

Canal.of([1, 2, 3])
.subtract(Canal.of([1, 2]))
.collect();
[
  3
]

union(that)

与另一个Canal实例取并集,将双方的数据都传输至下游。

that: Canal,另一个Canal实例。

Canal.of([1, 2, 3])
.union(Canal.of([1, 2]))
.collect();
[
  1,
  2,
  3,
  1,
  2
]

zip(that)

与另一个Canal实例做拉链操作,即形成一个新的Canal<[Key,Value]>实例,其中Key为本Canal的元素,Value为彼Canal对应位置的元素。

that: Canal<Value>,待执行拉链操作的Canal实例。

Canal.of([ 1, 2, 3 ])
.zip(Canal.of([ "A", "B", "C" ]))
.collect();
[
  [1, "A"],
  [2, "B"],
  [3, "C"]
]

zipWithIndex()

将上游数据映射成Pair,该Pair的第二个元素为数据的下标。

Canal.of([ "one", "two", "three" ])
.zipWithIndex()
.collect();
[
  ["one", 0],
  ["two", 1],
  ["three", 2]
]

Pair中间操作

除了通用的中间操作,Canal还定义了Pair中间操作,这部分操作假设上游提供的数据为Pair。

cogroup(that,...)

将一组数据中拥有相同Key的数据的Value归集到一起,结果形如[Key, [Array<Value1>,Array<Value2>,...]]

that: Canal<Pair>,待归集的数据。

Canal.of([
  ["a", 1],
  ["b", 2],
  ["b", 3],
  ["c", 4]
]).cogroup(Canal.of([
  ["b", 1],
  ["c", 2]
])).collect();
[
  ["a", [ [1],   []  ]],
  ["b", [ [2,3], [1] ]],
  ["c", [ [4],   [2] ]]
]

foldByKey(zero, folder[, kop[, vop]])

对拥有相同Key的数据进行折叠。
注:本操作为中间转换。

zero: ([key]) => R,使用该组数据的key为该组数据构造一个折叠初始值;
folder: (last,data) => R,折叠算子。

  • last: R,上一次折叠的结果,初次折叠取值为zero所构造的初始值;
  • data: Value,本次折叠所需要处理的数据;
  • R,折叠返回的数据。

kop: (data) => Key,Key的识别算子,默认为Kop
vop: (data) => Value,Value的识别算子,默认为Vop

Canal.of([
  ["a", 1],
  ["a", 2],
  ["b", 3]
]).foldByKey(function() {
  return 0;
}, function(a, b) {
  return a + b;
}).collect();
[
  ["a", 3],
  ["b", 3]
]

fullJoin(that[,keyL[,keyR[,valL[,valR]]]])

与另一个Canal实例做相等全外连接,join的结果形如[Key, [Option<ValueL>, Option<ValueR>]]

that: Canal,另一个Canal实例,右侧数据;
keyL: (data) => Key,左侧数据(本Canal)的Key识别算子,默认为Kop
keyR: (data) => Key,右侧数据(彼Canal)的Key识别算子,默认为Kop
valL: (data) => Value,左侧数据(本Canal)的Value识别算子,默认为Vop
valR: (data) => Value,右侧数据(彼Canal)的Value识别算子,默认为Vop

Canal.of([
  [0, 1],
  [1, 2]
]).leftJoin(Canal.of([
  [1, "a"],
  [2, "b"]
])).collect();
[
  ["0", [Some(1), None]],
  ["1", [Some(2), Some("a")]],
  ["2", [None, Some("b")]],
]

groupByKey([kop[,vop]])

将数据根据Key进行分组,具有相同Key的数据将归集到同一个数组中,形如[Key, Array<Value>]

kop: (data) => Key,Key的识别算子,默认为Kop
vop: (data) => Value,Value的识别算子,默认为Vop

Canal.of([
  [0, 1],
  [1, 2],
  [0, 3]
])
.groupByKey()
.collect();
[
  ["0", [1, 3]],
  ["1", [2]]
]

having(pred[, vop[, kop]])

根据Value对数据进行筛选。

pred: (Value[,Key]) => Boolean,筛选算子,当且仅当Value需要被保留时返回true
vop: (data) => Value,Value的识别算子,默认为Vop
kop: (data) => Key,Key的识别算子,默认为Kop

Canal.of([ 1, 2, 3 ])
.groupBy(function(d)
{
  return d % 2;
})
.having(function(grp)
{
  return grp.length > 1;
})
.collect();
[
  ["1", [1, 3]]
]

join(that[,keyL[,keyR[,valL[,valR]]]])

与另一个Canal实例做相等内连接,join的结果形如[Key, [ValueL, ValueR]]

that: Canal,另一个Canal实例,右侧数据;
keyL: (data) => Key,左侧数据(本Canal)的Key识别算子,默认为Kop
keyR: (data) => Key,右侧数据(彼Canal)的Key识别算子,默认为Kop
valL: (data) => Value,左侧数据(本Canal)的Value识别算子,默认为Vop
valR: (data) => Value,右侧数据(彼Canal)的Value识别算子,默认为Vop

Canal.of([
  [0, 1],
  [1, 2],
  [0, 3],
  [2, 4]
]).join(Canal.of([
  [0, "a"],
  [1, "b"]
])).collect();
[
  ["0", [1, "a"]],
  ["0", [3, "a"]],
  ["1", [2, "b"]]
]

keys([kop])

将上游的数据的Key传递至下游。

kop: (data) => Key,Key识别算子,默认为Kop

Canal.of([
  [ "a", 1 ],
  [ "b", 2 ],
  [ "c", 3 ]
]).keys().collect();
[
  "a",
  "b",
  "c"
]

leftJoin(that[,keyL[,keyR[,valL[,valR]]]])

与另一个Canal实例做相等左外连接,join的结果形如[Key, [ValueL, Option<ValueR>]]

that: Canal,另一个Canal实例,右侧数据;
keyL: (data) => Key,左侧数据(本Canal)的Key识别算子,默认为Kop
keyR: (data) => Key,右侧数据(彼Canal)的Key识别算子,默认为Kop
valL: (data) => Value,左侧数据(本Canal)的Value识别算子,默认为Vop
valR: (data) => Value,右侧数据(彼Canal)的Value识别算子,默认为Vop

Canal.of([
  [0, 1],
  [1, 2],
  [0, 3],
  [2, 4]
]).leftJoin(Canal.of([
  [0, "a"],
  [1, "b"]
])).collect();
[
  ["0", [1, Some("a")]],
  ["0", [3, Some("a")]],
  ["1", [2, Some("b")]],
  ["2", [4, None]]
]

mapJoint(mapper)

对join结果进行映射转换。

mapper: (valL[,valR[,key]]) => Value,joint映射算子。

  • valL,左侧join值;
  • valR,右侧join值;
  • key,join的Key;
  • Value,映射后的数据。
Canal.of([
  [0, 1],
  [1, 2],
  [0, 3],
  [2, 4]
]).leftJoin(Canal.of([
  [0, "a"],
  [1, "b"]
])).mapJoint(function(left, right) {
   return left + ":" + right.or("_"); // 如果右侧未关联到值,则补为"_"。
}).collect();
[
  "1:a",
  "3:a",
  "2:b",
  "4:_"
]

mapValues(mapper[,vop[,kop]])

将Pair的值进行映射,通常用于处理group后的数据。

mapper: (data[,key]) => Value,将Pair的值转换为Value。

  • data,Pair中的Value;
  • key,Pair中的Key。

vop,对上游数据的Value识别算子,默认为Vop
kop,对上游数据的Key识别算子,默认为Kop

Canal.of([
  ["a", 1],
  ["a", 2],
  ["b", 3]
])
.groupByKey()
.mapValues(function(arr){
  var sum = 0;
  for(i in arr) {
    sum += arr[i];
  }
  return sum; // 对组内数据进行求和
}).collect();
[
  ["a", 3],
  ["b", 3]
]

注:本例的运算过程可使用reduceByKey进行简化。

reduceByKey(reducer[, kop[, vop]])

对拥有相同Key的数据进行归约。
注:本操作为中间转换。

reducer: (a, b) => c,归约算子,将ab归约为c
kop: (data) => Key,Key的识别算子,默认为Kop
vop: (data) => Value,Value的识别算子,默认为Vop

Canal.of([
  ["a", 1],
  ["a", 2],
  ["b", 3]
]).reduceByKey(function(a, b){
  return a + b;
}).collect();
[
  ["a", 3],
  ["b", 3]
]

rightJoin(that[,keyL[,keyR[,valL[,valR]]]])

与另一个Canal实例做相等右外连接,join的结果形如[Key, [Option<ValueL>, ValueR]]

that: Canal,另一个Canal实例,右侧数据;
keyL: (data) => Key,左侧数据(本Canal)的Key识别算子,默认为Kop
keyR: (data) => Key,右侧数据(彼Canal)的Key识别算子,默认为Kop
valL: (data) => Value,左侧数据(本Canal)的Value识别算子,默认为Vop
valR: (data) => Value,右侧数据(彼Canal)的Value识别算子,默认为Vop

Canal.of([
  [0, 1],
  [2, 2]
]).rightJoin(Canal.of([
  [0, "a"],
  [1, "b"]
])).collect();
[
  ["0", [Some(1), "a"]],
  ["1", [None, "b"]]
]

values([vop])

将上游数据的Value传递至下游。

vop: (data) => Value,Value识别算子,默认为Vop

Canal.of([
  [ "a", 1 ],
  [ "b", 2 ],
  [ "c", 3 ]
]).values().collect();
[
  1,
  2,
  3
]

数组中间操作

这些操作假设上游提供的是数组型数据。

choose(pred)

对数组型数据进行挑选。

pred: ([a[,b[,...]]]) => Boolean,挑选算子,上游数组中的各个元素将依次作为参数a,b,...,当且仅当数组需要被保留时返回true

Canal.of([
  [ 1, 2 ],
  [ 3, 2 ],
  [ 3, 1 ]
]).choose(function(a, b)
{
  return a > b;
}).collect();
[
  [ 3, 2 ],
  [ 3, 1 ]
]

flatten([level])

将上游提供的嵌套层级数组型数据扁平化为单层数组。

level: Integer,扁平化的层数上限,null表示不限制,默认为null

Canal.of([
  [ 1, [ 2, [ 3 ] ] ],
  [ 2, [ 3, [ 4 ] ] ],
  [ 3, [ 4, [ 5 ] ] ]
]).flatten(1)  // 只扁平化第一层
.collect();
[
  [ 1, 2, [3] ],
  [ 2, 3, [4] ],
  [ 3, 4, [5] ]
]

unpack(unpacker)

将上游提供的数组型数据进行解包,并将处理后的结果传递给下游。

unpacker: ([a[,b[,...]]]) => Value,解包算子,上游数组中的各个元素将依次作为参数a,b,...,返回Value作为解包的结果。

Canal.of([
  [ 1, 2 ],
  [ 2, 3 ],
  [ 3, 4 ]
]).unpack(function(a, b)
{
  return a + b;
}).collect();
[
  3,
  5,
  7
]

对象中间操作

这些操作假设上游提供的是对象型数据。

select(fieldsMap | Array | [field1[,field2,...]])

从上游数据中选出若干个字段,并创建新的对象传递给下游,若未指定任何参数,则复制各个数据中的所有属性到对应的新对象中。

fieldsMap: Object,字段的映射关系,键为映射后的字段别名,值为源数据中的字段名;
field: String|Vop,字段选择器,可以是Vop,也可以是字符串,表示从源数据中选取的字段名以及目标字段名,建议使用Canal.field方法创建。

Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).select("id",
          f("grp"),
          f("rnk", "rank"),
          f(function(d){return d.sal+1000;}, "salplus")
).collect();
[
  {"id":"1","grp":"1","rank":1,"salplus":2000.00},
  {"id":"2","grp":"1","rank":1,"salplus":2100.00},
  {"id":"3","grp":"1","rank":2,"salplus":2200.00},
  {"id":"4","grp":"1","rank":2,"salplus":2300.00},
  {"id":"5","grp":"1","rank":3,"salplus":2400.00},
  {"id":"6","grp":"2","rank":1,"salplus":2500.00},
  {"id":"7","grp":"2","rank":1,"salplus":2600.00},
  {"id":"8","grp":"2","rank":2,"salplus":2700.00}
]

window(item1[,item2,...])

计算窗口函数。

item: {
  aggr: (partLevels) => aggRes,聚合算子,对整个分片中的各个层级进行聚合计算,每个分片仅计算一次,默认返回undefined,其中partLevels为[[Level1],[Level2],...]层级集合,每个层级分别包含一组数据
  updt: (aggRes,partRows,winBegin,winEnd,lvlBegin,lvlEnd) => updtRes,更新算子,对当前层级所在窗口中的数据进行计算,每个窗口计算一次,默认返回聚合结果
  expr: (curntPos,updtRes,partRows,winBegin,winEnd,lvlBegin,lvlEnd) => Value,表达算子,对当前层级中的某一行给出一个表达结果,每一行计算一次,可选,默认返回更新结果
  alias: String,该窗口项的结果别名
  part: [kop1[[,asc1][,kop2[,asc2]...],分片依据
  order: [kop1[[,asc1][,kop2[,asc2]...],排序依据,可选
  scope: Array<Number>,窗口范围,可选,默认为整个分片
  byRow: Boolean,窗口范围是否按行下标定义,false表示按排序依据第一列的值范围定义,默认为true
},窗口项。
function sum(mapper)
{
  return function(agg, rows, begin, end)
  {
    return Canal.of(rows, begin, end)
    .map(mapper)
    .reduce(function(a, b)
    {
      return a + b;
    }).get();
  };
}
Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).window(
  Canal.item(sum(function(d){return d.sal;}))
    .partBy(function(d){return d.grp;})
    .orderBy(function(d){return d.rnk;})
    .rows().between(-1, 1)
    .as("sum_sal")
).collect();

在ES6中可简化为:

Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).window(
  Canal.item(sum(d=>d.sal))
    .partBy(d=>d.grp)
    .orderBy(d=>d.rnk)
    .rows().between(-1, 1)
    .as("sum_sal")
).collect();

此外,Canal.wf中内置了部分常用的窗口函数,例如Canal.wf.sum,可免去对sum函数的额外定义,并将代码进一步简化为:

Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).window(
  Canal.wf.sum(d=>d.sal)
    .partBy(d=>d.grp)
    .orderBy(d=>d.rnk)
    .rows().between(-1, 1)
    .as("sum_sal")
).collect();
[
  {"id":"1","grp":"1","rnk":1,"sal":1000.00,"sum_sal":2100.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00,"sum_sal":3300.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00,"sum_sal":3600.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00,"sum_sal":3900.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00,"sum_sal":2700.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00,"sum_sal":3100.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00,"sum_sal":4800.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00,"sum_sal":3300.00}
]

窗口函数

Canal.wf中定义了常用的窗口函数,一般调用形式为:
Canal.wf.FUNC(...).partBy(FIELD,...).orderBy(FIELD,...).rows().between(PRCD,FOLW).as(ALIAS)

Canal.wf.count(vop[,distinct[,cmp]])

计算窗口中元素的个数。

vop: (data) => Value,Value识别算子;
distinct: Booleantrue表示计算不重复的元素个数,默认为false
cmp: (a, b) => Number,比较算子。

注:distinct取值true则不允许在窗口中指定orderBybetween

Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).select()
.window(
Canal.wf.count(f("id"))
  .partBy(function(d){return d.grp;})
  .orderBy(function(d){return d.rnk;})
  .as("count")
).collect();
[
  {"id":"1","grp":"1","rnk":1,"sal":1000.00,"count":2},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00,"count":2},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00,"count":4},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00,"count":4},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00,"count":5},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00,"count":2},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00,"count":2},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00,"count":3}
]

Canal.wf.cume_dist()

计算各行的累计分布,必须指定orderBy,但不能指定between

Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).select()
.window(
Canal.wf.cume_dist()
  .partBy(function(d){return d.grp;})
  .orderBy(function(d){return d.rnk;})
  .as("cum_dst")
).collect();
[
  {"id":"1","grp":"1","rnk":1,"sal":1000.00,"cum_dst":0.4},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00,"cum_dst":0.4},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00,"cum_dst":0.8},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00,"cum_dst":0.8},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00,"cum_dst":1.0},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00,"cum_dst":2/3},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00,"cum_dst":2/3},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00,"cum_dst":1.0}
]

Canal.wf.dense_rank()

计算各行的密集等级,必须指定orderBy,但不能指定between

Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).select()
.window(
Canal.wf.dense_rank()
  .partBy(function(d){return d.grp;})
  .orderBy(function(d){return d.rnk;})
  .as("rank")
).collect();
[
  {"id":"1","grp":"1","rnk":1,"sal":1000.00,"rank":1},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00,"rank":1},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00,"rank":2},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00,"rank":2},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00,"rank":3},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00,"rank":1},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00,"rank":1},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00,"rank":2}
]

Canal.wf.fold(init, folder[, vop])

对窗口中的数据进行折叠。

init: () => Value,各个窗口的折叠初始值;
folder: (last, data) => R,折叠算子,其中data为vop所识别的Value;
vop: (data) => Value,Value识别算子,默认为(data) => data

Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).select()
.window(
Canal.wf.fold(function(){return [];},
              function(last,data){
                last.push(data.sal);
                return last;}
             )
  .partBy(function(d){return d.grp;})
  .orderBy(function(d){return d.rnk;})
  .as("fold_sal")
).collect();
[
  {"id":"1","grp":"1","rnk":1,"sal":1000.00,"fold_sal":[1000.00,1100.00]},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00,"fold_sal":[1000.00,1100.00]},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00,"fold_sal":[1000.00,1100.00,1200.00,1300.00]},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00,"fold_sal":[1000.00,1100.00,1200.00,1300.00]},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00,"fold_sal":[1000.00,1100.00,1200.00,1300.00,1400.00]},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00,"fold_sal":[1500.00,1600.00]},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00,"fold_sal":[1500.00,1600.00]},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00,"fold_sal":[1500.00,1600.00,1700.00]}
]

Canal.wf.lag(vop[, offset[, default]])

计算当前行之前offset行的值,必须指定orderBy,但不能指定between

vop: (data) => Value,Value识别算子;
offset: Integer,偏移量,最小为0,默认为1;
default: Value,默认值,如果当前行之前的offset行超出分片范围,则返回此默认值,默认为undefined

Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).select()
.window(
Canal.wf.lag(function(d){return d.sal;}, 2, "N/A")
  .partBy(function(d){return d.grp;})
  .orderBy(function(d){return d.rnk;})
  .as("lg")
).collect();
[
  {"id":"1","grp":"1","rnk":1,"sal":1000.00,"lg":"N/A"},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00,"lg":"N/A"},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00,"lg":1000.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00,"lg":1100.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00,"lg":1200.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00,"lg":"N/A"},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00,"lg":"N/A"},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00,"lg":1500.00}
]

Canal.wf.lead(vop[, offset[, default]])

计算当前行之后offset行的值,必须指定orderBy,但不能指定between

vop: (data) => Value,Value识别算子;
offset: Integer,偏移量,最小为0,默认为1;
default: Value,默认值,如果当前行之后的offset行超出分片范围,则返回此默认值,默认为undefined

Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).select()
.window(
Canal.wf.lead(function(d){return d.sal;}, 2, "N/A")
  .partBy(function(d){return d.grp;})
  .orderBy(function(d){return d.rnk;})
  .as("ld")
).collect();
[
  {"id":"1","grp":"1","rnk":1,"sal":1000.00,"ld":1200.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00,"ld":1300.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00,"ld":1400.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00,"ld":"N/A"},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00,"ld":"N/A"},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00,"ld":1700.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00,"ld":"N/A"},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00,"ld":"N/A"}
]

Canal.wf.max(vop[, cmp])

计算窗口中的最大元素。

vop: (data) => Value,Value识别算子;
cmp: (a, b) => Number,比较算子。

Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).select()
.window(
Canal.wf.max(function(d){return d.sal;})
  .partBy(function(d){return d.grp;})
  .orderBy(function(d){return d.rnk;})
  .rows().between(0,1)
  .as("max_sal")
).collect();
[
  {"id":"1","grp":"1","rnk":1,"sal":1000.00,"max_sal":1100.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00,"max_sal":1200.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00,"max_sal":1300.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00,"max_sal":1400.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00,"max_sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00,"max_sal":1600.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00,"max_sal":1700.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00,"max_sal":1700.00}
]

Canal.wf.min(vop[, cmp])

计算窗口中的最小元素。

vop: (data) => Value,Value识别算子;
cmp: (a, b) => Number,比较算子。

Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).select()
.window(
Canal.wf.min(function(d){return d.sal;})
  .partBy(function(d){return d.grp;})
  .orderBy(function(d){return d.rnk;})
  .rows().between(-1,0)
  .as("min_sal")
).collect();
[
  {"id":"1","grp":"1","rnk":1,"sal":1000.00,"min_sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00,"min_sal":1000.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00,"min_sal":1100.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00,"min_sal":1200.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00,"min_sal":1300.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00,"min_sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00,"min_sal":1500.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00,"min_sal":1600.00}
]

Canal.wf.ntile(num)

根据num分组数量计算各行所属组别,必须指定orderBy,但不能指定between

num: Integer,分组总数。

Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).select()
.window(
Canal.wf.ntile(4)
  .partBy(function(d){return d.grp;})
  .orderBy(function(d){return d.rnk;})
  .as("ntle")
).collect();
[
  {"id":"1","grp":"1","rnk":1,"sal":1000.00,"ntle":1},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00,"ntle":1},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00,"ntle":2},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00,"ntle":3},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00,"ntle":4},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00,"ntle":1},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00,"ntle":2},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00,"ntle":3}
]

Canal.wf.percent_rank()

计算各行的百分比等级,必须指定orderBy,但不能指定between

Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).select()
.window(
Canal.wf.percent_rank()
  .partBy(function(d){return d.grp;})
  .orderBy(function(d){return d.rnk;})
  .as("pct_rnk")
).collect();
[
  {"id":"1","grp":"1","rnk":1,"sal":1000.00,"pct_rnk":0},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00,"pct_rnk":0},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00,"pct_rnk":0.5},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00,"pct_rnk":0.5},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00,"pct_rnk":1},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00,"pct_rnk":0},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00,"pct_rnk":0},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00,"pct_rnk":1}
]

Canal.wf.rank()

计算各行的等级,必须指定orderBy,但不能指定between

Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).select()
.window(
Canal.wf.rank()
  .partBy(function(d){return d.grp;})
  .orderBy(function(d){return d.rnk;})
  .as("rank")
).collect();
[
  {"id":"1","grp":"1","rnk":1,"sal":1000.00,"rank":1},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00,"rank":1},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00,"rank":3},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00,"rank":3},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00,"rank":5},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00,"rank":1},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00,"rank":1},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00,"rank":3}
]

Canal.wf.row_number()

计算各行的行号,必须指定orderBy,但不能指定between

Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).select()
.window(
Canal.wf.row_number()
  .partBy(function(d){return d.grp;})
  .orderBy(function(d){return d.rnk;})
  .as("row_num")
).collect();
[
  {"id":"1","grp":"1","rnk":1,"sal":1000.00,"row_num":1},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00,"row_num":2},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00,"row_num":3},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00,"row_num":4},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00,"row_num":5},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00,"row_num":1},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00,"row_num":2},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00,"row_num":3}
]

Canal.wf.sum(vop)

计算窗口中的元素之和。

vop: (data) => Value,Value识别算子。

Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).select()
.window(
Canal.wf.sum(function(d){return d.sal;})
  .partBy(function(d){return d.grp;})
  .orderBy(function(d){return d.rnk;})
  .rows().between(-1, 1)
  .as("sum_sal")
).collect();
[
  {"id":"1","grp":"1","rnk":1,"sal":1000.00,"sum_sal":2100.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00,"sum_sal":3300.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00,"sum_sal":3600.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00,"sum_sal":3900.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00,"sum_sal":2700.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00,"sum_sal":3100.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00,"sum_sal":4800.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00,"sum_sal":3300.00}
]

杂项

Canal.field(name[, alias]): (data) => Value

根据字段的名称将其转换成一个字段的提取器,使之能从给定的data中返回字段name的值。

name: String|Vop,字段名称,如果name是一个Vop,则直接用其作为提取器;
alias: String,字段别名,可选。

var f = Canal.field;
var result = Canal.of([
  {"id":"1","grp":"1","rnk":1,"sal":1000.00},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00}
]).window(
  Canal.wf.count(f("id"))  // 等价于d=>d["id"]
    .partBy(function(d){return d.grp;})
    .orderBy(function(d){return d.rnk;})
    .as("count")
).collect();
[
  {"id":"1","grp":"1","rnk":1,"sal":1000.00,"count":2},
  {"id":"2","grp":"1","rnk":1,"sal":1100.00,"count":2},
  {"id":"3","grp":"1","rnk":2,"sal":1200.00,"count":4},
  {"id":"4","grp":"1","rnk":2,"sal":1300.00,"count":4},
  {"id":"5","grp":"1","rnk":3,"sal":1400.00,"count":5},
  {"id":"6","grp":"2","rnk":1,"sal":1500.00,"count":2},
  {"id":"7","grp":"2","rnk":1,"sal":1600.00,"count":2},
  {"id":"8","grp":"2","rnk":2,"sal":1700.00,"count":3}
]

Canal.pairsOfMap(map[, keys]): Array<[Key, Value]>

将一个映射对象转换为由一系列Pair构成的数组。

map: Object<Key,Value>,映射对象;
keys: Array<String>,映射Key的序列,默认为Map中的所有Key。

Canal.pairsOfMap({
  "A": 0,
  "B": 1
});
[
  ["A", 0],
  ["B", 1]
]

Canal.mapOfPairs(pairs): Object<Key, Value>

将一个Pair数组转换为映射对象。

pairs: Array<[Key, Value]>,一个元素为Pair的数组。

Canal.pairsOfMap([
  ["A", 0],
  ["B", 1]
]);
{
  "A": 0,
  "B": 1
}

Canal.Some(value): Some<Value>

创建一个Some对象,Some继承自Option

value: Value,值。

var some = Canal.Some(1);
console.log(some.or("_"));
some.map(function(d){
  return d + 1;
}).foreach(function(d){
  console.log(d);
});
1
2

Canal.None(): None

创建一个None对象,None继承自Option

var none = Canal.None();
console.log(none.or("_"));
console.log(none.count());
_
0

Canal.Option([value]): Option

根据给定的value创建一个Option对象,如果value不为null则返回Some对象,否则返回None对象。

var opt1 = Canal.Option(1);
console.log(opt1.or("_"));
var opt2 = Canal.Option(null);
console.log(opt2.or("_"));
1
_

Canal.unit: (data) => data

数据到自身的映射。

console.log(Canal.unit(1));
1

Canal.kop: (array) => Key

默认的Key识别算子,将数组映射为第一个元素。

console.log(Canal.kop([1,"one"]));
1

Canal.vop: (array) => Value

默认的Value识别算子,将数组映射为第二个元素。

console.log(Canal.vop([1,"one"]));
one
Clone this wiki locally
You can’t perform that action at this time.