Skip to content

Commit

Permalink
Merge pull request #549 from cchantep/fix/549
Browse files Browse the repository at this point in the history
Internal state on `ConnectionNotInitialized`
  • Loading branch information
cchantep committed Jul 26, 2016
2 parents b48faa4 + 315f7cd commit 5648df7
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 11 deletions.
20 changes: 11 additions & 9 deletions driver/src/main/scala/api/collections/genericcollection.scala
Expand Up @@ -249,16 +249,18 @@ trait GenericCollection[P <: SerializationPack with Singleton] extends Collectio
}

@inline private def defaultWriteConcern = db.connection.options.writeConcern
@inline private def MissingMetadata() =
ConnectionNotInitialized.MissingMetadata(db.connection.history())

def bulkInsert(ordered: Boolean)(documents: ImplicitlyDocumentProducer*)(implicit ec: ExecutionContext): Future[MultiBulkWriteResult] =
db.connection.metadata.map { metadata =>
bulkInsert(documents.toStream.map(_.produce), ordered, defaultWriteConcern, metadata.maxBulkSize, metadata.maxBsonSize)
}.getOrElse(Future.failed(ConnectionNotInitialized.MissingMetadata))
}.getOrElse(Future.failed(MissingMetadata()))

def bulkInsert(ordered: Boolean, writeConcern: WriteConcern)(documents: ImplicitlyDocumentProducer*)(implicit ec: ExecutionContext): Future[MultiBulkWriteResult] =
db.connection.metadata.map { metadata =>
bulkInsert(documents.toStream.map(_.produce), ordered, writeConcern, metadata.maxBulkSize, metadata.maxBsonSize)
}.getOrElse(Future.failed(ConnectionNotInitialized.MissingMetadata))
}.getOrElse(Future.failed(MissingMetadata()))

def bulkInsert(ordered: Boolean, writeConcern: WriteConcern, bulkSize: Int, bulkByteSize: Int)(documents: ImplicitlyDocumentProducer*)(implicit ec: ExecutionContext): Future[MultiBulkWriteResult] =
bulkInsert(documents.toStream.map(_.produce), ordered, writeConcern, bulkSize, bulkByteSize)
Expand All @@ -269,7 +271,7 @@ trait GenericCollection[P <: SerializationPack with Singleton] extends Collectio
def bulkInsert(documents: Stream[pack.Document], ordered: Boolean, writeConcern: WriteConcern)(implicit ec: ExecutionContext): Future[MultiBulkWriteResult] =
db.connection.metadata.map { metadata =>
bulkInsert(documents, ordered, writeConcern, metadata.maxBulkSize, metadata.maxBsonSize)
}.getOrElse(Future.failed(ConnectionNotInitialized.MissingMetadata))
}.getOrElse(Future.failed(MissingMetadata()))

def bulkInsert(documents: Stream[pack.Document], ordered: Boolean, writeConcern: WriteConcern = defaultWriteConcern, bulkSize: Int, bulkByteSize: Int)(implicit ec: ExecutionContext): Future[MultiBulkWriteResult] = watchFailure {
def createBulk[R, A <: BulkMaker[R, A]](docs: Stream[pack.Document], command: A with BulkMaker[R, A]): Future[List[R]] = {
Expand All @@ -285,7 +287,7 @@ trait GenericCollection[P <: SerializationPack with Singleton] extends Collectio
/* TODO: Remove
val metadata = db.connection.metadata
val havingMetadata = Failover2(db.connection, failoverStrategy) { () =>
metadata.map(Future.successful).getOrElse(Future.failed(ConnectionNotInitialized.MissingMetadata))
metadata.map(Future.successful).getOrElse(Future.failed(MissingMetadata()))
}.future
*/

Expand Down Expand Up @@ -366,8 +368,8 @@ trait GenericCollection[P <: SerializationPack with Singleton] extends Collectio
}.future
}

case None =>
Future.failed(ConnectionNotInitialized.MissingMetadata)
case _ =>
Future.failed(MissingMetadata())
}

/**
Expand Down Expand Up @@ -426,7 +428,7 @@ trait GenericCollection[P <: SerializationPack with Singleton] extends Collectio
}.future
}

case _ => Future.failed(ConnectionNotInitialized.MissingMetadata)
case _ => Future.failed(MissingMetadata())
}

/**
Expand Down Expand Up @@ -684,8 +686,8 @@ trait GenericCollection[P <: SerializationPack with Singleton] extends Collectio
}.future
}

case None =>
Future.failed(ConnectionNotInitialized.MissingMetadata)
case _ =>
Future.failed(MissingMetadata())
}

/**
Expand Down
46 changes: 44 additions & 2 deletions driver/src/main/scala/core/errors.scala
Expand Up @@ -70,9 +70,51 @@ case class GenericDriverException(
message: String
) extends DriverException with NoStackTrace

case class ConnectionNotInitialized(message: String) extends DriverException
sealed class ConnectionNotInitialized(
val message: String,
override val cause: Throwable
) extends DriverException
with Product with java.io.Serializable with Serializable with Equals {

@deprecated(message = "Use constructor with cause", since = "0.12-RC1")
def this(message: String) = this(message, null)

override val productPrefix = "ConnectionNotInitialized"

def productElement(i: Int): Any = i match {
case 0 => message
case 1 => cause
case _ => throw new NoSuchElementException
}

override def productIterator: Iterator[Any] = Iterator(message, cause)

val productArity = 2

def canEqual(other: Any): Boolean = other match {
case _: ConnectionNotInitialized => true
case _ => false
}

@deprecated(message = "Use constructor with cause", since = "0.12-RC1")
def copy(message: String): ConnectionNotInitialized =
new ConnectionNotInitialized(message, this.cause)
}

object ConnectionNotInitialized {
def MissingMetadata = ConnectionNotInitialized("Connection is missing metadata (like protocol version, etc.) The connection pool is probably being initialized.")
@deprecated(message = "Use constructor with cause", since = "0.12-RC1")
def apply(message: String): ConnectionNotInitialized =
new ConnectionNotInitialized(message, null)

@deprecated(message = "Use constructor with cause", since = "0.12-RC1")
def unapply(instance: ConnectionNotInitialized): Option[String] =
Some(instance.message)

def MissingMetadata(cause: Throwable): ConnectionNotInitialized =
new ConnectionNotInitialized("Connection is missing metadata (like protocol version, etc.) The connection pool is probably being initialized.", cause)

@deprecated(message = "Use constructor with cause", since = "0.12-RC1")
def MissingMetadata: ConnectionNotInitialized = MissingMetadata(null)
}

case class ConnectionException(message: String) extends DriverException
Expand Down

0 comments on commit 5648df7

Please sign in to comment.