Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra sink #56

Merged
merged 13 commits into from
Nov 30, 2016
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ package akka.stream.alpakka.cassandra
import akka.stream._
import akka.stream.stage.{ AsyncCallback, GraphStage, GraphStageLogic, OutHandler }
import com.datastax.driver.core.{ ResultSet, Row, Session, Statement }
import com.google.common.util.concurrent.{ FutureCallback, Futures, ListenableFuture }

import scala.concurrent.{ Future, Promise }
import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }

class CassandraSourceStage(futStmt: Future[Statement], session: Session) extends GraphStage[SourceShape[Row]] {
import akka.stream.alpakka.cassandra.GuavaFutureOpts._

final class CassandraSourceStage(futStmt: Future[Statement], session: Session) extends GraphStage[SourceShape[Row]] {
val out: Outlet[Row] = Outlet("CassandraSource.out")
override val shape: SourceShape[Row] = SourceShape(out)

Expand All @@ -25,7 +26,7 @@ class CassandraSourceStage(futStmt: Future[Statement], session: Session) extends

futFetchedCallback = getAsyncCallback[Try[ResultSet]](tryPushAfterFetch)

val futRs = futStmt.flatMap(stmt => guavaFutToScalaFut(session.executeAsync(stmt)))
val futRs = futStmt.flatMap(stmt => session.executeAsync(stmt).asScala())
futRs.onComplete(futFetchedCallback.invoke)
}

Expand All @@ -38,7 +39,7 @@ class CassandraSourceStage(futStmt: Future[Statement], session: Session) extends
case Some(rs) if rs.isExhausted => completeStage()
case Some(rs) =>
// fetch next page
val futRs = guavaFutToScalaFut(rs.fetchMoreResults())
val futRs = rs.fetchMoreResults().asScala()
futRs.onComplete(futFetchedCallback.invoke)
case None => () // doing nothing, waiting for futRs in preStart() to be completed
}
Expand All @@ -59,14 +60,4 @@ class CassandraSourceStage(futStmt: Future[Statement], session: Session) extends
case Failure(failure) => failStage(failure)
}
}

