Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consumer): Support incremental cooperative rebalancing #53

Merged
merged 10 commits into from
Apr 1, 2022
150 changes: 97 additions & 53 deletions arroyo/backends/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,10 @@ class KafkaConsumer(Consumer[KafkaPayload]):

def __init__(
self,
configuration: Mapping[str, Any],
configuration: MutableMapping[str, Any],
*,
commit_retry_policy: Optional[RetryPolicy] = None,
incremental_cooperative: bool = False,
) -> None:
if commit_retry_policy is None:
commit_retry_policy = NoRetryPolicy()
Expand Down Expand Up @@ -182,6 +183,11 @@ def __init__(
"invalid value for 'enable.auto.offset.store' configuration"
)

self.__incremental_cooperative = incremental_cooperative

if self.__incremental_cooperative is True:
configuration["partition.assignment.strategy"] = "cooperative-sticky"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why providing incremental_cooperative as a dedicated flag instead of passing it through the configuration mapping like all the others config parameters ?

Copy link
Member Author

@lynnagara lynnagara Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it the other way originally but figured it was easier to pass a boolean than remember these strings then check for it to decide whether to apply incremental_assign. Thinking about it again, I might switch back to avoid changing the interface of this class though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might switch back to avoid changing the interface of this class though.

Also we would not reintroduce two separate ways to provide Kafka config that took us long to clean up

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was only one way. Passing the flag was the only way that would have worked.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am talking about all those fields we were passing via CLI that we are slowly moving to settings.

Copy link
Member Author

@lynnagara lynnagara Apr 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately passing via CLI is still the easiest option currently as we don't have a mechanism yet to provide different settings for consumers/producers of a topic via settings and this is a consumer-only configuration.


