Skip to content

Commit

Permalink
Merge ea3f2b8 into 27f939a
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwidman committed May 10, 2018
2 parents 27f939a + ea3f2b8 commit 53aadf2
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 31 deletions.
12 changes: 6 additions & 6 deletions kafka/client.py
Expand Up @@ -11,7 +11,7 @@
from kafka.vendor import six

import kafka.errors
from kafka.errors import (UnknownError, ConnectionError, FailedPayloadsError,
from kafka.errors import (UnknownError, KafkaConnectionError, FailedPayloadsError,
KafkaTimeoutError, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
NotLeaderForPartitionError, ReplicaNotAvailableError)
Expand Down Expand Up @@ -73,7 +73,7 @@ def _get_conn(self, host, port, afi):
conn = self._conns[host_key]
if not conn.connect_blocking(self.timeout):
conn.close()
raise ConnectionError("%s:%s (%s)" % (host, port, afi))
raise KafkaConnectionError("%s:%s (%s)" % (host, port, afi))
return conn

def _get_leader_for_partition(self, topic, partition):
Expand Down Expand Up @@ -156,7 +156,7 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
for (host, port, afi) in hosts:
try:
conn = self._get_conn(host, port, afi)
except ConnectionError:
except KafkaConnectionError:
log.warning("Skipping unconnected connection: %s:%s (AFI %s)",
host, port, afi)
continue
Expand Down Expand Up @@ -242,7 +242,7 @@ def failed_payloads(payloads):
host, port, afi = get_ip_port_afi(broker.host)
try:
conn = self._get_conn(host, broker.port, afi)
except ConnectionError:
except KafkaConnectionError:
refresh_metadata = True
failed_payloads(broker_payloads)
continue
Expand Down Expand Up @@ -344,8 +344,8 @@ def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn):
try:
host, port, afi = get_ip_port_afi(broker.host)
conn = self._get_conn(host, broker.port, afi)
except ConnectionError as e:
log.warning('ConnectionError attempting to send request %s '
except KafkaConnectionError as e:
log.warning('KafkaConnectionError attempting to send request %s '
'to server %s: %s', request_id, broker, e)

for payload in payloads:
Expand Down
2 changes: 1 addition & 1 deletion kafka/client_async.py
Expand Up @@ -602,7 +602,7 @@ def _poll(self, timeout):
log.warning('Protocol out of sync on %r, closing', conn)
except socket.error:
pass
conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests'))
conn.close(Errors.KafkaConnectionError('Socket EVENT_READ without in-flight-requests'))
continue

self._idle_expiry_manager.update(conn.node_id)
Expand Down
28 changes: 14 additions & 14 deletions kafka/conn.py
Expand Up @@ -327,7 +327,7 @@ def connect(self):
self.last_attempt = time.time()
next_lookup = self._next_afi_sockaddr()
if not next_lookup:
self.close(Errors.ConnectionError('DNS failure'))
self.close(Errors.KafkaConnectionError('DNS failure'))
return
else:
log.debug('%s: creating new socket', self)
Expand Down Expand Up @@ -381,12 +381,12 @@ def connect(self):
log.error('Connect attempt to %s returned error %s.'
' Disconnecting.', self, ret)
errstr = errno.errorcode.get(ret, 'UNKNOWN')
self.close(Errors.ConnectionError('{} {}'.format(ret, errstr)))
self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr)))

# Connection timed out
elif time.time() > request_timeout + self.last_attempt:
log.error('Connection attempt to %s timed out', self)
self.close(Errors.ConnectionError('timeout'))
self.close(Errors.KafkaConnectionError('timeout'))

# Needs retry
else:
Expand Down Expand Up @@ -463,7 +463,7 @@ def _try_handshake(self):
pass
except (SSLZeroReturnError, ConnectionError, SSLEOFError):
log.warning('SSL connection closed by server during handshake.')
self.close(Errors.ConnectionError('SSL connection closed by server during handshake'))
self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))
# Other SSLErrors will be raised to user

return False
Expand All @@ -488,7 +488,7 @@ def _try_authenticate(self):
return False
elif self._sasl_auth_future.failed():
ex = self._sasl_auth_future.exception
if not isinstance(ex, Errors.ConnectionError):
if not isinstance(ex, Errors.KafkaConnectionError):
raise ex # pylint: disable-msg=raising-bad-type
return self._sasl_auth_future.succeeded()

Expand Down Expand Up @@ -558,8 +558,8 @@ def _try_authenticate_plain(self, future):
data = self._recv_bytes_blocking(4)

except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
log.exception("%s: Error receiving reply from server", self)
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)

Expand Down Expand Up @@ -621,7 +621,7 @@ def _try_authenticate_gssapi(self, future):

