diff --git a/kafka/client.py b/kafka/client.py index 10b1724e4..789d4da3d 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -11,7 +11,7 @@ from kafka.vendor import six import kafka.errors -from kafka.errors import (UnknownError, ConnectionError, FailedPayloadsError, +from kafka.errors import (UnknownError, KafkaConnectionError, FailedPayloadsError, KafkaTimeoutError, KafkaUnavailableError, LeaderNotAvailableError, UnknownTopicOrPartitionError, NotLeaderForPartitionError, ReplicaNotAvailableError) @@ -73,7 +73,7 @@ def _get_conn(self, host, port, afi): conn = self._conns[host_key] if not conn.connect_blocking(self.timeout): conn.close() - raise ConnectionError("%s:%s (%s)" % (host, port, afi)) + raise KafkaConnectionError("%s:%s (%s)" % (host, port, afi)) return conn def _get_leader_for_partition(self, topic, partition): @@ -156,7 +156,7 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn): for (host, port, afi) in hosts: try: conn = self._get_conn(host, port, afi) - except ConnectionError: + except KafkaConnectionError: log.warning("Skipping unconnected connection: %s:%s (AFI %s)", host, port, afi) continue @@ -242,7 +242,7 @@ def failed_payloads(payloads): host, port, afi = get_ip_port_afi(broker.host) try: conn = self._get_conn(host, broker.port, afi) - except ConnectionError: + except KafkaConnectionError: refresh_metadata = True failed_payloads(broker_payloads) continue @@ -344,8 +344,8 @@ def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn): try: host, port, afi = get_ip_port_afi(broker.host) conn = self._get_conn(host, broker.port, afi) - except ConnectionError as e: - log.warning('ConnectionError attempting to send request %s ' + except KafkaConnectionError as e: + log.warning('KafkaConnectionError attempting to send request %s ' 'to server %s: %s', request_id, broker, e) for payload in payloads: diff --git a/kafka/client_async.py b/kafka/client_async.py index 9556eca12..a9704fafd 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -602,7 +602,7 @@ def _poll(self, timeout): log.warning('Protocol out of sync on %r, closing', conn) except socket.error: pass - conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests')) + conn.close(Errors.KafkaConnectionError('Socket EVENT_READ without in-flight-requests')) continue self._idle_expiry_manager.update(conn.node_id) diff --git a/kafka/conn.py b/kafka/conn.py index daaa234d5..f67edfbc9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -327,7 +327,7 @@ def connect(self): self.last_attempt = time.time() next_lookup = self._next_afi_sockaddr() if not next_lookup: - self.close(Errors.ConnectionError('DNS failure')) + self.close(Errors.KafkaConnectionError('DNS failure')) return else: log.debug('%s: creating new socket', self) @@ -381,12 +381,12 @@ def connect(self): log.error('Connect attempt to %s returned error %s.' ' Disconnecting.', self, ret) errstr = errno.errorcode.get(ret, 'UNKNOWN') - self.close(Errors.ConnectionError('{} {}'.format(ret, errstr))) + self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr))) # Connection timed out elif time.time() > request_timeout + self.last_attempt: log.error('Connection attempt to %s timed out', self) - self.close(Errors.ConnectionError('timeout')) + self.close(Errors.KafkaConnectionError('timeout')) # Needs retry else: @@ -463,7 +463,7 @@ def _try_handshake(self): pass except (SSLZeroReturnError, ConnectionError, SSLEOFError): log.warning('SSL connection closed by server during handshake.') - self.close(Errors.ConnectionError('SSL connection closed by server during handshake')) + self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake')) # Other SSLErrors will be raised to user return False @@ -488,7 +488,7 @@ def _try_authenticate(self): return False elif self._sasl_auth_future.failed(): ex = self._sasl_auth_future.exception - if not isinstance(ex, Errors.ConnectionError): + if not isinstance(ex, Errors.KafkaConnectionError): raise ex # pylint: disable-msg=raising-bad-type return self._sasl_auth_future.succeeded() @@ -558,8 +558,8 @@ def _try_authenticate_plain(self, future): data = self._recv_bytes_blocking(4) except ConnectionError as e: - log.exception("%s: Error receiving reply from server", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) + log.exception("%s: Error receiving reply from server", self) + error = Errors.KafkaConnectionError("%s: %s" % (self, e)) self.close(error=error) return future.failure(error) @@ -621,7 +621,7 @@ def _try_authenticate_gssapi(self, future): except ConnectionError as e: log.exception("%s: Error receiving reply from server", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) + error = Errors.KafkaConnectionError("%s: %s" % (self, e)) self.close(error=error) return future.failure(error) except Exception as e: @@ -701,7 +701,7 @@ def close(self, error=None): Arguments: error (Exception, optional): pending in-flight-requests will be failed with this exception. - Default: kafka.errors.ConnectionError. + Default: kafka.errors.KafkaConnectionError. """ if self.state is ConnectionStates.DISCONNECTED: if error is not None: @@ -733,7 +733,7 @@ def send(self, request): if self.connecting(): return future.failure(Errors.NodeNotReadyError(str(self))) elif not self.connected(): - return future.failure(Errors.ConnectionError(str(self))) + return future.failure(Errors.KafkaConnectionError(str(self))) elif not self.can_send_more(): return future.failure(Errors.TooManyInFlightRequests(str(self))) return self._send(request) @@ -753,7 +753,7 @@ def _send(self, request): self._sensors.bytes_sent.record(total_bytes) except ConnectionError as e: log.exception("Error sending %s to %s", request, self) - error = Errors.ConnectionError("%s: %s" % (self, e)) + error = Errors.KafkaConnectionError("%s: %s" % (self, e)) self.close(error=error) return future.failure(error) log.debug('%s Request %d: %s', self, correlation_id, request) @@ -781,7 +781,7 @@ def recv(self): # If requests are pending, we should close the socket and # fail all the pending request futures if self.in_flight_requests: - self.close(Errors.ConnectionError('Socket not connected during recv with in-flight-requests')) + self.close(Errors.KafkaConnectionError('Socket not connected during recv with in-flight-requests')) return () elif not self.in_flight_requests: @@ -821,7 +821,7 @@ def _recv(self): # without an exception raised if not data: log.error('%s: socket disconnected', self) - self.close(error=Errors.ConnectionError('socket disconnected')) + self.close(error=Errors.KafkaConnectionError('socket disconnected')) return [] else: recvd.append(data) @@ -833,7 +833,7 @@ def _recv(self): break log.exception('%s: Error receiving network data' ' closing socket', self) - self.close(error=Errors.ConnectionError(e)) + self.close(error=Errors.KafkaConnectionError(e)) return [] except BlockingIOError: if six.PY3: diff --git a/kafka/errors.py b/kafka/errors.py index c70853c69..f4c87407d 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -447,7 +447,7 @@ def __init__(self, payload, *args): self.payload = payload -class ConnectionError(KafkaError): +class KafkaConnectionError(KafkaError): retriable = True invalid_metadata = True @@ -517,13 +517,13 @@ def check_error(response): RETRY_BACKOFF_ERROR_TYPES = ( KafkaUnavailableError, LeaderNotAvailableError, - ConnectionError, FailedPayloadsError + KafkaConnectionError, FailedPayloadsError ) RETRY_REFRESH_ERROR_TYPES = ( NotLeaderForPartitionError, UnknownTopicOrPartitionError, - LeaderNotAvailableError, ConnectionError + LeaderNotAvailableError, KafkaConnectionError ) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index e8d6c3d27..c9dd6c3a1 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -372,7 +372,6 @@ def send_messages(self, topic, partition, *msg): Raises: FailedPayloadsError: low-level connection error, can be caused by networking failures, or a malformed request. - ConnectionError: KafkaUnavailableError: all known brokers are down when attempting to refresh metadata. LeaderNotAvailableError: topic or partition is initializing or diff --git a/test/test_client.py b/test/test_client.py index d02c621a2..c53983c94 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -8,7 +8,7 @@ from kafka import SimpleClient from kafka.errors import ( KafkaUnavailableError, LeaderNotAvailableError, KafkaTimeoutError, - UnknownTopicOrPartitionError, ConnectionError, FailedPayloadsError) + UnknownTopicOrPartitionError, FailedPayloadsError) from kafka.future import Future from kafka.protocol import KafkaProtocol, create_message from kafka.protocol.metadata import MetadataResponse diff --git a/test/test_conn.py b/test/test_conn.py index 12a32efb2..fbdeeb9e7 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -99,7 +99,7 @@ def test_send_disconnected(conn): conn.state = ConnectionStates.DISCONNECTED f = conn.send('foobar') assert f.failed() is True - assert isinstance(f.exception, Errors.ConnectionError) + assert isinstance(f.exception, Errors.KafkaConnectionError) def test_send_connecting(conn): @@ -162,7 +162,7 @@ def test_send_error(_socket, conn): _socket.send.side_effect = socket.error f = conn.send(req) assert f.failed() is True - assert isinstance(f.exception, Errors.ConnectionError) + assert isinstance(f.exception, Errors.KafkaConnectionError) assert _socket.close.call_count == 1 assert conn.state is ConnectionStates.DISCONNECTED diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 797e1c8ea..ad7dcb98b 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -4,7 +4,7 @@ from kafka import SimpleClient, SimpleConsumer, KeyedProducer from kafka.errors import ( - FailedPayloadsError, ConnectionError, RequestTimedOutError, + FailedPayloadsError, KafkaConnectionError, RequestTimedOutError, NotLeaderForPartitionError) from kafka.producer.base import Producer from kafka.structs import TopicPartition @@ -79,7 +79,7 @@ def test_switch_leader(self): producer.send_messages(topic, partition, b'success') log.debug("success!") recovered = True - except (FailedPayloadsError, ConnectionError, RequestTimedOutError, + except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError, NotLeaderForPartitionError): log.debug("caught exception sending message -- will retry") continue @@ -167,7 +167,7 @@ def test_switch_leader_keyed_producer(self): producer.send_messages(topic, key, msg) if producer.partitioners[topic].partition(key) == 0: recovered = True - except (FailedPayloadsError, ConnectionError, RequestTimedOutError, + except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError, NotLeaderForPartitionError): log.debug("caught exception sending message -- will retry") continue