File tree Expand file tree Collapse file tree 2 files changed +20
-1
lines changed Expand file tree Collapse file tree 2 files changed +20
-1
lines changed Original file line number Diff line number Diff line change @@ -47,6 +47,9 @@ def is_valid(self):
4747 def match (self , batch ):
4848 return self .producer_id == batch .producer_id and self .epoch == batch .producer_epoch
4949
50+ def __eq__ (self , other ):
51+ return isinstance (other , ProducerIdAndEpoch ) and self .producer_id == other .producer_id and self .epoch == other .epoch
52+
5053 def __str__ (self ):
5154 return "ProducerIdAndEpoch(producer_id={}, epoch={})" .format (self .producer_id , self .epoch )
5255
@@ -304,7 +307,7 @@ def reset_producer_id(self):
304307 it's best to return the produce error to the user and let them abort the transaction and close the producer explicitly.
305308 """
306309 with self ._lock :
307- if self .is_transactional :
310+ if self .is_transactional () :
308311 raise Errors .IllegalStateError (
309312 "Cannot reset producer state for a transactional producer."
310313 " You must either abort the ongoing transaction or"
Original file line number Diff line number Diff line change 77import pytest
88
99from kafka import KafkaProducer
10+ from kafka .cluster import ClusterMetadata
11+ from kafka .producer .transaction_manager import TransactionManager , ProducerIdAndEpoch
12+
1013
1114@pytest .mark .skipif (platform .python_implementation () != 'CPython' ,
1215 reason = 'Test relies on CPython-specific gc policies' )
@@ -20,4 +23,17 @@ def test_kafka_producer_gc_cleanup():
2023 assert threading .active_count () == threads
2124
2225
26+ def test_idempotent_producer_reset_producer_id ():
27+ transaction_manager = TransactionManager (
28+ transactional_id = None ,
29+ transaction_timeout_ms = 1000 ,
30+ retry_backoff_ms = 100 ,
31+ api_version = (0 , 11 ),
32+ metadata = ClusterMetadata (),
33+ )
2334
35+ test_producer_id_and_epoch = ProducerIdAndEpoch (123 , 456 )
36+ transaction_manager .set_producer_id_and_epoch (test_producer_id_and_epoch )
37+ assert transaction_manager .producer_id_and_epoch == test_producer_id_and_epoch
38+ transaction_manager .reset_producer_id ()
39+ assert transaction_manager .producer_id_and_epoch == ProducerIdAndEpoch (- 1 , - 1 )
You can’t perform that action at this time.
0 commit comments