# NOTE: Offsets are explicitly managed as part of the assignment
# callback, so preemptively resetting offsets is not enabled.
self.__consumer = ConfluentConsumer(
Expand Down Expand Up @@ -246,40 +252,74 @@ def assignment_callback(
) -> None:
self.__state = KafkaConsumerState.ASSIGNING

try:
assignment: MutableSequence[ConfluentTopicPartition] = []

for partition in self.__consumer.committed(partitions):
if partition.offset >= 0:
assignment.append(partition)
elif partition.offset == OFFSET_INVALID:
assignment.append(
self.__resolve_partition_starting_offset(partition)
)
else:
raise ValueError("received unexpected offset")

offsets: MutableMapping[Partition, int] = {
Partition(Topic(i.topic), i.partition): i.offset for i in assignment
}
self.__seek(offsets)

# Ensure that all partitions are resumed on assignment to avoid
# carrying over state from a previous assignment.
self.__consumer.resume(
[
ConfluentTopicPartition(
partition.topic.name, partition.index, offset
)
for partition, offset in offsets.items()
]
)
if self.__incremental_cooperative is True:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This diff got pretty weird because of indentation. This block from here to line 273 is new though, and could use a close review. It's quite different to what happens with eager rebalancing (lines 276-305), which is essentially unchanged.

try:
incremental_assignment: MutableSequence[
ConfluentTopicPartition
] = []

for partition in partitions:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am getting this right, you are not asking the broker anymore for the last committed offset.
Which means that you are invoking incremental_assign with the offset you received in the assignment_callback call.
Is this intentional? In the stop-the-world rebalancing, fetching the last committed offset and seeking that was meant to ensure all partitions had the same behavior (whether they were previously assigned to the same consumer or not). All partitions would be reset to the last committed offset.
I would expect it to be impossible, in the incremental case, to receive a partition you were already owning before (unless the consumer restarted). Is this guaranteed or can you still be assigned a partition you already owned before rebalancing?
If it is guaranteed and all partitions you received are "new" to this consumer, are you not resetting the offset because the offset you receive are supposed to already be the last committed ones ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you can receive a partition you already owned, but in any case I don't quite get why we would need to check the offset since the offset you are receiving should already be the correct one. The other issue I noticed with committed() is that it hangs forever (or times out if you pass it a timeout) for the cooperative-sticky strategy in the scenario where it's a new consumer group and no offset has been committed yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be important to verify.
The reason for getting the committed() offsets is so that all partitions start in a consistent way from the last committed offset after a rebalance (without doing so the partitions that are reassigned to you would start from the last consumed offset).
This behavior is visible to the application, but if we commit during revoke we should not have any issue as the two behaviors would be identical.
If we never receive a partition we already owned and we consistently commit on revoke we are good. But if we fail in committing during revoke and we get the same partition we owned in the past I don't know what would happen if we do not reset the offsets.
If committed() is taking much longer, then it is probably a non starter. Could you please verify that even if we did receive the same partition again on subscriptions (where we are testing this), we will not risk missing messages ?

Copy link
Member Author

@lynnagara lynnagara Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for getting the committed() offsets is so that all partitions start in a consistent way from the last committed offset after a rebalance (without doing so the partitions that are reassigned to you would start from the last consumed offset).

I don't think this is true. The offsets passed to the assignment callback are already the committed ones not just the ones that are consumed.

Kafka guarantees that

  • only new partitions are passed in this callback not previously owned ones
  • The revoke callback is always triggered before the assignment one.

The reason for committing offsets during revoke is to avoid double processing them so the new consumer doesn't get that same offset again. But we wouldn't be skipping it either way - only processing it either once or twice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The offsets passed to the assignment callback are already the committed ones not just the ones that are consumed.

Are you sure. I thought this was the reason for making this consumer behave differently from the standard Kafka consumer https://github.com/getsentry/arroyo/blob/main/arroyo/backends/kafka/consumer.py#L109-L118.

Anyway, if Kafka guarantees that only new partitions are passed and not previously owned one, we should be good.

Copy link
Member Author

@lynnagara lynnagara Apr 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this only applies to the partitions that are being newly assigned to the consumer. Previously all of the partitions (including those already assigned to the consumer) would be provided in the callback, now only the incremental ones are. So we are no longer rewinding back those partitions which the consumer is maintaining across a rebalance like we used to, those just continue from the same place.

if partition.offset >= 0:
incremental_assignment.append(partition)
elif partition.offset == OFFSET_INVALID:
incremental_assignment.append(
self.__resolve_partition_starting_offset(partition)
)
else:
raise ValueError("received unexpected offset")

offsets = {
Partition(Topic(i.topic), i.partition): i.offset
for i in incremental_assignment
}

self.__incremental_assign(offsets)

# Ensure that all partitions are resumed on assignment to avoid
# carrying over state from a previous assignment.
self.resume([p for p in offsets])

except Exception:
self.__state = KafkaConsumerState.ERROR
raise

for partition in offsets:
self.__paused.discard(partition)
except Exception:
self.__state = KafkaConsumerState.ERROR
raise
else:
try:
assignment: MutableSequence[ConfluentTopicPartition] = []

for partition in self.__consumer.committed(partitions):
if partition.offset >= 0:
assignment.append(partition)
elif partition.offset == OFFSET_INVALID:
assignment.append(
self.__resolve_partition_starting_offset(partition)
)
else:
raise ValueError("received unexpected offset")

offsets = {
Partition(Topic(i.topic), i.partition): i.offset
for i in assignment
}

self.__assign(offsets)

# Ensure that all partitions are resumed on assignment to avoid
# carrying over state from a previous assignment.
self.__consumer.resume(
[
ConfluentTopicPartition(
partition.topic.name, partition.index, offset
)
for partition, offset in offsets.items()
]
)

for partition in offsets:
self.__paused.discard(partition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for relying on the resume method for the incremental method and, instead, reimplementing its logic here for the stop-the-world rebalancing ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason, I just mostly avoided touching this implementation (it's just indented differently so it shows up here). But yes it's cleaner to reuse resume everywhere, I'll switch it over.

except Exception:
self.__state = KafkaConsumerState.ERROR
raise

try:
if on_assign is not None:
Expand Down Expand Up @@ -431,29 +471,33 @@ def __validate_offsets(self, offsets: Mapping[Partition, int]) -> None:
if invalid_offsets:
raise ConsumerError(f"invalid offsets: {invalid_offsets!r}")

def __assign(self, offsets: Mapping[Partition, int]) -> None:
self.__validate_offsets(offsets)
self.__consumer.assign(
[
ConfluentTopicPartition(partition.topic.name, partition.index, offset)
for partition, offset in offsets.items()
]
)
self.__offsets.update(offsets)

def __incremental_assign(self, offsets: Mapping[Partition, int]) -> None:
self.__validate_offsets(offsets)
self.__consumer.incremental_assign(
[
ConfluentTopicPartition(partition.topic.name, partition.index, offset)
for partition, offset in offsets.items()
]
)
self.__offsets.update(offsets)

def __seek(self, offsets: Mapping[Partition, int]) -> None:
self.__validate_offsets(offsets)

if self.__state is KafkaConsumerState.ASSIGNING:
# Calling ``seek`` on the Confluent consumer from an assignment
# callback will throw an "Erroneous state" error. Instead,
# partition offsets have to be initialized by calling ``assign``.
self.__consumer.assign(
[
ConfluentTopicPartition(
partition.topic.name, partition.index, offset
)
for partition, offset in offsets.items()
]
for partition, offset in offsets.items():
self.__consumer.seek(
ConfluentTopicPartition(partition.topic.name, partition.index, offset)
Copy link
Contributor

@fpacifici fpacifici Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change ? Are we never going to get to this method during assignment ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it pretty unintuitive that the same __seek implementation was being called from both seek() and the assign callback then checking the KafkaConsumerState to effectively determine which one was making the call. Now assigning function is separate and never calls this __seek function anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, now this method is only called by the seek public method. Why not inlining it there ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was avoiding touching a lot of code earlier, but yes it is better inline since no one else calls it now. Updated.

)
else:
for partition, offset in offsets.items():
self.__consumer.seek(
ConfluentTopicPartition(
partition.topic.name, partition.index, offset
)
)

self.__offsets.update(offsets)

def seek(self, offsets: Mapping[Partition, int]) -> None:
Expand Down
103 changes: 89 additions & 14 deletions tests/backends/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from contextlib import closing
from datetime import datetime
from pickle import PickleBuffer
from typing import Iterator, MutableSequence, Optional
from typing import Any, Iterator, Mapping, MutableSequence, Optional
from unittest import TestCase

import pytest
Expand Down Expand Up @@ -46,6 +46,25 @@ def test_payload_pickle_out_of_band() -> None:
assert pickle.loads(data, buffers=[b.raw() for b in buffers]) == payload


@contextlib.contextmanager
def get_topic(
configuration: Mapping[str, Any], partitions_count: int
) -> Iterator[Topic]:
name = f"test-{uuid.uuid1().hex}"
client = AdminClient(configuration)
[[key, future]] = client.create_topics(
[NewTopic(name, num_partitions=partitions_count, replication_factor=1)]
).items()
assert key == name
assert future.result() is None
try:
yield Topic(name)
finally:
[[key, future]] = client.delete_topics([name]).items()
assert key == name
assert future.result() is None


class KafkaStreamsTestCase(StreamsTestMixin[KafkaPayload], TestCase):

configuration = build_kafka_configuration(
Expand All @@ -54,19 +73,11 @@ class KafkaStreamsTestCase(StreamsTestMixin[KafkaPayload], TestCase):

@contextlib.contextmanager
def get_topic(self, partitions: int = 1) -> Iterator[Topic]:
name = f"test-{uuid.uuid1().hex}"
client = AdminClient(self.configuration)
[[key, future]] = client.create_topics(
[NewTopic(name, num_partitions=partitions, replication_factor=1)]
).items()
assert key == name
assert future.result() is None
try:
yield Topic(name)
finally:
[[key, future]] = client.delete_topics([name]).items()
assert key == name
assert future.result() is None
with get_topic(self.configuration, partitions) as topic:
try:
yield topic
finally:
pass

def get_consumer(
self,
Expand Down Expand Up @@ -133,6 +144,70 @@ def test_auto_offset_reset_error(self) -> None:
consumer.poll(10.0) # XXX: getting the subcription is slow


def test_cooperative_rebalancing() -> None:
configuration = build_kafka_configuration(
{"bootstrap.servers": os.environ.get("DEFAULT_BROKERS", "localhost:9092")}
)

partitions_count = 2

group_id = uuid.uuid1().hex
producer = KafkaProducer(configuration)

consumer_a = KafkaConsumer(
{
**configuration,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"enable.auto.offset.store": False,
"group.id": group_id,
"session.timeout.ms": 10000,
},
incremental_cooperative=True,
)
consumer_b = KafkaConsumer(
{
**configuration,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"enable.auto.offset.store": False,
"group.id": group_id,
"session.timeout.ms": 10000,
},
incremental_cooperative=True,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.
By the way, what happens if a number of consumers are incremental and others are not ? Which means what will happen at the first deployment ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't combine cooperative and non cooperative in the same group. I'm not sure of other solutions apart from stopping them all and restarting all with the new configuration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's do it one day I am on PTO.
Out of curiosity, what happens if you start a consumer that works with the incremental rebalancing system to a group that is using the standard system ?


with get_topic(configuration, partitions_count) as topic, closing(
producer
), closing(consumer_a), closing(consumer_b):
for i in range(10):
for j in range(partitions_count):
producer.produce(
Partition(topic, 1),
KafkaPayload(None, f"{j}-{i}".encode("utf8"), []),
)

consumer_a.subscribe([topic])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not providing callbacks so that you can assert that only one partition is transferred ?

Copy link
Member Author

@lynnagara lynnagara Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that could've worked too. Still this method of checking the assigned partitions via consumer.tell() works well enough in checking the same thing and it's copied from the test of the eager consumer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right but there is no way to tell whether all partitions were revoked and then reassigned or whether the incremental assignment actually works.

Copy link
Member Author

@lynnagara lynnagara Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this was not about testing Kafka consumer internals, just that we end up with one partition assigned to each consumer and consuming on both works ok.


assert consumer_a.poll(10.0) is not None

# Consumer A has 2 partitions assigned, B has none
assert len(consumer_a.tell()) == 2
assert len(consumer_b.tell()) == 0

consumer_b.subscribe([topic])
consumer_a.pause([Partition(topic, 0), Partition(topic, 1)])

# At some point, 1 partition will move to consumer B
for i in range(10):
assert consumer_a.poll(0) is None # attempt to force session timeout
if consumer_b.poll(1.0) is not None:
break

assert len(consumer_a.tell()) == 1
assert len(consumer_b.tell()) == 1


def test_commit_codec() -> None:
commit = Commit("group", Partition(Topic("topic"), 0), 0, datetime.now())
assert commit_codec.decode(commit_codec.encode(commit)) == commit
Expand Down