-
Notifications
You must be signed in to change notification settings - Fork 3
/
command_registry.go
139 lines (118 loc) · 4.3 KB
/
command_registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Copyright 2021 Monoskope Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package eventsourcing
import (
"context"
"sync"
"github.com/finleap-connect/monoskope/pkg/eventsourcing/errors"
"github.com/finleap-connect/monoskope/pkg/logger"
"github.com/google/uuid"
"google.golang.org/protobuf/types/known/anypb"
)
type CommandRegistry interface {
CommandHandler
RegisterCommand(func(uuid.UUID) Command)
CreateCommand(id uuid.UUID, commandType CommandType, data *anypb.Any) (Command, error)
GetRegisteredCommandTypes() []CommandType
SetHandler(handler CommandHandler, commandType CommandType)
}
type commandRegistry struct {
log logger.Logger
mutex sync.RWMutex
commands map[CommandType]func(uuid.UUID) Command
handlers map[CommandType]CommandHandler
}
var DefaultCommandRegistry CommandRegistry
func init() {
DefaultCommandRegistry = NewCommandRegistry()
}
// newCommandRegistry creates a new command registry
func NewCommandRegistry() CommandRegistry {
return &commandRegistry{
log: logger.WithName("command-registry"),
commands: make(map[CommandType]func(uuid.UUID) Command),
handlers: make(map[CommandType]CommandHandler),
}
}
// GetRegisteredCommandTypes returns a list with all registered command types.
func (r *commandRegistry) GetRegisteredCommandTypes() []CommandType {
keys := make([]CommandType, 0, len(r.commands))
for k := range r.commands {
keys = append(keys, k)
}
return keys
}
// RegisterCommand registers an command factory for a type. The factory is
// used to create concrete command types.
//
// An example would be:
// RegisterCommand(func() Command { return &MyCommand{} })
func (r *commandRegistry) RegisterCommand(factory func(uuid.UUID) Command) {
cmd := factory(uuid.Nil)
if cmd == nil {
r.log.Info("factory does not create commands")
panic(errors.ErrFactoryInvalid)
}
commandType := cmd.CommandType()
if commandType.String() == "" {
r.log.Info("attempt to register empty command type")
panic(errors.ErrEmptyCommandType)
}
r.mutex.Lock()
defer r.mutex.Unlock()
if _, ok := r.commands[commandType]; ok {
r.log.Info("attempt to register command already registered", "commandType", commandType)
panic(errors.ErrCommandTypeAlreadyRegistered)
}
r.commands[commandType] = factory
r.log.Info("command has been registered.", "commandType", commandType)
}
// CreateCommand creates an command of a type with an ID using the factory
// registered with RegisterCommand.
func (r *commandRegistry) CreateCommand(id uuid.UUID, commandType CommandType, data *anypb.Any) (Command, error) {
r.mutex.RLock()
defer r.mutex.RUnlock()
if factory, ok := r.commands[commandType]; ok {
cmd := factory(id)
if data != nil {
if err := cmd.SetData(data); err != nil {
return nil, err
}
}
return cmd, nil
}
r.log.Info("trying to create a command of non-registered type", "commandType", commandType)
return nil, errors.ErrCommandNotRegistered
}
// HandleCommand handles a command with a handler capable of handling it.
func (r *commandRegistry) HandleCommand(ctx context.Context, cmd Command) (*CommandReply, error) {
r.mutex.RLock()
defer r.mutex.RUnlock()
if handler, ok := r.handlers[cmd.CommandType()]; ok {
return handler.HandleCommand(ctx, cmd)
}
r.log.Info("trying to handle a command of non-registered type", "commandType", cmd.CommandType())
return nil, errors.ErrHandlerNotFound
}
// SetHandler adds a handler for a specific command.
func (r *commandRegistry) SetHandler(handler CommandHandler, commandType CommandType) {
r.mutex.Lock()
defer r.mutex.Unlock()
if _, ok := r.handlers[commandType]; ok {
r.log.Info("attempt to register command handler already registered", "commandType", commandType)
panic(errors.ErrHandlerAlreadySet)
}
r.handlers[commandType] = handler
r.log.Info("command handler has been registered.", "commandType", commandType)
}