-
Notifications
You must be signed in to change notification settings - Fork 8
/
store.go
140 lines (118 loc) · 3.6 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package transfer
import (
"sync"
)
// OnStoreChange is a function that is executed everytime a
// transfer is added, removed or updated to the store.
type OnStoreChange func(i int)
// TransferStore is a thread-safe store that allows to store, retrieve, remove,
// and update transfer and also get notification when the content of the store changes.
type TransferStore struct {
OnStoreChange
mu sync.Mutex
data []*Transfer
}
// NewTransferStore will create a new instance of TransferStore which is thread-safe.
func NewStore() *TransferStore {
return &TransferStore{
data: make([]*Transfer, 0, 3),
}
}
// Get will return an immutable transfer by the position i on the store.
//
// This transfer instance is a copy of the instance stored and for that
// reason any changes to the returned instance will not have effect on the
// stored instance.
func (s *TransferStore) Get(i int) *Transfer {
s.mu.Lock()
if i < 0 || i > s.Size()-1 {
return nil
}
//Make a copy of the object stored to avoid outside mutations.
cp := new(Transfer)
*cp = *s.data[i]
s.mu.Unlock()
return cp
}
// Add will add a transfer to the store and return is position. If t is ni
// the transfer is not added and returns -1.
//
// Also executes the function OnStoreChange after the transfer gets added.
func (s *TransferStore) Add(t *Transfer) int {
if t == nil {
return -1
}
s.mu.Lock()
s.data = append(s.data, t)
i := len(s.data) - 1
s.mu.Unlock()
if s.OnStoreChange != nil {
s.OnStoreChange(i)
}
return i
}
// Update will update the tranfer stored at position i with the values of t.
//
// Only the status, localfilepath and error are updated. It Also executes the
// function OnStoreChange after the transfer gets updated.
//
// If the status changes from Waiting it will close the waiting channel.
//
// If the status change to a final status it will close the progress channel.
func (s *TransferStore) Update(i int, t *Transfer) {
if i < 0 || i > s.Size()-1 || t == nil {
return
}
s.mu.Lock()
s.update(i, t)
s.mu.Unlock()
if s.OnStoreChange != nil {
s.OnStoreChange(i)
}
}
func (s *TransferStore) update(i int, t *Transfer) {
s.data[i].Status = t.Status
s.data[i].LocalFilePath = t.LocalFilePath
s.data[i].err = t.err
// If the waiting channel is open and the status is not waiting,
// it will close the channel to unlock the waiting.
if s.data[i].wait != nil && t.Status != Waiting {
close(s.data[i].wait)
s.data[i].wait = nil
}
if s.data[i].Status.IsFinal() && s.data[i].prog != nil {
close(s.data[i].prog)
s.data[i].prog = nil
}
}
// Size will return the current number of elements on the store.
func (s *TransferStore) Size() int {
return len(s.data)
}
// AddToWait will add a transfer to the store, return is position and it will
// also return a channel that allows to wait until this transfer status changes
// from waiting to another status.
func (s *TransferStore) AddToWait(t *Transfer) (int, <-chan interface{}) {
id := s.Add(t)
if id == -1 {
return id, nil
}
return id, s.data[id].waitDecision()
}
// FollowProgress returns a channel for the transfer on the specified position
// that allow the consumer to get the current progress of the transfer
// everytime it changes.
func (s *TransferStore) FollowProgress(i int) <-chan float64 {
if i < 0 || i > s.Size()-1 {
return nil
}
return s.data[i].progress()
}
// UpdateProgress allows to update the progress of the transfer on the
// specified index.
//
// Everythime this function is called the channel from FollowProgress
// will get the data provided here.
func (s *TransferStore) UpdateProgress(id int, progress float64) {
s.data[id].progress() <- progress
}