Skip to content

Commit

Permalink
refactor: Use MaxWait also Pagination and other calls
Browse files Browse the repository at this point in the history
  • Loading branch information
QuadStingray committed Nov 21, 2023
1 parent 63c3b46 commit 292355c
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 39 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name" : "mongodb-driver",
"organization" : "dev.mongocamp",
"version" : "2.6.8.snapshot",
"version" : "2.6.8",
"author" : "info@mongocamp.dev",
"license" : "Apache-2.0",
"repository" : {
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/dev/mongocamp/driver/mongodb/MongoDAO.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package dev.mongocamp.driver.mongodb

import better.files.File
import dev.mongocamp.driver.mongodb.bson.{ BsonConverter, DocumentHelper }
import dev.mongocamp.driver.mongodb.database.{ ChangeObserver, CollectionStatus, CompactResult, DatabaseProvider }
import dev.mongocamp.driver.mongodb.bson.{BsonConverter, DocumentHelper}
import dev.mongocamp.driver.mongodb.database.{ChangeObserver, CollectionStatus, CompactResult, DatabaseProvider}
import dev.mongocamp.driver.mongodb.operation.Crud
import org.bson.json.JsonParseException
import org.mongodb.scala.model.Accumulators._
import org.mongodb.scala.model.Aggregates._
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.Projections
import org.mongodb.scala.{ BulkWriteResult, Document, MongoCollection, Observable, SingleObservable }
import org.mongodb.scala.{BulkWriteResult, Document, MongoCollection, Observable, SingleObservable}

import java.nio.charset.Charset
import java.util.Date
Expand Down Expand Up @@ -47,7 +47,7 @@ abstract class MongoDAO[A](provider: DatabaseProvider, collectionName: String)(i
* @return
* List of column names
*/
def columnNames(sampleSize: Int = 0): List[String] = {
def columnNames(sampleSize: Int = 0, maxWait: Int = DefaultMaxWait): List[String] = {
val projectStage = project(Projections.computed("tempArray", equal("$objectToArray", "$$ROOT")))
val unwindStage = unwind("$tempArray")
val groupStage = group("_id", addToSet("keySet", "$tempArray.k"))
Expand All @@ -60,7 +60,7 @@ abstract class MongoDAO[A](provider: DatabaseProvider, collectionName: String)(i
}
}

val aggregationResult: Document = Raw.findAggregated(pipeline).result()
val aggregationResult: Document = Raw.findAggregated(pipeline).result(maxWait)
BsonConverter.fromBson(aggregationResult.get("keySet").head).asInstanceOf[List[String]]
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package dev.mongocamp.driver.mongodb.operation

import java.util.concurrent.TimeUnit

import com.typesafe.scalalogging.LazyLogging
import org.mongodb.scala._

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future }
import scala.concurrent.{Await, Future}

object ObservableIncludes extends ObservableIncludes

Expand All @@ -25,17 +24,13 @@ trait ObservableIncludes {

def asFuture(): Future[Seq[C]] = observable.toFuture()

def result(maxWait: Int = DefaultMaxWait): C =
Await.result(observable.head(), Duration(maxWait, TimeUnit.SECONDS))
def result(maxWait: Int = DefaultMaxWait): C = Await.result(observable.head(), Duration(maxWait, TimeUnit.SECONDS))

def results(maxWait: Int = DefaultMaxWait): Seq[C] =
Await.result(asFuture(), Duration(maxWait, TimeUnit.SECONDS))
def results(maxWait: Int = DefaultMaxWait): Seq[C] = Await.result(asFuture(), Duration(maxWait, TimeUnit.SECONDS))

def resultList(maxWait: Int = DefaultMaxWait): List[C] =
Await.result(asFuture(), Duration(maxWait, TimeUnit.SECONDS)).toList
def resultList(maxWait: Int = DefaultMaxWait): List[C] = Await.result(asFuture(), Duration(maxWait, TimeUnit.SECONDS)).toList

def resultOption(maxWait: Int = DefaultMaxWait): Option[C] =
Await.result(observable.headOption(), Duration(maxWait, TimeUnit.SECONDS))
def resultOption(maxWait: Int = DefaultMaxWait): Option[C] = Await.result(observable.headOption(), Duration(maxWait, TimeUnit.SECONDS))

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package dev.mongocamp.driver.mongodb.pagination

import dev.mongocamp.driver.mongodb.exception.MongoCampPaginationException
import dev.mongocamp.driver.mongodb.{ MongoDAO, _ }
import dev.mongocamp.driver.mongodb.{MongoDAO, _}
import org.mongodb.scala.bson.conversions.Bson

case class MongoPaginatedFilter[A <: Any](dao: MongoDAO[A], filter: Bson = Map(), sort: Bson = Map(), projection: Bson = Map()) extends MongoPagination[A] {
case class MongoPaginatedFilter[A <: Any](dao: MongoDAO[A], filter: Bson = Map(), sort: Bson = Map(), projection: Bson = Map(), maxWait: Int = DefaultMaxWait)
extends MongoPagination[A] {

def paginate(page: Long, rows: Long): PaginationResult[A] = {
val count = countResult
Expand All @@ -16,10 +17,10 @@ case class MongoPaginatedFilter[A <: Any](dao: MongoDAO[A], filter: Bson = Map()
}
val allPages = Math.ceil(count.toDouble / rows).toInt
val skip = (page - 1) * rows
val responseList = dao.find(filter, sort, projection, rows.toInt).skip(skip.toInt).resultList()
val responseList = dao.find(filter, sort, projection, rows.toInt).skip(skip.toInt).resultList(maxWait)
PaginationResult(responseList, PaginationInfo(count, rows, page, allPages))
}

def countResult: Long = dao.count(filter).result()
def countResult: Long = dao.count(filter).result(maxWait)

}
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package dev.mongocamp.driver.mongodb.sync

import java.util.Date

import com.typesafe.scalalogging.LazyLogging
import dev.mongocamp.driver.mongodb._
import dev.mongocamp.driver.mongodb.database.{ ConfigHelper, DatabaseProvider }
import dev.mongocamp.driver.mongodb.database.{ConfigHelper, DatabaseProvider}
import dev.mongocamp.driver.mongodb.sync.SyncDirection.SyncDirection
import dev.mongocamp.driver.mongodb.sync.SyncStrategy.SyncStrategy
import com.typesafe.scalalogging.LazyLogging
import org.mongodb.scala.Document
import org.mongodb.scala.bson.ObjectId
import org.mongodb.scala.model.Projections._
import org.mongodb.scala.model.Updates._

import java.util.Date

object SyncStrategy extends Enumeration {
type SyncStrategy = Value
val SyncAll = Value
Expand Down Expand Up @@ -66,23 +66,24 @@ case class MongoSyncOperation(
left: DatabaseProvider,
right: DatabaseProvider,
countBefore: Int,
documentsToSync: Seq[Document]
documentsToSync: Seq[Document],
maxWait: Int = DefaultMaxWait
): MongoSyncResult = {
val start = System.currentTimeMillis()
val syncDate = new Date()
var filteredDocumentsToSync = Seq[Document]()
if (documentsToSync.nonEmpty) {
val idSet: Set[ObjectId] = documentsToSync.map(doc => doc.getObjectId(idColumnName)).toSet
filteredDocumentsToSync = left.dao(collectionName).find(valueFilter(idColumnName, idSet)).results()
right.dao(collectionName).bulkWriteMany(filteredDocumentsToSync).result()
filteredDocumentsToSync = left.dao(collectionName).find(valueFilter(idColumnName, idSet)).results(maxWait)
right.dao(collectionName).bulkWriteMany(filteredDocumentsToSync).result(maxWait)
val update = combine(
set(MongoSyncOperation.SyncColumnLastSync, syncDate),
set(MongoSyncOperation.SyncColumnLastUpdate, syncDate)
)
left.dao(collectionName).updateMany(Map(), update).result()
right.dao(collectionName).updateMany(Map(), update).result()
left.dao(collectionName).updateMany(Map(), update).result(maxWait)
right.dao(collectionName).updateMany(Map(), update).result(maxWait)
}
val countAfter: Int = right.dao(collectionName).count().result().toInt
val countAfter: Int = right.dao(collectionName).count().result(maxWait).toInt
MongoSyncResult(
collectionName,
syncDate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dev.mongocamp.driver.mongodb.gridfs

import better.files.File
import dev.mongocamp.driver.MongoImplicits
import dev.mongocamp.driver.mongodb._
import dev.mongocamp.driver.mongodb.test.TestDatabase.ImageFilesDAO
import org.bson.types.ObjectId
import org.mongodb.scala.bson.conversions.Bson
Expand Down Expand Up @@ -34,10 +33,7 @@ trait GridfsDatabaseFunctions extends MongoImplicits {
val start = System.currentTimeMillis()
val size: Long = ImageFilesDAO.downloadFileResult(id, file)

println(
"file: %s with size %s Bytes written in %s ms "
.format(file.pathAsString, size, System.currentTimeMillis() - start)
)
println("file: %s with size %s Bytes written in %s ms ".format(file.pathAsString, size, System.currentTimeMillis() - start))

size
}
Expand All @@ -48,9 +44,10 @@ trait GridfsDatabaseFunctions extends MongoImplicits {

def findImages(key: String, value: String): List[GridFSFile] = ImageFilesDAO.findByMetadataValue(key, value)

def updateMetadata(oid: ObjectId, value: Any): UpdateResult =
ImageFilesDAO.updateMetadata(oid, value).result()
def updateMetadata(oid: ObjectId, value: Any): UpdateResult = ImageFilesDAO.updateMetadata(oid, value).result()

def updateMetadataElements(filter: Bson, elements: Map[String, Any]): UpdateResult =
def updateMetadataElements(filter: Bson, elements: Map[String, Any]): UpdateResult = {
ImageFilesDAO.updateMetadataElements(filter, elements).result()
}

}

0 comments on commit 292355c

Please sign in to comment.