/
possum-provider.go
137 lines (124 loc) · 2.96 KB
/
possum-provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
//go:build !android
package possumTorrentStorage
import (
"cmp"
"fmt"
"io"
"sort"
"strconv"
"github.com/anacrolix/log"
possum "github.com/anacrolix/possum/go"
possumResource "github.com/anacrolix/possum/go/resource"
"github.com/anacrolix/torrent/storage"
)
// Extends possum resource.Provider with an efficient implementation of torrent
// storage.ConsecutiveChunkReader. TODO: This doesn't expose Capacity
type Provider struct {
possumResource.Provider
Logger log.Logger
}
var _ storage.ConsecutiveChunkReader = Provider{}
// Sorts by a precomputed key but swaps on another slice at the same time.
type keySorter[T any, K cmp.Ordered] struct {
orig []T
keys []K
}
func (o keySorter[T, K]) Len() int {
return len(o.keys)
}
func (o keySorter[T, K]) Less(i, j int) bool {
return o.keys[i] < o.keys[j]
}
func (o keySorter[T, K]) Swap(i, j int) {
o.keys[i], o.keys[j] = o.keys[j], o.keys[i]
o.orig[i], o.orig[j] = o.orig[j], o.orig[i]
}
// TODO: Should the parent ReadConsecutiveChunks method take the expected number of bytes to avoid
// trying to read discontinuous or incomplete sequences of chunks?
func (p Provider) ReadConsecutiveChunks(prefix string) (rc io.ReadCloser, err error) {
p.Logger.Levelf(log.Debug, "ReadConsecutiveChunks(%q)", prefix)
//debug.PrintStack()
pr, err := p.Handle.NewReader()
if err != nil {
return
}
defer func() {
if err != nil {
pr.End()
}
}()
items, err := pr.ListItems(prefix)
if err != nil {
return
}
keys := make([]int64, 0, len(items))
for _, item := range items {
var i int64
offsetStr := item.Key
i, err = strconv.ParseInt(offsetStr, 10, 64)
if err != nil {
err = fmt.Errorf("failed to parse offset %q: %w", offsetStr, err)
return
}
keys = append(keys, i)
}
sort.Sort(keySorter[possum.Item, int64]{items, keys})
offset := int64(0)
consValues := make([]consecutiveValue, 0, len(items))
for i, item := range items {
itemOffset := keys[i]
if itemOffset > offset {
// We can't provide a continuous read.
break
}
if itemOffset+item.Stat.Size() <= offset {
// This item isn't needed
continue
}
var v possum.Value
v, err = pr.Add(prefix + item.Key)
if err != nil {
return
}
consValues = append(consValues, consecutiveValue{
pv: v,
offset: itemOffset,
size: item.Stat.Size(),
})
offset += item.Stat.Size() - (offset - itemOffset)
}
err = pr.Begin()
if err != nil {
return
}
rc, pw := io.Pipe()
go func() {
defer pr.End()
err := p.writeConsecutiveValues(consValues, pw)
err = pw.CloseWithError(err)
if err != nil {
panic(err)
}
}()
return
}
type consecutiveValue struct {
pv possum.Value
offset int64
size int64
}
func (pp Provider) writeConsecutiveValues(
values []consecutiveValue, pw *io.PipeWriter,
) (err error) {
off := int64(0)
for _, v := range values {
var n int64
valueOff := off - v.offset
n, err = io.Copy(pw, io.NewSectionReader(v.pv, valueOff, v.size-valueOff))
if err != nil {
return
}
off += n
}
return nil
}