/
dataset_merge.go
49 lines (43 loc) · 1.33 KB
/
dataset_merge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package flow
import (
"github.com/chrislusf/gleam/instruction"
)
func (d *Dataset) MergeSortedTo(name string, partitionCount int) (ret *Dataset) {
if len(d.Shards) == partitionCount {
return d
}
ret = d.Flow.NewNextDataset(partitionCount)
everyN := len(d.Shards) / partitionCount
if len(d.Shards)%partitionCount > 0 {
everyN++
}
ret.IsLocalSorted = d.IsLocalSorted
ret.IsPartitionedBy = d.IsPartitionedBy
step := d.Flow.AddLinkedNToOneStep(d, everyN, ret)
step.SetInstruction(name, instruction.NewMergeSortedTo(d.IsLocalSorted))
return ret
}
func (d *Dataset) TreeMergeSortedTo(name string, partitionCount int, factor int) (ret *Dataset) {
if len(d.Shards) > factor && len(d.Shards) > partitionCount {
t := d.MergeSortedTo(name, len(d.Shards)/factor)
return t.TreeMergeSortedTo(name, partitionCount, factor)
}
if len(d.Shards) > partitionCount {
return d.MergeSortedTo(name, partitionCount)
}
return d
}
func (d *Dataset) MergeTo(name string, partitionCount int) (ret *Dataset) {
if len(d.Shards) == partitionCount {
return d
}
ret = d.Flow.NewNextDataset(partitionCount)
everyN := len(d.Shards) / partitionCount
if len(d.Shards)%partitionCount > 0 {
everyN++
}
ret.IsPartitionedBy = d.IsPartitionedBy
step := d.Flow.AddLinkedNToOneStep(d, everyN, ret)
step.SetInstruction(name, instruction.NewMergeTo())
return ret
}