Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider moving commit method from CommitableOffset (and some others) to KafkaConsumer #462

Open
LMnet opened this issue Jan 14, 2021 · 13 comments
Labels
breaking requires changes that break binary compatibility enhancement New feature or request
Milestone

Comments

@LMnet
Copy link
Member

LMnet commented Jan 14, 2021

During the discussion of #409 I suggested that commit method from the CommitableOffset could be moved to the KafkaConsumer.

CommittableOffset feels like pure data, but it contains def commit: F[Unit], which is not data, but a behavior. Maybe it would be better if CommittableOffset will not have the commit method? Maybe it would be more natural to have it on KafkaConsumer (like def commit(committableOffset: CommitableOffset): F[Unit])?

The presence of the commit method in the CommittableOffset adds some difficulties for creating instances like Eq for CommittableOffset.

All Committable... types affected by this problem too.

As a complement to the KafkaConsumer#commit method, we could implement commit on the CommittableOffset as an extension method. It will require KafkaConsumer as an input argument, but overall I think it will help to maintain the current API as much as possible.

@bplommer
Copy link
Member

We could consider adding the new methods to KafkaConsumer before 2.0 and deprecating the ones on CommittableX.

@bplommer bplommer modified the milestones: v2.0.0, v1.3.0 Jan 14, 2021
@bplommer
Copy link
Member

Another thing we could consider is for a CommittableOffset etc to expose a reference to the KafkaConsumer it belongs to - then the commit method can be derived from that and we can do equality by reference equality, without extra syntactic overhead of having to explicitly invoke a method on the consumer.

@LMnet
Copy link
Member Author

LMnet commented Jan 15, 2021

Another thing we could consider is for a CommittableOffset etc to expose a reference to the KafkaConsumer it belongs to - then the commit method can be derived from that and we can do equality by reference equality, without extra syntactic overhead of having to explicitly invoke a method on the consumer.

@bplommer great idea! I think with this approach we could maintain source compatibility with the current version.

@bplommer
Copy link
Member

bplommer commented Mar 8, 2021

@LMnet do you think this is something you'd like to pick up?

@LMnet
Copy link
Member Author

LMnet commented Mar 9, 2021

@bplommer yes, I will try to find some time for that. If I will not submit a pull request until March 14 you could interpret that I did not find the time.

@LMnet
Copy link
Member Author

LMnet commented Mar 9, 2021

I started to work and faced an issue with the CommittableOffsetBatch. I changed the CommittableOffset as @bplommer suggested: I added consumer (named it committer: KafkaCommit[F]) as a field of CommittableOffset and moved the commit method outside of a type definition as an extension method. Because of these changes I have to change CommittableOffsetBatch in the same manner too. But it looks like this is not straightforward to do. I tried to add the committer: KafkaCommit[F] field, as in the CommittableOffset. But CommittableOffsetBatch could be empty. And it could potentially contain offsets from multiple consumers. The situation looks similar to the consumerGroupIds field: it should be exactly one but could be empty or more than one. The problem with that design that it is not safe. Checking logic is a part of the commit method, but could be a part of a constructor logic.
My thought on the CommittableOffsetBatch overall:

  1. It should be a pure data type because it's just a batch of CommittableOffset, which is pure data too.
  2. It's strange that the current API gives a way to construct an invalid batch (with an empty consumer group id or with multiple consumer group ids).
  3. How should we introduce a concept of a committer to a CommittableOffsetBatch? In the same unsafe manner as in consumerGroupIds or in some other?
  4. Maybe CommittableOffsetBatch should be always non-empty and safe by construction?

Moving further I thought about the reasons, why CommittableOffset and CommittableOffsetBatch should have consumerGroupId. And it looks like the only reason is to not mix CommittableOffsets from the different consumers. So, if we will have a consumer (committer) as a field on a CommittableOffsets, we will not need to have consumerGroupId on a CommittableOffset and a CommittableOffsetBatch.

Moreover, we could try to use path-dependent types for that. With this, we will have a compile-time guarantee that offsets from different consumers are not mixed in the same batch. But I'm concerned about this change — how it will affect library users?

@LMnet
Copy link
Member Author

LMnet commented Mar 27, 2021

I played a bit with path-dependent types and it will work, but I think it would be not practical.

I did something like this (simplified):

trait KafkaConsumer[F[_]] { outer =>
  
  // some methods here

  private[kafka] def commitInternal(offsets: Map[TopicPartition, OffsetAndMetadata]): F[Unit]

  case class CommittableOffset(topicPartition: TopicPartition, offsetAndMetadata: OffsetAndMetadata)
  object CommittableOffset {
    implicit class CommittableOffsetOps(val self: CommittableOffset) {
      def offsets: Map[TopicPartition, OffsetAndMetadata] = Map(self.topicPartition -> self.offsetAndMetadata)
      def commit: F[Unit] = outer.commitInternal(offsets)
    }

    implicit val eq: Eq[CommittableOffset] = ???
  }

  case class CommittableOffsetBatch(offsets: Map[TopicPartition, OffsetAndMetadata])
  object CommittableOffsetBatch = {
    implicit class CommittableOffsetBatchOps(val self: CommittableOffsetBatch) {
      def commit: F[Unit] = outer.commitInternal(self.offsets)
    }

    implicit val eq: Eq[CommittableOffsetBatch] = ???
  }
}

This approach will give complete type safety. There is no way to messed up with the CommittableOffset from the different consumers. And there is no way to create invalid CommittableOffsetBatch.

