-
Notifications
You must be signed in to change notification settings - Fork 10
/
irelate.go
298 lines (271 loc) · 8.55 KB
/
irelate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
// Streaming relation (overlap, distance, KNN) testing of (any number of) sorted files of intervals.
package irelate
import (
"container/heap"
"fmt"
"io"
"log"
"os"
. "github.com/brentp/irelate/interfaces"
)
// Set relativeTo so SelfRelations constant to allow reporting overlaps within a stream
const SelfRelations = -2
func relate(a Relatable, b Relatable, relativeTo int) {
if relativeTo == SelfRelations {
a.AddRelated(b)
b.AddRelated(a)
return
}
if a.Source() != b.Source() {
if relativeTo == -1 {
a.AddRelated(b)
b.AddRelated(a)
} else {
if uint32(relativeTo) == a.Source() {
a.AddRelated(b)
}
if uint32(relativeTo) == b.Source() {
b.AddRelated(a)
}
}
}
}
func Less(a Relatable, b Relatable) bool {
if a.Chrom() != b.Chrom() {
return a.Chrom() < b.Chrom()
}
return a.Start() < b.Start() // || (a.Start() == b.Start() && a.End() < b.End())
}
// 1, 2, 3 ... 9, 10, 11...
func NaturalLessPrefix(a Relatable, b Relatable) bool {
if !SameChrom(a.Chrom(), b.Chrom()) {
return NaturalLess(StripChr(a.Chrom()), StripChr(b.Chrom()))
}
return a.Start() < b.Start() || (a.Start() == b.Start() && a.End() < b.End())
}
// 1, 10, 11... 19, 2, 20, 21 ...
func LessPrefix(a Relatable, b Relatable) bool {
if !SameChrom(a.Chrom(), b.Chrom()) {
return StripChr(a.Chrom()) < StripChr(b.Chrom())
}
return a.Start() < b.Start() || (a.Start() == b.Start() && a.End() < b.End())
}
// CheckRelatedByOverlap returns true if Relatables overlap.
func CheckRelatedByOverlap(a Relatable, b Relatable) bool {
return (b.Start() < a.End()) && (b.Chrom() == a.Chrom())
// note with distance == 0 this just overlap.
//distance := uint32(0)
//return (b.Start()-distance < a.End()) && (b.Chrom() == a.Chrom())
}
// handles chromomomes like 'chr1' from one org and '1' from another.
func CheckOverlapPrefix(a Relatable, b Relatable) bool {
if b.Start() < a.End() {
return SameChrom(a.Chrom(), b.Chrom())
}
return false
}
// CheckKNN relates an interval to its k-nearest neighbors.
// The reporting function will have to do some filtering since this is only
// guaranteed to associate *at least* k neighbors, but it could be returning extra.
func CheckKNN(a Relatable, b Relatable) bool {
// the first n checked would be the n_closest, but need to consider ties
// the report function can decide what to do with them.
k := 4
r := a.Related()
if len(r) >= k {
// TODO: double-check this.
return r[len(r)-1].Start()-a.End() < b.Start()-a.End()
}
return true
}
// filter rewrites the input-slice to remove nils.
func filter(s []Relatable, nils int) []Relatable {
j := 0
if len(s) != nils {
for _, v := range s {
if v != nil {
s[j] = v
j++
}
}
}
for k := j; k < len(s); k++ {
s[k] = nil
}
return s[:j]
}
type irelate struct {
checkRelated func(a, b Relatable) bool
// relativeTo indicates which stream is the query stream. A value of -1 means
// all vs all. A value of -2 reports overlaps even within the same stream.
relativeTo int
less func(a, b Relatable) bool
// cache holds the set of Relatables we must test for overlap. A Relatable
// is ejected from the cache when it is not related to the interval that's
// about to be added.
cache []Relatable
// an item eject from the cache gets put on the sendQ if it's from the query
// stream.
sendQ *relatableQueue
// mergeStream creates a single (sorted) stream of all incoming intervals.
mergeStream RelatableIterator
//merger RelatableChannel
nils int
}
// IRelate provides the basis for flexible overlap/proximity/k-nearest neighbor
// testing. IRelate receives merged, ordered Relatables via stream and takes
// function that checks if they are related (see CheckRelatedByOverlap).
// It is guaranteed that !Less(b, a) is true (we can't guarantee that Less(a, b)
// is true since they may have the same start). Once checkRelated returns false,
// it is assumed that no other `b` Relatables could possibly be related to `a`
// and so `a` is sent to the returnQ.
// streams are a variable number of iterators that send intervals.
func IRelate(checkRelated func(a, b Relatable) bool,
relativeTo int,
less func(a, b Relatable) bool,
streams ...RelatableIterator) RelatableIterator {
mergeStream := newMerger(less, relativeTo, streams...)
ir := &irelate{checkRelated: checkRelated, relativeTo: relativeTo,
mergeStream: mergeStream,
cache: make([]Relatable, 0, 1024), sendQ: &relatableQueue{make([]Relatable, 0, 1024), less},
less: less}
return ir
}
func (ir *irelate) Close() error {
return nil
}
func (ir *irelate) Next() (Relatable, error) {
for {
interval, err := ir.mergeStream.Next()
if err == io.EOF {
break
}
// check the interval against everything in the cache.
for i, c := range ir.cache {
if c == nil {
continue
}
if ir.checkRelated(c, interval) {
relate(c, interval, ir.relativeTo)
} else {
// if it's not related, we remove it from the cache
// if it's a query interval, we push it onto the sendQ.
if ir.relativeTo < 0 || int(c.Source()) == ir.relativeTo {
heap.Push(ir.sendQ, c)
}
ir.cache[i] = nil
ir.nils++
}
}
// only do this when we have a lot of nils as it's expensive to create a new slice.
// nils are spaces that we've removed from the cache.
if ir.nils < 2 {
ir.cache = append(ir.cache, interval)
continue
}
// remove nils from the cache (must do this before sending)
ir.cache, ir.nils = filter(ir.cache, ir.nils), 0
var o Relatable
if len(ir.sendQ.rels) > 0 {
o = ir.sendQ.rels[0]
}
// if the first thing in the sendQ is less than the first thing in the cache
// then we can send Pop the lowest thing off the sendQ.
// otherwise, we continue to read from the stream.
if o != nil && (len(ir.cache) == 0 || ir.less(o, ir.cache[0])) {
ir.cache = append(ir.cache, interval)
return heap.Pop(ir.sendQ).(Relatable), nil
}
ir.cache = append(ir.cache, interval)
}
// stream is done so we empty the cache by pushing onto the sendQ
if len(ir.cache) > 0 {
ir.cache, ir.nils = filter(ir.cache, ir.nils), 0
for _, c := range ir.cache {
if ir.relativeTo < 0 || int(c.Source()) == ir.relativeTo {
heap.Push(ir.sendQ, c)
}
}
ir.cache = ir.cache[:0]
}
// ... then we clear the sendQ
if len(ir.sendQ.rels) > 0 {
return heap.Pop(ir.sendQ).(Relatable), nil
}
return nil, io.EOF
}
type merger struct {
less func(a, b Relatable) bool
relativeTo int
streams []RelatableIterator
q relatableQueue
seen map[string]struct{}
j int
lastChrom string
verbose bool
}
func newMerger(less func(a, b Relatable) bool, relativeTo int, streams ...RelatableIterator) *merger {
q := relatableQueue{make([]Relatable, 0, len(streams)), less}
verbose := os.Getenv("IRELATE_VERBOSE") == "TRUE"
for i, stream := range streams {
interval, err := stream.Next()
if err != nil && err != io.EOF {
panic(err)
}
if interval != nil {
interval.SetSource(uint32(i))
heap.Push(&q, interval)
}
if err == io.EOF {
stream.Close()
}
}
m := &merger{less: less, relativeTo: relativeTo, streams: streams, q: q, seen: make(map[string]struct{}), j: -1000, lastChrom: "", verbose: verbose}
return m
}
func (m *merger) Close() error {
return nil
}
func (m *merger) Next() (Relatable, error) {
if len(m.q.rels) == 0 {
return nil, io.EOF
}
interval := heap.Pop(&m.q).(Relatable)
source := interval.Source()
if !SameChrom(interval.Chrom(), m.lastChrom) {
if m.verbose && m.lastChrom != "" {
log.Printf("on chromosome: %s\n", m.lastChrom)
}
m.lastChrom = StripChr(interval.Chrom())
if _, ok := m.seen[m.lastChrom]; ok {
log.Println("warning: chromosomes must be in different order between files or the chromosome sort order is not as expected.")
log.Printf("warning: overlaps will likely be missed after this chrom: %s from source: %d\n", m.lastChrom, interval.Source())
}
m.seen[m.lastChrom] = struct{}{}
}
// pull the next interval from the same source.
next_interval, err := m.streams[source].Next()
if err == nil {
if next_interval.Start() < interval.Start() {
if SameChrom(next_interval.Chrom(), interval.Chrom()) {
panic(fmt.Sprintf("intervals out of order within file: starts at: %d and %d from source: %d", interval.Start(), next_interval.Start(), source))
}
}
next_interval.SetSource(source)
heap.Push(&m.q, next_interval)
m.j--
if m.j == 0 {
return nil, io.EOF
}
} else {
if int(source) == m.relativeTo {
// we pull in 200K more records and then stop. to make sure we get anything that might
// relate to last query
m.j = 200000
}
}
if err == io.EOF {
m.streams[source].Close()
}
return interval, nil
}