/
distinct_global.go
119 lines (105 loc) · 2.44 KB
/
distinct_global.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package executor
import (
"github.com/gotodb/gotodb/datatype"
"github.com/gotodb/gotodb/metadata"
"github.com/gotodb/gotodb/pb"
"github.com/gotodb/gotodb/row"
"github.com/gotodb/gotodb/stage"
"github.com/gotodb/gotodb/util"
"github.com/vmihailenco/msgpack"
"io"
"sync"
)
func (e *Executor) SetInstructionDistinctGlobal(instruction *pb.Instruction) error {
var job stage.DistinctGlobalJob
if err := msgpack.Unmarshal(instruction.EncodedStageJobBytes, &job); err != nil {
return err
}
e.StageJob = &job
return nil
}
func (e *Executor) RunDistinctGlobal() error {
job := e.StageJob.(*stage.DistinctGlobalJob)
//read md
md := &metadata.Metadata{}
for _, reader := range e.Readers {
if err := util.ReadObject(reader, md); err != nil {
return err
}
}
mdOutput := job.Metadata
rbWriters := make([]*row.RowsBuffer, len(e.Writers))
for i, writer := range e.Writers {
if err := util.WriteObject(writer, mdOutput); err != nil {
return err
}
rbWriters[i] = row.NewRowsBuffer(mdOutput, nil, writer)
}
//init
for _, e := range job.Expressions {
if err := e.Init(md); err != nil {
return err
}
}
var mutex sync.Mutex
distinctMap := make([]map[string]bool, len(job.Expressions))
for i := 0; i < len(job.Expressions); i++ {
distinctMap[i] = make(map[string]bool)
}
indexes := make([]int, len(job.Expressions))
var err error
for i, e := range job.Expressions {
indexes[i], err = md.GetIndexByName(e.Name)
if err != nil {
return err
}
}
//write rows
var wg sync.WaitGroup
for i := range e.Readers {
wg.Add(1)
go func(wi int) {
defer func() {
wg.Done()
}()
reader := e.Readers[wi]
rbReader := row.NewRowsBuffer(md, reader, nil)
for {
rg0, err := rbReader.Read()
if err == io.EOF {
break
}
if err != nil {
e.AddLogInfo(err, pb.LogLevel_ERR)
return
}
mutex.Lock()
for i := 0; i < rg0.GetRowsNumber(); i++ {
r := rg0.GetRow(i)
for j, index := range indexes {
c := r.Vals[index]
ckey := datatype.ToKeyString(c)
if _, ok := distinctMap[j][ckey]; ok {
r.Vals[index] = nil
} else {
distinctMap[j][ckey] = true
}
}
if err = rbWriters[wi].WriteRow(r); err != nil {
e.AddLogInfo(err, pb.LogLevel_ERR)
return
}
row.Pool.Put(r)
}
mutex.Unlock()
}
}(i)
}
wg.Wait()
for _, rbWriter := range rbWriters {
if err := rbWriter.Flush(); err != nil {
return err
}
}
return nil
}