/
bitmap.go
146 lines (126 loc) · 3.91 KB
/
bitmap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package pubsub
import (
"encoding/hex"
"fmt"
"strings"
"github.com/boljen/go-bitmap"
. "github.com/ForceCLI/force/lib"
)
var ErrUnexpectedType = fmt.Errorf("unexpected type")
// ProcessBitMap takes the bitmap-encoded list of changed fields and returns a list of field names.
// See Salesforce pub-sub docs for more details:
// https://developer.salesforce.com/docs/platform/pub-sub-api/guide/event-deserialization-considerations.html
func ProcessBitMap(avroSchema map[string]any, bitmapFields []string) ([]string, error) {
changedFieldNames := []string{}
if len(bitmapFields) == 0 {
return changedFieldNames, nil
}
// Replace top field level bitmap with list of fields
if strings.HasPrefix(bitmapFields[0], "0x") {
// Convert hex to bytes
hexStr := bitmapFields[0][2:]
bytes, err := hex.DecodeString(hexStr)
if err != nil {
return nil, fmt.Errorf("decode hex string '%s': %w", hexStr, err)
}
// Reverse to little-endian
reversedBytes := []byte{}
for i := len(bytes) - 1; i >= 0; i-- {
reversedBytes = append(reversedBytes, bytes[i])
}
bitMap := bitmap.Bitmap(reversedBytes)
schemaFieldNames, err := getSchemaFieldNames(avroSchema)
if err != nil {
return nil, fmt.Errorf("get schema field names: %w", err)
}
changedFieldNames = append(
changedFieldNames,
getFieldNamesFromBitString(bitMap, schemaFieldNames)...,
)
bitmapFields = bitmapFields[1:] // shift off the front
}
// There can be other bitmaps present in the message of the form:
// parentPos-childBitMap
// If we end up needing that, we can implement it here.
// Check the example repository for Python or Java examples (no Go examples exist yet).
return changedFieldNames, nil
}
func getSchemaFieldNames(schema map[string]any) ([]string, error) {
fields := []string{}
fieldsSlice, ok := schema["fields"].([]any)
if !ok {
return nil, ErrCouldNotConvertType(schema["fields"], []any{})
}
for _, fieldObj := range fieldsSlice {
field, ok := fieldObj.(map[string]any)
if !ok {
return nil, ErrCouldNotConvertType(fieldObj, map[string]any{})
}
asString, ok := field["name"].(string)
if !ok {
return nil, ErrCouldNotConvertType(field["name"], "")
}
fields = append(fields, asString)
}
return fields, nil
}
func ErrCouldNotConvertType(from any, to any) error {
return fmt.Errorf("could not convert %T to type %T: %w", from, to, ErrUnexpectedType)
}
func getFieldNamesFromBitString(
bitMap bitmap.Bitmap,
// A list of all of the schema field names
schemaFieldNames []string,
) []string {
// Find indices of "1" bits
oneIndices := []int{}
for i := 0; i < bitMap.Len(); i++ {
if bitMap.Get(i) {
oneIndices = append(oneIndices, i)
}
}
// And then pick the schema field names with those indices
changedFieldNames := []string{}
for _, index := range oneIndices {
changedFieldNames = append(changedFieldNames, schemaFieldNames[index])
}
return changedFieldNames
}
// Parse Change Data Capture events using the bitmaps in the ChangeEventHeader
// to return the changed fields
func parseBody(body map[string]any, schema map[string]any) map[string]any {
if body["ChangeEventHeader"] == nil {
return body
}
changeEventHeader, ok := body["ChangeEventHeader"].(map[string]any)
if !ok {
Log.Info("Unexpected ChangeEventHeader is not map")
return body
}
parsedBody := make(map[string]any)
parsedBody["ChangeEventHeader"] = changeEventHeader
for _, bf := range []string{"changedFields", "diffFields", "nulledFields"} {
bfs := changeEventHeader[bf].([]any)
var bitmaps []string
for _, i := range bfs {
bitmaps = append(bitmaps, i.(string))
}
fields, err := ProcessBitMap(schema, bitmaps)
if err != nil {
Log.Info(fmt.Sprintf("failed to process bitmap for %s", bf))
return body
}
for _, f := range fields {
if body[f] == nil {
parsedBody[f] = nil
break
}
values := body[f].(map[string]any)
for _, v := range values {
parsedBody[f] = v
break
}
}
}
return parsedBody
}