Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Core consumer impl; capable of processing messages from a consumer client. #113

Open
gauravAshok opened this issue Mar 15, 2024 · 1 comment
Assignees
Milestone

Comments

@gauravAshok
Copy link
Collaborator

gauravAshok commented Mar 15, 2024

  • netty executor group.
  • Is consumer tied to a context? Is consumer a "task" to be run on a event-loop.
  • Consumer initialization:
    • blocking init() / connect()
    • startOn(EventLoop)

Consumer task? (ctx(event_loop))

  • MessageSrc::replenish : Launch a task to fetch new messages from outside world & then enqueue a task on ctx to replenish this messageSrc.
    • async MessageSrc::nextMessages(out array) : async method to fetch messages from the messageSrc. It is a wrapper
      to hide a particular consumer impl and grouping semantics. It allows us to reorder (if required) messages received
      from the consumer instance.
  • process LOOP :
    loop_begin:
    • async nextMessages() : fetch messages from the messageSrc.
    • then, check for group state
    • and, split(messages) into (failed, ok) messages
    • and, enqueue concurrency_control.deliver(ok_message)
    • and, enqueue produce_quota_q.acquire(size(failed_msgs)), // this step is simplified, but each message can go to a separate q, hence the quota object may be multiple
    • then enqueue loop_begin.
  • ConcurrencyControl (a Thread safe DS)
    • deliver(msgs)
      • requests_to_make = min(free_slots, msgs.size)
      • add_backlog(rest of the messages)
      • free_slots -= requests_to_make
      • msgs.slice(reqeusts_to_make).forEach(msg -> enqueue_delivery(msg)), then
        • free_slots += 1
        • and if requests_in_backlog, then add a task to initiate delivery for them onto the event_loop, if the task not already started.
        • and, enqueue post_process(msg, response) on ctx
  • Post_process(msg, response): This is running back on ctx / event-loop.
    • If it failed, then enqueue error_throttler.acquire(produce_to_failure_q)
      • then, q.produce()
      • then, GS state upte.
      • msg.commit()
    • else, advance GS state and msg.commit()

todo: explore if post_process needs to be batched!, everything before the post_process is batched upto some extent. post_process is the only step that is per msg. enqueue from outside onto the ctx, is also a candidate for contention. Should the event-loop be responsible for http/remote requests? probably not, as one is highly IO bound and other is compute bound. IO threads need to be responsive.

@gauravAshok gauravAshok self-assigned this Mar 15, 2024
@gauravAshok gauravAshok added this to the Consumer milestone Mar 15, 2024
@aayustark007-fk
Copy link
Collaborator

How do we handle Consumer failure scenarios and prevent duplicate delivery during recovery?
Eg: Consumer task fails post ConcurrencyControl::deliver for some of the messages. So, the offset is not committed to the messaging stack. Now, when the consumer comes back up, or the workload is assigned to another consumer. It will try to deliver those messages again. Are we fine with that?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In Progress
Development

No branches or pull requests

2 participants