forked from debezium/debezium
/
RecordMakers.java
249 lines (231 loc) · 11.8 KB
/
RecordMakers.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.Document;
import org.bson.codecs.Encoder;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.DBCollection;
import com.mongodb.MongoClient;
import com.mongodb.util.JSONSerializers;
import com.mongodb.util.ObjectSerializer;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.mongodb.FieldSelector.FieldFilter;
import io.debezium.data.Envelope.FieldName;
import io.debezium.data.Envelope.Operation;
import io.debezium.data.Json;
import io.debezium.function.BlockingConsumer;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
/**
* A component that makes {@link SourceRecord}s for {@link CollectionId collections} and submits them to a consumer.
*
* @author Randall Hauch
*/
@ThreadSafe
public class RecordMakers {
private static final ObjectSerializer jsonSerializer = JSONSerializers.getStrict();
private static final Map<String, Operation> operationLiterals = new HashMap<>();
static {
operationLiterals.put("i", Operation.CREATE);
operationLiterals.put("u", Operation.UPDATE);
operationLiterals.put("d", Operation.DELETE);
}
private final Logger logger = LoggerFactory.getLogger(getClass());
private final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(logger);
private final Filters filters;
private final SourceInfo source;
private final TopicSelector<CollectionId> topicSelector;
private final Map<CollectionId, RecordsForCollection> recordMakerByCollectionId = new HashMap<>();
private final Function<Document, String> valueTransformer;
private final BlockingConsumer<SourceRecord> recorder;
private final boolean emitTombstonesOnDelete;
/**
* Create the record makers using the supplied components.
*
* @param filters the filter configuration; may not be null
* @param source the connector's source information; may not be null
* @param topicSelector the selector for topic names; may not be null
* @param recorder the potentially blocking consumer function to be called for each generated record; may not be null
*/
public RecordMakers(Filters filters, SourceInfo source, TopicSelector<CollectionId> topicSelector, BlockingConsumer<SourceRecord> recorder,
boolean emitTombstonesOnDelete) {
this.filters = filters;
this.source = source;
this.topicSelector = topicSelector;
JsonWriterSettings writerSettings = new JsonWriterSettings(JsonMode.STRICT, "", ""); // most compact JSON
Encoder<Document> encoder = MongoClient.getDefaultCodecRegistry().get(Document.class);
this.valueTransformer = (doc) -> doc.toJson(writerSettings, encoder);
this.recorder = recorder;
this.emitTombstonesOnDelete = emitTombstonesOnDelete;
}
/**
* Obtain the record maker for the given table, using the specified columns and sending records to the given consumer.
*
* @param collectionId the identifier of the collection for which records are to be produced; may not be null
* @return the table-specific record maker; may be null if the table is not included in the connector
*/
public RecordsForCollection forCollection(CollectionId collectionId) {
return recordMakerByCollectionId.computeIfAbsent(collectionId, id -> {
FieldFilter fieldFilter = filters.fieldFilterFor(collectionId);
String topicName = topicSelector.topicNameFor(collectionId);
return new RecordsForCollection(collectionId, fieldFilter, source, topicName, schemaNameAdjuster, valueTransformer, recorder, emitTombstonesOnDelete);
});
}
/**
* A record producer for a given collection.
*/
public static final class RecordsForCollection {
private final CollectionId collectionId;
private final String replicaSetName;
private final FieldFilter fieldFilter;
private final SourceInfo source;
private final Map<String, ?> sourcePartition;
private final String topicName;
private final Schema keySchema;
private final Schema valueSchema;
private final Function<Document, String> valueTransformer;
private final BlockingConsumer<SourceRecord> recorder;
private final boolean emitTombstonesOnDelete;
protected RecordsForCollection(CollectionId collectionId, FieldFilter fieldFilter, SourceInfo source, String topicName,
SchemaNameAdjuster adjuster, Function<Document, String> valueTransformer, BlockingConsumer<SourceRecord> recorder,
boolean emitTombstonesOnDelete) {
this.sourcePartition = source.partition(collectionId.replicaSetName());
this.collectionId = collectionId;
this.replicaSetName = this.collectionId.replicaSetName();
this.fieldFilter = fieldFilter;
this.source = source;
this.topicName = topicName;
this.keySchema = SchemaBuilder.struct()
.name(adjuster.adjust(topicName + ".Key"))
.field("id", Schema.STRING_SCHEMA)
.build();
this.valueSchema = SchemaBuilder.struct()
.name(adjuster.adjust(topicName + ".Envelope"))
.field(FieldName.AFTER, Json.builder().optional().build())
.field("patch", Json.builder().optional().build())
.field(FieldName.SOURCE, source.schema())
.field(FieldName.OPERATION, Schema.OPTIONAL_STRING_SCHEMA)
.field(FieldName.TIMESTAMP, Schema.OPTIONAL_INT64_SCHEMA)
.build();
this.valueTransformer = valueTransformer;
this.recorder = recorder;
this.emitTombstonesOnDelete = emitTombstonesOnDelete;
}
/**
* Get the identifier of the collection to which this producer applies.
*
* @return the collection ID; never null
*/
public CollectionId collectionId() {
return collectionId;
}
/**
* Generate and record one or more source records to describe the given object.
*
* @param id the identifier of the collection in which the document exists; may not be null
* @param object the document; may not be null
* @param timestamp the timestamp at which this operation is occurring
* @return the number of source records that were generated; will be 0 or more
* @throws InterruptedException if the calling thread was interrupted while waiting to submit a record to
* the blocking consumer
*/
public int recordObject(CollectionId id, Document object, long timestamp) throws InterruptedException {
final Struct sourceValue = source.lastOffsetStruct(replicaSetName, id);
final Map<String, ?> offset = source.lastOffset(replicaSetName);
String objId = idObjToJson(object);
assert objId != null;
return createRecords(sourceValue, offset, Operation.READ, objId, object, timestamp);
}
/**
* Generate and record one or more source records to describe the given event.
*
* @param oplogEvent the event; may not be null
* @param timestamp the timestamp at which this operation is occurring
* @return the number of source records that were generated; will be 0 or more
* @throws InterruptedException if the calling thread was interrupted while waiting to submit a record to
* the blocking consumer
*/
public int recordEvent(Document oplogEvent, long timestamp) throws InterruptedException {
final Struct sourceValue = source.offsetStructForEvent(replicaSetName, oplogEvent);
final Map<String, ?> offset = source.lastOffset(replicaSetName);
Document patchObj = oplogEvent.get("o", Document.class);
// Updates have an 'o2' field, since the updated object in 'o' might not have the ObjectID ...
Object o2 = oplogEvent.get("o2");
String objId = o2 != null ? idObjToJson(o2) : idObjToJson(patchObj);
assert objId != null;
Operation operation = operationLiterals.get(oplogEvent.getString("op"));
return createRecords(sourceValue, offset, operation, objId, patchObj, timestamp);
}
protected int createRecords(Struct source, Map<String, ?> offset, Operation operation, String objId, Document objectValue,
long timestamp)
throws InterruptedException {
Integer partition = null;
Struct key = keyFor(objId);
Struct value = new Struct(valueSchema);
switch (operation) {
case READ:
case CREATE:
// The object is the new document ...
String jsonStr = valueTransformer.apply(fieldFilter.apply(objectValue));
value.put(FieldName.AFTER, jsonStr);
break;
case UPDATE:
// The object is the idempotent patch document ...
String patchStr = valueTransformer.apply(fieldFilter.apply(objectValue));
value.put("patch", patchStr);
break;
case DELETE:
// The delete event has nothing of any use, other than the _id which we already have in our key.
// So put nothing in the 'after' or 'patch' fields ...
break;
}
value.put(FieldName.SOURCE, source);
value.put(FieldName.OPERATION, operation.code());
value.put(FieldName.TIMESTAMP, timestamp);
SourceRecord record = new SourceRecord(sourcePartition, offset, topicName, partition, keySchema, key, valueSchema, value);
recorder.accept(record);
if (operation == Operation.DELETE && emitTombstonesOnDelete) {
// Also generate a tombstone event ...
record = new SourceRecord(sourcePartition, offset, topicName, partition, keySchema, key, null, null);
recorder.accept(record);
return 2;
}
return 1;
}
protected String idObjToJson(Object idObj) {
if (idObj == null) {
return null;
}
if (!(idObj instanceof Document)) {
return jsonSerializer.serialize(idObj);
}
return jsonSerializer.serialize(
((Document) idObj).get(DBCollection.ID_FIELD_NAME)
);
}
protected Struct keyFor(String objId) {
return new Struct(keySchema).put("id", objId);
}
}
/**
* Clear all of the cached record makers. This should be done when the logs are rotated, since in that a different table
* numbering scheme will be used by all subsequent TABLE_MAP binlog events.
*/
public void clear() {
logger.debug("Clearing table converters");
recordMakerByCollectionId.clear();
}
}