-
Notifications
You must be signed in to change notification settings - Fork 645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cassandra: rewrite on top of Java driver 4.5.0 #2182
Conversation
84013de
to
224975d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice and complete and implementation. I left a few suggestions and questions.
* The `init` hook is called before the underlying session is used by other methods, | ||
* so it can be used for things like creating the keyspace and tables. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea. This would be helpful in some of the other connectors, like JMS.
* A flow writing to Cassandra for every stream element, passing context along. | ||
* The element and context are emitted unchanged. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be worth mentioning that the context in this case is the element being persisted.
I like this use of withContext
. Is there any reason why we can't always use it instead of providing non-context factory method too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be worth mentioning that the context in this case is the element being persisted.
I'm not sure what you mean. The context can be anything.
Is there any reason why we can't always use it instead of providing non-context factory method too?
That would require to have tuples as elements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean. The context can be anything.
It wasn't clear to me immediately if the context would be the input element, or some other thing (i.e. Cassandra persistence metadata maybe?)
That would require to have tuples as elements.
I was suggesting only providing a flow as a FlowWithContext
. If users wanted a normal flow they could drop down to it with asSource
. Maybe that's too much though.
cassandra/src/main/scala/akka/stream/alpakka/cassandra/AkkaDiscoverySessionProvider.scala
Outdated
Show resolved
Hide resolved
cassandra/src/main/scala/akka/stream/alpakka/cassandra/AkkaDiscoverySessionProvider.scala
Outdated
Show resolved
Hide resolved
def connect(system: ActorSystem, config: Config)(implicit ec: ExecutionContext): Future[CqlSession] = { | ||
readNodes(config)(system, ec).flatMap { contactPoints => | ||
val driverConfigWithContactPoints = ConfigFactory.parseString(s""" | ||
basic.contact-points = [${contactPoints.mkString("\"", "\", \"", "\"")}] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This threw me through a bit of a loop.
basic.contact-points = [${contactPoints.mkString("\"", "\", \"", "\"")}] | |
basic.contact-points = [${contactPoints.mkString(start = "\"", sep = "\", \"", end = "\"")}] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What kind of loop? I find the compact notation better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A cognitive loop ;) I couldn't figure out if I was looking at one string or 3 until I made the params explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I have reviewed and contributed in APC
Replace the Alpakka Cassandra connector with a rewrite based on the Datastax Java Driver 4.6.0.
This new implementation is the basis for Cassandra connectivity in Akka Persistence Cassandra 1.0.
This work has been prepared in the Akka Persistence Cassandra repository.
This implementation is compatible with Akka 2.5 and 2.6, and published for Scala versions 2.12 and 2.13.
References #1213, #1647