Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra sink #56

Merged
merged 13 commits into from
Nov 30, 2016
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ package akka.stream.alpakka.cassandra
import akka.stream._
import akka.stream.stage.{ AsyncCallback, GraphStage, GraphStageLogic, OutHandler }
import com.datastax.driver.core.{ ResultSet, Row, Session, Statement }
import com.google.common.util.concurrent.{ FutureCallback, Futures, ListenableFuture }

import scala.concurrent.{ Future, Promise }
import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }

import akka.stream.alpakka.cassandra.cassandra._

class CassandraSourceStage(futStmt: Future[Statement], session: Session) extends GraphStage[SourceShape[Row]] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final class (please boyscout this for us)

val out: Outlet[Row] = Outlet("CassandraSource.out")
override val shape: SourceShape[Row] = SourceShape(out)
Expand All @@ -25,7 +26,7 @@ class CassandraSourceStage(futStmt: Future[Statement], session: Session) extends

futFetchedCallback = getAsyncCallback[Try[ResultSet]](tryPushAfterFetch)

val futRs = futStmt.flatMap(stmt => guavaFutToScalaFut(session.executeAsync(stmt)))
val futRs = futStmt.flatMap(stmt => session.executeAsync(stmt).asScala())
futRs.onComplete(futFetchedCallback.invoke)
}

Expand All @@ -38,7 +39,7 @@ class CassandraSourceStage(futStmt: Future[Statement], session: Session) extends
case Some(rs) if rs.isExhausted => completeStage()
case Some(rs) =>
// fetch next page
val futRs = guavaFutToScalaFut(rs.fetchMoreResults())
val futRs = rs.fetchMoreResults().asScala()
futRs.onComplete(futFetchedCallback.invoke)
case None => () // doing nothing, waiting for futRs in preStart() to be completed
}
Expand All @@ -59,14 +60,4 @@ class CassandraSourceStage(futStmt: Future[Statement], session: Session) extends
case Failure(failure) => failStage(failure)
}
}

private def guavaFutToScalaFut[A](guavaFut: ListenableFuture[A]): Future[A] = {
val p = Promise[A]()
val callback = new FutureCallback[A] {
override def onSuccess(a: A): Unit = p.success(a)
override def onFailure(err: Throwable): Unit = p.failure(err)
}
Futures.addCallback(guavaFut, callback)
p.future
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.cassandra.javadsl

import java.util.concurrent.CompletionStage
import java.util.function.BiFunction

import akka.Done
import akka.stream.javadsl.Sink
import com.datastax.driver.core.{ BoundStatement, PreparedStatement, Session }
import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSink => ScalaCSink }

import scala.compat.java8.FutureConverters._

