Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discussion: package pubsub #295

Closed
peterbourgon opened this issue Jun 15, 2016 · 37 comments
Closed

Discussion: package pubsub #295

peterbourgon opened this issue Jun 15, 2016 · 37 comments
Labels

Comments

@peterbourgon
Copy link
Member

peterbourgon commented Jun 15, 2016

I'd like to start thinking about a design for package pubsub/msgbus/...

Important use cases:

  • An async transport for existing/typical RPC messaging
  • A foundation for event sourcing
  • A way to manage streaming
  • Others?

Would like support for:

  • Kafka
  • RabbitMQ
  • NATS
  • NSQ
  • AWS SQS
  • GCP PubSub
  • Weave Mesh
  • Others?

Prior art:

Open questions:

  • What are the core abstractions and interfaces?
  • Is streaming significantly different than event sourcing and transport use cases?
  • What is the lowest common denominator for serialization?
  • Others?
@basvanbeek
Copy link
Member

Shall we start a document for this or are you going to edit the above message to keep track of state?

@intellidynamics
Copy link

Peter,

Task-oriented message passing architectures are great for flexibility, wiring together functional blocks or micro-services through subscriptions. These tasks send/route messages of a negotiated format with identified payloads through a mesh of subscriptions, adding value as the information flows and routing onward to downstream subscribers. We’d love to use Go Kit to do this, else we’ll write our own, it’s not too hard really. We’ve been doing this successfully for 16 years. Tasks/services have a collection of subscribers and a subscription object is relatively simple, upon which events are raised containing the transmitted message with its envelope information and payload. Tasks, or a supervisory agent, can place their subscription object into the collection of other objects and begin receiving. They can unsubscribe by removing the subscription object from the message sender’s collection.

I would avoid a “bus” concept (centralized pipe) but use independent synchronous / asynchronous connections between tasks or services.

I would seek to keep the pub/sub as a transport that is independent of what is being exchanged as much as possible. What the message is, it’s meaning, its format is the snarly part, so I’d avoid mingling it into the transport. I think it should support fan-out (multiple subscribers) at a minimum and fan-in is sometimes needed too (use case: synchronizing data sets coming from different sources).

You may also need different pub/sub “channels” (may or may not be in the golang sense) as data may be flowing at extremely high rates through one channel and you’ll want to put “command and control” messages in another channel (Prepare, Execute, Pause, Stop/Halt, Do This, Do That J, etc.).

My 2 cents. Opinions will vary. J

Thanks,

Carl

Carl Cook

President / CTO

IntelliDynamics

http://www.intellidynamics.net/ www.intellidynamics.net


IntelliDynamics is a brand and division of BioComp Systems, Inc.

