forked from apache/pulsar-client-go
/
message_metadata.go
122 lines (106 loc) · 3.43 KB
/
message_metadata.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package crypto
import (
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)
// MessageMetadataSupplier wrapper implementation around message metadata
type MessageMetadataSupplier interface {
// EncryptionKeys read all the encryption keys from the MessageMetadata
EncryptionKeys() []EncryptionKeyInfo
// UpsertEncryptionKey add new or update existing EncryptionKeys in to the MessageMetadata
UpsertEncryptionKey(EncryptionKeyInfo)
// EncryptionParam read the ecryption parameter from the MessageMetadata
EncryptionParam() []byte
// SetEncryptionParam set encryption parameter in to the MessageMetadata
SetEncryptionParam([]byte)
}
type MessageMetadata struct {
messageMetadata *pb.MessageMetadata
}
func NewMessageMetadataSupplier(messageMetadata *pb.MessageMetadata) MessageMetadataSupplier {
return &MessageMetadata{
messageMetadata: messageMetadata,
}
}
func (m *MessageMetadata) EncryptionKeys() []EncryptionKeyInfo {
if m.messageMetadata != nil {
encInfo := []EncryptionKeyInfo{}
for _, k := range m.messageMetadata.EncryptionKeys {
key := NewEncryptionKeyInfo(k.GetKey(), k.GetValue(), getKeyMetaMap(k.GetMetadata()))
encInfo = append(encInfo, *key)
}
return encInfo
}
return nil
}
func (m *MessageMetadata) UpsertEncryptionKey(keyInfo EncryptionKeyInfo) {
if m.messageMetadata != nil {
idx := m.encryptionKeyPresent(keyInfo)
newKey := &pb.EncryptionKeys{
Key: &keyInfo.name,
Value: keyInfo.Key(),
Metadata: getKeyMeta(keyInfo.Metadata()),
}
if idx >= 0 {
m.messageMetadata.EncryptionKeys[idx] = newKey
} else {
m.messageMetadata.EncryptionKeys = append(m.messageMetadata.EncryptionKeys, newKey)
}
}
}
func (m *MessageMetadata) EncryptionParam() []byte {
if m.messageMetadata != nil {
return m.messageMetadata.EncryptionParam
}
return nil
}
func (m *MessageMetadata) SetEncryptionParam(param []byte) {
if m.messageMetadata != nil {
m.messageMetadata.EncryptionParam = param
}
}
func (m *MessageMetadata) encryptionKeyPresent(keyInfo EncryptionKeyInfo) int {
if len(m.messageMetadata.EncryptionKeys) > 0 {
for idx, k := range m.messageMetadata.EncryptionKeys {
if k.GetKey() == keyInfo.Name() {
return idx
}
}
}
return -1
}
func getKeyMeta(metaMap map[string]string) []*pb.KeyValue {
if len(metaMap) > 0 {
var meta []*pb.KeyValue
for k, v := range metaMap {
meta = append(meta, &pb.KeyValue{Key: &k, Value: &v})
}
return meta
}
return nil
}
func getKeyMetaMap(keyValues []*pb.KeyValue) map[string]string {
if keyValues != nil {
meta := map[string]string{}
for _, kv := range keyValues {
meta[kv.GetKey()] = kv.GetValue()
}
return meta
}
return nil
}