except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)
except Exception as e:
Expand Down Expand Up @@ -701,7 +701,7 @@ def close(self, error=None):
Arguments:
error (Exception, optional): pending in-flight-requests
will be failed with this exception.
Default: kafka.errors.ConnectionError.
Default: kafka.errors.KafkaConnectionError.
"""
if self.state is ConnectionStates.DISCONNECTED:
if error is not None:
Expand Down Expand Up @@ -733,7 +733,7 @@ def send(self, request):
if self.connecting():
return future.failure(Errors.NodeNotReadyError(str(self)))
elif not self.connected():
return future.failure(Errors.ConnectionError(str(self)))
return future.failure(Errors.KafkaConnectionError(str(self)))
elif not self.can_send_more():
return future.failure(Errors.TooManyInFlightRequests(str(self)))
return self._send(request)
Expand All @@ -753,7 +753,7 @@ def _send(self, request):
self._sensors.bytes_sent.record(total_bytes)
except ConnectionError as e:
log.exception("Error sending %s to %s", request, self)
error = Errors.ConnectionError("%s: %s" % (self, e))
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)
log.debug('%s Request %d: %s', self, correlation_id, request)
Expand Down Expand Up @@ -781,7 +781,7 @@ def recv(self):
# If requests are pending, we should close the socket and
# fail all the pending request futures
if self.in_flight_requests:
self.close(Errors.ConnectionError('Socket not connected during recv with in-flight-requests'))
self.close(Errors.KafkaConnectionError('Socket not connected during recv with in-flight-requests'))
return ()

elif not self.in_flight_requests:
Expand Down Expand Up @@ -821,7 +821,7 @@ def _recv(self):
# without an exception raised
if not data:
log.error('%s: socket disconnected', self)
self.close(error=Errors.ConnectionError('socket disconnected'))
self.close(error=Errors.KafkaConnectionError('socket disconnected'))
return []
else:
recvd.append(data)
Expand All @@ -833,7 +833,7 @@ def _recv(self):
break
log.exception('%s: Error receiving network data'
' closing socket', self)
self.close(error=Errors.ConnectionError(e))
self.close(error=Errors.KafkaConnectionError(e))
return []
except BlockingIOError:
if six.PY3:
Expand Down
6 changes: 3 additions & 3 deletions kafka/errors.py
Expand Up @@ -447,7 +447,7 @@ def __init__(self, payload, *args):
self.payload = payload


class ConnectionError(KafkaError):
class KafkaConnectionError(KafkaError):
retriable = True
invalid_metadata = True

Expand Down Expand Up @@ -517,13 +517,13 @@ def check_error(response):

RETRY_BACKOFF_ERROR_TYPES = (
KafkaUnavailableError, LeaderNotAvailableError,
ConnectionError, FailedPayloadsError
KafkaConnectionError, FailedPayloadsError
)


RETRY_REFRESH_ERROR_TYPES = (
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
LeaderNotAvailableError, ConnectionError
LeaderNotAvailableError, KafkaConnectionError
)


Expand Down
1 change: 0 additions & 1 deletion kafka/producer/base.py
Expand Up @@ -372,7 +372,6 @@ def send_messages(self, topic, partition, *msg):
Raises:
FailedPayloadsError: low-level connection error, can be caused by
networking failures, or a malformed request.
ConnectionError:
KafkaUnavailableError: all known brokers are down when attempting
to refresh metadata.
LeaderNotAvailableError: topic or partition is initializing or
Expand Down
2 changes: 1 addition & 1 deletion test/test_client.py
Expand Up @@ -8,7 +8,7 @@
from kafka import SimpleClient
from kafka.errors import (
KafkaUnavailableError, LeaderNotAvailableError, KafkaTimeoutError,
UnknownTopicOrPartitionError, ConnectionError, FailedPayloadsError)
UnknownTopicOrPartitionError, FailedPayloadsError)
from kafka.future import Future
from kafka.protocol import KafkaProtocol, create_message
from kafka.protocol.metadata import MetadataResponse
Expand Down
4 changes: 2 additions & 2 deletions test/test_conn.py
Expand Up @@ -99,7 +99,7 @@ def test_send_disconnected(conn):
conn.state = ConnectionStates.DISCONNECTED
f = conn.send('foobar')
assert f.failed() is True
assert isinstance(f.exception, Errors.ConnectionError)
assert isinstance(f.exception, Errors.KafkaConnectionError)


def test_send_connecting(conn):
Expand Down Expand Up @@ -162,7 +162,7 @@ def test_send_error(_socket, conn):
_socket.send.side_effect = socket.error
f = conn.send(req)
assert f.failed() is True
assert isinstance(f.exception, Errors.ConnectionError)
assert isinstance(f.exception, Errors.KafkaConnectionError)
assert _socket.close.call_count == 1
assert conn.state is ConnectionStates.DISCONNECTED

Expand Down
6 changes: 3 additions & 3 deletions test/test_failover_integration.py
Expand Up @@ -4,7 +4,7 @@

from kafka import SimpleClient, SimpleConsumer, KeyedProducer
from kafka.errors import (
FailedPayloadsError, ConnectionError, RequestTimedOutError,
FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
NotLeaderForPartitionError)
from kafka.producer.base import Producer
from kafka.structs import TopicPartition
Expand Down Expand Up @@ -79,7 +79,7 @@ def test_switch_leader(self):
producer.send_messages(topic, partition, b'success')
log.debug("success!")
recovered = True
except (FailedPayloadsError, ConnectionError, RequestTimedOutError,
except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
NotLeaderForPartitionError):
log.debug("caught exception sending message -- will retry")
continue
Expand Down Expand Up @@ -167,7 +167,7 @@ def test_switch_leader_keyed_producer(self):
producer.send_messages(topic, key, msg)
if producer.partitioners[topic].partition(key) == 0:
recovered = True
except (FailedPayloadsError, ConnectionError, RequestTimedOutError,
except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
NotLeaderForPartitionError):
log.debug("caught exception sending message -- will retry")
continue
Expand Down

0 comments on commit 53aadf2

Please sign in to comment.