-
Notifications
You must be signed in to change notification settings - Fork 467
/
metadata.go
62 lines (55 loc) · 1.61 KB
/
metadata.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package kubemq
import (
"fmt"
"strconv"
"strings"
"github.com/dapr/components-contrib/pubsub"
kitmd "github.com/dapr/kit/metadata"
)
type kubemqMetadata struct {
Address string `mapstructure:"address"`
internalHost string `mapstructure:"-"`
internalPort int `mapstructure:"-"`
ClientID string `mapstructure:"clientID"`
AuthToken string `mapstructure:"authToken"`
Group string `mapstructure:"group"`
IsStore bool `mapstructure:"store"`
DisableReDelivery bool `mapstructure:"disableReDelivery"`
}
func parseAddress(address string) (string, int, error) {
var host string
var port int
var err error
hostPort := strings.Split(address, ":")
if len(hostPort) != 2 {
return "", 0, fmt.Errorf("invalid kubeMQ address, address format is invalid")
}
host = hostPort[0]
if len(host) == 0 {
return "", 0, fmt.Errorf("invalid kubeMQ address, host is empty")
}
port, err = strconv.Atoi(hostPort[1])
if err != nil {
return "", 0, fmt.Errorf("invalid kubeMQ address, port is invalid")
}
return host, port, nil
}
// createMetadata creates a new instance from the pubsub metadata
func createMetadata(pubSubMetadata pubsub.Metadata) (*kubemqMetadata, error) {
result := &kubemqMetadata{
IsStore: true,
}
err := kitmd.DecodeMetadata(pubSubMetadata.Properties, result)
if err != nil {
return nil, err
}
if result.Address != "" {
result.internalHost, result.internalPort, err = parseAddress(result.Address)
if err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("invalid kubeMQ address, address is empty")
}
return result, nil
}