Skip to content

Commit

Permalink
Merge 8c0f227 into 13059be
Browse files Browse the repository at this point in the history
  • Loading branch information
snava10 committed Dec 24, 2020
2 parents 13059be + 8c0f227 commit 1c6760e
Show file tree
Hide file tree
Showing 38 changed files with 654 additions and 163 deletions.
1 change: 1 addition & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ dependencies {
implementation group: 'io.netty', name: 'netty-transport', version: 'latest.release'
implementation group: 'io.netty', name: 'netty-codec', version: 'latest.release'
implementation group: 'io.netty', name: 'netty-handler', version: 'latest.release'
api group: 'com.h2database', name: 'h2', version: 'latest.release'

testImplementation group: 'org.mongodb', name: 'mongo-java-driver', version: 'latest.release'
testImplementation "org.mockito:mockito-core:latest.release"
Expand Down
13 changes: 7 additions & 6 deletions core/gradle.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
# This file is expected to be part of source control.
ch.qos.logback:logback-classic:1.2.3=testRuntimeClasspath
ch.qos.logback:logback-core:1.2.3=testRuntimeClasspath
io.netty:netty-buffer:4.1.54.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.netty:netty-codec:4.1.54.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.netty:netty-common:4.1.54.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.netty:netty-handler:4.1.54.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.netty:netty-resolver:4.1.54.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.netty:netty-transport:4.1.54.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
com.h2database:h2:1.4.200=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.netty:netty-buffer:4.1.56.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.netty:netty-codec:4.1.56.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.netty:netty-common:4.1.56.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.netty:netty-handler:4.1.56.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.netty:netty-resolver:4.1.56.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.netty:netty-transport:4.1.56.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
net.bytebuddy:byte-buddy-agent:1.10.18=testCompileClasspath,testRuntimeClasspath
net.bytebuddy:byte-buddy:1.10.18=testCompileClasspath,testRuntimeClasspath
nl.jqno.equalsverifier:equalsverifier:3.5=testCompileClasspath,testRuntimeClasspath
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.bwaldvogel.mongo;

import java.io.Serializable;
import java.util.concurrent.CompletionStage;

import de.bwaldvogel.mongo.backend.QueryParameters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.concurrent.CompletionStage;

import de.bwaldvogel.mongo.backend.MongoSession;
import de.bwaldvogel.mongo.backend.QueryResult;
import de.bwaldvogel.mongo.bson.Document;
import de.bwaldvogel.mongo.oplog.Oplog;
Expand All @@ -13,7 +14,7 @@

public interface AsyncMongoDatabase {

CompletionStage<Document> handleCommandAsync(Channel channel, String command, Document query, Oplog oplog);
CompletionStage<Document> handleCommandAsync(Channel channel, String command, Document query, Oplog oplog, MongoSession mongoSession);

CompletionStage<QueryResult> handleQueryAsync(MongoQuery query);

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/de/bwaldvogel/mongo/MongoBackend.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,6 @@ default CompletionStage<Void> closeAsync() {

MongoBackend version(ServerVersion version);

void setServerAddress(String serverAddress);

}
13 changes: 12 additions & 1 deletion core/src/main/java/de/bwaldvogel/mongo/MongoCollection.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import de.bwaldvogel.mongo.backend.ArrayFilters;
import de.bwaldvogel.mongo.backend.Index;
import de.bwaldvogel.mongo.backend.MongoSession;
import de.bwaldvogel.mongo.backend.QueryParameters;
import de.bwaldvogel.mongo.backend.QueryResult;
import de.bwaldvogel.mongo.bson.Document;
Expand Down Expand Up @@ -67,6 +68,8 @@ default QueryResult handleQuery(Document query, int numberToSkip, int limit) {

QueryResult handleQuery(QueryParameters queryParameters);

QueryResult handleQuery(QueryParameters queryParameters, MongoSession mongoSession);

@Override
default CompletionStage<QueryResult> handleQueryAsync(QueryParameters queryParameters) {
return FutureUtils.wrap(() -> handleQuery(queryParameters));
Expand All @@ -81,6 +84,11 @@ default List<Document> insertDocuments(List<Document> documents) {
Document updateDocuments(Document selector, Document update, ArrayFilters arrayFilters,
boolean isMulti, boolean isUpsert, Oplog oplog);

default Document updateDocuments(Document selector, Document update, ArrayFilters arrayFilters,
boolean isMulti, boolean isUpsert, Oplog oplog, MongoSession mongoSession) {
return updateDocuments(selector, update, arrayFilters, isMulti, isUpsert, oplog, mongoSession);
}

default int deleteDocuments(Document selector, int limit) {
return deleteDocuments(selector, limit, NoopOplog.get());
}
Expand All @@ -89,6 +97,8 @@ default int deleteDocuments(Document selector, int limit) {

Document handleDistinct(Document query);

Document handleDistinct(Document query, MongoSession mongoSession);

Document getStats();

Document validate();
Expand All @@ -97,6 +107,8 @@ default int deleteDocuments(Document selector, int limit) {

int count(Document query, int skip, int limit);

int count(Document query, int skip, int limit, MongoSession mongoSession);

default boolean isEmpty() {
return count() == 0;
}
Expand All @@ -112,5 +124,4 @@ default int getNumIndexes() {
void renameTo(MongoDatabase newDatabase, String newCollectionName);

void drop();

}
9 changes: 6 additions & 3 deletions core/src/main/java/de/bwaldvogel/mongo/MongoDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import de.bwaldvogel.mongo.backend.CollectionOptions;
import de.bwaldvogel.mongo.backend.QueryResult;
import de.bwaldvogel.mongo.backend.MongoSession;
import de.bwaldvogel.mongo.bson.Document;
import de.bwaldvogel.mongo.oplog.Oplog;
import de.bwaldvogel.mongo.util.FutureUtils;
Expand All @@ -19,11 +20,11 @@ public interface MongoDatabase extends AsyncMongoDatabase {

void handleClose(Channel channel);

Document handleCommand(Channel channel, String command, Document query, Oplog oplog);
Document handleCommand(Channel channel, String command, Document query, Oplog oplog, MongoSession mongoSession);

@Override
default CompletionStage<Document> handleCommandAsync(Channel channel, String command, Document query, Oplog oplog) {
return FutureUtils.wrap(() -> handleCommand(channel, command, query, oplog));
default CompletionStage<Document> handleCommandAsync(Channel channel, String command, Document query, Oplog oplog, MongoSession mongoSession) {
return FutureUtils.wrap(() -> handleCommand(channel, command, query, oplog, mongoSession));
}

QueryResult handleQuery(MongoQuery query);
Expand Down Expand Up @@ -55,6 +56,8 @@ default CompletionStage<Void> handleDeleteAsync(MongoDelete delete, Oplog oplog)

void handleUpdate(MongoUpdate update, Oplog oplog);

void handleUpdate(MongoUpdate update, Oplog oplog, MongoSession mongoSession);

@Override
default CompletionStage<Void> handleUpdateAsync(MongoUpdate update, Oplog oplog) {
return FutureUtils.wrap(() -> {
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/de/bwaldvogel/mongo/MongoServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public void initChannel(SocketChannel ch) throws Exception {
});

channel = bootstrap.bind().syncUninterruptibly().channel();
InetSocketAddress localAddress = getLocalAddress();
if (localAddress != null) {
backend.setServerAddress(String.format("%s:%d", localAddress.getHostName(), localAddress.getPort()));
}

log.info("started {}", this);
} catch (RuntimeException e) {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/de/bwaldvogel/mongo/ServerVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

public enum ServerVersion {
MONGO_3_0(Arrays.asList(3, 0, 0), 2),
MONGO_3_6(Arrays.asList(3, 6, 0), 6);
MONGO_3_6(Arrays.asList(3, 6, 0), 6),
MONGO_4_2(Arrays.asList(4, 2, 0), 8);

private final List<Integer> versionArray;
private final int wireVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.h2.mvstore.tx.Transaction;
import org.h2.mvstore.tx.TransactionStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -52,7 +55,7 @@ public abstract class AbstractMongoBackend implements MongoBackend {

protected static final String OPLOG_COLLECTION_NAME = "oplog.rs";

static final String ADMIN_DB_NAME = "admin";
public static final String ADMIN_DB_NAME = "admin";

private final Map<String, MongoDatabase> databases = new ConcurrentHashMap<>();

Expand All @@ -64,6 +67,10 @@ public abstract class AbstractMongoBackend implements MongoBackend {
private final CursorRegistry cursorRegistry = new CursorRegistry();

protected Oplog oplog = NoopOplog.get();
private String serverAddress;

protected final ConcurrentHashMap<UUID, MongoSession> sessions = new ConcurrentHashMap<>();
protected TransactionStore transactionStore;

protected AbstractMongoBackend() {
this(defaultClock());
Expand Down Expand Up @@ -150,7 +157,7 @@ private Document getLog(String argument) {
return response;
}

private Document handleAdminCommand(String command, Document query) {
protected Document handleAdminCommand(String command, Document query) {
if (command.equalsIgnoreCase("listdatabases")) {
List<Document> databases = listDatabaseNames().stream()
.sorted()
Expand Down Expand Up @@ -201,13 +208,21 @@ private Document handleAdminCommand(String command, Document query) {
} else if (command.equalsIgnoreCase("ping")) {
return successResponse();
} else if (command.equalsIgnoreCase("endSessions")) {
log.debug("endSessions on admin database");
handleEndSessions(query);
return successResponse();
} else {
throw new NoSuchCommandException(command);
}
}

private void handleEndSessions(Document query) {
log.debug("endSessions");
ArrayList<Document> endingSessions = (ArrayList<Document>)query.get("endSessions");
endingSessions.stream().map(s -> s.get("id"))
.filter(sessions::containsKey)
.forEach(sessions::remove);
}

private static Document successResponse() {
Document response = new Document();
Utils.markOkay(response);
Expand Down Expand Up @@ -314,6 +329,12 @@ private Document handleCommandSync(Channel channel, String databaseName, String
response.put("maxWireVersion", Integer.valueOf(version.getWireVersion()));
response.put("minWireVersion", Integer.valueOf(0));
response.put("localTime", Instant.now(clock));
response.put("setName", "rs0");
response.put("hosts", Collections.singleton(serverAddress));
response.put("me", serverAddress);
response.put("primary", serverAddress);
response.put("logicalSessionTimeoutMinutes", 100);
response.put("connectionId", 21210);
Utils.markOkay(response);
return response;
} else if (command.equalsIgnoreCase("buildinfo")) {
Expand All @@ -328,6 +349,12 @@ private Document handleCommandSync(Channel channel, String databaseName, String
return handleGetMore(databaseName, command, query);
} else if (command.equalsIgnoreCase("killCursors")) {
return handleKillCursors(query);
} else if (command.equalsIgnoreCase("commitTransaction")) {
UUID sessionId = Utils.getSessionId(query);
sessions.get(sessionId).commit();
Document response = new Document("lsid", sessionId);
Utils.markOkay(response);
return response;
}
return null;
}
Expand All @@ -342,8 +369,26 @@ public Document handleCommand(Channel channel, String databaseName, String comma
if (databaseName.equals(ADMIN_DB_NAME)) {
return handleAdminCommand(command, query);
}
MongoSession mongoSession = MongoSession.NoopSession();
if (query != null) {
if ((boolean)query.getOrDefault("autocommit", true)) {
return resolveDatabase(databaseName).handleCommand(channel, command, query, oplog, null);
}

return resolveDatabase(databaseName).handleCommand(channel, command, query, oplog);
UUID sessionId = Utils.getSessionId(query);
if (sessionId == null) {
throw new RuntimeException("SessionId cannot be null. Make sure you are using a mongo driver version that support sessions and transactions.");
}
if (sessions.containsKey(sessionId)) {
mongoSession = sessions.get(sessionId);
} else {
Transaction transaction = transactionStore.begin();
log.info(String.format("Starting new transaction with id %d: %s", transaction.getId(), transaction.getName()));
mongoSession = new MongoSession(sessionId, transactionStore.begin());
sessions.put(sessionId, mongoSession);
}
}
return resolveDatabase(databaseName).handleCommand(channel, command, query, oplog, mongoSession);
}

@Override
Expand Down Expand Up @@ -373,7 +418,7 @@ public CompletionStage<Document> handleCommandAsync(Channel channel, String data
return FutureUtils.wrap(() -> handleAdminCommand(command, query));
}

return resolveDatabase(database).handleCommandAsync(channel, command, query, oplog);
return resolveDatabase(database).handleCommandAsync(channel, command, query, oplog, null);
}

@Override
Expand Down Expand Up @@ -498,6 +543,11 @@ public void dropDatabase(String databaseName) {
}
}

@Override
public void setServerAddress(String serverAddress) {
this.serverAddress = serverAddress;
}

@Override
public void handleClose(Channel channel) {
for (MongoDatabase db : databases.values()) {
Expand Down

0 comments on commit 1c6760e

Please sign in to comment.