-
Notifications
You must be signed in to change notification settings - Fork 7
/
partition.go
118 lines (107 loc) · 3.25 KB
/
partition.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Copyright 2019-present Open Networking Foundation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gossip
import (
"context"
"github.com/atomix/atomix-api/go/atomix/primitive/meta"
"github.com/atomix/atomix-go-framework/pkg/atomix/cluster"
"github.com/atomix/atomix-go-framework/pkg/atomix/errors"
"sync"
)
// NewPartition creates a new proxy partition
func NewPartition(p cluster.Partition, registry *Registry) *Partition {
return &Partition{
ID: PartitionID(p.ID()),
Partition: p,
registry: registry,
services: make(map[ServiceId]Service),
replicas: make(map[ServiceId]Replica),
}
}
// PartitionID is a partition identifier
type PartitionID int
// Partition is a proxy partition
type Partition struct {
cluster.Partition
registry *Registry
ID PartitionID
services map[ServiceId]Service
servicesMu sync.RWMutex
replicas map[ServiceId]Replica
replicasMu sync.RWMutex
}
func (p *Partition) GetService(ctx context.Context, serviceID ServiceId, timestamp *meta.Timestamp) (Service, error) {
p.servicesMu.RLock()
service, ok := p.services[serviceID]
p.servicesMu.RUnlock()
if ok {
return service, nil
}
p.servicesMu.Lock()
defer p.servicesMu.Unlock()
service, ok = p.services[serviceID]
if ok {
return service, nil
}
f, err := p.registry.GetServiceFunc(serviceID.Type)
if err != nil {
return nil, err
}
service, err = f(ctx, serviceID, p, getClockFromTimestamp(*timestamp), getReplicationFactorFromContext(ctx))
if err != nil {
return nil, err
}
p.services[serviceID] = service
return service, nil
}
func (p *Partition) RegisterReplica(replica Replica) error {
p.replicasMu.Lock()
defer p.replicasMu.Unlock()
if _, ok := p.replicas[replica.ID()]; ok {
return errors.NewAlreadyExists("replica '%s' already exists", replica.ID())
}
p.replicas[replica.ID()] = replica
return nil
}
func (p *Partition) getReplica(ctx context.Context, serviceID ServiceId, timestamp *meta.Timestamp) (Replica, error) {
p.replicasMu.RLock()
replica, ok := p.replicas[serviceID]
p.replicasMu.RUnlock()
if !ok {
_, err := p.GetService(ctx, serviceID, timestamp)
if err != nil {
return nil, err
}
p.replicasMu.RLock()
replica, ok = p.replicas[serviceID]
p.replicasMu.RUnlock()
if !ok {
return nil, errors.NewNotFound("replica '%s' not found", serviceID)
}
}
if replica.ID().Type != serviceID.Type {
return nil, errors.NewConflict("replica '%s' already exists with a different type '%s'", serviceID, replica.ID().Type)
}
return replica, nil
}
func (p *Partition) deleteReplica(serviceID ServiceId) error {
p.servicesMu.Lock()
defer p.servicesMu.Unlock()
_, ok := p.services[serviceID]
if !ok {
return errors.NewNotFound("service '%s' not found", serviceID)
}
delete(p.services, serviceID)
return nil
}