Skip to content

Commit

Permalink
Allows setting the consistency level at query level (#96)
Browse files Browse the repository at this point in the history
* Adds the new method for setting the consistency level at query level

* Upgrades plugins

* Replaces overloaded methods by adding args with default values
  • Loading branch information
fedefernandez committed Oct 20, 2017
1 parent e68ac2c commit 4bd7631
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 37 deletions.
7 changes: 5 additions & 2 deletions core/src/main/scala/api/SessionAPI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ trait SessionAPI {

def executeStatement(statement: Statement): FS[ResultSet]

def executeWithByteBuffer(query: String, values: List[SerializableValueBy[Int]]): FS[ResultSet]
def executeWithByteBuffer(
query: String,
values: List[SerializableValueBy[Int]],
consistencyLevel: Option[ConsistencyLevel] = None): FS[ResultSet]

}
}
7 changes: 5 additions & 2 deletions core/src/main/scala/handlers/implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,13 @@ object implicits {

def executeWithByteBuffer(
query: String,
values: List[SerializableValueBy[Int]]): SessionAPIOps[M, ResultSet] =
values: List[SerializableValueBy[Int]],
consistencyLevel: Option[ConsistencyLevel] = None): SessionAPIOps[M, ResultSet] =
Kleisli { session =>
values.traverse(_.serializableValue.serialize[M]).flatMap { values =>
H(session.executeAsync(ByteBufferSimpleStatement(query, values.toArray)))
val st = ByteBufferSimpleStatement(query, values.toArray)
consistencyLevel.foreach(st.setConsistencyLevel)
H(session.executeAsync(st))
}
}

Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/query/interpolator/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package freestyle.cassandra
package query

import cats.{~>, MonadError}
import com.datastax.driver.core.{ResultSet, Session}
import com.datastax.driver.core.{ConsistencyLevel, ResultSet, Session}
import contextual.Context
import freestyle._
import freestyle.async.AsyncContext
Expand All @@ -41,15 +41,16 @@ package object interpolator {
E: MonadError[M, Throwable]): SessionAPI.Op ~> M =
sessionAPIHandler andThen apiInterpreter[M, Session](S)

def asResultSet[M[_]](implicit API: SessionAPI[M]): FreeS[M, ResultSet] =
API.executeWithByteBuffer(tuple._1, tuple._2)
def asResultSet[M[_]](consistencyLevel: Option[ConsistencyLevel] = None)(
implicit API: SessionAPI[M]): FreeS[M, ResultSet] =
API.executeWithByteBuffer(tuple._1, tuple._2, consistencyLevel)

def attemptResultSet[M[_]](
def attemptResultSet[M[_]](consistencyLevel: Option[ConsistencyLevel] = None)(
implicit API: SessionAPI[SessionAPI.Op],
S: Session,
AC: AsyncContext[M],
E: MonadError[M, Throwable]): M[ResultSet] =
API.executeWithByteBuffer(tuple._1, tuple._2).interpret[M]
asResultSet[SessionAPI.Op](consistencyLevel).interpret[M]

}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/query/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object model {
def serialize[M[_]](implicit E: MonadError[M, Throwable]): M[ByteBuffer]
}

