Skip to content

Commit

Permalink
Fix for dbref handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Krasnov (nikr0614) authored and Nikita Krasnov (nikr0614) committed Sep 19, 2017
1 parent 79fefa3 commit 8dd12d7
Showing 1 changed file with 30 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
*/
package io.debezium.connector.mongodb;

import static java.util.Arrays.asList;

import com.mongodb.DBObjectCodecProvider;
import com.mongodb.DBRefCodecProvider;
import com.mongodb.client.gridfs.codecs.GridFSFileCodecProvider;
import com.mongodb.client.model.geojson.codecs.GeoJsonCodecProvider;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
Expand All @@ -14,6 +21,13 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.Document;
import org.bson.codecs.BsonTypeClassMap;
import org.bson.codecs.BsonValueCodecProvider;
import org.bson.codecs.DocumentCodec;
import org.bson.codecs.DocumentCodecProvider;
import org.bson.codecs.ValueCodecProvider;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;
import org.bson.types.ObjectId;
Expand Down Expand Up @@ -92,6 +106,17 @@ public static final class RecordsForCollection {
private final Function<Document, String> valueTransformer;
private final BlockingConsumer<SourceRecord> recorder;

private final CodecRegistry DEFAULT_REGISTRY = CodecRegistries.fromProviders(
asList(new ValueCodecProvider(),
new BsonValueCodecProvider(),
new DocumentCodecProvider(),
new DBRefCodecProvider(),
new DBObjectCodecProvider(),
new BsonValueCodecProvider(),
new GeoJsonCodecProvider(),
new GridFSFileCodecProvider()));
private final BsonTypeClassMap DEFAULT_BSON_TYPE_CLASS_MAP = new BsonTypeClassMap();

protected RecordsForCollection(CollectionId collectionId, SourceInfo source, String topicName, AvroValidator validator,
Function<Document, String> valueTransformer, BlockingConsumer<SourceRecord> recorder) {
this.sourcePartition = source.partition(collectionId.replicaSetName());
Expand All @@ -112,7 +137,11 @@ protected RecordsForCollection(CollectionId collectionId, SourceInfo source, Str
.field(FieldName.TIMESTAMP, Schema.OPTIONAL_INT64_SCHEMA)
.build();
JsonWriterSettings writerSettings = new JsonWriterSettings(JsonMode.STRICT, "", ""); // most compact JSON
this.valueTransformer = (doc) -> doc.toJson(writerSettings);
DocumentCodec documentCodec = new DocumentCodec(
DEFAULT_REGISTRY,
DEFAULT_BSON_TYPE_CLASS_MAP
);
this.valueTransformer = (doc) -> doc.toJson(writerSettings, documentCodec);
this.recorder = recorder;
}

Expand Down

0 comments on commit 8dd12d7

Please sign in to comment.