Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,22 @@ class KafkaClient(object):
# socket timeout.
def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
correlation_id=0):
correlation_id=0,
ca="/etc/ssl/certs/ca-bundle.crt",
certfile=None,
keyfile=None,
ssl=False,
verify_hostname=False):
# We need one connection to bootstrap
self.client_id = kafka_bytestring(client_id)
self.timeout = timeout
self.hosts = collect_hosts(hosts)
self.correlation_id = correlation_id
self.ca = ca
self.certfile = certfile
self.keyfile = keyfile
self.ssl = ssl
self.verify_hostname = verify_hostname

# create connections only when we need them
self.conns = {}
Expand All @@ -55,7 +65,12 @@ def _get_conn(self, host, port):
self.conns[host_key] = KafkaConnection(
host,
port,
timeout=self.timeout
timeout=self.timeout,
ca=self.ca,
certfile=self.certfile,
keyfile=self.keyfile,
ssl=self.ssl,
verify_hostname=self.verify_hostname
)

return self.conns[host_key]
Expand Down
46 changes: 41 additions & 5 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from random import shuffle
import socket
import ssl
import struct
from threading import local

Expand Down Expand Up @@ -53,12 +54,23 @@ class KafkaConnection(local):
port: the port number the kafka broker is listening on
timeout: default 120. The socket timeout for sending and receiving data
in seconds. None means no timeout, so a request can block forever.
certfile: the SSL certificate file of the client (requires Kafka 0.9+)
keyfile: the SSL key file of the client (requires Kafka 0.9+)
ca: The SSL CA (requires Kafka 0.9+)
ssl: Use SSL when communicating with Kafka
verify_hostname: Whether or not to verify the server's hostname against its cert
"""
def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
ssl=False, ca=None, certfile=None, keyfile=None, verify_hostname=False):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
self.timeout = timeout
self.ca = ca
self.certfile = certfile
self.keyfile = keyfile
self.ssl = ssl
self.verify_hostname = verify_hostname
self._sock = None

self.reinit()
Expand Down Expand Up @@ -102,7 +114,7 @@ def _read_bytes(self, num_bytes):
if data == b'':
raise socket.error("Not enough data to read message -- did server kill socket?")

except socket.error:
except (socket.error, ssl.SSLError):
log.exception('Unable to receive data from Kafka')
self._raise_connection_error()

Expand Down Expand Up @@ -135,7 +147,7 @@ def send(self, request_id, payload):

try:
self._sock.sendall(payload)
except socket.error:
except (socket.error, ssl.SSLError):
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()

Expand Down Expand Up @@ -172,6 +184,11 @@ def copy(self):
c.host = copy.copy(self.host)
c.port = copy.copy(self.port)
c.timeout = copy.copy(self.timeout)
c.ca = copy.copy(self.ca)
c.certfile = copy.copy(self.certfile)
c.keyfile = copy.copy(self.keyfile)
c.ssl = copy.copy(self.ssl)
c.verify_hostname = copy.copy(self.verify_hostname)
c._sock = None
return c

Expand All @@ -186,7 +203,7 @@ def close(self):
# closed by the server
try:
self._sock.shutdown(socket.SHUT_RDWR)
except socket.error:
except (socket.error, ssl.SSLError):
pass

# Closing the socket should always succeed
Expand All @@ -210,5 +227,24 @@ def reinit(self):
try:
self._sock = socket.create_connection((self.host, self.port), self.timeout)
except socket.error:
log.exception('Unable to connect to kafka broker at %s:%d' % (self.host, self.port))
log.exception('Unable to connect to Kafka broker at %s:%r', self.host, self.port)
self._raise_connection_error()

if self.ssl:
# Disallow use of SSLv2 and V3 (meaning we require TLSv1.0+)
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # pylint: disable=no-member
context.options |= ssl.OP_NO_SSLv2 # pylint: disable=no-member
context.options |= ssl.OP_NO_SSLv3 # pylint: disable=no-member
context.verify_mode = ssl.CERT_OPTIONAL
if self.verify_hostname:
context.check_hostname = True
if self.ca:
context.load_verify_locations(self.ca)
context.verify_mode = ssl.CERT_REQUIRED
if self.certfile and self.keyfile:
context.load_cert_chain(certfile=self.certfile, keyfile=self.keyfile)
try:
self._sock = context.wrap_socket(self._sock, server_hostname=self.host)
except ssl.SSLError:
log.exception('Unable to connect to Kafka broker at %s:%r over SSL', self.host, self.port)
self._raise_connection_error()