Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueueSource should complete future returned by .offer() only when element is sent to downstream #26696

Open
unoexperto opened this issue Apr 7, 2019 · 8 comments
Labels
0 - new Ticket is unclear on it's purpose or if it is valid or not t:stream

Comments

@unoexperto
Copy link

unoexperto commented Apr 7, 2019

Hi guys!

I believe current implementation of QueueSource in backpressure mode makes it unusable and violates what's described in documentation.

To quote, it says:

Acknowledgement mechanism is available. akka.stream.scaladsl.SourceQueue.offer returns Future[QueueOfferResult] which completes with QueueOfferResult.Enqueued if element was added to buffer or sent downstream.

This statement, in my understanding, implies I can rely on QueueOfferResult to know when to offer next item. Currently it completes promise as soon as item is put to QueueSource.buffer. It means if call to QueueSource.offer() fails with IllegalStateException("You have to wait for previous offer to be resolved to send another request") I can't rely on previously returned futures to know when to call .offer again. In fact I'll have to implement my own buffer around Source.queue and I tried hard and discussed it few times in your Gitter :)

I believe right thing to do is to complete promise AFTER buffered item is sent downstream.

To reproduce the problem call .offer in more threads than (buffer size + 1) (I'll talk about plus-one later).

object Main {
  def log(text: String): Unit = {
    println(s"${ZonedDateTime.now()} [${Thread.currentThread().getId}]: $text")
  }

  private implicit val system: ActorSystem = ActorSystem("root")
  private implicit val executor: ExecutionContext = system.dispatcher
  private implicit val mat: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(system))

  def main(args: Array[String]): Unit = {

    val (inputSource, outputSource) = SourceExt.queue[Int](2, OverflowStrategy.backpressure)
      .toMat(BroadcastHub.sink(bufferSize = 2))(Keep.both)
      .run()

    val items = (0 until 100).toList
    val items2 = mutable.HashSet[Int]()
    items.foreach(items2.add)

    outputSource
      .throttle(1, 1 second)
      .runWith(Sink.foreachAsync(4) { v ⇒
        Future {
          items2.remove(v)
          log(s"====>>> $v [remaining count: ${items2.size}]")
        }
      })
      .onComplete {
        case Success(_) ⇒
          log(s"\n\n***** REMAINING ITEMS: $items2")
          system.terminate()
        case Failure(ex) ⇒
          ex.printStackTrace()
          system.terminate()
      }

    Source(items)
      .mapAsync(2) { v ⇒
        log(s"Pushing $v")
        inputSource.offer(v)
      }
      .runWith(Sink.ignore)
      .onComplete {
        case Success(_) ⇒
          log(s"\n***** COMPLETED PUSH")
          inputSource.complete()
        case Failure(ex) ⇒
          ex.printStackTrace()
      }

    Await.ready(system.whenTerminated, Duration.Inf)
  }
}

In current (2.5.22) version it fails instantly where client developer expects QueueSource to respect buffer limitation and not to complete offer futures until items are really gone. So it quickly fills up buffer and QueueSource.pendingOffer.

I have pull-request that fixes this but it breaks few tests, particularily

  1. "buffer when needed"
  2. "fail offer future if user does not wait in backpressure mode"
  3. "wait when buffer is full and backpressure is on"`

that built around faulty logic. I can fix the test but there is risk that some clients may be imlemented around such behavior. On the other hand with proposed fix their code should not fail. It's just going to be correctly backpressured.

I think fixing this will also address #25798


With proposed fix promises of dropped items will never be completed in DropHead/DropTail/DropBuffer/Fail modes. I'm not sure if I should create separate ticket for this but I believe it should be fixed too. Please advise (@jrudolph).


Finally I don't understand purpose of QueueSource.pendingOffer. It acts as one-item-buffer which again is misleading from what documentation says that maxBuffer parameter defined number of buffed items. In reality number of buffered items is (maxBuffer + 1). Perhaps original author first implemented QueueSource without buffer, only with pendingOffer, and later added backpressure mode. Test "fail offer future if user does not wait in backpressure mode" accounts for this though. I believe pendingOffer should be dropped in favor of QueueSource.buffer if this PR is accepted. So far it looks like QueueSource.pendingOffer is used to wait on (from client code) when we have only one producer calling .offer.

@johanandren
Copy link
Member

I'm not sure, sounds dangerous to change semantics just to fit your use case, there is very likely existing user code relying on the current behavior. I have seen people struggle with this before though, so maybe an imperative N-producer source would be useful (and better documenting that Source.queue is tricky for that use case).

BTW, that sample isn't thread safe, you can't safely mutate shared global state like that hashset from inside streams or Futures and be sure the changes are visible from other threads.

@johanandren johanandren added 0 - new Ticket is unclear on it's purpose or if it is valid or not t:stream labels Apr 14, 2019
@raboof
Copy link
Member

raboof commented Apr 17, 2019

This statement, in my understanding, implies I can rely on QueueOfferResult to know when to offer next item.

I agree that is the desired semantics

Currently it completes promise as soon as item is put to QueueSource.buffer.

I think that is the correct behaviour

It means if call to QueueSource.offer() fails with IllegalStateException("You have to wait for previous offer to be resolved to send another request")

I don't think so: it only throws when pendingOffer is filled, and if you correctly waited for the Futures to resolve I don't think an offer should ever come in when pendingOffer is filled.

I don't understand purpose of QueueSource.pendingOffer. It acts as one-item-buffer which again is misleading from what documentation says that maxBuffer parameter defined number of buffed items

Perhaps the documentation is slightly misleading there, but the logic looks correct to me so far. I hope to review your reproducer later.

I believe right thing to do is to complete promise AFTER buffered item is sent downstream.

Even if there is a bug in the logic here, I think we should only fix the bug, and not change the semantics as you propose: if we'd complete the promise after sending the buffered item downstream, and the user is only allowed to offer a new element when the future is resolved, then the buffer would always be empty, right?

@raboof
Copy link
Member

raboof commented Apr 17, 2019

Looking closer at the reproducer, .mapAsync(parallelism=2) means there can be 2 .offer() calls in parallel. This means:

  • When the buffer is full, all offers are acknowledged.
  • When you then send another offer(), it will not be acknowledged, but placed in pendingOffer. This is OK.
  • However, because of the parallelism you configured, the 'other' parallel offer() will cause the IllegalStateException.

I think this works as designed: the exception is caused by the fact that you broke the contract of waiting for the QueueOfferResult to resolve before offering the next element.

Does that make sense?

@johanandren
Copy link
Member

That's why I mentioned that an imperative N-producer operator could perhaps make sense.

@raboof
Copy link
Member

raboof commented Apr 18, 2019

I must admit I didn't quite understand what you meant by that :D. Would that be a queue-like source that allows for up to N concurrent writers? Might be relatively easy to do by making pendingOffer a list of max size N?

@jrudolph
Copy link
Member

Isn't a workaround for that use case to use one queue per writer and merge those afterwards?

It seems the current confusion also comes from the fact that the queue interface and semantics are quite overloaded by being able to specify OverflowStrategies. The queue interface is safe to use concurrently but as observed here it only makes sense if the strategy is not backpressure.

@johanandren
Copy link
Member

Yeah, I didn't go so far to think of a solution, more the problem that is reported over and over when users try to use Source.queue from multiple threads.

@ennru
Copy link
Member

ennru commented Apr 29, 2020

#28272 was implemented for release in Akka 2.6.5 and allows pushing from several threads.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
0 - new Ticket is unclear on it's purpose or if it is valid or not t:stream
Projects
None yet
Development

No branches or pull requests

5 participants