Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dist
MANIFEST
env
servers/*/kafka-bin*
servers/*/resources/ssl*
.coverage*
.noseids
docs/_build
Expand Down
39 changes: 37 additions & 2 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ class KafkaClient(object):
'send_buffer_bytes': None,
'retry_backoff_ms': 100,
'metadata_max_age_ms': 300000,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
'ssl_cafile': None,
'ssl_certfile': None,
'ssl_keyfile': None,
}

def __init__(self, **configs):
Expand Down Expand Up @@ -90,6 +96,21 @@ def __init__(self, **configs):
brokers or partitions. Default: 300000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations
will be ignored. Default: None.
ssl_check_hostname (bool): flag to configure whether ssl handshake
should verify that the certificate matches the brokers hostname.
default: true.
ssl_cafile (str): optional filename of ca file to use in certificate
veriication. default: none.
ssl_certfile (str): optional filename of file in pem format containing
the client certificate, as well as any ca certificates needed to
establish the certificate's authenticity. default: none.
ssl_keyfile (str): optional filename containing the client private key.
default: none.
"""
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
Expand Down Expand Up @@ -168,8 +189,10 @@ def _can_connect(self, node_id):

def _conn_state_change(self, node_id, conn):
if conn.connecting():
self._connecting.add(node_id)
self._selector.register(conn._sock, selectors.EVENT_WRITE)
# SSL connections can enter this state 2x (second during Handshake)
if node_id not in self._connecting:
self._connecting.add(node_id)
self._selector.register(conn._sock, selectors.EVENT_WRITE)

elif conn.connected():
log.debug("Node %s connected", node_id)
Expand Down Expand Up @@ -412,14 +435,17 @@ def poll(self, timeout_ms=None, future=None, sleep=True):
def _poll(self, timeout, sleep=True):
# select on reads across all connected sockets, blocking up to timeout
assert self.in_flight_request_count() > 0 or self._connecting or sleep

responses = []
processed = set()
for key, events in self._selector.select(timeout):
if key.fileobj is self._wake_r:
self._clear_wake_fd()
continue
elif not (events & selectors.EVENT_READ):
continue
conn = key.data
processed.add(conn)
while conn.in_flight_requests:
response = conn.recv() # Note: conn.recv runs callbacks / errbacks

Expand All @@ -428,6 +454,15 @@ def _poll(self, timeout, sleep=True):
if not response:
break
responses.append(response)

# Check for additional pending SSL bytes
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
# TODO: optimize
for conn in self._conns.values():
if conn not in processed and conn.connected() and conn._sock.pending():
response = conn.recv()
if response:
responses.append(response)
return responses

def in_flight_request_count(self, node_id=None):
Expand Down
94 changes: 91 additions & 3 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io
from random import shuffle
import socket
import ssl
import struct
from threading import local
import time
Expand All @@ -29,11 +30,25 @@
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
DEFAULT_KAFKA_PORT = 9092

# support older ssl libraries
try:
assert ssl.SSLWantReadError
assert ssl.SSLWantWriteError
assert ssl.SSLZeroReturnError
except:
log.warning('old ssl module detected.'
' ssl error handling may not operate cleanly.'
' Consider upgrading to python 3.5 or 2.7')
ssl.SSLWantReadError = ssl.SSLError
ssl.SSLWantWriteError = ssl.SSLError
ssl.SSLZeroReturnError = ssl.SSLError


class ConnectionStates(object):
DISCONNECTING = '<disconnecting>'
DISCONNECTED = '<disconnected>'
CONNECTING = '<connecting>'
HANDSHAKE = '<handshake>'
CONNECTED = '<connected>'


Expand All @@ -49,6 +64,12 @@ class BrokerConnection(object):
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
'ssl_cafile': None,
'ssl_certfile': None,
'ssl_keyfile': None,
'api_version': (0, 8, 2), # default to most restrictive
'state_change_callback': lambda conn: True,
}
Expand All @@ -66,6 +87,9 @@ def __init__(self, host, port, afi, **configs):

self.state = ConnectionStates.DISCONNECTED
self._sock = None
self._ssl_context = None
if self.config['ssl_context'] is not None:
self._ssl_context = self.config['ssl_context']
self._rbuffer = io.BytesIO()
self._receiving = False
self._next_payload_bytes = 0
Expand All @@ -87,6 +111,8 @@ def connect(self):
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,
self.config['send_buffer_bytes'])
self._sock.setblocking(False)
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
self._wrap_ssl()
self.state = ConnectionStates.CONNECTING
self.last_attempt = time.time()
self.config['state_change_callback'](self)
Expand All @@ -103,7 +129,11 @@ def connect(self):
# Connection succeeded
if not ret or ret == errno.EISCONN:
log.debug('%s: established TCP connection', str(self))
self.state = ConnectionStates.CONNECTED
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
log.debug('%s: initiating SSL handshake', str(self))
self.state = ConnectionStates.HANDSHAKE
else:
self.state = ConnectionStates.CONNECTED
self.config['state_change_callback'](self)

# Connection failed
Expand All @@ -122,8 +152,60 @@ def connect(self):
else:
pass

if self.state is ConnectionStates.HANDSHAKE:
if self._try_handshake():
log.debug('%s: completed SSL handshake.', str(self))
self.state = ConnectionStates.CONNECTED
self.config['state_change_callback'](self)

return self.state

def _wrap_ssl(self):
assert self.config['security_protocol'] in ('SSL', 'SASL_SSL')
if self._ssl_context is None:
log.debug('%s: configuring default SSL Context', str(self))
self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # pylint: disable=no-member
self._ssl_context.options |= ssl.OP_NO_SSLv2 # pylint: disable=no-member
self._ssl_context.options |= ssl.OP_NO_SSLv3 # pylint: disable=no-member
self._ssl_context.verify_mode = ssl.CERT_OPTIONAL
if self.config['ssl_check_hostname']:
self._ssl_context.check_hostname = True
if self.config['ssl_cafile']:
log.info('%s: Loading SSL CA from %s', str(self), self.config['ssl_cafile'])
self._ssl_context.load_verify_locations(self.config['ssl_cafile'])
self._ssl_context.verify_mode = ssl.CERT_REQUIRED
if self.config['ssl_certfile'] and self.config['ssl_keyfile']:
log.info('%s: Loading SSL Cert from %s', str(self), self.config['ssl_certfile'])
log.info('%s: Loading SSL Key from %s', str(self), self.config['ssl_keyfile'])
self._ssl_context.load_cert_chain(
certfile=self.config['ssl_certfile'],
keyfile=self.config['ssl_keyfile'])
log.debug('%s: wrapping socket in ssl context', str(self))
try:
self._sock = self._ssl_context.wrap_socket(
self._sock,
server_hostname=self.host,
do_handshake_on_connect=False)
except ssl.SSLError:
log.exception('%s: Failed to wrap socket in SSLContext!', str(self))
self.close()
self.last_failure = time.time()

def _try_handshake(self):
assert self.config['security_protocol'] in ('SSL', 'SASL_SSL')
try:
self._sock.do_handshake()
return True
# old ssl in python2.6 will swallow all SSLErrors here...
except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
pass
except ssl.SSLZeroReturnError:
log.warning('SSL connection closed by server during handshake.')
self.close()
# Other SSLErrors will be raised to user

return False

def blacked_out(self):
"""
Return true if we are disconnected from the given node and can't
Expand All @@ -140,8 +222,10 @@ def connected(self):
return self.state is ConnectionStates.CONNECTED