But there is also CommittableConsumerRecord, which is the main entity users deal with through stream-like methods. And it contains CommittableOffset inside. So, it looks like CommittableConsumerRecord should be a path-dependent type too, right? And it means that if we don't want to lose type-safety we should always use path-dependent types notation in all user code dealing with any Committable... entities. It means, that instead of this:

def foo(record: CommittableConsumerRecord[F, A, B]): CommittableConsumerRecord[F, A, B] = ??? // doing some stuff with record

Users will have to do something like this:

def foo(record: consumer.CommittableConsumerRecord[F, A, B]): consumer.CommittableConsumerRecord[F, A, B] = ???

This variant will widen a type and we will loose type-safety:

def foo(record: KafkaConsumer#CommittableConsumerRecord[F, A, B]): KafkaConsumer#CommittableConsumerRecord[F, A, B] = ???

So, it looks like path-dependent types are not an option here.

I think the best approach will be to refactor CommittableOffsetBatch and make it:

  1. Always non-empty.
  2. Safe by creation, not by usage.

Also, there is a related issue about the safety of CommittableOffsetBatch: #347

@bplommer @vlovgr wdyt?

@vlovgr
Copy link
Contributor

vlovgr commented Mar 27, 2021

I think the best approach will be to refactor CommittableOffsetBatch and make it:

  1. Always non-empty.
  2. Safe by creation, not by usage.

I agree this option is preferable, if possible.

@LMnet
Copy link
Member Author

LMnet commented Mar 28, 2021

There is also another option: allow CommittableOffsetBatch to commit offsets from the different consumers. It means, that instead of this:

def offsets: Map[TopicPartition, OffsetAndMetadata]

CommittableOffsetBatch will contain this:

def offsets: Map[KafkaCommit[F], Map[TopicPartition, OffsetAndMetadata]]

It will automatically save us from the problems like #347. And also, CommittableOffsetBatch will be always safe to use and construct. And the current public API will not change at all.

@bplommer
Copy link
Member

Could we improve the ergonomics by having a wrapper for the path dependent type? So we have something like

sealed trait WrappedCommittableConsumerRecord[F[_], K, V] {
  val consumer: KafkaConsumer[F, K, V]
  val record: consumer.CommittableConsumerRecord
}

and then client code would be

def foo(record: WrappedCommittableConsumerRecord[F, A, B]): record.consumer.CommittableConsumerRecord = ???

It's very likely that that there's something I'm missing, but could this work?

@LMnet
Copy link
Member Author

LMnet commented Mar 30, 2021

Well, it will work, yes. But it will be:

  1. A major change that forces users to rewrite their code.
  2. It still has downsides of path-dependent I described above.

The more I think about this issue the more I leaning towards my last solution (when CommittableOffsetBatch can commit offsets from different consumers).

@bplommer
Copy link
Member

bplommer commented Apr 1, 2021

The more I think about this issue the more I leaning towards my last solution (when CommittableOffsetBatch can commit offsets from different consumers).

That sounds reasonable to me.

@bplommer bplommer modified the milestones: v1.4.0, v3.0.0 Apr 1, 2021
@LMnet
Copy link
Member Author

LMnet commented Apr 20, 2021

I'm trying to implement a version where CommittableOffsetBatch could contain offsets from the different consumers.

Overall all looks fine. CommittableOffset simplified to this:

sealed abstract class CommittableOffset[F[_]] {
  def topicPartition: TopicPartition
  def offsetAndMetadata: OffsetAndMetadata
  def committer: KafkaCommit[F]
}

And CommittableOffsetBatch to this:

sealed abstract class CommittableOffsetBatch[F[_]] {
  def offsets: Map[KafkaCommit[F], Map[TopicPartition, OffsetAndMetadata]]
}

But I have some difficulties with the commit method implementation. Currently, the commit method is constructing inside KafkaConsumerActor and added to the CommittableOffset as a field. In the new API CommittableOffset doesn't have the commit method. Instead, it has committer with the dedicated commit method. The problem is that KafkaCommit (which is in the essence a KafkaConsumer) is not available inside KafkaConsumerActor. So, CommittableOffset can't be constructed inside KafkaConsumerActor anymore. To fix this I see a few ways with different drawbacks:

  1. Construct in the KafkaConsumerActor some new entity, which is like CommittableOffset, but without committer field. When these entities reach KafkaConsumer it enriched them and turned into CommittableOffset with the committer field. It's doable, but some decent amount of code should be rewritten and every batch should be traversed to create a CommittableOffset. I'm not sure how bad is that in terms of performance.
  2. Pass a link to the KafkaConsumer in KafkaConsumerActor. It could be done only with the message passing because KafkaConsumerActor is created earlier than the KafkaConsumer. It means that we will need some new actor message like SetKafkaConsumer which will set a mutable field inside KafkaConsumerActor. It's doable too but feels a bit dirty because KafkaConsumerActor will need a mutable optional field which in practice will be always filled with the value.
  3. Create a special Committer entity for the committer field in the CommittableOffset. This entity will be created before KafkaConsumerActor and KafkaConsumer inside the KafkaConsumer constructor. It could be passed to the KafkaConsumerActor without dirty message passing. And It could create a link between a KafkaConsumerActor and KafkaConsumer. With this option, we will have an extra Committer entity in the public API, which is not good. Also, its purpose will be exclusively technical, to overcome our internal difficulties with the sharing data between KafkaConsumer and KafkaConsumerActor.

@bplommer @vlovgr wdyt? Maybe you could offer some other options to solve this issue?

@LMnet LMnet mentioned this issue Mar 12, 2022
@aartigao aartigao modified the milestones: v3.0.0, v4.0.0 Oct 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking requires changes that break binary compatibility enhancement New feature or request
Development

No branches or pull requests

4 participants