-
Notifications
You must be signed in to change notification settings - Fork 64
/
resolver_transfers.go
106 lines (92 loc) · 2.54 KB
/
resolver_transfers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package gql
import (
"context"
"sort"
"time"
gqltypes "github.com/filecoin-project/boost/gql/types"
"github.com/filecoin-project/boost/storagemarket"
"github.com/google/uuid"
"github.com/graph-gophers/graphql-go"
)
type transferPoint struct {
At graphql.Time
Bytes gqltypes.Uint64
}
// query: transfers: [TransferPoint]
func (r *resolver) Transfers(_ context.Context) []*transferPoint {
return r.getTransferSamples(r.provider.Transfers(), nil)
}
type transferStats struct {
HttpMaxConcurrentDownloads int32
Stats []*hostTransferStats
}
type hostTransferStats struct {
Host string
Total int32
Started int32
Stalled int32
TransferSamples []*transferPoint
}
// query: transferStats: TransferStats
func (r *resolver) TransferStats(_ context.Context) *transferStats {
transfersByDeal := r.provider.Transfers()
stats := r.provider.TransferStats()
gqlStats := make([]*hostTransferStats, 0, len(stats))
for _, s := range stats {
gqlStats = append(gqlStats, &hostTransferStats{
Host: s.Host,
Total: int32(s.Total),
Started: int32(s.Started),
Stalled: int32(s.Stalled),
TransferSamples: r.getTransferSamples(transfersByDeal, s.DealUuids),
})
}
return &transferStats{
HttpMaxConcurrentDownloads: int32(r.cfg.Dealmaking.HttpTransferMaxConcurrentDownloads),
Stats: gqlStats,
}
}
func (r *resolver) getTransferSamples(deals map[uuid.UUID][]storagemarket.TransferPoint, filter []uuid.UUID) []*transferPoint {
// If filter is nil, include all deals
if filter == nil {
for dealUuid := range deals {
filter = append(filter, dealUuid)
}
}
// We have
// dealUUID -> [At: <time>, Transferred: <bytes>, At: <time>, Transferred: <bytes>, ...]
// Convert this to
// <time> -> <transferred per second>
totalAt := make(map[time.Time]uint64)
for _, dealUuid := range filter {
points, ok := deals[dealUuid]
if !ok {
continue
}
var prev uint64
first := true
for _, pt := range points {
if first {
first = false
prev = pt.Bytes
continue
}
transferredSincePrev := pt.Bytes - prev
totalAt[pt.At] += transferredSincePrev
prev = pt.Bytes
}
}
// Convert map into array of transferPoints
pts := make([]*transferPoint, 0, len(totalAt))
for at, total := range totalAt {
pts = append(pts, &transferPoint{
At: graphql.Time{Time: at},
Bytes: gqltypes.Uint64(total),
})
}
// Sort the array
sort.Slice(pts, func(i, j int) bool {
return pts[i].At.Before(pts[j].At.Time)
})
return pts
}