sealed trait SerializableValueBy[T] {
trait SerializableValueBy[T] {
def position: T
def serializableValue: SerializableValue
}
Expand Down
22 changes: 13 additions & 9 deletions core/src/test/scala/api/SessionAPISpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ class SessionAPISpec extends WordSpec with Matchers with OneInstancePerTest with

implicit val sessionAPIHandler: SessionAPI.Op ~> Id = new (SessionAPI.Op ~> Id) {
override def apply[A](fa: SessionAPI.Op[A]): Id[A] = fa match {
case SessionAPI.InitOp() => sessionMock
case SessionAPI.CloseOp() => unit
case SessionAPI.PrepareOp(_) => prepSt
case SessionAPI.PrepareStatementOp(_) => prepSt
case SessionAPI.ExecuteOp(_) => resultSet
case SessionAPI.ExecuteWithValuesOp(_, _) => resultSet
case SessionAPI.ExecuteWithMapOp(_, _) => resultSet
case SessionAPI.ExecuteStatementOp(_) => resultSet
case SessionAPI.InitOp() => sessionMock
case SessionAPI.CloseOp() => unit
case SessionAPI.PrepareOp(_) => prepSt
case SessionAPI.PrepareStatementOp(_) => prepSt
case SessionAPI.ExecuteOp(_) => resultSet
case SessionAPI.ExecuteWithValuesOp(_, _) => resultSet
case SessionAPI.ExecuteWithMapOp(_, _) => resultSet
case SessionAPI.ExecuteStatementOp(_) => resultSet
case SessionAPI.ExecuteWithByteBufferOp(_, _, _) => resultSet
}
}

Expand All @@ -57,6 +58,7 @@ class SessionAPISpec extends WordSpec with Matchers with OneInstancePerTest with
ResultSet,
ResultSet,
ResultSet,
ResultSet,
ResultSet)

def program[F[_]](implicit sessionAPI: SessionAPI[F]): FreeS[F, ReturnResult] = {
Expand All @@ -69,7 +71,8 @@ class SessionAPISpec extends WordSpec with Matchers with OneInstancePerTest with
v6 <- sessionAPI.executeWithValues("", Null[Any])
v7 <- sessionAPI.executeWithMap("", Null[Map[String, AnyRef]])
v8 <- sessionAPI.executeStatement(Null[Statement])
} yield (v1, v2, v3, v4, v5, v6, v7, v8)
v9 <- sessionAPI.executeWithByteBuffer("", Nil, None)
} yield (v1, v2, v3, v4, v5, v6, v7, v8, v9)
}

val result = program[SessionAPI.Op].interpret[Id]
Expand All @@ -82,6 +85,7 @@ class SessionAPISpec extends WordSpec with Matchers with OneInstancePerTest with
resultSet,
resultSet,
resultSet,
resultSet,
resultSet))
}

Expand Down
74 changes: 67 additions & 7 deletions core/src/test/scala/handlers/SessionAPIHandlerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
package freestyle.cassandra
package handlers

import java.nio.ByteBuffer

import cats.MonadError
import com.datastax.driver.core._
import freestyle.cassandra.api.SessionAPIOps
import freestyle.cassandra.query.model.{SerializableValue, SerializableValueBy}
import org.scalamock.scalatest.MockFactory
import org.scalatest.{Matchers, OneInstancePerTest, WordSpec}

Expand All @@ -32,13 +36,32 @@ class SessionAPIHandlerSpec
with OneInstancePerTest
with MockFactory {

val sessionMock: Session = mock[Session]
val regStMock: RegularStatement = stub[RegularStatement]
val prepStMock: PreparedStatement = stub[PreparedStatement]
val rsMock: ResultSet = stub[ResultSet]
val queryString: String = "SELECT * FROM table;"
val mapValues: Map[String, AnyRef] = Map("param1" -> "value1", "param2" -> "value2")
val values: Seq[Any] = Seq("value1", "value2")
val sessionMock: Session = mock[Session]
val regStMock: RegularStatement = stub[RegularStatement]
val prepStMock: PreparedStatement = stub[PreparedStatement]
val rsMock: ResultSet = stub[ResultSet]
val queryString: String = "SELECT * FROM table;"
val mapValues: Map[String, AnyRef] = Map("param1" -> "value1", "param2" -> "value2")
val values: Seq[Any] = Seq("value1", "value2")
val consistencyLevel: ConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM

val valueSerializedA: ByteBuffer = TypeCodec.ascii().serialize("Hello World!", ProtocolVersion.V3)
val serializableValueByIntMockA: SerializableValueBy[Int] = new SerializableValueBy[Int] {
override def position = 0
override def serializableValue: SerializableValue = new SerializableValue {
override def serialize[M[_]](implicit E: MonadError[M, Throwable]): M[ByteBuffer] =
E.pure(valueSerializedA)
}
}

val valueSerializedB: ByteBuffer = TypeCodec.bigint().serialize(99l, ProtocolVersion.V3)
val serializableValueByIntMockB: SerializableValueBy[Int] = new SerializableValueBy[Int] {
override def position = 1
override def serializableValue: SerializableValue = new SerializableValue {
override def serialize[M[_]](implicit E: MonadError[M, Throwable]): M[ByteBuffer] =
E.pure(valueSerializedB)
}
}

import cats.instances.future._
import freestyle.async.implicits._
Expand Down Expand Up @@ -103,6 +126,43 @@ class SessionAPIHandlerSpec
run(handler.executeStatement(regStMock)) shouldBe rsMock
}

