/
statustracker.go
126 lines (115 loc) · 2.55 KB
/
statustracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package pinwatcher
import (
"context"
"fmt"
"time"
"github.com/crossedbot/common/golang/logger"
clusterapi "github.com/ipfs-cluster/ipfs-cluster/api"
cluster "github.com/ipfs-cluster/ipfs-cluster/api/rest/client"
ipfscid "github.com/ipfs/go-cid"
pinsdb "github.com/crossedbot/axis/pkg/pins/database"
"github.com/crossedbot/axis/pkg/pins/models"
)
type StatusTracker interface {
Poll(target clusterapi.TrackerStatus, checkFreq time.Duration)
CheckStatus(target clusterapi.TrackerStatus) bool
Stop()
GetId() string
}
type statusTracker struct {
ctx context.Context
client cluster.Client
pin models.PinStatus
db pinsdb.Pins
quit chan struct{}
}
func NewStatusTracker(
ctx context.Context,
client cluster.Client,
db pinsdb.Pins,
pin models.PinStatus,
) StatusTracker {
return &statusTracker{
ctx: ctx,
client: client,
db: db,
pin: pin,
quit: make(chan struct{}),
}
}
func (t *statusTracker) Poll(
target clusterapi.TrackerStatus,
checkFreq time.Duration,
) {
ticker := time.NewTicker(checkFreq)
defer ticker.Stop()
for {
select {
case <-t.ctx.Done():
t.CheckStatus(target)
return
case <-t.quit:
t.CheckStatus(target)
return
case <-ticker.C:
if t.CheckStatus(target) {
return
}
}
}
}
func (t *statusTracker) CheckStatus(target clusterapi.TrackerStatus) bool {
// get the currently known status of the pin
var err error
t.pin, err = t.db.Get(t.pin.Id)
if err != nil {
logger.Error(fmt.Sprintf(
"Failed to get Pin: %s",
err,
))
return false
}
prevStatus := t.pin.Status
currStatus := prevStatus
// poll the cluster peer for the pin's status
cid, err := ipfscid.Decode(t.pin.Pin.Cid)
if err != nil {
logger.Error(fmt.Sprintf(
"Failed to decode pin's CID: %s",
err,
))
return false
}
gblPinInfo, err := t.client.Status(t.ctx, clusterapi.NewCid(cid), true)
if err != nil {
logger.Error(fmt.Sprintf(
"Failed to get status: %s",
err,
))
return false
}
targetReached := false
for _, pinInfo := range gblPinInfo.PeerMap {
// Assume there is one in the map due to local being true (for
// now!). In the future, we should check for consensus among the
// cluster.
currStatus = pinInfo.Status.String()
if pinInfo.Status == target {
targetReached = true
}
break
}
// Patch the pins status if status has changed
if prevStatus != currStatus {
t.db.Patch(t.pin.Id, map[string]interface{}{
"status": currStatus,
})
}
return targetReached
}
func (t *statusTracker) Stop() {
close(t.quit)
}
func (t *statusTracker) GetId() string {
return t.pin.Id
}