Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jms per message destination #1181

Merged
merged 12 commits into from
Sep 10, 2018

Conversation

andreas-schroeder
Copy link
Contributor

Extends the Jms message model with messages having destinations, and implement a flow (directedMessageFlow) that consumes messages with destinations only.

- introduced directed message types.
- introduced toQueue, toTopic, to, and withoutDestination methods to
  switch between directed and regular messages.
- remove parens of properties() member in JmsMessage.
- added missing type annotations, removed unnecessary parens.
- improved scaladocs of message types.
- Divided JmsConnector trait into producer and consumer connector traits
  to simplify logic of each connector
- Divided consumer and produce sessions for the same reason
- Destination objects are cached in a memory-aware way
- Added tests and documentation of new flow
@andreas-schroeder
Copy link
Contributor Author

There are multiple ways of implementing the interface, however, and I'm not 100% sure the solution I propose here is the best one.

There are two ways per-message destinations can be offered at the API level:

  • force the user to use separate message types (directed messages) and use those in the flow (this is this proposal).
  • force the user to provide a default destination and extend regular messages with an optional destination (that overrides the default if set). This would mean that a default destination must always be provided.

While I provide the first here, the latter now seems to be more lightweight, so I'm slightly in favor of reducing the message model options back to what it initially was.

- As the producer is passed from one thread to the next only when the
  first one has completed its send() operation, it is sufficient to
  make writes visible across threads. No real synchronization is
  necessary.
- Along the way, harmonized javax.jms imports in JmsConnector to
  consistently use the jmx. prefix.
Session creation was asynchronous for consumers, but all sessions were
created in one thread. Producer sessions were created synchronously.
To reduce latency, session creation is fanned out to multiple threads.
- Added a comment to explain why the stage might pull when a new session
  is added
- Removed unnecessary check from in.onPush: we know a producer is
  available, since we only pull if the producer pool is non-empty
@ennru ennru added the p:jms label Sep 3, 2018
@ennru
Copy link
Member

ennru commented Sep 3, 2018

Yes, I agree that an optional destination override per message would make this functionality much smaller.

@andreas-schroeder
Copy link
Contributor Author

Ok, I will boil this proposal down to a simpler model.

- Introduce optional Direction field on JmsMessage
- Fallback to default destination if no direction is defined on message

Fixes akka#1166
@andreas-schroeder
Copy link
Contributor Author

If there's something wrong with this PR that causes delay in merging, please let me know :)

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this implementation is much more reasonable - sorry for sending you down the rabbit hole...

Fits nicely without breaking any existing API, great job.


private def onConnection = getAsyncCallback[Connection] { c =>
private val onConnection: AsyncCallback[jms.Connection] = getAsyncCallback[jms.Connection] { c =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for cleaning these up.

future
val sessions: Seq[Future[S]] = openSessions()
sessions.foreach(_.foreach(onSession.invoke))
Future.sequence(sessions).failed.foreach(fail.invoke)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could fail before invoking onSession.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, like this?

    val allSessions = Future.sequence(sessions)
    allSessions.failed.foreach(fail.invoke)
    // wait for all sessions to successfully initialize before invoking the onSession callback.
    // reduces flakiness (start, consume, then crash) at the cost of increased latency of startup.
    allSessions.foreach(_.foreach(onSession.invoke))

import akka.stream.stage.{AsyncCallback, GraphStageLogic}
import akka.stream.{ActorAttributes, ActorMaterializer, Attributes}
import javax.jms
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to keep the jms prefix to distinguish things.

yield
Future {
val session = connection.createSession(false, AcknowledgeMode.AutoAcknowledge.mode)
new JmsProducerSession(connection, session, createDestination(session))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this in a private createSession() method to align with the consumer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure thing 👍

val session: jms.Session,
val jmsDestination: jms.Destination,
val settingsDestination: Destination) {
private[jms] trait JmsSession {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it sealed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -15,8 +15,10 @@ object JmsProducer {
/**
* Scala API: Creates an [[JmsProducer]] for [[JmsMessage]]s
*/
def flow[T <: JmsMessage](settings: JmsProducerSettings): Flow[T, T, NotUsed] =
def flow[T <: JmsMessage](settings: JmsProducerSettings): Flow[T, T, NotUsed] = {
require(settings.destination.isDefined, "Producer destination must be defined in regular producer flow")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to check early on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also thinking of introducing an intermediate JmsIncompleteProducerSettings type that gets the destination methods toQueue, toTopic etc, and that those methods yield a JmsProducerSettings instance - similar to the JmsMessage and JmsDirectedMessage types that were previously in this PR. Would break compatibility, but maybe something worthwhile for the 1.0 release?

case Some(value) => value
case None =>
purgeCache() // facing a garbage collected soft reference, purge other entries.
update(key, default, lfence)
Copy link
Member

@jrudolph jrudolph Sep 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it still be possible to access the mutable.HashMap concurrently? This is not safe (and will lead to spinning threads because of broken internal invariants of the mutable HashMap).

The most simple solution would be to use regular synchronization if there is no need for utmost performance. Otherwise, the next best solution would be to use an immutable HashMap inside of an AtomicReference and the CAS pattern at least for writing to the cache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, let's keep things simple 👍when re-thinking what's the whole flow, I think it might be possible to actually piggy-back synchronization on the futureCB

private val futureCB = getAsyncCallback[Holder[A]](
, and then all this delicate volatile-based memory fences would be a red herring anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you think you can simplify it with that solution, please go ahead in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ennru I decided to go for a simple lock here:

def lookup(key: K, default: => V): V = lock.synchronized {
since there will be no concurrent access to the cache (only sequential by different threads), the cost of sychronized should be low. I'm not 100% sure about the synchronization guarantees of async callbacks, is there some docs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I saw that before I LGTMed, but didn't see your comment before today...

The async callbacks are guaranteed to be allowed to access everything in the stage logic. See Using asynchronous side-channels

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getting that one test right was an interesting challenge, so here you go :) 050b3af

- reduced flakiness of session initialization
- aligned session creation between producer and consumer
- made JmsSession sealed
- simplified synchronization in SoftReferenceCache
Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Provided a test in SoftReferenceCacheSpec giving some evidence
(which of course is no proof) that synchronization is not needed for how
the cache is intended to be used.

SoftReferenceCache synchronization can piggyback on Akka's async
callback synchronization guarantess that is ultimately relying on
AtomicReference operations.
@ennru ennru merged commit 50c929f into akka:master Sep 10, 2018
@ennru
Copy link
Member

ennru commented Sep 10, 2018

Thank you for another great bit of functionality in the JMS connector!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants