Skip to content

Commit

Permalink
馃悰 Source Mongo Internal POC: Use sampling for schema discovery (#29607)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Aug 21, 2023
1 parent 26bc2a8 commit 5393943
Showing 1 changed file with 8 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

package io.airbyte.integrations.source.mongodb.internal;

import static io.airbyte.integrations.source.mongodb.internal.MongoCatalogHelper.DEFAULT_CURSOR_FIELD;

import com.mongodb.MongoCommandException;
import com.mongodb.MongoException;
import com.mongodb.MongoSecurityException;
Expand All @@ -23,7 +21,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.bson.Document;
Expand All @@ -32,10 +29,12 @@
public class MongoUtil {

/**
* The maximum number of documents to retrieve when attempting to discover the unique keys/types for
* a collection.
* The maximum number of documents to sample when attempting to discover the unique keys/types for a
* collection. Inspired by the
* <a href="https://www.mongodb.com/docs/compass/current/sampling/#sampling-method">sampling method
* utilized by the MongoDB Compass client</a>.
*/
private static final Integer DISCOVERY_LIMIT = 100;
private static final Integer DISCOVERY_SAMPLE_SIZE = 1000;

/**
* Set of collection prefixes that should be ignored when performing operations, such as discover to
Expand Down Expand Up @@ -97,8 +96,7 @@ public static List<AirbyteStream> getAirbyteStreams(final MongoClient mongoClien
*/
final Set<Field> discoveredFields = new HashSet<>();
final MongoCollection<Document> mongoCollection = mongoClient.getDatabase(databaseName).getCollection(collectionName);
discoveredFields.addAll(getFieldsInCollection(mongoCollection, Optional.empty()));
discoveredFields.addAll(getFieldsInCollection(mongoCollection, Optional.of(DEFAULT_CURSOR_FIELD)));
discoveredFields.addAll(getFieldsInCollection(mongoCollection));
return createAirbyteStream(collectionName, databaseName, new ArrayList<>(discoveredFields));
}).collect(Collectors.toList());
}
Expand All @@ -107,7 +105,7 @@ private static AirbyteStream createAirbyteStream(final String collectionName, fi
return MongoCatalogHelper.buildAirbyteStream(collectionName, databaseName, fields);
}

private static Set<Field> getFieldsInCollection(final MongoCollection collection, final Optional<String> sortField) {
private static Set<Field> getFieldsInCollection(final MongoCollection collection) {
final Set<Field> discoveredFields = new HashSet<>();
final Map<String, Object> fieldsMap = Map.of("input", Map.of("$objectToArray", "$$ROOT"),
"as", "each",
Expand All @@ -121,8 +119,7 @@ private static Set<Field> getFieldsInCollection(final MongoCollection collection
groupMap.put("fields", Map.of("$addToSet", "$fields"));

final List<Bson> aggregateList = new ArrayList<>();
aggregateList.add(Aggregates.limit(DISCOVERY_LIMIT));
sortField.ifPresent(s -> aggregateList.add(Aggregates.sort(new Document(s, -1))));
aggregateList.add(Aggregates.sample(DISCOVERY_SAMPLE_SIZE));
aggregateList.add(Aggregates.project(new Document("fields", arrayToObjectAggregation)));
aggregateList.add(Aggregates.unwind("$fields"));
aggregateList.add(new Document("$group", groupMap));
Expand Down

0 comments on commit 5393943

Please sign in to comment.