@@ -212,13 +212,12 @@ class BrokerConnection(object):
212212 SASL_MECHANISMS = ('PLAIN' , 'GSSAPI' )
213213
214214 def __init__ (self , host , port , afi , ** configs ):
215- self .hostname = host
216215 self .host = host
217216 self .port = port
218217 self .afi = afi
219- self ._init_host = host
220- self ._init_port = port
221- self ._init_afi = afi
218+ self ._sock_ip = host
219+ self ._sock_port = port
220+ self ._sock_afi = afi
222221 self .in_flight_requests = collections .deque ()
223222 self ._api_versions = None
224223
@@ -273,10 +272,10 @@ def __init__(self, host, port, afi, **configs):
273272 self .node_id )
274273
275274 def _dns_lookup (self ):
276- self ._gai = dns_lookup (self ._init_host , self ._init_port , self ._init_afi )
275+ self ._gai = dns_lookup (self .host , self .port , self .afi )
277276 if not self ._gai :
278277 log .error ('DNS lookup failed for %s:%i (%s)' ,
279- self ._init_host , self ._init_port , self ._init_afi )
278+ self .host , self .port , self .afi )
280279 return False
281280 return True
282281
@@ -334,8 +333,8 @@ def connect(self):
334333 return
335334 else :
336335 log .debug ('%s: creating new socket' , self )
337- self .afi , self .host , self .port = next_lookup
338- self ._sock = socket .socket (self .afi , socket .SOCK_STREAM )
336+ self ._sock_afi , self ._sock_ip , self ._sock_port = next_lookup
337+ self ._sock = socket .socket (self ._sock_afi , socket .SOCK_STREAM )
339338
340339 for option in self .config ['socket_options' ]:
341340 log .debug ('%s: setting socket option %s' , self , option )
@@ -349,15 +348,17 @@ def connect(self):
349348 # so we need to double check that we are still connecting before
350349 if self .connecting ():
351350 self .config ['state_change_callback' ](self )
352- log .info ('%s: connecting to %s:%d' , self , self .host , self .port )
351+ log .info ('%s: connecting to %s:%d [%s:%d %s]' , self , self .host ,
352+ self .port , self ._sock_ip , self ._sock_port ,
353+ AFI_NAMES [self ._sock_afi ])
353354
354355 if self .state is ConnectionStates .CONNECTING :
355356 # in non-blocking mode, use repeated calls to socket.connect_ex
356357 # to check connection status
357358 request_timeout = self .config ['request_timeout_ms' ] / 1000.0
358359 ret = None
359360 try :
360- ret = self ._sock .connect_ex ((self .host , self .port ))
361+ ret = self ._sock .connect_ex ((self ._sock_ip , self ._sock_port ))
361362 except socket .error as err :
362363 ret = err .errno
363364
@@ -449,7 +450,7 @@ def _wrap_ssl(self):
449450 try :
450451 self ._sock = self ._ssl_context .wrap_socket (
451452 self ._sock ,
452- server_hostname = self .hostname ,
453+ server_hostname = self .host ,
453454 do_handshake_on_connect = False )
454455 except ssl .SSLError as e :
455456 log .exception ('%s: Failed to wrap socket in SSLContext!' , self )
@@ -573,7 +574,7 @@ def _try_authenticate_plain(self, future):
573574 return future .success (True )
574575
575576 def _try_authenticate_gssapi (self , future ):
576- auth_id = self .config ['sasl_kerberos_service_name' ] + '@' + self .hostname
577+ auth_id = self .config ['sasl_kerberos_service_name' ] + '@' + self .host
577578 gssapi_name = gssapi .Name (
578579 auth_id ,
579580 name_type = gssapi .NameType .hostbased_service
@@ -1002,9 +1003,9 @@ def filter(self, record):
10021003 return version
10031004
10041005 def __str__ (self ):
1005- return "<BrokerConnection node_id=%s host=%s/%s port=%d afi=%s >" % (
1006- self .node_id , self .hostname , self .host , self .port ,
1007- AFI_NAMES [self .afi ])
1006+ return "<BrokerConnection node_id=%s host=%s:%d %s [%s:%d %s] >" % (
1007+ self .node_id , self .host , self .port , self .state ,
1008+ self . _sock_ip , self . _sock_port , AFI_NAMES [self ._sock_afi ])
10081009
10091010
10101011class BrokerConnectionMetrics (object ):
0 commit comments