Skip to content

Commit

Permalink
feat: Added MongoDb Listeners to MongoConfig (Command and Connection …
Browse files Browse the repository at this point in the history
…Pool)
  • Loading branch information
QuadStingray committed Jun 7, 2022
1 parent e8e3531 commit bbe77de
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 67 deletions.
19 changes: 14 additions & 5 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
version = 2.7.5
version = "3.0.0-RC5"

style = defaultWithAlign
preset = defaultWithAlign
project.git = true
maxColumn = 120
maxColumn = 160
unindentTopLevelOperators = true
danglingParentheses = true
danglingParentheses.preset = true
spaces.inImportCurlyBraces = true
newlines.alwaysBeforeElseAfterCurlyIf = true

rewrite.rules = [RedundantBraces, SortImports, PreferCurlyFors]
rewrite.rules = [AvoidInfix, PreferCurlyFors, SortImports, SortModifiers, RedundantBraces]

rewrite.redundantBraces.methodBodies = false
rewrite.redundantBraces.includeUnitMethods = true
rewrite.redundantBraces.parensForOneLineApply = true
rewrite.redundantBraces.maxLines = 1
rewrite.redundantBraces.ifElseExpressions = false
rewrite.redundantBraces.methodBodies = false
rewrite.redundantBraces.includeUnitMethods = false
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package dev.mongocamp.driver.mongodb.database

import dev.mongocamp.driver.mongodb._
import dev.mongocamp.driver.mongodb.bson.codecs.CustomCodecProvider
import org.bson.codecs.configuration.CodecRegistries.{fromProviders, fromRegistries}
import org.bson.codecs.configuration.CodecRegistries.{ fromProviders, fromRegistries }
import org.bson.codecs.configuration.CodecRegistry
import org.mongodb.scala.MongoClient.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala._
Expand Down Expand Up @@ -34,8 +34,7 @@ class DatabaseProvider(val config: MongoConfig, val registry: CodecRegistry) ext
cachedClient = None
}

def databases: ListDatabasesObservable[Document] =
client.listDatabases()
def databases: ListDatabasesObservable[Document] = client.listDatabases()

def databaseInfos: List[DatabaseInfo] = databases.resultList().map(doc => DatabaseInfo(doc)).sortBy(_.name)

Expand All @@ -44,30 +43,32 @@ class DatabaseProvider(val config: MongoConfig, val registry: CodecRegistry) ext
def dropDatabase(databaseName: String = DefaultDatabaseName): SingleObservable[Void] = database(databaseName).drop()

def database(databaseName: String = DefaultDatabaseName): MongoDatabase = {
if (!cachedDatabaseMap.contains(databaseName))
if (!cachedDatabaseMap.contains(databaseName)) {
cachedDatabaseMap.put(databaseName, client.getDatabase(databaseName).withCodecRegistry(registry))
}
cachedDatabaseMap(databaseName)
}

