Skip to content

Kafka Connect: MongoDataConverter throws on arrays of TIMESTAMP/DATE_TIME values (array.encoding=array) #16603

@wombatu-kun

Description

@wombatu-kun

Problem

MongoDataConverter (used by the MongoDebeziumTransform SMT) reads BSON array elements with the wrong accessors for TIMESTAMP and DATE_TIME when array.encoding=array (the default). In the private array-element convertFieldValue:

} else if (arrValue.getBsonType() == BsonType.DATE_TIME && valueType == BsonType.DATE_TIME) {
  Date temp = new Date(arrValue.asInt64().getValue());          // arrValue is a BsonDateTime, not BsonInt64
  ...
} else if (arrValue.getBsonType() == BsonType.TIMESTAMP && valueType == BsonType.TIMESTAMP) {
  Date temp = new Date(1000L * arrValue.asInt32().getValue());  // arrValue is a BsonTimestamp, not BsonInt32

BsonValue.asInt32()/asInt64() call throwIfInvalidType(...), which throws BsonInvalidOperationException when the actual BSON type is TIMESTAMP/DATE_TIME. The scalar (non-array) paths use the correct accessors (asTimestamp().getTime() and asDateTime().getValue()); only the array-element path is wrong.

Impact

A MongoDB document containing an array of timestamps or date-times fails to convert with BsonInvalidOperationException (surfaced as a Connect DataException), so the record cannot be processed. array.encoding=array is the default mode in MongoDebeziumTransform, and arrays of dates are common in document data.

Reproduction

These tests, added to TestMongoArrayConverter, fail on current code:

@Test
@SuppressWarnings("JavaUtilDate")
public void shouldConvertArrayOfTimestamps() {
  final MongoDataConverter converter = new MongoDataConverter(ArrayEncoding.ARRAY);
  final BsonDocument val =
      new BsonDocument()
          .append("_id", new BsonInt32(1))
          .append(
              "ts",
              new BsonArray(Arrays.asList(new BsonTimestamp(60, 1), new BsonTimestamp(120, 1))));

  final SchemaBuilder schemaBuilder = SchemaBuilder.struct().name("array");
  for (Entry<String, BsonValue> entry : val.entrySet()) {
    converter.addFieldSchema(entry, schemaBuilder);
  }
  final Schema finalSchema = schemaBuilder.build();
  final Struct struct = new Struct(finalSchema);
  for (Entry<String, BsonValue> entry : val.entrySet()) {
    converter.convertRecord(entry, finalSchema, struct);
  }

  // BsonTimestamp.getTime() returns the seconds component; the scalar path multiplies by 1000
  List<?> tsValues = (List<?>) struct.get("ts");
  assertThat(tsValues).hasSize(2);
  assertThat(tsValues.get(0)).isEqualTo(new Date(60_000L));
  assertThat(tsValues.get(1)).isEqualTo(new Date(120_000L));
}

@Test
@SuppressWarnings("JavaUtilDate")
public void shouldConvertArrayOfDateTimes() {
  final MongoDataConverter converter = new MongoDataConverter(ArrayEncoding.ARRAY);
  final BsonDocument val =
      new BsonDocument()
          .append("_id", new BsonInt32(1))
          .append(
              "dt",
              new BsonArray(Arrays.asList(new BsonDateTime(1_000L), new BsonDateTime(2_000L))));

  final SchemaBuilder schemaBuilder = SchemaBuilder.struct().name("array");
  for (Entry<String, BsonValue> entry : val.entrySet()) {
    converter.addFieldSchema(entry, schemaBuilder);
  }
  final Schema finalSchema = schemaBuilder.build();
  final Struct struct = new Struct(finalSchema);
  for (Entry<String, BsonValue> entry : val.entrySet()) {
    converter.convertRecord(entry, finalSchema, struct);
  }

  // BsonDateTime.getValue() is epoch millis; the scalar path uses it directly
  List<?> dtValues = (List<?>) struct.get("dt");
  assertThat(dtValues).hasSize(2);
  assertThat(dtValues.get(0)).isEqualTo(new Date(1_000L));
  assertThat(dtValues.get(1)).isEqualTo(new Date(2_000L));
}

Failures on current code:

shouldConvertArrayOfTimestamps
  -> BsonInvalidOperationException: Value expected to be of type INT32 is of unexpected type TIMESTAMP
     at MongoDataConverter.java:245 (asInt32)

shouldConvertArrayOfDateTimes
  -> BsonInvalidOperationException: Value expected to be of type INT64 is of unexpected type DATE_TIME
     at MongoDataConverter.java:239 (asInt64)

Fix

Use the same accessors as the scalar paths:

  • TIMESTAMP: new Date(1000L * arrValue.asTimestamp().getTime())
  • DATE_TIME: new Date(arrValue.asDateTime().getValue())

Note: this file lives under org.debezium.connector.mongodb.transforms and was adapted from Debezium; the fix intentionally diverges from the (buggy) upstream snapshot.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions