Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-4083 Add filter to MongoDB cloud converter #2785

Merged
merged 1 commit into from Oct 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,7 +34,7 @@ public class MongoDbRecordParser extends RecordParser {
COLLECTION);

public MongoDbRecordParser(Schema schema, Struct record) {
super(schema, record, Envelope.FieldName.AFTER, "patch");
super(schema, record, Envelope.FieldName.AFTER, "patch", "filter");
}

@Override
Expand Down
@@ -0,0 +1,82 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;

import static org.fest.assertions.Assertions.assertThat;

import java.util.List;

import org.apache.kafka.connect.source.SourceRecord;
import org.bson.Document;
import org.junit.Test;

import io.debezium.connector.mongodb.MongoDbConnectorConfig.SnapshotMode;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.util.Testing;

/**
* Test to verify MongoDB connector behaviour with CloudEvents converter for all streaming events.
*
* @author Jiri Pechanec
*/
public class CloudEventsConverterIT extends AbstractMongoConnectorIT {

@Test
public void testCorrectFormat() throws Exception {
Testing.Print.enable();
config = TestHelper.getConfiguration()
.edit()
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.c1")
.with(MongoDbConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.build();

context = new MongoDbTaskContext(config);

TestHelper.cleanDatabase(primary(), "dbA");

start(MongoDbConnector.class, config);
assertConnectorIsRunning();

// Wait for snapshot completion
waitForSnapshotToBeCompleted("mongodb", "mongo1");

List<Document> documentsToInsert = loadTestDocuments("restaurants1.json");
insertDocuments("dbA", "c1", documentsToInsert.toArray(new Document[0]));
Document updateObj = new Document()
.append("$set", new Document()
.append("name", "Closed"));
updateDocument("dbA", "c1", Document.parse("{\"restaurant_id\": \"30075445\"}"), updateObj);
deleteDocuments("dbA", "c1", Document.parse("{\"restaurant_id\": \"30075445\"}"));

// 6 INSERTs + 1 UPDATE + 1 DELETE
final int recCount = 8;
final SourceRecords records = consumeRecordsByTopic(recCount);
final List<SourceRecord> c1s = records.recordsForTopic("mongo1.dbA.c1");

assertThat(c1s).hasSize(recCount);

final List<SourceRecord> insertRecords = c1s.subList(0, 6);
final SourceRecord updateRecord = c1s.get(6);
final SourceRecord deleteRecord = c1s.get(7);

for (SourceRecord record : insertRecords) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mongodb", "mongo1", false);
}

CloudEventsConverterTest.shouldConvertToCloudEventsInJson(deleteRecord, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(deleteRecord, "filter", false);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(deleteRecord, "mongodb", "mongo1", false);

CloudEventsConverterTest.shouldConvertToCloudEventsInJson(updateRecord, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(updateRecord, "filter", false);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(updateRecord, "patch", false);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(updateRecord, "mongodb", "mongo1", false);

stopConnector();
}
}
Expand Up @@ -129,6 +129,10 @@ public static void shouldConvertToCloudEventsInJson(SourceRecord record, boolean
}

public static void shouldConvertToCloudEventsInJsonWithDataAsAvro(SourceRecord record, boolean hasTransaction) {
shouldConvertToCloudEventsInJsonWithDataAsAvro(record, "after", hasTransaction);
}

public static void shouldConvertToCloudEventsInJsonWithDataAsAvro(SourceRecord record, String fieldName, boolean hasTransaction) {
gunnarmorling marked this conversation as resolved.
Show resolved Hide resolved
Map<String, Object> config = new HashMap<>();
config.put("serializer.type", "json");
config.put("data.serializer.type", "avro");
Expand Down Expand Up @@ -205,7 +209,7 @@ public static void shouldConvertToCloudEventsInJsonWithDataAsAvro(SourceRecord r
avroConverter.configure(Collections.singletonMap("schema.registry.url", "http://fake-url"), false);
SchemaAndValue data = avroConverter.toConnectData(record.topic(), Base64.getDecoder().decode(dataJson.asText()));
assertThat(data.value()).isInstanceOf(Struct.class);
assertThat(((Struct) data.value()).get("after")).isNotNull();
assertThat(((Struct) data.value()).get(fieldName)).isNotNull();
}
catch (Throwable t) {
Testing.Print.enable();
Expand Down