"call to serializableValue and executeAsync(Statement) when calling executeWithByteBuffer(String, List[SerializableValueBy[Int]], None) method" in {

val values = List(serializableValueByIntMockA, serializableValueByIntMockB)

(sessionMock
.executeAsync(_: Statement))
.expects(where { (st: Statement) =>
st.isInstanceOf[SimpleStatement] &&
(st
.asInstanceOf[SimpleStatement]
.getValues(Null[ProtocolVersion], Null[CodecRegistry]) sameElements Array(
valueSerializedA,
valueSerializedB))
})
.returns(ResultSetFutureTest(rsMock))
run(handler.executeWithByteBuffer(queryString, values)) shouldBe rsMock
}

"call to serializableValue and executeAsync(Statement) when calling executeWithByteBuffer(String, List[SerializableValueBy[Int]], Some(ConsistencyLevel)) method" in {

val values = List(serializableValueByIntMockA, serializableValueByIntMockB)

(sessionMock
.executeAsync(_: Statement))
.expects(where { (st: Statement) =>
st.isInstanceOf[SimpleStatement] &&
(st
.asInstanceOf[SimpleStatement]
.getValues(Null[ProtocolVersion], Null[CodecRegistry]) sameElements Array(
valueSerializedA,
valueSerializedB)) &&
(st.getConsistencyLevel == consistencyLevel)
})
.returns(ResultSetFutureTest(rsMock))
run(handler.executeWithByteBuffer(queryString, values, Some(consistencyLevel))) shouldBe rsMock
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,31 @@ class InterpolatorImplicitSpec
val rsMock: ResultSet = stub[ResultSet]
(sessionMock.executeAsync(_: Statement)).when(*).returns(ResultSetFutureTest(rsMock))

"InterpolatorImplicitDef attemptResultSet" should {
val consistencyLevel = ConsistencyLevel.EACH_QUORUM

"return a valid ResultSet" in {
val future: Future[ResultSet] = cql"SELECT * FROM users".attemptResultSet[Future]
"InterpolatorImplicitDef asResultSet" should {

implicit val interpreter = sessionAPIHandler[Future] andThen apiInterpreter[Future, Session](
sessionMock)

"return a valid ResultSet from a FreeS" in {
val future: Future[ResultSet] =
cql"SELECT * FROM users".asResultSet[SessionAPI.Op]().interpret[Future]
Await.result(future, Duration.Inf) shouldBe rsMock
}

"return a valid ResultSet from a FreeS when passing a ConsistencyLevel" in {
val future: Future[ResultSet] =
cql"SELECT * FROM users"
.asResultSet[SessionAPI.Op](Some(consistencyLevel))
.interpret[Future]
Await.result(future, Duration.Inf) shouldBe rsMock
(sessionMock
.executeAsync(_: Statement))
.verify(where { (st: Statement) => st.getConsistencyLevel == consistencyLevel
})
}

"return a failed future when the ByteBufferCodec returns a failure" in {
val serializeException = new RuntimeException("Error serializing")
implicit val stringByteBufferCodec: ByteBufferCodec[String] = new ByteBufferCodec[String] {
Expand All @@ -73,21 +91,80 @@ class InterpolatorImplicitSpec
}
val name: String = "UserName"
val future: Future[ResultSet] =
cql"SELECT * FROM users WHERE name=$name".attemptResultSet[Future]
cql"SELECT * FROM users WHERE name=$name".asResultSet[SessionAPI.Op]().interpret[Future]
Await.result(future.failed, Duration.Inf) shouldBe serializeException
}

"return a failed future when the ByteBufferCodec returns a failure when passing a ConsistencyLevel" in {
val serializeException = new RuntimeException("Error serializing")
implicit val stringByteBufferCodec: ByteBufferCodec[String] = new ByteBufferCodec[String] {
override def deserialize[M[_]](bytes: ByteBuffer)(
implicit E: MonadError[M, Throwable]): M[String] =
E.raiseError(new RuntimeException("Error deserializing"))

override def serialize[M[_]](value: String)(
implicit E: MonadError[M, Throwable]): M[ByteBuffer] =
E.raiseError(serializeException)
}
val name: String = "UserName"
val future: Future[ResultSet] =
cql"SELECT * FROM users WHERE name=$name"
.asResultSet[SessionAPI.Op](Some(consistencyLevel))
.interpret[Future]
Await.result(future.failed, Duration.Inf) shouldBe serializeException
}

}

"InterpolatorImplicitDef asResultSet" should {
"InterpolatorImplicitDef attemptResultSet()" should {

implicit val interpreter = sessionAPIHandler[Future] andThen apiInterpreter[Future, Session](
sessionMock)
"return a valid ResultSet" in {
val future: Future[ResultSet] = cql"SELECT * FROM users".attemptResultSet[Future]()
Await.result(future, Duration.Inf) shouldBe rsMock
}

"return a valid ResultSet from a FreeS" in {
"return a valid ResultSet when passing a ConsistencyLevel" in {
val future: Future[ResultSet] =
cql"SELECT * FROM users".asResultSet[SessionAPI.Op].interpret[Future]
cql"SELECT * FROM users".attemptResultSet[Future](Some(consistencyLevel))
Await.result(future, Duration.Inf) shouldBe rsMock
(sessionMock
.executeAsync(_: Statement))
.verify(where { (st: Statement) => st.getConsistencyLevel == consistencyLevel
})
}

"return a failed future when the ByteBufferCodec returns a failure" in {
val serializeException = new RuntimeException("Error serializing")
implicit val stringByteBufferCodec: ByteBufferCodec[String] = new ByteBufferCodec[String] {
override def deserialize[M[_]](bytes: ByteBuffer)(
implicit E: MonadError[M, Throwable]): M[String] =
E.raiseError(new RuntimeException("Error deserializing"))

override def serialize[M[_]](value: String)(
implicit E: MonadError[M, Throwable]): M[ByteBuffer] =
E.raiseError(serializeException)
}
val name: String = "UserName"
val future: Future[ResultSet] =
cql"SELECT * FROM users WHERE name=$name".attemptResultSet[Future]()
Await.result(future.failed, Duration.Inf) shouldBe serializeException
}

"return a failed future when the ByteBufferCodec returns a failure when passing a ConsistencyLevel" in {
val serializeException = new RuntimeException("Error serializing")
implicit val stringByteBufferCodec: ByteBufferCodec[String] = new ByteBufferCodec[String] {
override def deserialize[M[_]](bytes: ByteBuffer)(
implicit E: MonadError[M, Throwable]): M[String] =
E.raiseError(new RuntimeException("Error deserializing"))

override def serialize[M[_]](value: String)(
implicit E: MonadError[M, Throwable]): M[ByteBuffer] =
E.raiseError(serializeException)
}
val name: String = "UserName"
val future: Future[ResultSet] =
cql"SELECT * FROM users WHERE name=$name".attemptResultSet[Future](Some(consistencyLevel))
Await.result(future.failed, Duration.Inf) shouldBe serializeException
}

}
Expand Down
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
resolvers += Resolver.sonatypeRepo("releases")
addSbtPlugin("io.frees" % "sbt-freestyle" % "0.3.0")
addSbtPlugin("com.47deg" %% "sbt-embedded-cassandra" % "0.0.3")
addSbtPlugin("io.frees" % "sbt-freestyle" % "0.3.2")
addSbtPlugin("com.47deg" % "sbt-embedded-cassandra" % "0.0.4")

0 comments on commit 4bd7631

Please sign in to comment.