Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Google PubSub: subscribe as a flow #2169

Merged
merged 1 commit into from
Mar 30, 2020

Conversation

btomala
Copy link
Contributor

@btomala btomala commented Feb 25, 2020

I have a small improvement for current PubSub API. Pulling from PubSub should be exposed as a flow, not the source. This gives the user more elasticity over stream composition.

The benefits are:

  • User can set pulling frequency configuring source or attaching any source depends he needs
  • Network failures happen from time to time when implementing backoff strategy for source we losing access materializer for original source. This means we cannot cancel the source anymore. When we use backoff strategy for flow we still have access to source materializer. As mention before more elasticity in composition.

I left the old interface for backward compatibility.

@raboof
Copy link
Member

raboof commented Feb 25, 2020

User can set pulling frequency configuring source or attaching any source depends he needs

So this new API is intended to allow you use it with a slower Source for use cases where 'downstream' otherwise wouldn't be able to keep up with the rate at which messages are produced? I think that should instead be solved by having your downstream backpressure when it cannot keep up.

I agree the Source.tick(0.seconds, 1.second, Done) in the current implementation looks rather weird, but Source.tick should not fire when there is backpressure, so if 'downstream' correctly backpressures it looks like it might work. Do you see situation where that doesn't work as intended? If you want to 'artifically' slow down your read rate you might introduce a throttle

Network failures happen from time to time when implementing backoff strategy for source we losing access materializer for original source. This means we cannot cancel the source anymore. When we use backoff strategy for flow we still have access to source materializer. As mention before more elasticity in composition

I'm not sure I understand this scenario, could you give a more concrete example?

@ennru ennru changed the title PubSub: subscribe as a flow Google PubSub: subscribe as a flow Mar 2, 2020
@btomala
Copy link
Contributor Author

btomala commented Mar 3, 2020

In an example, I used Source which is slower than the default. I understand that backpressure should work here. However, if there is not too much data on PubSub user could prefer to cache them in PubSub, pull them rather and process in bigger batches. Anyway, it could be faster as well.

The original idea was to have a flow to be able to restart Flow instead of Source in case of network failure. To keep the source and be able to cancel it when needed.

I put the below example in source code. It is impossible to have Cancellable graph if you use RestratSource.withBackoff because the return type for withBackoff is Source[T, NotUsed]

  val graph: RunnableGraph[Cancellable] = Source
    .tick(0.seconds, 10.seconds, Done)
    .via(
      RestartFlow.withBackoff(1.second, 30.seconds, randomFactor = 0.2)(
        () => GooglePubSub.subscribeFlow(subscription, config)
      )
    )
    .map { message =>
      // do something fun

      message.ackId
    }
    .groupedWithin(1000, 1.minute)
    .map(AcknowledgeRequest.apply)
    .to(ackSink)

@raboof
Copy link
Member

raboof commented Mar 3, 2020

It is impossible to have Cancellable graph if you use RestratSource.withBackoff because the return type for withBackoff is Source[T, NotUsed]

I think you could use a KillSwitch to achieve this, see https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html#controlling-stream-completion-with-killswitch

@btomala
Copy link
Contributor Author

btomala commented Mar 3, 2020

Yes, you can achieve a similar result. However, when you cancel the source it doesn't produce any demand. If you shut down the stream during the request, the request has to be handled and you have to drain the stream. This is how I see the difference, but I'm not familiar with the implementation of killswitch and I could be wrong.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see how this can be useful. Especially for composition with restart flow.
LGTM.

@ennru ennru added this to the 2.0.0 milestone Mar 30, 2020
@ennru ennru merged commit ffe838c into akka:master Mar 30, 2020
@ennru
Copy link
Member

ennru commented Mar 30, 2020

Thank you for suggesting this. Sorry for the delay.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants