2323from kafka .serializer import Serializer
2424from kafka .structs import TopicPartition
2525
26+
2627log = logging .getLogger (__name__ )
2728PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger ()
2829
@@ -375,13 +376,13 @@ def __init__(self, **configs):
375376 reporters = [reporter () for reporter in self .config ['metric_reporters' ]]
376377 self ._metrics = Metrics (metric_config , reporters )
377378
378- self . _client = KafkaClient (metrics = self ._metrics , metric_group_prefix = 'producer' ,
379- wakeup_timeout_ms = self .config ['max_block_ms' ],
380- ** self .config )
379+ client = KafkaClient (metrics = self ._metrics , metric_group_prefix = 'producer' ,
380+ wakeup_timeout_ms = self .config ['max_block_ms' ],
381+ ** self .config )
381382
382383 # Get auto-discovered version from client if necessary
383384 if self .config ['api_version' ] is None :
384- self .config ['api_version' ] = self . _client .config ['api_version' ]
385+ self .config ['api_version' ] = client .config ['api_version' ]
385386
386387 if self .config ['compression_type' ] == 'lz4' :
387388 assert self .config ['api_version' ] >= (0 , 8 , 2 ), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
@@ -397,9 +398,9 @@ def __init__(self, **configs):
397398
398399 message_version = self ._max_usable_produce_magic ()
399400 self ._accumulator = RecordAccumulator (message_version = message_version , metrics = self ._metrics , ** self .config )
400- self ._metadata = self . _client .cluster
401+ self ._metadata = client .cluster
401402 guarantee_message_order = bool (self .config ['max_in_flight_requests_per_connection' ] == 1 )
402- self ._sender = Sender (self . _client , self ._metadata ,
403+ self ._sender = Sender (client , self ._metadata ,
403404 self ._accumulator , self ._metrics ,
404405 guarantee_message_order = guarantee_message_order ,
405406 ** self .config )
@@ -413,20 +414,16 @@ def __init__(self, **configs):
413414
414415 def bootstrap_connected (self ):
415416 """Return True if the bootstrap is connected."""
416- if self ._client ._bootstrap_fails > 0 :
417- return False
418- return True
417+ return self ._sender .bootstrap_connected ()
419418
420419 def _cleanup_factory (self ):
421420 """Build a cleanup clojure that doesn't increase our ref count"""
422421 _self = weakref .proxy (self )
423-
424422 def wrapper ():
425423 try :
426424 _self .close (timeout = 0 )
427425 except (ReferenceError , AttributeError ):
428426 pass
429-
430427 return wrapper
431428
432429 def _unregister_cleanup (self ):
0 commit comments