def addChangeObserver(
observer: ChangeObserver[Document],
databaseName: String = DefaultDatabaseName
): ChangeObserver[Document] = {
def addChangeObserver(observer: ChangeObserver[Document], databaseName: String = DefaultDatabaseName): ChangeObserver[Document] = {
database(databaseName).watch().subscribe(observer)
observer
}

def collections(databaseName: String = DefaultDatabaseName): ListCollectionsObservable[Document] =
def collections(databaseName: String = DefaultDatabaseName): ListCollectionsObservable[Document] = {
database(databaseName).listCollections()
}

def collectionInfos(databaseName: String = DefaultDatabaseName): List[CollectionInfo] =
def collectionInfos(databaseName: String = DefaultDatabaseName): List[CollectionInfo] = {
collections(databaseName).resultList().map(doc => CollectionInfo(doc)).sortBy(_.name)
}

def collectionNames(databaseName: String = DefaultDatabaseName): List[String] =
def collectionNames(databaseName: String = DefaultDatabaseName): List[String] = {
collectionInfos(databaseName).map(info => info.name)
}

def runCommand(document: Document, databaseName: String = DefaultDatabaseName): SingleObservable[Document] =
def runCommand(document: Document, databaseName: String = DefaultDatabaseName): SingleObservable[Document] = {
database(databaseName).runCommand(document)
}

def collectionStatus(
collectionName: String,
Expand All @@ -81,42 +82,51 @@ class DatabaseProvider(val config: MongoConfig, val registry: CodecRegistry) ext
val newCollectionName: String = guessName(collectionName)
database(newDatabaseName).getCollection[A](newCollectionName)
}
else
else {
database().getCollection[A](collectionName)
}

def guessDatabaseName(maybeSeparatedName: String): String =
if (maybeSeparatedName.contains(DatabaseProvider.CollectionSeparator))
def guessDatabaseName(maybeSeparatedName: String): String = {
if (maybeSeparatedName.contains(DatabaseProvider.CollectionSeparator)) {
maybeSeparatedName.substring(0, maybeSeparatedName.indexOf(DatabaseProvider.CollectionSeparator))
else
}
else {
DefaultDatabaseName
}
}

def guessName(maybeSeparatedName: String): String =
if (maybeSeparatedName.contains(DatabaseProvider.CollectionSeparator))
def guessName(maybeSeparatedName: String): String = {
if (maybeSeparatedName.contains(DatabaseProvider.CollectionSeparator)) {
maybeSeparatedName.substring(maybeSeparatedName.indexOf(DatabaseProvider.CollectionSeparator) + 1)
else
}
else {
maybeSeparatedName
}
}

def bucket(bucketName: String): GridFSBucket =
def bucket(bucketName: String): GridFSBucket = {
if (bucketName.contains(DatabaseProvider.CollectionSeparator)) {
val newDatabaseName = guessDatabaseName(bucketName)
val newBucketName = guessName(bucketName)
GridFSBucket(database(newDatabaseName), newBucketName)
}
else
else {
GridFSBucket(database(), bucketName)
}
}

def dao(collectionName: String): MongoDAO[Document] = {
if (!cachedMongoDAOMap.contains(collectionName))
if (!cachedMongoDAOMap.contains(collectionName)) {
cachedMongoDAOMap.put(collectionName, DocumentDao(this, collectionName))
}
cachedMongoDAOMap(collectionName)
}

def cachedDatabaseNames(): List[String] = cachedDatabaseMap.keys.toList

def cachedCollectionNames(): List[String] = cachedMongoDAOMap.keys.toList

case class DocumentDao(provider: DatabaseProvider, collectionName: String)
extends MongoDAO[Document](this, collectionName)
case class DocumentDao(provider: DatabaseProvider, collectionName: String) extends MongoDAO[Document](this, collectionName)

}