From: Peter Bourgon [mailto:notifications@github.com]
Sent: Wednesday, June 15, 2016 8:52 AM
To: go-kit/kit
Subject: [go-kit/kit] Discussion: package pubsub (#295)

I'd like to start thinking about a design for package pubsub/msgbus/...

Important use cases:

Would like support for:

  • Kafka
  • RabbitMQ
  • NATS
  • NSQ
  • AWS SQS
  • GCP PubSub
  • Others?

Prior art:

Open questions:

  • What are the core abstractions and interfaces?
  • Is streaming significantly different than event sourcing and transport use cases?
  • What is the lowest common denominator for serialization?
  • Others?


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub #295 , or mute the thread https://github.com/notifications/unsubscribe/AJ2SvFoXjVH4bk_BhOUMjWl0FuRAPtptks5qMAN2gaJpZM4I2YN6 . https://github.com/notifications/beacon/AJ2SvODan_e_8AIiCmI7G9wjOWzPpyQrks5qMAN2gaJpZM4I2YN6.gif

@jmank88
Copy link

jmank88 commented Jun 15, 2016

RFC 7464 - JSON Text Sequences could be a good candidate for the JSON streaming transport. Some of the general streaming protocol concepts discussed here are relevant as well.

@ChrisHines
Copy link
Member

Streaming RPC makes me think of WebSockets and gRPC streaming. Go examples for streaming gRPC are here: http://www.grpc.io/docs/tutorials/basic/go.html

@peterbourgon
Copy link
Member Author

@basvanbeek

Shall we start a document for this or are you going to edit the above message to keep track of state?

I'd like to keep discussion in the issue for now. I'll update the OP as necessary.

@intellidynamics

I would avoid a “bus” concept (centralized pipe) but use independent synchronous / asynchronous connections between tasks or services.

For event sourcing (typical pub/sub) the logical domain language is absolutely in terms of abstract topics, publishers, and consumers. But I think the nature of the beast requires us to manage an actual bus, n.b. connections, state management, etc. Details varying based on implementation.

Otherwise thanks for the great feedback. I don't think we need to rule any of your ideas out at this stage.

@jmank88

RFC 7464 - JSON Text Sequences could be a good candidate for the JSON streaming transport.

Great, thanks for the link, I've added it to Prior Art (above) and will review. In general I'd expect details like JSON to be implemented by user-provided serialization modules; I wouldn't want to enforce anything at that level in Go kit's opinions. It's also reminded me of EventSourcing and server-sent-events, also added to Prior Art.

@ChrisHines

Streaming RPC makes me think of WebSockets and gRPC streaming.

Agreed; I think gRPC streaming is probably the benchmark here, and (at least at the moment) I'd expect any Go kit streaming API to very closely mirror the gRPC model. It's also very close to how Asim modeled things in https://github.com/micro/go-micro.

@peterbourgon
Copy link
Member Author

Starting with pubsub, as it's easiest and we have a lot of prior art. Here's my first draft of a set of interfaces, basically lifted verbatim from basvanbeek/pubsub via gizmo/pubsub.

// Publisher is a minimal interface for publishing messages to a pool of
// subscribers. Publishers are probably (but not necessarily) sending to a
// message bus.
//
// Most paramaterization of the publisher (topology restrictions like a topic,
// exchange, or specific message type; queue or buffer sizes; etc.) should be
// done in the concrete constructor.
type Publisher interface {
    // Publish a single message, described by an io.Reader, to the given key.
    Publish(key string, r io.Reader) error

    // Stop the publisher.
    Stop() error
}

// Subscriber is a minimal interface for subscribing to published messages.
// Subscribers are probably (but not necessarily) receiving from a message bus.
//
// Most paramaterization of the subscriber (topology restrictions like a topic,
// exchange, or specific message type; queue or buffer sizes; etc.) should be
// done in the concrete constructor.
type Subscriber interface {
    // Start returns a channel of messages that the caller should consume.
    // Failure to keep up with the incoming messages will have different
    // consequences depending on the concrete implementation of the subscriber.
    //
    // The channel will be closed when the subscriber encounters an error, or
    // when the caller invokes Stop, whichever comes first.
    Start() <-chan Message

    // Err returns the error that was responsible for closing the channel of
    // incoming messages.
    Err() error

    // Stop the subscriber, closing the channel that was returned by Start.
    Stop() error
}

// Message is a minimal interface to describe payloads received by subscribers.
// Clients may type-assert to more concrete types (e.g. pubsub/kafka.Message) to
// get access to more specific behaviors.
type Message interface {
    // Messages implement io.Reader to access the payload data.
    io.Reader

    // Done indicates the client is finished with the message, and the
    // underlying implementation may free its resources. Clients should ensure
    // to call Done for every received message.
    Done() error
}

Questions on my side:

  • Is there a reason Publish can't take an io.Reader?
  • Is there a reason Message can't simply implement io.Reader?
  • For which use case(s) does Publish need its own key parameter?
  • I can imagine a few ways to build an async (RPC) transport on top of this set of interfaces. Before I get too deep, does anyone know of any prior art?

Streaming interfaces to come...

@basvanbeek
Copy link
Member

I can only talk about Kafka as I haven't seen other Go API's

  • yes it could take an io.Reader. The most commonly used Go API for Kafka uses []byte for the message payload and it is what protobuf messages serialize to, so that's why I used it in my pubsub take.
  • Key is not a broker generic item. In Kafka it is used as partitioning key for partitioned topics. I would like to be able to set a partitioning key for each message but it might make no sense for other broker back-ends.
  • Async RPC and Prior Art... often built on top of JMS, AMQP, Stomp and involving some messageID, correlationID and a return path/topic.

@Ayiga
Copy link
Contributor

Ayiga commented Jul 4, 2016

I'm not sure about this current interface for Publisher. The way I've understood pubsub, the entity doing the publishing isn't necessarily sticking around after Publishing something. So I'm uncertain about the need to have a method called Stop on it, because what would it do? Stop everything you have published, stop future publishes? Is there some nuance with a potential implementation that may require this?

I'm going to try and mock this out with rabbitmq real quick to see how this might work. On the whole I'm having trouble visualizing the components that you've laid out, as you've laid them out. However, it's possible that I'm just missing something.

@Ayiga
Copy link
Contributor

Ayiga commented Jul 4, 2016

Ok, I've created an example, with some example programs, just to see if I could adhere to the interface that is laid out as-is. It turns out I can, and I can see the purpose of every piece so far.

If you'd like to see the implementation, for review or reference, you can find it here:
https://github.com/Ayiga/kit/tree/pubsub-rabbitmq/pubsub/rabbitmq

@peterbourgon
Copy link
Member Author

So I'm uncertain about the need to have a method called Stop on it, because what would it do?

Effectively every implementation will require a stateful connection to the broker. Stop will gracefully terminate that connection, including (potentially) any in-flight requests. But, you got it in your example. Nice! Thanks. That's some validation.

@waigani
Copy link

waigani commented Aug 2, 2016

This could potentially be a good base for an implementation: https://github.com/juju/pubsub.

@muratzorer
Copy link

muratzorer commented Aug 11, 2016

I am really new to Go but I have some 2 cents about the pub/sub topic in general. I'd like to mention about a question which should stay in mind:

  • Am I Publishing an Event or am I Sending a Command?

You can link pub/sub thinking with TCP Multicast/Unicast. Multicast as publishing event, Unicast as sending command which publisher knows the destination. Maybe this might have an impact on your designs at some time.

IMHO the very first need of pub/sub to be dependent on a Bus, is to persist the messages on a queue (when there's any IPC error or such) until either the publisher or subscriber service is up and running again with the pending messages.
The second need may be to escalate the management of 'retry/compensation policy per event' into the Bus infrastructure.

@epsstan
Copy link

epsstan commented Aug 15, 2016

In my experience, it's always been useful to model a message as a payload plus metadata in the form of standard/free-form headers/properties. E.g a message id property allows the generic implementation of dupe detection, poison message handling etc. Here are some properties from Java/JMS : https://docs.oracle.com/cd/E19798-01/821-1841/bnces/index.html

@joeblew99
Copy link

I am using go-kit with NATS and it's very nice. It's not very well done so i doubt i can contribute much.

But i was wondering if this has moved on in terms of a pub/sub abstraction being adapted in Nat's ?

All modules communication north south fully. Modules subscribe to each other via NATS to do east / west sharing of events.
Clients compose bit of modules together to make a full client.
This is definitely not the most efficient as often the server does all consolidation so that the client has just a single connection but that would mean another layer of composition / decomposition and i just don't think it's worth it.
All clients hold all the the data as a cache and writes do through to nats.
Hence the client is very fast anyway.

This is a basic cqrs system. It would be impossible to build cleanly with go-kit. Really thankful for this project.

Also above Nat's is all the rpc stuff.
Below Nat's is everything else.
Security, dB, logic, validation.
I am not sure this is very Performant, but it's more future proof i feel be side it's all transport, binding independent down there.

@stevenferrer
Copy link

stevenferrer commented Jul 23, 2017

I've tried using @basvanbeek's pubsub and here's the result. This is patterned from the go-kit examples.

I'm not really sure if I understood everything in the pubsub package. So far, it works fine for me.

@blainsmith
Copy link

I've helped author something similar in Node way back if it helps. https://github.com/oddnetworks/oddcast

@chrisprijic
Copy link

chrisprijic commented Sep 10, 2017

I think the above implementation is pretty solid; however I have one addition I think would be good:

The Message could have an optional Metadata field. I'm partial to a context.Context object or anything else that best fits this idea. This metadata would be passed as an additional item to the Publisher and will be an optional Metadata field on the Messages returned from the Subscriber.

This is useful for tracing and/or having the context of the message; with the current interface, the PubSub implementation would need to provide these values via the io.Reader (extending the actual message being transferred). To me that isn't good practice, and taking into account implementations like these:

  • rabbitmq have headers
  • Google Cloud PubSub and AWS SNS/SQS have message attributes
  • Kafka has record headers
  • I'm sure there's others (?) I'm only familiar with the above.....

Moreover, the more I think about this pubsub stuff in relation to go-kit, I see it more as transport implementation details. The Publisher is just a "client" to this transport, and the Subscriber is just a "server"-like implementation that listens for messages. The rest (endpoints, hooking up to service implementations, etc) are all easy to do with existing go-kit methodology. Using this methodology, the above request (adding context to Message) is now handled by the context.Context object being passed around, and this solves that problem nicely!

Using this, we would pass a Publisher as a dependency to any services publishing messages, and we would have a Subscriber that routes messages to a group of endpoints that process them. We can also translate the messages into the objects that the service endpoints expect, so the pubsub transport takes in encode/decode methods on creation.


That leads me to my most recent idea. An interface similar to the one provided by @peterbourgon above, with a transport wrapper around it, can simplify implementations quite a bit. This thought would require more feedback and discussion in my eyes, but this could be a way to simplify creating transports for pubsub systems, meaning rather than building an entire implementation, you just have to provide the Publisher, Subscriber, and Message interfaces to the transport. This provides the following things:

  1. the publisher and subscriber interfaces as they stand
  2. a transport layer that can take
    • a publisher, with an encode function, and provide and endpoint.Endpoint
    • a subscriber, that works like a server, that takes a router (go-kit can have a DefaultRouter implementation)
      • this router takes SubscriptionHandlers, that are made of a decode function and an endpoint

I'm not sure about the subscription parts -- does go-kit want to enforce some sort of router/handler setup on this? I'm not entirely sure about this. The router implementation could look similar to Gorilla-mux, and could also be an interface that users can create their own implementations for if they see fit. They'd have the Subscriber interface, the DefaultRouter, and could also implement their own Router. These provide different levels of granularity to the user of go-kit. Again, not too sure about this part as I have just started thinking about this.

Everything go-kit has for it now (tracing, middlewares, metrics, etc) all can coincide with this implementation, and it keeps the pubsub implementation in the transport layer.

This might make it easier to introduce pubsub to current service implementations that don't have it, too.

Welcome for feedback and the like; I'm down for building an implementation like this and submitting a PR after I've thought this through a bit more, if there's a liking to this. I'll be looking to provide an implementation for my own use anyways.

@peterbourgon
Copy link
Member Author

rather than building an entire implementation, you just have to provide the Publisher, Subscriber, and Message interfaces to the transport. This provides the following things:

This seems like a reasonable plan of attack. The hard part is making it sufficiently general to be useful across different message busses. My intuition is that it's possible.

does go-kit want to enforce some sort of router/handler setup on this? I'm not entirely sure about this. The router implementation could look similar to Gorilla-mux, and could also be an interface that users can create their own implementations for if they see fit. They'd have the Subscriber interface, the DefaultRouter, and could also implement their own Router. These provide different levels of granularity to the user of go-kit. Again, not too sure about this part as I have just started thinking about this.

With the caveat that I've given this specific idea only superficial analysis, I think the introduction of a Router concept is probably a bad one. The goal here shouldn't be to build a general-purpose pipes-and-connectors framework for anything you can think to wire up. It should be to provide the smallest possible abstraction to get Go-kit-style RPC semantics across a handful of common message busses.

Anyway, +1 to continuing in this vein. Would be happy to review code.

@ghost
Copy link

ghost commented Oct 6, 2017

@peterbourgon

If it can provide introspection, that allows others to build things higher on top.
For example i only do choreography and never orchestration. Learnt my lesson ..
If i can have an API for querying who subscribes to who thats hugely useful.
I have make it so i can it the matrix at design time and runtime.
Design time uses AST, and runtime uses basic heart beating.

I have been using annotations on code and then walking the code with AST to build a pub / sub model. Then i match it with whats happing on NATS, and i can get a picture of all the Choreography

From the Choreography, i can then compute all the potential pipelines as a matrix. Hence i can see the Orchestrations.
This is essentially a generative Design pattern. A developer just subscribes to an event (using Choreography) without ever thinking about the big picture unless they need to.
If i am a BA / OPS ( non dev) and want to see all the Flows i can ask the system for all the orchestration flows. Clicking on one takes me to the events and then clicking and event shows me all the other flows that use this event... Its a graph basically.

I can then see everything going on no mater how big the system gets.

Am i explaining this well enough ???. It took me ages to see the powerful difference between Choreography and Orchestration.

Just wanted to raise this for dicussion about extensibility.
not saying this is the best way but introspection is so productive (as is code gen :) )

@karthikmuralidharan
Copy link
Contributor

Start() <-chan Message

Does it make sense to use a callback method instead of exposing the messages channel directly ?

https://github.com/golang/go/wiki/CodeReviewComments#synchronous-functions

@peterbourgon
Copy link
Member Author

@karthikmuralidharan Given the domain is actually asynchronous here, the linked section probably doesn't apply. Given that, the decision to model it as a returned chan vs. a received chan vs. a callback function is mostly moot.

@andrey-moor
Copy link

Is anyone still working on this?

@qneyrat
Copy link

qneyrat commented Jan 29, 2018

up

@welemon
Copy link

welemon commented Apr 12, 2018

any progress / update?

@jasonmacdonald
Copy link

Really wish there was some headway on this. I'm trying to design micro-services that are both GRPC servers but also pub/sub participants, and this would seal the deal on using go-kit. Right now, my only real options are Gizmo, Micro or roll-my-own. :(

@ghost
Copy link

ghost commented Apr 18, 2018 via email

@abdullah2993
Copy link

+1 for this.

@ghost
Copy link

ghost commented Jun 21, 2018

There are examples of groceries integrated with NATS on the web that are pretty clear.

@Reasno
Copy link
Contributor

Reasno commented Jun 24, 2018

Great discussions everyone. I want to add my two cents to the list.

I suggest we don't let the versatility of message queues get in the way. Before trying to abstract message queue, abstract their usage. For example go kit already has a beautiful abstraction of transport. It should be much easier to rollout kafka/RabbitMQ server/client in the transport package, which merely use message queues as an async transport, leaving out all the rest. (event sourcing, replay, streaming, etc.) The newcomer NATS package is a good direction to follow.

If someday we were to build an event broadcasting library in go kit, it would not be too late until then for us to seek broker interfaces for various message buses.

@chrisprijic
Copy link

I thought I'd provide the current implementation we're using to abstract our different messaging transports with a managed pubsub go-kit-"like" package:

// Publisher is a minimal interface for publishing messages to a pool of
// subscribers. Publishers are probably (but not necessarily) sending to a
// message bus.
//
// Most paramaterization of the publisher (topology restrictions like a topic,
// exchange, or specific message type; queue or buffer sizes; etc.) should be
// done in the concrete constructor.
type Publisher interface {
    // Publish a single message, described by an io.Reader, to the given key.
    // 
    // CHANGE(chrisprijic): added context, so transport can utilize it.
    Publish(ctx context.Context, key string, r io.Reader) error

    // Stop the publisher.
    Stop() error
}

// NOTE: This code only contains the parts that have been changed or added.

// Message is a minimal interface to describe payloads received by subscribers.
// Clients may type-assert to more concrete types (e.g. pubsub/kafka.Message) to
// get access to more specific behaviors.
type Message interface {
    // CHANGE(chrisprijic): Context can implement something similar to *http.Request, 
    // or can be made public on it's own. I use the first since I like the immutability
    // 
    //     Context() context.Context
    //     WithContext(context.Context) Message
    //
    // or
    //
    //     Context context.Context
    Context() context.Context
    WithContext(context.Context) Message

    // Messages implement io.Reader to access the payload data.
    io.Reader

    // Done indicates the client is finished with the message, and the
    // underlying implementation may free its resources. Clients should ensure
    // to call Done for every received message.
    // 
    // CHANGE(chrisprijic): ack was added so that we can specify cleanup AND 
    // whether or not we want to ack (in cases where we can). Ignore if not needed
    Done(ack bool) error
}

// NewManagedSubscriber can be used to wrap a subscriber into a managed
// transport.
//
// The function will subscribe to the given subscriber, use the decoder
// to decode the message, and send it to the given endpoint.
// 
// Message.Done() could be called on success of the endpoint running.
//
// This does bring up how to interpret the response/error from the endpoint...
// This also brings up how to interpret Message.Done(), meaning we're making 
// assumptions, should that be the case?
//
// -> I think this needs more discussion... 
func NewManagedSubscriber(
    e endpoint.Endpoint,
    subscriber Subscriber,
    decoder DecodeMessageFunc,
    options ...SubscriberOptions,
) *ManagedSubscriber {
    // implementation here
}

// Serve begins listening for messages and passing them to the endpoint:
func (s *ManagedSubscriber) Serve() error {
    // implementation here

    // something like the following:
    msgs := s.subscriber.Start()
    for msg := range msgs {
        // decode and send to endpoint, call msg.Done() when 
    }

    // check and return error if there were any
    return s.subscriber.Err()
}

// Stop can gracefully stop the subscriber:
func (s *ManagedSubscriber) Stop() error {
    return s.subscriber.Stop()
}

// NewManagedPublisher can be used to wrap a publisher into a managed
// transport.
// 
// The function returns an object that will publish messages provided to it 
// using the Publisher, to the given key. It can encode the message using the 
// EncodeMessageFunc.
//
// The returned transport can provide an endpoint with it's Endpoint() call, 
// which can be used like any other endpoint in go-kit.
func NewManagedPublisher(
    key string,
    publisher Publisher, 
    enc EncodeMessageFunc,
    options ...PublisherOptions,
) *ManagedPublisher {
    // implementation here
}

// Endpoint returns a useable endpoint for publishing messages on the publisher
// transport.
func (p *ManagedPublisher) Endpoint() endpoint.Endpoint {
    // implementation here
}

I think @Reasno makes a good point; however since all of our implementations had endpoint creation and the subscription message-loop looking refactor-able to the level above, we did so.

Since it's our implementation, we could make the assumptions posed by this. How these assumptions fit with go-kit, is another discussion...the "managed" code is secondary to the goal in this issue anyways. I think adding context availability throughout the package is the important part here.

Either way, I'm happy to help if I can. We've got our working implementation so hopefully the above can help others that might want to jump into it before go-kit has it.

@mattfung mattfung mentioned this issue Aug 7, 2018
@glerchundi
Copy link

Cross-referencing go-cloud's pubsub package as it has an abstraction and several implementations. Perhaps we can wrap an make those go-kit compatible endpoints?

@banh-gao
Copy link

Interesting repo @glerchundi. I'm actually just started today experimenting a go-kit compatible implementation of google pubsub at https://github.com/revas/kit. 😉

@stratg5
Copy link

stratg5 commented Jul 2, 2019

Is something like this what you're looking to implement? It's far from complete, but Kafka is fleshed out and RabbitMQ is in progress. Starting to use this and would be nice to have it added to the go-kit standard lib, seems to be a popular request -

https://github.com/stratg5/pubsub

@peterbourgon
Copy link
Member Author

We already have amqp, and patterns should be possible to generalize from that. Closing this issue, if we want kafka I'm open to it.

@zhongliangong
Copy link

Interesting repo @glerchundi. I'm actually just started today experimenting a go-kit compatible implementation of google pubsub at https://github.com/revas/kit. 😉

Was watching this for 2 weeks before the close comment. Are there any plans to merge this into the mainline go-kit repository?

@banh-gao
Copy link

banh-gao commented Sep 5, 2019

@zhongliangong yes soon. We are about to first test this implementation in our production before merging.

@Mistic92
Copy link

Mistic92 commented Jun 3, 2022

So idea of supporting multiple other providers like GCP PubSub was dropped? Or maybe there is somewhere plugin to support that?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests