/
sync_state.go
217 lines (178 loc) · 5.73 KB
/
sync_state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
// Copyright (c) 2021 Proton Technologies AG
//
// This file is part of ProtonMail Bridge.
//
// ProtonMail Bridge is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// ProtonMail Bridge is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with ProtonMail Bridge. If not, see <https://www.gnu.org/licenses/>.
package store
import (
"sync"
"time"
"github.com/pkg/errors"
)
type syncState struct {
lock *sync.RWMutex
store storeSynchronizer
// finishTime is the time, when the sync was finished for the last time.
// When it's zero, it was never finished or the sync is ongoing.
finishTime int64
// idRanges are ID ranges which are used to split work in several workers.
// On the beginning of the sync it will find split IDs which are used to
// create this ranges. If we have 10000 messages and five workers, it will
// find IDs around 2000, 4000, 6000 and 8000 and then first worker will
// sync IDs 0-2000, second 2000-4000 and so on.
idRanges []*syncIDRange
// idsToBeDeletedMap is map with keys as message IDs. On the beginning
// of the sync, it will load all message IDs in database. During the sync,
// it will delete all messages from the map which were sycned. The rest
// at the end of the sync will be removed as those messages were not synced
// again. We do that because we don't want to remove everything on the
// beginning of the sync to keep client synced.
idsToBeDeletedMap map[string]bool
}
func newSyncState(store storeSynchronizer, finishTime int64, idRanges []*syncIDRange, idsToBeDeleted []string) *syncState {
idsToBeDeletedMap := map[string]bool{}
for _, id := range idsToBeDeleted {
idsToBeDeletedMap[id] = true
}
syncState := &syncState{
lock: &sync.RWMutex{},
store: store,
finishTime: finishTime,
idRanges: idRanges,
idsToBeDeletedMap: idsToBeDeletedMap,
}
for _, idRange := range idRanges {
idRange.syncState = syncState
}
return syncState
}
func (s *syncState) save() {
s.lock.Lock()
defer s.lock.Unlock()
s.store.saveSyncState(s.finishTime, s.idRanges, s.getIDsToBeDeleted())
}
// isIncomplete returns whether the sync is in progress (no matter whether
// the sync is running or just not finished by info from database).
func (s *syncState) isIncomplete() bool {
s.lock.Lock()
defer s.lock.Unlock()
return s.finishTime == 0 && len(s.idRanges) != 0
}
// isFinished returns whether the sync was finished.
func (s *syncState) isFinished() bool {
s.lock.Lock()
defer s.lock.Unlock()
return s.finishTime != 0
}
// clearFinishTime sets finish time to zero.
func (s *syncState) clearFinishTime() {
s.lock.Lock()
defer s.save()
defer s.lock.Unlock()
s.finishTime = 0
}
// setFinishTime sets finish time to current time.
func (s *syncState) setFinishTime() {
s.lock.Lock()
defer s.save()
defer s.lock.Unlock()
s.finishTime = time.Now().UnixNano()
}
// initIDRanges inits the main full range. Then each range is added
// by `addIDRange`.
func (s *syncState) initIDRanges() {
s.lock.Lock()
defer s.lock.Unlock()
s.idRanges = []*syncIDRange{{
syncState: s,
StartID: "",
StopID: "",
}}
}
// addIDRange sets `splitID` as stopID for last range and adds new one
// starting with `splitID`.
func (s *syncState) addIDRange(splitID string) {
s.lock.Lock()
defer s.lock.Unlock()
lastGroup := s.idRanges[len(s.idRanges)-1]
lastGroup.StopID = splitID
s.idRanges = append(s.idRanges, &syncIDRange{
syncState: s,
StartID: splitID,
StopID: "",
})
}
// loadMessageIDsToBeDeleted loads all message IDs from database
// and by default all IDs are meant for deletion. During sync for
// each ID `doNotDeleteMessageID` has to be called to remove that
// message from being deleted by `deleteMessagesToBeDeleted`.
func (s *syncState) loadMessageIDsToBeDeleted() error {
idsToBeDeletedMap := make(map[string]bool)
ids, err := s.store.getAllMessageIDs()
if err != nil {
return err
}
for _, id := range ids {
idsToBeDeletedMap[id] = true
}
s.lock.Lock()
defer s.save()
defer s.lock.Unlock()
s.idsToBeDeletedMap = idsToBeDeletedMap
return nil
}
func (s *syncState) doNotDeleteMessageID(id string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.idsToBeDeletedMap, id)
}
func (s *syncState) deleteMessagesToBeDeleted() error {
s.lock.Lock()
defer s.lock.Unlock()
idsToBeDeleted := s.getIDsToBeDeleted()
log.Infof("Deleting %v messages after sync", len(idsToBeDeleted))
if err := s.store.deleteMessagesEvent(idsToBeDeleted); err != nil {
return errors.Wrap(err, "failed to delete messages")
}
return nil
}
// getIDsToBeDeleted is helper to convert internal map for easier
// manipulation to array.
func (s *syncState) getIDsToBeDeleted() []string {
keys := []string{}
for key := range s.idsToBeDeletedMap {
keys = append(keys, key)
}
return keys
}
// syncIDRange holds range which IDs need to be synced.
type syncIDRange struct {
syncState *syncState
StartID string
StopID string
}
func (r *syncIDRange) setStartID(startID string) {
r.StartID = startID
r.syncState.save()
}
func (r *syncIDRange) setStopID(stopID string) {
r.StopID = stopID
r.syncState.save()
}
// isFinished returns syncIDRange is finished when StartID and StopID
// are the same. But it cannot be full range, full range cannot be
// determined in other way than asking API.
func (r *syncIDRange) isFinished() bool {
return r.StartID == r.StopID && r.StartID != ""
}