/
left_over_data.go
60 lines (54 loc) · 1.93 KB
/
left_over_data.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// Copyright 2018-2022 Burak Sezer
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routingtable
import (
"github.com/buraksezer/olric/internal/cluster/partitions"
"github.com/buraksezer/olric/internal/discovery"
)
func (r *RoutingTable) processLeftOverDataReports(reports map[discovery.Member]*leftOverDataReport) {
check := func(member discovery.Member, owners []discovery.Member) bool {
for _, owner := range owners {
if member.CompareByID(owner) {
return true
}
}
return false
}
ensureOwnership := func(member discovery.Member, partID uint64, part *partitions.Partition) {
owners := part.Owners()
if check(member, owners) {
return
}
// This section is protected by routingMtx against parallel writers.
//
// Copy owners and append the member to head
newOwners := make([]discovery.Member, len(owners))
copy(newOwners, owners)
// Prepend
newOwners = append([]discovery.Member{member}, newOwners...)
part.SetOwners(newOwners)
r.log.V(2).Printf("[INFO] %s still have some data for PartID (kind: %s): %d", member, part.Kind(), partID)
}
// data structures in this function is guarded by routingMtx
for member, report := range reports {
for _, partID := range report.Partitions {
part := r.primary.PartitionByID(partID)
ensureOwnership(member, partID, part)
}
for _, partID := range report.Backups {
part := r.backup.PartitionByID(partID)
ensureOwnership(member, partID, part)
}
}
}