def connecting(self):
"""Return True iff socket is in intermediate connecting state."""
return self.state is ConnectionStates.CONNECTING
"""Returns True if still connecting (this may encompass several
different states, such as SSL handshake, authorization, etc)."""
return self.state in (ConnectionStates.CONNECTING,
ConnectionStates.HANDSHAKE)

def disconnected(self):
"""Return True iff socket is closed"""
Expand Down Expand Up @@ -260,6 +344,8 @@ def recv(self):
# An extremely small, but non-zero, probability that there are
# more than 0 but not yet 4 bytes available to read
self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell()))
except ssl.SSLWantReadError:
return None
except ConnectionError as e:
if six.PY2 and e.errno == errno.EWOULDBLOCK:
return None
Expand All @@ -286,6 +372,8 @@ def recv(self):
staged_bytes = self._rbuffer.tell()
try:
self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes))
except ssl.SSLWantReadError:
return None
except ConnectionError as e:
# Extremely small chance that we have exactly 4 bytes for a
# header, but nothing to read in the body yet
Expand Down
21 changes: 21 additions & 0 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,21 @@ class KafkaConsumer(six.Iterator):
consumer_timeout_ms (int): number of millisecond to throw a timeout
exception to the consumer if no message is available for
consumption. Default: -1 (dont throw exception)
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations
will be ignored. Default: None.
ssl_check_hostname (bool): flag to configure whether ssl handshake
should verify that the certificate matches the brokers hostname.
default: true.
ssl_cafile (str): optional filename of ca file to use in certificate
veriication. default: none.
ssl_certfile (str): optional filename of file in pem format containing
the client certificate, as well as any ca certificates needed to
establish the certificate's authenticity. default: none.
ssl_keyfile (str): optional filename containing the client private key.
default: none.
api_version (str): specify which kafka API version to use.
0.9 enables full group coordination features; 0.8.2 enables
kafka-storage offset commits; 0.8.1 enables zookeeper-storage
Expand Down Expand Up @@ -158,6 +173,12 @@ class KafkaConsumer(six.Iterator):
'send_buffer_bytes': None,
'receive_buffer_bytes': None,
'consumer_timeout_ms': -1,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
'ssl_cafile': None,
'ssl_certfile': None,
'ssl_keyfile': None,
'api_version': 'auto',
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
#'metric_reporters': None,
Expand Down
21 changes: 21 additions & 0 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,21 @@ class KafkaProducer(object):
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Default: 5.
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations
will be ignored. Default: None.
ssl_check_hostname (bool): flag to configure whether ssl handshake
should verify that the certificate matches the brokers hostname.
default: true.
ssl_cafile (str): optional filename of ca file to use in certificate
veriication. default: none.
ssl_certfile (str): optional filename of file in pem format containing
the client certificate, as well as any ca certificates needed to
establish the certificate's authenticity. default: none.
ssl_keyfile (str): optional filename containing the client private key.
default: none.
api_version (str): specify which kafka API version to use.
If set to 'auto', will attempt to infer the broker version by
probing various APIs. Default: auto
Expand Down Expand Up @@ -222,6 +237,12 @@ class KafkaProducer(object):
'send_buffer_bytes': None,
'reconnect_backoff_ms': 50,
'max_in_flight_requests_per_connection': 5,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
'ssl_cafile': None,
'ssl_certfile': None,
'ssl_keyfile': None,
'api_version': 'auto',
}

Expand Down
13 changes: 11 additions & 2 deletions servers/0.10.0.0/resources/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@ broker.id={broker_id}

############################# Socket Server Settings #############################

listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}

ssl.keystore.location={ssl_dir}/server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/server.truststore.jks
ssl.truststore.password=foobar

# The port the socket server listens on
port={port}
#port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name={host}
#host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
Expand Down
13 changes: 11 additions & 2 deletions servers/0.9.0.0/resources/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@ broker.id={broker_id}

############################# Socket Server Settings #############################

listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}

ssl.keystore.location={ssl_dir}/server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/server.truststore.jks
ssl.truststore.password=foobar

# The port the socket server listens on
port={port}
#port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name={host}
#host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
Expand Down
13 changes: 11 additions & 2 deletions servers/0.9.0.1/resources/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@ broker.id={broker_id}

############################# Socket Server Settings #############################

listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}

ssl.keystore.location={ssl_dir}/server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/server.truststore.jks
ssl.truststore.password=foobar

# The port the socket server listens on
port={port}
#port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name={host}
#host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
Expand Down
Loading