Skip to content

Commit

Permalink
Fix KeyError on solitary abort marker. (#782)
Browse files Browse the repository at this point in the history
* Fix KeyError on solitary abort marker.

* Add regression test for solitary abort marker.

* Fix regression test.

* Use non-extension builder for test setup.

* Add comments to test.
  • Loading branch information
pikulmar committed Sep 14, 2021
1 parent dbd490f commit 95bafb9
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 2 deletions.
5 changes: 4 additions & 1 deletion aiokafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ def _unpack_records(self):

if next_batch.is_control_batch:
if self._contains_abort_marker(next_batch):
self._aborted_producers.remove(next_batch.producer_id)
# Using `discard` instead of `remove`, because Kafka
# may return an abort marker for an otherwise empty
# topic-partition.
self._aborted_producers.discard(next_batch.producer_id)

if next_batch.is_transactional and \
next_batch.producer_id in self._aborted_producers:
Expand Down
49 changes: 48 additions & 1 deletion tests/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

from kafka.protocol.offset import OffsetResponse
from aiokafka.record.legacy_records import LegacyRecordBatchBuilder
from aiokafka.record.default_records import (
# NB: test_solitary_abort_marker relies on implementation details
_DefaultRecordBatchBuilderPy as DefaultRecordBatchBuilder)
from aiokafka.record.memory_records import MemoryRecords

from aiokafka.protocol.fetch import (
FetchRequest_v0 as FetchRequest, FetchResponse_v0 as FetchResponse)
Expand All @@ -18,7 +22,7 @@
from aiokafka.client import AIOKafkaClient
from aiokafka.consumer.fetcher import (
Fetcher, FetchResult, FetchError, ConsumerRecord, OffsetResetStrategy,
PartitionRecords, READ_UNCOMMITTED
PartitionRecords, READ_COMMITTED, READ_UNCOMMITTED
)
from aiokafka.consumer.subscription_state import SubscriptionState
from aiokafka.util import create_future, create_task, get_running_loop
Expand Down Expand Up @@ -534,3 +538,46 @@ async def mock_send(node_id, request):
if cm is not None:
self.assertIn(
"Received unknown topic or partition error", cm.output[0])

@run_until_complete
async def test_solitary_abort_marker(self):
# An abort marker may not be preceded by any aborted messages

# Setup: Create a record batch (control batch) containing
# a single transaction abort marker.
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=0, is_transactional=True,
producer_id=3, producer_epoch=1, base_sequence=-1,
batch_size=999)
orig_get_attributes = builder._get_attributes
builder._get_attributes = lambda *args, **kwargs: (
# Make batch a control batch
orig_get_attributes(*args, **kwargs)
| DefaultRecordBatchBuilder.CONTROL_MASK)
builder.append(
offset=0, timestamp=1631276519572,
# transaction abort marker
key=b'\x00\x00\x00\x00', value=b'\x00\x00\x00\x00\x00\x00',
headers=[])
buffer = builder.build()
records = MemoryRecords(bytes(buffer))

# Test: In aiokafka>=0.7.2, the following line would result in a an
# exception, because the implementation assumed that any transaction
# abort marker would be preceded by at least one aborted message
# originating from the same producer_id. However, this appears to
# not always be the case, as reported in
# https://github.com/aio-libs/aiokafka/issues/781 .
partition_recs = PartitionRecords(
tp=TopicPartition('test-topic', 0),
records=records,
aborted_transactions=[],
fetch_offset=0,
key_deserializer=None,
value_deserializer=None,
check_crcs=True,
isolation_level=READ_COMMITTED)

# Since isolation_level is READ_COMMITTED, no consumer records are
# expected to be returned here.
self.assertEqual(len(list(partition_recs)), 0)

0 comments on commit 95bafb9

Please sign in to comment.