Expand All @@ -126,16 +136,14 @@ object DatabaseProvider {

private val CustomRegistry = fromProviders(CustomCodecProvider())

private val codecRegistry: CodecRegistry =
fromRegistries(CustomRegistry, DEFAULT_CODEC_REGISTRY)
private val codecRegistry: CodecRegistry = fromRegistries(CustomRegistry, DEFAULT_CODEC_REGISTRY)

def apply(config: MongoConfig, registry: CodecRegistry = codecRegistry): DatabaseProvider =
def apply(config: MongoConfig, registry: CodecRegistry = codecRegistry): DatabaseProvider = {
new DatabaseProvider(config, fromRegistries(registry, CustomRegistry, DEFAULT_CODEC_REGISTRY))
}

def fromPath(
configPath: String = MongoConfig.DefaultConfigPathPrefix,
registry: CodecRegistry = codecRegistry
): DatabaseProvider =
def fromPath(configPath: String = MongoConfig.DefaultConfigPathPrefix, registry: CodecRegistry = codecRegistry): DatabaseProvider = {
apply(MongoConfig.fromPath(configPath), fromRegistries(registry, CustomRegistry, DEFAULT_CODEC_REGISTRY))
}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package dev.mongocamp.driver.mongodb.database

import java.util.concurrent.TimeUnit

import com.mongodb.MongoCompressor
import com.mongodb.MongoCredential.createCredential
import com.mongodb.event.{ CommandListener, ConnectionPoolListener }
import dev.mongocamp.driver.mongodb.database.MongoConfig._
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.config.{ Config, ConfigFactory }
import org.mongodb.scala.connection._
import org.mongodb.scala.{MongoClientSettings, MongoCredential, ServerAddress}
import org.mongodb.scala.{ MongoClientSettings, MongoCredential, ServerAddress }

import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
Expand All @@ -22,50 +22,58 @@ case class MongoConfig(
authDatabase: String = DefaultAuthenticationDatabaseName,
poolOptions: MongoPoolOptions = MongoPoolOptions(),
compressors: List[String] = List(),
connectionPoolListener: List[ConnectionPoolListener] = List(),
commandListener: List[CommandListener] = List(),
customClientSettings: Option[MongoClientSettings] = None
) {

val clientSettings: MongoClientSettings = {
if (customClientSettings.isDefined)
if (customClientSettings.isDefined) {
customClientSettings.get
}
else {
val clusterSettings: ClusterSettings =
ClusterSettings.builder().hosts(List(new ServerAddress(host, port)).asJava).build()
val clusterSettings: ClusterSettings = ClusterSettings.builder().hosts(List(new ServerAddress(host, port)).asJava).build()

val connectionPoolSettings = ConnectionPoolSettings
val connectionPoolSettingsBuilder = ConnectionPoolSettings
.builder()
.maxConnectionIdleTime(poolOptions.maxConnectionIdleTime, TimeUnit.SECONDS)
.maxSize(poolOptions.maxSize)
.minSize(poolOptions.minSize)
.maintenanceInitialDelay(poolOptions.maintenanceInitialDelay, TimeUnit.SECONDS)
.build()

connectionPoolListener.foreach(listener => connectionPoolSettingsBuilder.addConnectionPoolListener(listener))

val connectionPoolSettings = connectionPoolSettingsBuilder.build()

val compressorList = new ArrayBuffer[MongoCompressor]()
compressors.foreach { compression =>
if (ComressionSnappy.equalsIgnoreCase(compression))
compressors.foreach(compression => {
if (ComressionSnappy.equalsIgnoreCase(compression)) {
compressorList.+=(MongoCompressor.createSnappyCompressor())
else if (ComressionZlib.equalsIgnoreCase(compression))
}
else if (ComressionZlib.equalsIgnoreCase(compression)) {
compressorList.+=(MongoCompressor.createZlibCompressor())
else if (ComressionZstd.equalsIgnoreCase(compression))
}
else if (ComressionZstd.equalsIgnoreCase(compression)) {
compressorList.+=(MongoCompressor.createZstdCompressor())
}
}
})

val builder = MongoClientSettings
.builder()
.applicationName(applicationName)
.applyToConnectionPoolSettings((b: com.mongodb.connection.ConnectionPoolSettings.Builder) =>
b.applySettings(connectionPoolSettings)
)
.applyToConnectionPoolSettings((b: com.mongodb.connection.ConnectionPoolSettings.Builder) => b.applySettings(connectionPoolSettings))
.applyToClusterSettings((b: com.mongodb.connection.ClusterSettings.Builder) => b.applySettings(clusterSettings))
.compressorList(compressorList.asJava)

commandListener.foreach(listener => builder.addCommandListener(listener))

if (userName.isDefined && password.isDefined) {
val credential: MongoCredential = createCredential(userName.get, authDatabase, password.get.toCharArray)

builder.credential(credential).build()
}
else
else {
builder.build()
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@ case class LocalServer(serverConfig: ServerConfig = ServerConfig()) {
private val server: MongoServer = {
if (ServerBackend.H2 == serverConfig.backend)
if (serverConfig.h2BackendConfig.isDefined && !serverConfig.h2BackendConfig.get.inMemory) {
if (serverConfig.h2BackendConfig.get.path.isDefined)
if (serverConfig.h2BackendConfig.get.path.isDefined) {
h2Path = serverConfig.h2BackendConfig.get.path.get
else
}
else {
h2Path = File.temporaryFile().get().path.toString
}
createH2Server(h2Path)
}
else
else {
createH2InMemoryServer
else
}
else {
createInMemoryServer
}
}

server.bind(serverConfig.host, serverConfig.port)
Expand All @@ -32,19 +36,18 @@ case class LocalServer(serverConfig: ServerConfig = ServerConfig()) {

def shutdown(): Unit = server.shutdown()

private def createInMemoryServer: MongoServer =
new MongoServer(new MemoryBackend())
private def createInMemoryServer: MongoServer = new MongoServer(new MemoryBackend())

private def createH2InMemoryServer: MongoServer =
new MongoServer(H2Backend.inMemory())
private def createH2InMemoryServer: MongoServer = new MongoServer(H2Backend.inMemory())

private def createH2Server(path: String): MongoServer =
new MongoServer(new H2Backend(path))
private def createH2Server(path: String): MongoServer = new MongoServer(new H2Backend(path))

}

object LocalServer {

def fromPath(configPath: String = DefaultServerConfigPathPrefix): LocalServer =
LocalServer(ServerConfig.fromPath(configPath))
def fromPath(configPath: String = DefaultServerConfigPathPrefix): LocalServer = LocalServer(
ServerConfig.fromPath(configPath)
)

}
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "2.4.3-SNAPSHOT"
ThisBuild / version := "2.4.3"

0 comments on commit bbe77de

Please sign in to comment.