object CassandraSink {

def create[T](parallelism: Int,
statement: PreparedStatement,
statementBinder: BiFunction[T, PreparedStatement, BoundStatement],
session: Session): Sink[T, CompletionStage[Done]] = {
val sink = ScalaCSink.apply[T](parallelism, statement, (t, p) => statementBinder.apply(t, p))(
session,
scala.concurrent.ExecutionContext.global)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take the EC as a parameter to create instead, just like the Scala api does (if done like this the user has no control).


sink.mapMaterializedValue(_.toJava).asJava
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.cassandra

import com.google.common.util.concurrent.{ FutureCallback, Futures, ListenableFuture }

import scala.concurrent.{ Future, Promise }

package object cassandra {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[cassandra]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Johan, where do you want me to put that. I cant get it to work. Its a compiler error on the object and cant be used on the implicit class. ???

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you cannot put access restrictions on the package object itself, so then it needs to go on the implicit decorator, if kept public the risk is that library consumers start using it and then it cannot be moved/changed in the future, and this library is not here to provide guice-conversions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Thanks.

I had two issues,

  • I had to change the object name from cassandara, I think the private[cassandra] declaration was referencing the object instead of the package, which stopped it working.
  • Then I had to add a implicit def to convert to the opts. It seems like the implicit class wasnt getting picked up with the private.

I that right,

implicit class GuavaFutureOpts[A](val guavaFut: ListenableFuture[A]) extends AnyVal {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?

def asScala(): Future[A] = {
val p = Promise[A]()
val callback = new FutureCallback[A] {
override def onSuccess(a: A): Unit = p.success(a)
override def onFailure(err: Throwable): Unit = p.failure(err)
}
Futures.addCallback(guavaFut, callback)
p.future
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.cassandra.scaladsl

import akka.Done
import akka.stream.scaladsl.{ Flow, Keep, Sink }
import com.datastax.driver.core.{ BoundStatement, PreparedStatement, Session }

import scala.concurrent.{ ExecutionContext, Future }

import akka.stream.alpakka.cassandra.cassandra._

object CassandraSink {
def apply[T](parallelism: Int,
statement: PreparedStatement,
statementBinder: (T, PreparedStatement) => BoundStatement)(
implicit session: Session,
ex: ExecutionContext): Sink[T, Future[Done]] =
Flow[T]
.mapAsyncUnordered(parallelism)(t ⇒ session.executeAsync(statementBinder(t, statement)).asScala())
.toMat(Sink.ignore)(Keep.right)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
*/
package akka.stream.alpakka.cassandra.scaladsl

import akka.NotUsed
import akka.{ Done, NotUsed }
import akka.stream.alpakka.cassandra.CassandraSourceStage
import akka.stream.scaladsl.Source
import com.datastax.driver.core.{ Row, Session, Statement }
import akka.stream.scaladsl.{ Sink, Source }
import com.datastax.driver.core._

import scala.concurrent.Future

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
*/
package akka.stream.alpakka.cassandra.javadsl;

import akka.Done;
import akka.NotUsed;
import akka.stream.alpakka.cassandra.CassandraSourceStage;
import akka.stream.javadsl.Source;
import com.datastax.driver.core.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -17,15 +22,12 @@
import akka.stream.javadsl.Sink;
import akka.testkit.*;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.Row;

import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -70,9 +72,6 @@ public static void setup() {
session.execute(
"CREATE TABLE IF NOT EXISTS akka_stream_java_test.test (id int PRIMARY KEY);"
);
for (Integer i = 1; i < 103; i++) {
session.execute("INSERT INTO akka_stream_java_test.test(id) VALUES (" + i + ")");
}
}

@AfterClass
Expand All @@ -83,8 +82,17 @@ public static void teardown() {
JavaTestKit.shutdownActorSystem(system);
}

@After
public void cleanUp() {
session.execute("truncate akka_stream_java_test.test");
}

@Test
public void streamStatementResult() throws Exception {
for (Integer i = 1; i < 103; i++) {
session.execute("INSERT INTO akka_stream_java_test.test(id) VALUES (" + i + ")");
}

//#statement
final Statement stmt = new SimpleStatement("SELECT * FROM akka_stream_java_test.test").setFetchSize(20);
//#statement
Expand All @@ -99,4 +107,32 @@ public void streamStatementResult() throws Exception {
rows.toCompletableFuture().get(3, TimeUnit.SECONDS).stream().map(r -> r.getInt("id")).collect(Collectors.toSet()));
}

@Test
public void sinkInputValues() throws Exception {

//#prepared-statement
final PreparedStatement preparedStatement = session.prepare("insert into akka_stream_java_test.test (id) values (?)");
//#prepared-statement

//#statement-binder
BiFunction<Integer, PreparedStatement,BoundStatement> statementBinder = (myInteger, statement) -> {
return statement.bind(myInteger);
};
//#statement-binder

Source<Integer, NotUsed> source = Source.from(IntStream.range(1, 10).boxed().collect(Collectors.toList()));


//#run-sink
final Sink<Integer, CompletionStage<Done>> sink = CassandraSink.create(2, preparedStatement, statementBinder, session);

CompletionStage<Done> result = source.runWith(sink, materializer);
//#run-sink

result.toCompletableFuture().get();

Set<Integer> found = session.execute("select * from akka_stream_java_test.test").all().stream().map(r -> r.getInt("id")).collect(Collectors.toSet());

assertEquals(found, IntStream.range(1, 10).boxed().collect(Collectors.toSet()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ package akka.stream.alpakka.cassandra.scaladsl

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.datastax.driver.core.{ Cluster, SimpleStatement }
import akka.stream.scaladsl.{Sink, Source}
import com.datastax.driver.core.{Cluster, PreparedStatement, SimpleStatement}
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures

import scala.concurrent._
import scala.concurrent.duration._
import scala.collection.JavaConverters._

/**
* All the tests must be run with a local Cassandra running on default port 9042.
Expand Down Expand Up @@ -110,6 +111,30 @@ class CassandraSourceSpec
rows mustBe empty
}

}
"sink should write to the table" in {
import system.dispatcher

val source = Source(0 to 10).map(i => i: Integer)

//#prepared-statement
val preparedStatement = session.prepare("INSERT INTO akka_stream_scala_test.test(id) VALUES (?)")
//#prepared-statement

//#statement-binder
val statementBinder = (myInteger: Integer, statement: PreparedStatement) => statement.bind(myInteger)
//#statement-binder

//#run-sink
val sink = CassandraSink[Integer](parallelism = 2, preparedStatement, statementBinder)

val result = source.runWith(sink)
//#run-sink

result.futureValue

val found = session.execute("select id from akka_stream_scala_test.test").all().asScala.map(_.getInt("id"))

found.toSet mustBe (0 to 10).toSet
}
}
}
28 changes: 28 additions & 0 deletions docs/src/main/paradox/cassandra.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ Java

This is all preparation that we are going to need.

### Source Usage

Let's create a Cassandra statement with a query that we want to execute.

Scala
Expand All @@ -69,6 +71,32 @@ Java

Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build @extref[more advanced stream topologies](akka-docs:scala/stream/stream-introduction).

### Sink Usage

Let's create a Cassandra Prepared statement with a query that we want to execute.

Scala
: @@snip (../../../../cassandra/src/test/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSourceSpec.scala) { #prepared-statement }

Java
: @@snip (../../../../cassandra/src/test/java/akka/stream/alpakka/cassandra/javadsl/CassandraSourceTest.java) { #prepared-statement }

Now lets we need to create a 'statement binder', this is just a function to bind to the prepared statement. It can take in any type / data structure to fit your query values. Here we're just using one Integer, but it can just as easily be a (case) class.

Scala
: @@snip (../../../../cassandra/src/test/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSourceSpec.scala) { #statement-binder }

Java
: @@snip (../../../../cassandra/src/test/java/akka/stream/alpakka/cassandra/javadsl/CassandraSourceTest.java) { #statement-binder }

Finally we run the sink from any source.

Scala
: @@snip (../../../../cassandra/src/test/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSourceSpec.scala) { #run-sink }

Java
: @@snip (../../../../cassandra/src/test/java/akka/stream/alpakka/cassandra/javadsl/CassandraSourceTest.java) { #run-sink }

### Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
Expand Down