Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add STOMP connectors: StompClientSource and StompClientSink #514 #856

Closed
wants to merge 4 commits into from

Conversation

nachinius
Copy link

Add connectors to a STOMP server as a stream client or source.

Ref #514

Before finishing the documentation and test for java, I wish this to be reviewed.

As stated in #514 (comment) here we have
the

  • StompClientSource
  • StompClientSink
    and both support backpropagation

@nachinius nachinius changed the title Add new connectors StompClientSource and StompClientSink #514 Add STOMP connectors: StompClientSource and StompClientSink #514 Mar 21, 2018
Copy link
Contributor

@huntc huntc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! Thanks for the contribution!

val Stomp = Seq(
libraryDependencies ++= Seq(
// https://mvnrepository.com/artifact/io.vertx/vertx-stomp
"io.vertx" % "vertx-stomp" % "3.5.1" // ApacheV2/EPL1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a sense of the dependencies here ie is it bringing in many?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What’s the thread usage of this dependency in terms of the number of threads required?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve implemented STOMP before - the protocol is easy. Should we even need this dependency?

Copy link
Author

@nachinius nachinius Mar 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comments @huntc ,

Do we have a sense of the dependencies here ie is it bringing in many?

It brings stuff from vertx like vertx-core

What’s the thread usage of this dependency in terms of the number of threads required?

How would you check it?

I’ve implemented STOMP before - the protocol is easy. Should we even need this dependency?

Good! Congrats! is it in scala or java? will you open source it?
This alpakka connector basically sits on top of the particular stomp protocol. Our main objective is able to expose Stomp streams. Nevertheless, we could implement the stomp protocol in following versions of this connector to drop the depencency on vertx-stomp. The advantage of vertx-stomp is that is complete, and can handle out of the box several versions of stomp and all features.

In short: eventually we don't need this dependency, but for the moment works.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no code for you unfortunately.

Profiling this would be great to get a sense of resource usage along with thread usage.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Profiling this is not in my tool belt yet. For the sake of awareness, could you point me in the direction on where to start (blog, book, etc)? thks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps start with VisualVM.

* Connects to a STOMP server upon materialization and sends incoming messages to the server.
* Each materialized sink will create one connection to the server.
*/
final class SinkStage(settings: ConnectorSettings)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a min, we should have a Source and Flow. Sink is a convenience on Flow. Please see the guidelines for more info.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flows should also permit an opaque element to be carried through them. I think this is also in the guidelines.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huntc we do have a Source in SourceStage.scala.
Regarding the Flow, at this stage, it's not clear to me what do you mean. Could you point me the place in the guidelines that talk about it, since I'm unable to find it. Furthermore, and an example of a flow with the opaque element. Neither I was able to find it in the guidelines. Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I had read those. I can't find what you mention. Nevertheless, the basic blocks to construct anything, AFAIU is a Source and a Sink. Later we could add flows, that connects those sources and sink, depending in the most common use cases.

Anyways, what kind of Flow are you imaging? A flow in a stream, that has a side effect of sending the whatever is being passed to a stomp server? That is what you meant by opaque element?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully, these links will help:

https://github.com/akka/alpakka/blob/master/contributor-advice.md#public-factory-methods
https://github.com/akka/alpakka/blob/master/contributor-advice.md#flows

You generally construct sinks from flows:

https://github.com/akka/alpakka/blob/master/contributor-advice.md#keep-the-code-dry

Some examples, also illustrating the passing through of an element:

  /**
    * Append a command request to a queue
    * @tparam A The type of data to carry through
    * @return a flow that takes a command and returns with an acknowledgement
    */
  def flow[A]: Flow[CommandRequest[A], CommandReply[A], NotUsed]

  /**
    * Append a command request to a queue via a sink for convenience
    * @tparam A The type of data to carry through
    * @return a sink that takes a command and returns with an acknowledgement
    */
  def sink[A]: Sink[CommandRequest[A], Future[Done]] =
    flow.toMat(Sink.ignore)(Keep.right)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see what you say. For me, the example is key. I had already read the guidelines links a month ago and didn't get this idea. The example is crystal, can it be added to the contributor-advice? (is there any copyright restriction on the example you wrote?)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also see the approach you propose is cleaner and more powerful. I would love to have hear this when I outlined the shape and plan for this contribution on feb 26th ! A month ago! (#514 (comment))

For the moment I'm reluctant to switch approaches. Since this works for several use cases that I find useful.

@ennru
Copy link
Member

ennru commented Mar 22, 2018

Thank you for this effort.
Just as @huntc I'm a bit reluctant to pull in Vert.x as a dependency to implement the STOMP protocol.

@ennru
Copy link
Member

ennru commented Apr 25, 2018

As Vert.x supports the Reactive Streams specification just as Akka Streams do, we should be able to connect the two and make use of connectors implemented for Vert.x.
I'll give it a try and hopefully get things together for a blog post about it.

@ennru
Copy link
Member

ennru commented May 10, 2018

I learned a bit more about vert.x and understand now that only a few data structures support Reactive Streams, not vert.x as such. This makes integrating with their connectors harder than I hoped it to be.
What we'd need is to connect to the vert.x event bus to a bi-directional flow. I'll give it some more time in a while.

@huntc
Copy link
Contributor

huntc commented May 10, 2018

...or just avoid vertx for stomp?

@huntc
Copy link
Contributor

huntc commented Nov 19, 2018

Any further thoughts on this PR @ennru ?

@ennru
Copy link
Member

ennru commented Nov 20, 2018

Not really. While it would be great to have support for STOMP, we don't want to pull in Vert.x to accomplish it. The best would be to find a more (thread-)lightweight client for STOMP.

@ennru
Copy link
Member

ennru commented Jan 11, 2019

Sorry, @nachinius if we didn't communicate well enough before you put the effort in implementing a STOMP connector based on Vertx. As it seems they have a good implementation of the protocol itself and there are few other implementations available so it would be great to re-use that part.
As said earlier, I don't like the idea to pull in a lot of Vertx and run the Vertx event loop threads within an Akka Streams connector.

Closing this now.

@ennru ennru closed this Jan 11, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants