Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra sink #56

Merged
merged 13 commits into from
Nov 30, 2016
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ package akka.stream.alpakka.cassandra
import akka.stream._
import akka.stream.stage.{ AsyncCallback, GraphStage, GraphStageLogic, OutHandler }
import com.datastax.driver.core.{ ResultSet, Row, Session, Statement }
import com.google.common.util.concurrent.{ FutureCallback, Futures, ListenableFuture }

import scala.concurrent.{ Future, Promise }
import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }

import akka.stream.alpakka.cassandra.cassandra._

class CassandraSourceStage(futStmt: Future[Statement], session: Session) extends GraphStage[SourceShape[Row]] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final class (please boyscout this for us)

val out: Outlet[Row] = Outlet("CassandraSource.out")
override val shape: SourceShape[Row] = SourceShape(out)
Expand All @@ -25,7 +26,7 @@ class CassandraSourceStage(futStmt: Future[Statement], session: Session) extends

futFetchedCallback = getAsyncCallback[Try[ResultSet]](tryPushAfterFetch)

val futRs = futStmt.flatMap(stmt => guavaFutToScalaFut(session.executeAsync(stmt)))
val futRs = futStmt.flatMap(stmt => session.executeAsync(stmt).asScala())
futRs.onComplete(futFetchedCallback.invoke)
}

Expand All @@ -38,7 +39,7 @@ class CassandraSourceStage(futStmt: Future[Statement], session: Session) extends
case Some(rs) if rs.isExhausted => completeStage()
case Some(rs) =>
// fetch next page
val futRs = guavaFutToScalaFut(rs.fetchMoreResults())
val futRs = rs.fetchMoreResults().asScala()
futRs.onComplete(futFetchedCallback.invoke)
case None => () // doing nothing, waiting for futRs in preStart() to be completed
}
Expand All @@ -59,15 +60,5 @@ class CassandraSourceStage(futStmt: Future[Statement], session: Session) extends
case Failure(failure) => failStage(failure)
}
}

private def guavaFutToScalaFut[A](guavaFut: ListenableFuture[A]): Future[A] = {
val p = Promise[A]()
val callback = new FutureCallback[A] {
override def onSuccess(a: A): Unit = p.success(a)
override def onFailure(err: Throwable): Unit = p.failure(err)
}
Futures.addCallback(guavaFut, callback)
p.future
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.cassandra

import com.google.common.util.concurrent.{ FutureCallback, Futures, ListenableFuture }

import scala.concurrent.{ Future, Promise }

package object cassandra {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[cassandra]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Johan, where do you want me to put that. I cant get it to work. Its a compiler error on the object and cant be used on the implicit class. ???

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you cannot put access restrictions on the package object itself, so then it needs to go on the implicit decorator, if kept public the risk is that library consumers start using it and then it cannot be moved/changed in the future, and this library is not here to provide guice-conversions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Thanks.

I had two issues,

  • I had to change the object name from cassandara, I think the private[cassandra] declaration was referencing the object instead of the package, which stopped it working.
  • Then I had to add a implicit def to convert to the opts. It seems like the implicit class wasnt getting picked up with the private.

I that right,

implicit class GuavaFutureOpts[A](val guavaFut: ListenableFuture[A]) extends AnyVal {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?

def asScala(): Future[A] = {
val p = Promise[A]()
val callback = new FutureCallback[A] {
override def onSuccess(a: A): Unit = p.success(a)
override def onFailure(err: Throwable): Unit = p.failure(err)
}
Futures.addCallback(guavaFut, callback)
p.future
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.cassandra.scaladsl

import akka.Done
import akka.stream.scaladsl.{ Flow, Keep, Sink }
import com.datastax.driver.core.{ BoundStatement, PreparedStatement, Session }

import scala.concurrent.{ ExecutionContext, Future }

import akka.stream.alpakka.cassandra.cassandra._

object CassandraSink {
def apply[T](parallelism: Int, statement: PreparedStatement, statementBinder: (T, PreparedStatement) => BoundStatement)(implicit session: Session, ex: ExecutionContext): Sink[T, Future[Done]] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some hints for the javadsl

  • statementBinder: BiFunction[T, PreparedStatement, BoundStatement]
  • delegate to scaladsl.CassandraSink.apply
  • mapMaterializedValue and scala.compat.java8.FutureConverters._ to convert the Future to CompletionStage
  • Sink.asJava to convert to akka.stream.javadsl.Sink

Flow[T].mapAsyncUnordered(parallelism)(t ⇒
session.executeAsync(statementBinder(t, statement)).asScala()).toMat(Sink.ignore)(Keep.right)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
*/
package akka.stream.alpakka.cassandra.scaladsl

import akka.NotUsed
import akka.{ Done, NotUsed }
import akka.stream.alpakka.cassandra.CassandraSourceStage
import akka.stream.scaladsl.Source
import com.datastax.driver.core.{ Row, Session, Statement }
import akka.stream.scaladsl.{ Sink, Source }
import com.datastax.driver.core._

import scala.concurrent.Future

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ package akka.stream.alpakka.cassandra.scaladsl

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.{ Sink, Source }
import com.datastax.driver.core.{ Cluster, SimpleStatement }
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures

import scala.concurrent._
import scala.concurrent.duration._

import scala.collection.JavaConverters._

/**
* All the tests must be run with a local Cassandra running on default port 9042.
*/
Expand Down Expand Up @@ -114,6 +116,19 @@ class CassandraSourceSpec extends WordSpec with ScalaFutures with BeforeAndAfter
rows mustBe empty
}

}
"sink should write to the table" in {
import system.dispatcher

val source = Source(0 to 10)
val sink = CassandraSink[Integer](parallelism = 2, session.prepare("INSERT INTO akka_stream_scala_test.test(id) VALUES (?)"), (t, p) => p.bind(t))

val result = source.map(i => i: Integer).runWith(sink)

result.futureValue

val found = session.execute("select id from akka_stream_scala_test.test").all().asScala.map(_.getInt("id"))

found.toSet mustBe (0 to 10).toSet
}
}
}