Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an option to force reading from Mongo with only one processor #24198

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -50,6 +50,7 @@

import java.util.ArrayList;

import static com.hazelcast.internal.nio.IOUtil.closeResource;
import static com.hazelcast.jet.mongodb.impl.Mappers.defaultCodecRegistry;
import static com.hazelcast.test.DockerTestUtil.assumeDockerEnabled;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -101,8 +102,8 @@ public static void beforeClass() {

@AfterClass
public static void afterClass() {
mongoClient.close();
mongoContainer.stop();
closeResource(mongoClient);
closeResource(mongoContainer);
}

@Before
Expand Down
Expand Up @@ -95,10 +95,23 @@ public class ReadMongoP<I> extends AbstractProcessor {
private boolean snapshotInProgress;
private final MongoChunkedReader reader;
private final MongoConnection connection;
/**
* Means that user requested the query to be executed in non-distributed way.
* This property set to true should mean that
* {@link com.hazelcast.jet.core.ProcessorMetaSupplier#forceTotalParallelismOne} was used.
*/
private final boolean nonDistributed;

private Traverser<?> traverser;
private Traverser<Entry<BroadcastKey<Integer>, Object>> snapshotTraverser;

/**
* Parallelization of reading is possible only when
* user didn't mark the processor as nonDistributed
* and when totalParallelism is higher than 1.
*/
private boolean canParallelize;

public ReadMongoP(ReadMongoParams<I> params) {
if (params.isStream()) {
EventTimeMapper<I> eventTimeMapper = new EventTimeMapper<>(params.eventTimePolicy);
Expand All @@ -111,13 +124,15 @@ public ReadMongoP(ReadMongoParams<I> params) {
}
this.connection = new MongoConnection(params.clientSupplier, params.dataLinkRef, client -> reader.connect(client,
snapshotsEnabled));
this.nonDistributed = params.isNonDistributed();
this.throwOnNonExisting = params.isThrowOnNonExisting();
}

@Override
protected void init(@Nonnull Context context) {
logger = context.logger();
totalParallelism = context.totalParallelism();
canParallelize = !nonDistributed && totalParallelism > 1;
processorIndex = context.globalProcessorIndex();
this.snapshotsEnabled = context.snapshottingEnabled();

Expand Down Expand Up @@ -309,7 +324,7 @@ void onConnect(MongoClient mongoClient, boolean supportsSnapshots) {
if (supportsSnapshots && lastKey != null) {
aggregateList.add(match(gt("_id", lastKey)).toBsonDocument());
}
if (totalParallelism > 1) {
if (canParallelize) {
aggregateList.addAll(0, partitionAggregate(totalParallelism, processorIndex, false));
}
if (collection != null) {
Expand Down Expand Up @@ -412,7 +427,7 @@ private StreamMongoReader(
@Override
public void onConnect(MongoClient mongoClient, boolean snapshotsEnabled) {
List<Bson> aggregateList = new ArrayList<>(aggregates);
if (totalParallelism > 1) {
if (canParallelize) {
aggregateList.addAll(0, partitionAggregate(totalParallelism, processorIndex, true));
}
ChangeStreamIterable<Document> changeStream;
Expand Down
Expand Up @@ -49,6 +49,7 @@ public class ReadMongoParams<I> implements Serializable {
Long startAtTimestamp;
EventTimePolicy<? super I> eventTimePolicy;
BiFunctionEx<ChangeStreamDocument<Document>, Long, I> mapStreamFn;
boolean nonDistributed;
boolean throwOnNonExisting = true;
private List<Document> aggregates = new ArrayList<>();

Expand Down Expand Up @@ -180,4 +181,13 @@ public ReadMongoParams<I> setThrowOnNonExisting(boolean throwOnNonExisting) {
this.throwOnNonExisting = throwOnNonExisting;
return this;
}

public ReadMongoParams<I> setNonDistributed(boolean nonDistributed) {
this.nonDistributed = nonDistributed;
return this;
}

public boolean isNonDistributed() {
return nonDistributed;
}
}
Expand Up @@ -99,19 +99,21 @@ public void createDefaultCollection() {

@After
public void clear() {
try (MongoClient mongoClient = MongoClients.create(mongoContainer.getConnectionString())) {
for (String databaseName : mongoClient.listDatabaseNames()) {
if (databaseName.startsWith("test")) {
MongoDatabase database = mongoClient.getDatabase(databaseName);
database.drop();
if (mongoContainer != null) {
try (MongoClient mongoClient = MongoClients.create(mongoContainer.getConnectionString())) {
for (String databaseName : mongoClient.listDatabaseNames()) {
if (databaseName.startsWith("test")) {
MongoDatabase database = mongoClient.getDatabase(databaseName);
database.drop();
}
}
List<String> allowedDatabasesLeft = asList("admin", "local", "config", "tech");
assertTrueEventually(() -> {
ArrayList<String> databasesLeft = mongoClient.listDatabaseNames().into(new ArrayList<>());
assertEquals(allowedDatabasesLeft.size(), databasesLeft.size());
assertContainsAll(databasesLeft, allowedDatabasesLeft);
});
}
List<String> allowedDatabasesLeft = asList("admin", "local", "config", "tech");
assertTrueEventually(() -> {
ArrayList<String> databasesLeft = mongoClient.listDatabaseNames().into(new ArrayList<>());
assertEquals(allowedDatabasesLeft.size(), databasesLeft.size());
assertContainsAll(databasesLeft, allowedDatabasesLeft);
});
}
}

Expand Down
Expand Up @@ -18,6 +18,8 @@
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.sql.impl.connector.HazelcastRexNode;
import com.hazelcast.jet.sql.impl.connector.SqlConnector;
Expand Down Expand Up @@ -147,13 +149,13 @@ public Vertex fullScanReader(
: allFieldsExternalNames(table);
boolean needTwoSteps = !filter.allProceeded || !projections.allProceeded;

SelectProcessorSupplier supplier;
ProcessorMetaSupplier supplier;
if (isStream()) {
BsonTimestamp startAt = Options.startAt(table.getOptions());
supplier = new SelectProcessorSupplier(table, filter.result, projectionList, startAt,
eventTimePolicyProvider);
supplier = wrap(context, new SelectProcessorSupplier(table, filter.result, projectionList, startAt,
eventTimePolicyProvider));
} else {
supplier = new SelectProcessorSupplier(table, filter.result, projectionList);
supplier = wrap(context, new SelectProcessorSupplier(table, filter.result, projectionList));
}

DAG dag = context.getDag();
Expand All @@ -180,6 +182,13 @@ public Vertex fullScanReader(
return sourceVertex;
}

protected static ProcessorMetaSupplier wrap(DagBuildContext ctx, ProcessorSupplier supplier) {
MongoTable table = ctx.getTable();
return table.isForceMongoParallelismOne()
? ProcessorMetaSupplier.forceTotalParallelismOne(supplier)
: ProcessorMetaSupplier.of(supplier);
}

private static TranslationResult<Document> translateFilter(HazelcastRexNode filterNode, RexToMongoVisitor visitor) {
try {
if (filterNode == null) {
Expand Down
Expand Up @@ -33,6 +33,8 @@
import java.util.Objects;

import static com.google.common.base.Preconditions.checkState;
import static com.hazelcast.jet.sql.impl.connector.mongodb.Options.FORCE_PARALLELISM_ONE;
import static java.lang.Boolean.parseBoolean;
import static java.util.stream.Collectors.toList;

class MongoTable extends JetTable {
Expand All @@ -49,6 +51,7 @@ class MongoTable extends JetTable {
private final String[] externalNames;
private final QueryDataType[] fieldTypes;
private final BsonType[] fieldExternalTypes;
private final boolean forceMongoParallelismOne;

MongoTable(
@Nonnull String schemaName,
Expand Down Expand Up @@ -77,6 +80,8 @@ class MongoTable extends JetTable {
this.fieldExternalTypes = getFields().stream()
.map(field -> ((MongoTableField) field).externalType)
.toArray(BsonType[]::new);

this.forceMongoParallelismOne = parseBoolean(options.getOrDefault(FORCE_PARALLELISM_ONE, "false"));
}

public MongoTableField getField(String name) {
Expand Down Expand Up @@ -172,6 +177,10 @@ QueryDataType[] resolveColumnTypes(@Nonnull List<String> requestedProjection) {
return types;
}

public boolean isForceMongoParallelismOne() {
return forceMongoParallelismOne;
}

@Override
public String toString() {
return "MongoTable{" +
Expand All @@ -180,6 +189,7 @@ public String toString() {
", connectionString='" + connectionString + '\'' +
", options=" + options +
", streaming=" + streaming +
", forceMongoParallelismOne=" + forceMongoParallelismOne +
'}';
}

Expand Down
Expand Up @@ -31,11 +31,58 @@

final class Options {

/**
* Option which is used to get data link's name. Must be non-empty (if set) and contain
* a name of previously configured {@link MongoDataLink}.
*
* <p>Not mandatory if {@link #CONNECTION_STRING_OPTION} is set.</p>
*/
static final String DATA_LINK_REF_OPTION = "data-link-name";
/**
* A valid MongoDB connectionString.
* Must be non-empty (if set).
* <p>Not mandatory if {@link #DATA_LINK_REF_OPTION} is set.</p>
*/
static final String CONNECTION_STRING_OPTION = "connectionString";

/**
* The name of the database from which the collection should be read.
*/
static final String DATABASE_NAME_OPTION = "database";

/**
* Option for streaming source only and mandatory for them.
* <p>
* Indicates a moment from which the oplog (changeStream) will be read. Possible values:
* <ul>
* <li>string 'now' - sets current time (of submitting the mapping) as the start time</li>
* <li>numeric value that means milliseconds of unix epoch</li>
* <li>ISO-formatted instant in UTC timezone, like '2023-03-24T15:31:00Z'</li>
* </ul>
*/
static final String START_AT_OPTION = "startAt";

/**
* The name of the column that will be used as primary key.
* Note it's the name in Hazelcast, not external name.
* <p>Setting this option allows user to avoid using {@code _id} column as primary key, e.g. if user
* prefers to use some natural key instead of artificial key they added at project start.
*
* <p>Setting this property is not mandatory, by default connector will pick column that maps to Mongo's
* {@code _id} column.</p>
*/
static final String PK_COLUMN = "idColumn";

/**
* If set to true, the reading from MongoDB will be done in one processor instance.
* <p>
* Normally user wants to distribute the work, however the {@code $function} aggregate is not present on
* e.g. Atlas Serverless instances. In such cases setting this property to {@code true} allows user
* to query the Atlas Serverless - in one processor only, but better one than nothing. Maybe some day MongoDB will
* change that restriction.
*/
static final String FORCE_PARALLELISM_ONE = "forceMongoReadParallelismOne";

private static final String POSSIBLE_VALUES = "This property should " +
" have value of: a) 'now' b) time in epoch milliseconds or c) " +
" ISO-formatted instant in UTC timezone, like '2023-03-24T15:31:00Z'.";
Expand Down
Expand Up @@ -53,7 +53,6 @@
* ProcessorSupplier that creates {@linkplain com.hazelcast.jet.mongodb.impl.ReadMongoP} processors on each instance.
*/
public class SelectProcessorSupplier implements ProcessorSupplier {

private transient SupplierEx<? extends MongoClient> clientSupplier;
private final String databaseName;
private final String collectionName;
Expand All @@ -67,6 +66,8 @@ public class SelectProcessorSupplier implements ProcessorSupplier {
private transient ExpressionEvalContext evalContext;
private final QueryDataType[] types;

private final boolean forceMongoParallelismOne;

SelectProcessorSupplier(MongoTable table, Document predicate, List<String> projection, BsonTimestamp startAt, boolean stream,
FunctionEx<ExpressionEvalContext, EventTimePolicy<JetSqlRow>> eventTimePolicyProvider) {
checkArgument(projection != null && !projection.isEmpty(), "projection cannot be empty");
Expand All @@ -81,6 +82,7 @@ public class SelectProcessorSupplier implements ProcessorSupplier {
this.stream = stream;
this.eventTimePolicyProvider = eventTimePolicyProvider;
this.types = table.resolveColumnTypes(projection);
this.forceMongoParallelismOne = table.isForceMongoParallelismOne();
}


Expand Down Expand Up @@ -135,6 +137,7 @@ public Collection<? extends Processor> get(int count) {
.setMapStreamFn(this::convertStreamDocToRow)
.setStartAtTimestamp(startAt == null ? null : new BsonTimestamp(startAt))
.setEventTimePolicy(eventTimePolicy)
.setNonDistributed(forceMongoParallelismOne)
);

processors.add(processor);
Expand Down