private def guavaFutToScalaFut[A](guavaFut: ListenableFuture[A]): Future[A] = {
val p = Promise[A]()
val callback = new FutureCallback[A] {
override def onSuccess(a: A): Unit = p.success(a)
override def onFailure(err: Throwable): Unit = p.failure(err)
}
Futures.addCallback(guavaFut, callback)
p.future
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.cassandra

import com.google.common.util.concurrent.{ FutureCallback, Futures, ListenableFuture }

import scala.concurrent.{ Future, Promise }

private[cassandra] object GuavaFutureOpts {
final class GuavaFutureOpts[A](val guavaFut: ListenableFuture[A]) extends AnyVal {
def asScala(): Future[A] = {
val p = Promise[A]()
val callback = new FutureCallback[A] {
override def onSuccess(a: A): Unit = p.success(a)
override def onFailure(err: Throwable): Unit = p.failure(err)
}
Futures.addCallback(guavaFut, callback)
p.future
}
}

implicit def toGuavaFutureOpts[A](guavaFut: ListenableFuture[A]): GuavaFutureOpts[A] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you make the class implicit you do not need this either

new GuavaFutureOpts(guavaFut)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.cassandra.javadsl

import java.util.concurrent.CompletionStage
import java.util.function.BiFunction

import akka.Done
import akka.stream.javadsl.Sink
import com.datastax.driver.core.{ BoundStatement, PreparedStatement, Session }
import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSink => ScalaCSink }

import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContext

object CassandraSink {

def create[T](parallelism: Int,
statement: PreparedStatement,
statementBinder: BiFunction[T, PreparedStatement, BoundStatement],
session: Session,
executionContext: ExecutionContext): Sink[T, CompletionStage[Done]] = {
val sink =
ScalaCSink.apply[T](parallelism, statement, (t, p) => statementBinder.apply(t, p))(session, executionContext)

sink.mapMaterializedValue(_.toJava).asJava
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.cassandra.scaladsl

import akka.Done
import akka.stream.scaladsl.{ Flow, Keep, Sink }
import com.datastax.driver.core.{ BoundStatement, PreparedStatement, Session }

import scala.concurrent.{ ExecutionContext, Future }

import akka.stream.alpakka.cassandra.GuavaFutureOpts._

object CassandraSink {
def apply[T](parallelism: Int,
statement: PreparedStatement,
statementBinder: (T, PreparedStatement) => BoundStatement)(
implicit session: Session,
ex: ExecutionContext): Sink[T, Future[Done]] =
Flow[T]
.mapAsyncUnordered(parallelism)(t ⇒ session.executeAsync(statementBinder(t, statement)).asScala())
.toMat(Sink.ignore)(Keep.right)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
*/
package akka.stream.alpakka.cassandra.scaladsl

import akka.NotUsed
import akka.{ Done, NotUsed }
import akka.stream.alpakka.cassandra.CassandraSourceStage
import akka.stream.scaladsl.Source
import com.datastax.driver.core.{ Row, Session, Statement }
import akka.stream.scaladsl.{ Sink, Source }
import com.datastax.driver.core._

import scala.concurrent.Future

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
*/
package akka.stream.alpakka.cassandra.javadsl;

import akka.Done;
import akka.NotUsed;
import akka.stream.alpakka.cassandra.CassandraSourceStage;
import akka.stream.javadsl.Source;
import com.datastax.driver.core.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -17,15 +22,12 @@
import akka.stream.javadsl.Sink;
import akka.testkit.*;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.Row;

import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -70,9 +72,6 @@ public static void setup() {
session.execute(
"CREATE TABLE IF NOT EXISTS akka_stream_java_test.test (id int PRIMARY KEY);"
);
for (Integer i = 1; i < 103; i++) {
session.execute("INSERT INTO akka_stream_java_test.test(id) VALUES (" + i + ")");
}
}

@AfterClass
Expand All @@ -83,8 +82,17 @@ public static void teardown() {
JavaTestKit.shutdownActorSystem(system);
}

@After
public void cleanUp() {
session.execute("truncate akka_stream_java_test.test");
}

@Test
public void streamStatementResult() throws Exception {
for (Integer i = 1; i < 103; i++) {
session.execute("INSERT INTO akka_stream_java_test.test(id) VALUES (" + i + ")");
}

//#statement
final Statement stmt = new SimpleStatement("SELECT * FROM akka_stream_java_test.test").setFetchSize(20);
//#statement
Expand All @@ -99,4 +107,32 @@ public void streamStatementResult() throws Exception {
rows.toCompletableFuture().get(3, TimeUnit.SECONDS).stream().map(r -> r.getInt("id")).collect(Collectors.toSet()));
}

@Test
public void sinkInputValues() throws Exception {

//#prepared-statement
final PreparedStatement preparedStatement = session.prepare("insert into akka_stream_java_test.test (id) values (?)");
//#prepared-statement

//#statement-binder
BiFunction<Integer, PreparedStatement,BoundStatement> statementBinder = (myInteger, statement) -> {
return statement.bind(myInteger);
};
//#statement-binder

Source<Integer, NotUsed> source = Source.from(IntStream.range(1, 10).boxed().collect(Collectors.toList()));


//#run-sink
final Sink<Integer, CompletionStage<Done>> sink = CassandraSink.create(2, preparedStatement, statementBinder, session, system.dispatcher());

CompletionStage<Done> result = source.runWith(sink, materializer);
//#run-sink

result.toCompletableFuture().get();

Set<Integer> found = session.execute("select * from akka_stream_java_test.test").all().stream().map(r -> r.getInt("id")).collect(Collectors.toSet());

assertEquals(found, IntStream.range(1, 10).boxed().collect(Collectors.toSet()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ package akka.stream.alpakka.cassandra.scaladsl

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.datastax.driver.core.{ Cluster, SimpleStatement }
import akka.stream.scaladsl.{ Sink, Source }
import com.datastax.driver.core.{ Cluster, PreparedStatement, SimpleStatement }
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures

import scala.concurrent._
import scala.concurrent.duration._
import scala.collection.JavaConverters._

/**
* All the tests must be run with a local Cassandra running on default port 9042.
Expand Down Expand Up @@ -110,6 +111,30 @@ class CassandraSourceSpec
rows mustBe empty
}

}
"sink should write to the table" in {
import system.dispatcher

val source = Source(0 to 10).map(i => i: Integer)

//#prepared-statement
val preparedStatement = session.prepare("INSERT INTO akka_stream_scala_test.test(id) VALUES (?)")
//#prepared-statement

//#statement-binder
val statementBinder = (myInteger: Integer, statement: PreparedStatement) => statement.bind(myInteger)
//#statement-binder

//#run-sink
val sink = CassandraSink[Integer](parallelism = 2, preparedStatement, statementBinder)

val result = source.runWith(sink)
//#run-sink

result.futureValue

val found = session.execute("select id from akka_stream_scala_test.test").all().asScala.map(_.getInt("id"))

found.toSet mustBe (0 to 10).toSet
}
}
}
28 changes: 28 additions & 0 deletions docs/src/main/paradox/cassandra.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ Java

This is all preparation that we are going to need.

### Source Usage

Let's create a Cassandra statement with a query that we want to execute.

Scala
Expand All @@ -69,6 +71,32 @@ Java

Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build @extref[more advanced stream topologies](akka-docs:scala/stream/stream-introduction).

### Sink Usage

Let's create a Cassandra Prepared statement with a query that we want to execute.

Scala
: @@snip (../../../../cassandra/src/test/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSourceSpec.scala) { #prepared-statement }

Java
: @@snip (../../../../cassandra/src/test/java/akka/stream/alpakka/cassandra/javadsl/CassandraSourceTest.java) { #prepared-statement }

Now lets we need to create a 'statement binder', this is just a function to bind to the prepared statement. It can take in any type / data structure to fit your query values. Here we're just using one Integer, but it can just as easily be a (case) class.

Scala
: @@snip (../../../../cassandra/src/test/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSourceSpec.scala) { #statement-binder }

Java
: @@snip (../../../../cassandra/src/test/java/akka/stream/alpakka/cassandra/javadsl/CassandraSourceTest.java) { #statement-binder }

Finally we run the sink from any source.

Scala
: @@snip (../../../../cassandra/src/test/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSourceSpec.scala) { #run-sink }

Java
: @@snip (../../../../cassandra/src/test/java/akka/stream/alpakka/cassandra/javadsl/CassandraSourceTest.java) { #run-sink }

### Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
Expand Down