From 1c1aded123000a9025458b636054ecabac5de71d Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Fri, 8 Oct 2021 11:42:44 +0200 Subject: [PATCH] DBZ-4083 Add filter to MongoDB cloud converter --- .../converters/MongoDbRecordParser.java | 2 +- .../mongodb/CloudEventsConverterIT.java | 82 +++++++++++++++++++ .../converters/CloudEventsConverterTest.java | 6 +- 3 files changed, 88 insertions(+), 2 deletions(-) create mode 100644 debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/CloudEventsConverterIT.java diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/converters/MongoDbRecordParser.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/converters/MongoDbRecordParser.java index 613a23559cf..3aa78811774 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/converters/MongoDbRecordParser.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/converters/MongoDbRecordParser.java @@ -34,7 +34,7 @@ public class MongoDbRecordParser extends RecordParser { COLLECTION); public MongoDbRecordParser(Schema schema, Struct record) { - super(schema, record, Envelope.FieldName.AFTER, "patch"); + super(schema, record, Envelope.FieldName.AFTER, "patch", "filter"); } @Override diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/CloudEventsConverterIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/CloudEventsConverterIT.java new file mode 100644 index 00000000000..46bf0270dcf --- /dev/null +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/CloudEventsConverterIT.java @@ -0,0 +1,82 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb; + +import static org.fest.assertions.Assertions.assertThat; + +import java.util.List; + +import org.apache.kafka.connect.source.SourceRecord; +import org.bson.Document; +import org.junit.Test; + +import io.debezium.connector.mongodb.MongoDbConnectorConfig.SnapshotMode; +import io.debezium.converters.CloudEventsConverterTest; +import io.debezium.util.Testing; + +/** + * Test to verify MongoDB connector behaviour with CloudEvents converter for all streaming events. + * + * @author Jiri Pechanec + */ +public class CloudEventsConverterIT extends AbstractMongoConnectorIT { + + @Test + public void testCorrectFormat() throws Exception { + Testing.Print.enable(); + config = TestHelper.getConfiguration() + .edit() + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.c1") + .with(MongoDbConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .build(); + + context = new MongoDbTaskContext(config); + + TestHelper.cleanDatabase(primary(), "dbA"); + + start(MongoDbConnector.class, config); + assertConnectorIsRunning(); + + // Wait for snapshot completion + waitForSnapshotToBeCompleted("mongodb", "mongo1"); + + List documentsToInsert = loadTestDocuments("restaurants1.json"); + insertDocuments("dbA", "c1", documentsToInsert.toArray(new Document[0])); + Document updateObj = new Document() + .append("$set", new Document() + .append("name", "Closed")); + updateDocument("dbA", "c1", Document.parse("{\"restaurant_id\": \"30075445\"}"), updateObj); + deleteDocuments("dbA", "c1", Document.parse("{\"restaurant_id\": \"30075445\"}")); + + // 6 INSERTs + 1 UPDATE + 1 DELETE + final int recCount = 8; + final SourceRecords records = consumeRecordsByTopic(recCount); + final List c1s = records.recordsForTopic("mongo1.dbA.c1"); + + assertThat(c1s).hasSize(recCount); + + final List insertRecords = c1s.subList(0, 6); + final SourceRecord updateRecord = c1s.get(6); + final SourceRecord deleteRecord = c1s.get(7); + + for (SourceRecord record : insertRecords) { + CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record, false); + CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record, false); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mongodb", "mongo1", false); + } + + CloudEventsConverterTest.shouldConvertToCloudEventsInJson(deleteRecord, false); + CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(deleteRecord, "filter", false); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(deleteRecord, "mongodb", "mongo1", false); + + CloudEventsConverterTest.shouldConvertToCloudEventsInJson(updateRecord, false); + CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(updateRecord, "filter", false); + CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(updateRecord, "patch", false); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(updateRecord, "mongodb", "mongo1", false); + + stopConnector(); + } +} diff --git a/debezium-core/src/test/java/io/debezium/converters/CloudEventsConverterTest.java b/debezium-core/src/test/java/io/debezium/converters/CloudEventsConverterTest.java index 01126df27fe..d2470b54cd7 100644 --- a/debezium-core/src/test/java/io/debezium/converters/CloudEventsConverterTest.java +++ b/debezium-core/src/test/java/io/debezium/converters/CloudEventsConverterTest.java @@ -129,6 +129,10 @@ public static void shouldConvertToCloudEventsInJson(SourceRecord record, boolean } public static void shouldConvertToCloudEventsInJsonWithDataAsAvro(SourceRecord record, boolean hasTransaction) { + shouldConvertToCloudEventsInJsonWithDataAsAvro(record, "after", hasTransaction); + } + + public static void shouldConvertToCloudEventsInJsonWithDataAsAvro(SourceRecord record, String fieldName, boolean hasTransaction) { Map config = new HashMap<>(); config.put("serializer.type", "json"); config.put("data.serializer.type", "avro"); @@ -205,7 +209,7 @@ public static void shouldConvertToCloudEventsInJsonWithDataAsAvro(SourceRecord r avroConverter.configure(Collections.singletonMap("schema.registry.url", "http://fake-url"), false); SchemaAndValue data = avroConverter.toConnectData(record.topic(), Base64.getDecoder().decode(dataJson.asText())); assertThat(data.value()).isInstanceOf(Struct.class); - assertThat(((Struct) data.value()).get("after")).isNotNull(); + assertThat(((Struct) data.value()).get(fieldName)).isNotNull(); } catch (Throwable t) { Testing.Print.enable();