Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra sink #56

Merged
merged 13 commits into from
Nov 30, 2016
Merged

Cassandra sink #56

merged 13 commits into from
Nov 30, 2016

Conversation

stephennancekivell
Copy link
Contributor

Hey, should we provide a Cassandra sink like this.

It seems pretty simple, but I think this is what a user would want.

What do you think ?

+def apply[T](parallelism: Int, statement: PreparedStatement, statementBinder: (T, PreparedStatement) => BoundStatement)(implicit session: Session, ex: ExecutionContext): Sink[T, Future[Done]] =
+    Flow[T].mapAsyncUnordered(parallelism)(t ⇒
+      session.executeAsync(statementBinder(t, statement)).toFuture()
+    ).toMat(Sink.ignore)(Keep.right)
+}

@stephennancekivell stephennancekivell mentioned this pull request Nov 14, 2016
Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this. Please add javadsl also.

object GuavaFutureOptsImplicits {
implicit def toGuavaFutureOpts[A](guavaFut: ListenableFuture[A]): GuavaFutureOpts[A] =
new GuavaFutureOpts[A](guavaFut)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import com.datastax.driver.core.{ Cluster, SimpleStatement }
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid this EC, also in tests, unless there is a good reason. Use import system.dispatcher inside the test instead.

import com.datastax.driver.core.{ Cluster, SimpleStatement }
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

import scala.collection.convert.decorateAsScala._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's the same but I prefer scala.collection.JavaConverters._

@stephennancekivell
Copy link
Contributor Author

Hi @patriknw, I've implemented your feedback, thanks for that.

Im not sure how to do the java dsl at the moment and it might be a few weeks before I have time to look at it. I agree it will be good for this project.

So, I dont know if you want to merge this in without java or not. Up to you.

Cheers,

import akka.stream.alpakka.cassandra.cassandra._

object CassandraSink {
def apply[T](parallelism: Int, statement: PreparedStatement, statementBinder: (T, PreparedStatement) => BoundStatement)(implicit session: Session, ex: ExecutionContext): Sink[T, Future[Done]] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some hints for the javadsl

  • statementBinder: BiFunction[T, PreparedStatement, BoundStatement]
  • delegate to scaladsl.CassandraSink.apply
  • mapMaterializedValue and scala.compat.java8.FutureConverters._ to convert the Future to CompletionStage
  • Sink.asJava to convert to akka.stream.javadsl.Sink

@patriknw
Copy link
Member

@stephennancekivell We would like to see it completed with javadsl and some small usage example in the docs before merging.

@stephennancekivell
Copy link
Contributor Author

Awesome, Thanks for the tips. Will do.

On Thu, 17 Nov 2016, 06:47 Patrik Nordwall notifications@github.com wrote:

@stephennancekivell https://github.com/stephennancekivell We would like
to see it completed with javadsl and some small usage example in the docs
https://github.com/akka/alpakka/blob/master/docs/src/main/paradox/cassandra.md
before merging.


You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub
#56 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAsX5W0QGxKy0HdZfEscYAO63UCdMPcZks5q-0G8gaJpZM4KxJ_H
.

@stephennancekivell
Copy link
Contributor Author

Hi @patriknw I've completed the Java DSL and the logging. Do you or anyone else have any feedback?

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, some minor nitpicks though

session: Session): Sink[T, CompletionStage[Done]] = {
val sink = ScalaCSink.apply[T](parallelism, statement, (t, p) => statementBinder.apply(t, p))(
session,
scala.concurrent.ExecutionContext.global)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take the EC as a parameter to create instead, just like the Scala api does (if done like this the user has no control).


import scala.concurrent.{ Future, Promise }

package object cassandra {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[cassandra]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Johan, where do you want me to put that. I cant get it to work. Its a compiler error on the object and cant be used on the implicit class. ???

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you cannot put access restrictions on the package object itself, so then it needs to go on the implicit decorator, if kept public the risk is that library consumers start using it and then it cannot be moved/changed in the future, and this library is not here to provide guice-conversions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Thanks.

I had two issues,

  • I had to change the object name from cassandara, I think the private[cassandra] declaration was referencing the object instead of the package, which stopped it working.
  • Then I had to add a implicit def to convert to the opts. It seems like the implicit class wasnt getting picked up with the private.

I that right,

import scala.concurrent.{ Future, Promise }

package object cassandra {
implicit class GuavaFutureOpts[A](val guavaFut: ListenableFuture[A]) extends AnyVal {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?

import scala.util.{ Failure, Success, Try }

import akka.stream.alpakka.cassandra.cassandra._

class CassandraSourceStage(futStmt: Future[Statement], session: Session) extends GraphStage[SourceShape[Row]] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final class (please boyscout this for us)

@@ -7,8 +7,8 @@ import com.google.common.util.concurrent.{ FutureCallback, Futures, ListenableFu

import scala.concurrent.{ Future, Promise }

package object cassandra {
implicit final class GuavaFutureOpts[A](val guavaFut: ListenableFuture[A]) extends AnyVal {
package object GuavaFutureOpts {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this name it is no longer a package object, just a normal object, so remove that and make it package private for cassandra with private[cassandra] which will cover the class inside it as well. You will have to import the implicit class into the scope where it is used though, so that will be an import GuavaFutureOpts._ in the classes you call asScala

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, thanks. However i still need the implicit def the implicit class alone doesnt seem to get picked up...

@@ -20,6 +20,6 @@ package object GuavaFutureOpts {
}
}

private[cassandra] implicit def toGuavaFutureOpts[A](guavaFut: ListenableFuture[A]): GuavaFutureOpts[A] =
implicit def toGuavaFutureOpts[A](guavaFut: ListenableFuture[A]): GuavaFutureOpts[A] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you make the class implicit you do not need this either

@stephennancekivell
Copy link
Contributor Author

stephennancekivell commented Nov 22, 2016

@johanandren I've just pushed a commit with the changes like you suggested. However now Its a compiler error. So what do I do here. It seemed ok to me with the def. Is there a better way.

https://travis-ci.org/akka/alpakka/jobs/177938167#L773

Note the previous build failed, because of Amqp TimeoutException.

@johanandren
Copy link
Member

If you look at the docs for implicit classes (http://docs.scala-lang.org/overviews/core/implicit-classes.html) you can see that an implicit class cannot have the same name as something else that is already in scope, therefore GuavaFutureOpts inside the object GuavaFutureOpts which is in the same package as where it is used will not work.

Rename the object or the implicit class and that will solve the problem.

Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@stephennancekivell
Copy link
Contributor Author

Thanks @patriknw

Thanks @johanandren I hadn't come across that rule before.

@stephennancekivell
Copy link
Contributor Author

hey @patriknw @johanandren,

Can this be merged in ?

Thanks

@patriknw patriknw merged commit 83dcecb into akka:master Nov 30, 2016
@patriknw
Copy link
Member

Refs #57

@patriknw
Copy link
Member

Excellent contribution @stephennancekivell !

@stephennancekivell stephennancekivell deleted the cassandra-sink branch December 2, 2016 08:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants