Skip to content

Commit

Permalink
Add support for binary _id field in source-mongodb (#38103)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich committed May 10, 2024
1 parent 3525225 commit 82961ad
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.3.12
dockerImageTag: 1.3.13
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.source.mongodb;

import static io.airbyte.integrations.source.mongodb.state.IdType.idToStringRepresenation;
import static io.airbyte.integrations.source.mongodb.state.IdType.parseBinaryIdString;
import static io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus.IN_PROGRESS;

import com.google.common.collect.AbstractIterator;
Expand All @@ -16,12 +18,7 @@
import io.airbyte.integrations.source.mongodb.state.IdType;
import io.airbyte.integrations.source.mongodb.state.MongoDbStreamState;
import java.util.Optional;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonObjectId;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.*;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
Expand Down Expand Up @@ -85,7 +82,8 @@ protected Document computeNext() {
private Optional<MongoDbStreamState> getCurrentState(Object currentId) {
final var idType = IdType.findByJavaType(currentId.getClass().getSimpleName())
.orElseThrow(() -> new ConfigErrorException("Unsupported _id type " + currentId.getClass().getSimpleName()));
final var state = new MongoDbStreamState(currentId.toString(),

final var state = new MongoDbStreamState(idToStringRepresenation(currentId, idType),
IN_PROGRESS,
idType);
return Optional.of(state);
Expand Down Expand Up @@ -130,6 +128,7 @@ private Bson buildFilter() {
case OBJECT_ID -> new BsonObjectId(new ObjectId(state.id()));
case INT -> new BsonInt32(Integer.parseInt(state.id()));
case LONG -> new BsonInt64(Long.parseLong(state.id()));
case BINARY -> parseBinaryIdString(state.id());
}))
// if nothing was found, return a new BsonDocument
.orElseGet(BsonDocument::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,27 @@

package io.airbyte.integrations.source.mongodb.state;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import static java.util.Base64.getEncoder;

import java.util.*;
import java.util.stream.Collectors;
import org.bson.types.ObjectId;
import org.bson.BsonBinary;
import org.bson.BsonBinarySubType;
import org.bson.UuidRepresentation;
import org.bson.internal.UuidHelper;
import org.bson.types.Binary;

/**
* _id field types that are currently supported, potential types are defined <a href=
* "https://www.mongodb.com/docs/manual/reference/operator/query/type/#std-label-document-type-available-types">here</a>
*/
public enum IdType {

OBJECT_ID("objectId", "ObjectId", ObjectId::new),
STRING("string", "String", s -> s),
INT("int", "Integer", Integer::valueOf),
LONG("long", "Long", Long::valueOf);
OBJECT_ID("objectId", "ObjectId"),
STRING("string", "String"),
INT("int", "Integer"),
LONG("long", "Long"),
BINARY("binData", "Binary");

private static final Map<String, IdType> byBsonType = new HashMap<>();
static {
Expand Down Expand Up @@ -49,17 +52,10 @@ public enum IdType {
private final String bsonType;
/** Java class name type */
private final String javaType;
/** Converter for converting a string value into an appropriate MongoDb type. */
private final Function<String, Object> converter;

IdType(final String bsonType, final String javaType, final Function<String, Object> converter) {
IdType(final String bsonType, final String javaType) {
this.bsonType = bsonType;
this.javaType = javaType;
this.converter = converter;
}

public Object convert(final String t) {
return converter.apply(t);
}

public static Optional<IdType> findByBsonType(final String bsonType) {
Expand All @@ -76,4 +72,44 @@ public static Optional<IdType> findByJavaType(final String javaType) {
return Optional.ofNullable(byJavaType.get(javaType.toLowerCase()));
}

/**
* Convers a collection id to a string representation for use in a saved state. Most types will be
* converted to a string, except for Binary types which will be converted to a Base64 encoded
* string. and UUIDs which will be converted to a human-readable string.
*
* @param id an _id field value
* @param idType the type of the _id field
* @return a string representation of the _id field
*/
public static String idToStringRepresenation(final Object id, final IdType idType) {
final String strId;
if (idType == IdType.BINARY) {
final var binLastId = (Binary) id;
if (binLastId.getType() == BsonBinarySubType.UUID_STANDARD.getValue()) {
strId = UuidHelper.decodeBinaryToUuid(binLastId.getData(), binLastId.getType(), UuidRepresentation.STANDARD).toString();
} else {
strId = getEncoder().encodeToString(binLastId.getData());
}
} else {
strId = id.toString();
}

return strId;
}

/**
* Parse a string representation of a binary _id field into a BsonBinary object. The string can be a
* UUID or a Base64 encoded string.
*
* @param id a string representation of an _id field
* @return a BsonBinary object
*/
public static BsonBinary parseBinaryIdString(final String id) {
try {
return new BsonBinary(UUID.fromString(id));
} catch (final IllegalArgumentException ex) {
return new BsonBinary(Base64.getDecoder().decode(id));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.source.mongodb.state;

import static io.airbyte.integrations.source.mongodb.state.IdType.idToStringRepresenation;
import static io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus.FULL_REFRESH;
import static io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus.IN_PROGRESS;
import static io.airbyte.protocol.models.v0.SyncMode.INCREMENTAL;
Expand All @@ -23,11 +24,7 @@
import io.airbyte.protocol.models.v0.*;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;
import org.bson.Document;
import org.slf4j.Logger;
Expand Down Expand Up @@ -302,7 +299,7 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
final var finalStateStatus = InitialSnapshotStatus.COMPLETE;
final var idType = IdType.findByJavaType(lastId.getClass().getSimpleName())
.orElseThrow(() -> new ConfigErrorException("Unsupported _id type " + lastId.getClass().getSimpleName()));
final var state = new MongoDbStreamState(lastId.toString(), finalStateStatus, idType);
final var state = new MongoDbStreamState(idToStringRepresenation(lastId, idType), finalStateStatus, idType);

updateStreamState(stream.getStream().getName(), stream.getStream().getNamespace(), state);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,4 @@

package io.airbyte.integrations.source.mongodb.state;

public record MongoDbStreamState(String id, InitialSnapshotStatus status, IdType idType) {

/**
* Takes a value converting it to the appropriate MongoDb type based on the IdType of this record.
*
* @param value the value to convert
* @return a converted value.
*/
public Object idTypeAsMongoDbType(final String value) {
return idType.convert(value);
}

}
public record MongoDbStreamState(String id, InitialSnapshotStatus status, IdType idType) {}

0 comments on commit 82961ad

Please sign in to comment.