-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
KeyValueStore.java
288 lines (250 loc) · 9.55 KB
/
KeyValueStore.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.data;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.data.Envelope.Operation;
import io.debezium.relational.TableId;
/**
* A test utility for accumulating the {@link SourceRecord}s that represent change events on rows. This store applies the
* changes and maintains the current state of the rows.
*
* @author Randall Hauch
*/
public class KeyValueStore {
protected static Function<String, TableId> prefixedWith(String prefix) {
int index = prefix.length();
return topicName -> {
if (topicName.startsWith(prefix) && topicName.length() > index) {
return TableId.parse(topicName.substring(index));
}
return null;
};
}
protected static Function<String, TableId> fromRegex(String regex, int groupNumber) {
Pattern pattern = Pattern.compile(regex);
return topicName -> {
Matcher m = pattern.matcher(topicName);
String fullyQualified = m.group(groupNumber);
return fullyQualified != null ? TableId.parse(fullyQualified) : null;
};
}
/**
* Create a KeyValueStore that uses the supplied regular expression and group number to extract the {@link TableId} from
* the topic name.
*
* @param regex the regular expression that identifies the table ID within the topic name; may not be null
* @param groupNumber the group number in the regex for the table ID string
* @return the key value store
*/
public static KeyValueStore createForTopicsMatching(String regex, int groupNumber) {
return new KeyValueStore(fromRegex(regex, groupNumber));
}
/**
* Create a KeyValueStore that removes from the topic names the supplied prefix to obtain the {@link TableId}.
*
* @param prefix the prefix after which all of the topic name forms the table ID; may not be null
* @return the key value store
*/
public static KeyValueStore createForTopicsBeginningWith(String prefix) {
return new KeyValueStore(prefixedWith(prefix));
}
private final List<SourceRecord> sourceRecords = new ArrayList<>();
private final Map<TableId, Collection> collectionsByTableId = new HashMap<>();
private final Function<String, TableId> tableIdFromTopic;
protected KeyValueStore(Function<String, TableId> tableIdFromTopic) {
this.tableIdFromTopic = tableIdFromTopic;
}
public void add(SourceRecord record) {
TableId tableId = tableIdFromTopic.apply(record.topic());
if (tableId != null) {
this.sourceRecords.add(record);
getOrCreate(tableId).add(record);
}
}
public List<SourceRecord> sourceRecords() {
return sourceRecords;
}
public Collection collection(String fullyQualifiedName) {
return collection(TableId.parse(fullyQualifiedName));
}
public Collection collection(String catalog, String tableName) {
return collection(new TableId(catalog, null, tableName));
}
public Collection collection(TableId tableId) {
return collectionsByTableId.get(tableId);
}
public Set<TableId> collections() {
return Collections.unmodifiableSet(collectionsByTableId.keySet());
}
public Set<String> databases() {
return collectionsByTableId.keySet().stream().map(TableId::catalog).collect(Collectors.toSet());
}
public int collectionCount() {
return collectionsByTableId.size();
}
public Collection getOrCreate(TableId tableId) {
return collectionsByTableId.computeIfAbsent(tableId, Collection::new);
}
public static class Collection {
private final TableId id;
private Schema keySchema = null;
private Schema valueSchema = null;
private final List<Schema> keySchemas = new ArrayList<>();
private final List<Schema> valueSchemas = new ArrayList<>();
private final Map<Struct, SourceRecord> valuesByKey = new HashMap<>();
private final SourceRecordStats stats = new SourceRecordStats();
protected Collection(TableId id) {
this.id = id;
}
public TableId tableId() {
return id;
}
/**
* Get the number of changes to the key schema for events in this collection.
*
* @return the count; never negative
*/
public long numberOfKeySchemaChanges() {
return keySchemas.size();
}
/**
* Get the number of changes to the key schema for events in this collection.
*
* @return the count; never negative
*/
public long numberOfValueSchemaChanges() {
return valueSchemas.size();
}
/**
* Get the number of {@link Operation#CREATE CREATE} records {@link #add(SourceRecord) added} to this collection.
*
* @return the count; never negative
*/
public long numberOfCreates() {
return stats.numberOfCreates();
}
/**
* Get the number of {@link Operation#DELETE DELETE} records {@link #add(SourceRecord) added} to this collection.
*
* @return the count; never negative
*/
public long numberOfDeletes() {
return stats.numberOfDeletes();
}
/**
* Get the number of {@link Operation#READ READ} records {@link #add(SourceRecord) added} to this collection.
*
* @return the count; never negative
*/
public long numberOfReads() {
return stats.numberOfReads();
}
/**
* Get the number of {@link Operation#UPDATE UPDATE} records {@link #add(SourceRecord) added} to this collection.
*
* @return the count; never negative
*/
public long numberOfUpdates() {
return stats.numberOfUpdates();
}
/**
* Get the number of tombstone records that were {@link #add(SourceRecord) added} to this collection.
*
* @return the count; never negative
*/
public long numberOfTombstones() {
return stats.numberOfTombstones();
}
public int size() {
return valuesByKey.size();
}
public int keySchemaChanges() {
return keySchemas.size();
}
public int valueSchemaChanges() {
return valueSchemas.size();
}
public Struct value(Struct key) {
SourceRecord record = valuesByKey.get(key);
return record != null ? valueFor(record) : null;
}
public void forEach(Consumer<SourceRecord> consumer) {
valuesByKey.values().forEach(consumer);
}
protected void add(SourceRecord record) {
if (record != null) {
Struct key = keyFor(record);
if (key == null) {
// no PK information
return;
}
Struct value = valueFor(record);
if (value != null) {
Operation op = Envelope.operationFor(record);
if (op != null) {
switch (op) {
case READ:
case CREATE:
case UPDATE:
// Replace the existing value ...
valuesByKey.put((Struct) record.key(), record);
break;
case DELETE:
valuesByKey.remove(key);
break;
}
}
} else {
// This is a tombstone ...
valuesByKey.remove(key);
}
// Add the schemas if they are different ...
if (!record.keySchema().equals(keySchema)) {
keySchemas.add(record.keySchema());
keySchema = record.keySchema();
}
if (!record.valueSchema().equals(valueSchema)) {
valueSchemas.add(record.valueSchema());
valueSchema = record.valueSchema();
}
stats.accept(record);
}
}
}
protected static Struct keyFor(SourceRecord record) {
return (Struct) record.key();
}
protected static Struct valueFor(SourceRecord record) {
Struct envelope = (Struct) record.value();
Field afterField = envelope.schema().field(Envelope.FieldName.AFTER);
if (afterField != null) {
return envelope.getStruct(afterField.name());
}
return null;
}
protected static Struct sourceFor(SourceRecord record) {
Struct envelope = (Struct) record.value();
Field field = envelope.schema().field(Envelope.FieldName.SOURCE);
if (field != null) {
return envelope.getStruct(field.name());
}
return null;
}
}