This repository has been archived by the owner on Mar 3, 2020. It is now read-only.
/
iterator.go
106 lines (89 loc) · 1.96 KB
/
iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package streamsort
import (
"errors"
)
var errClosed = errors.New("streamsort: iterator is closed")
// Iterator allows to iterate over sorted outputs
type Iterator struct {
sources []*fileReader
stash sortedSlice
comparer Comparer
cur []byte
err error
}
func newIterator(fnames []string, comparer Comparer, compression Compression) (*Iterator, error) {
i := &Iterator{
sources: make([]*fileReader, 0, len(fnames)),
comparer: comparer,
}
for _, fname := range fnames {
src, srcID, err := i.openFile(fname, compression)
if err != nil {
_ = i.Close()
return nil, err
}
if src.Next() {
i.insert(srcID, src.Bytes())
}
if err := src.Err(); err != nil {
_ = i.Close()
return nil, err
}
}
return i, nil
}
func (i *Iterator) insert(srcID int, data []byte) {
i.stash = i.stash.Insert(srcID, data, i.comparer)
}
func (i *Iterator) openFile(fname string, comp Compression) (*fileReader, int, error) {
srcID := len(i.sources)
src, err := openFile(fname, comp)
if err != nil {
return nil, 0, err
}
i.sources = append(i.sources, src)
return src, srcID, nil
}
// Next advances the cursor to the next entry
func (i *Iterator) Next() bool {
if i.err != nil {
return false
}
last := len(i.stash) - 1
if last < 0 {
return false
}
item := i.stash[last]
i.stash = i.stash[:last]
i.cur = append(i.cur[:0], item.data...)
src := i.sources[item.srcID]
if src.Next() {
i.insert(item.srcID, src.Bytes())
}
if err := src.Err(); err != nil {
i.err = err
return false
}
return true
}
// Bytes returns the chunk at the current position
func (i *Iterator) Bytes() []byte {
if i.err != nil {
return nil
}
return i.cur
}
// Err returns the iterator error
func (i *Iterator) Err() error { return i.err }
// Close closes and releases the iterator
func (i *Iterator) Close() error {
var err error
for _, src := range i.sources {
if e := src.Close(); e != nil {
err = e
}
}
i.sources = i.sources[:0]
i.err = errClosed
return err
}