Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSLWantReadError: The operation did not complete (read) #1535

Open
zhgjun opened this issue Jul 2, 2018 · 3 comments
Open

SSLWantReadError: The operation did not complete (read) #1535

zhgjun opened this issue Jul 2, 2018 · 3 comments

Comments

@zhgjun
Copy link

zhgjun commented Jul 2, 2018

hi, I use kafka-python-1.4.2 to connect the kafka_2.11-0.11.0.1, and i use SSL_SASL mode as security choose.
I got ERROR as below, but if i remove the eventlet.monkey_patch() , every things get ok. Is anyone knew why?

this is my dev info:
python version is 2.7.5
(centos)Linux version 3.10.0-514.44.5.10_44.x86_64

and my script is:
####################################

#-*- coding: utf-8 -*-
import eventlet
import os
from kafka import KafkaConsumer
import time
import ssl
from oslo_utils import eventletutils
eventlet.monkey_patch()
import logging
from kafka.client_async import selectors
if eventletutils.is_monkey_patched('select'):
    # monkeypatch the vendored SelectSelector._select like eventlet does
    # https://github.com/eventlet/eventlet/blob/master/eventlet/green/selectors.py#L32
    from eventlet.green import select
    selectors.SelectSelector._select = staticmethod(select.select)

    # Force to use the select selectors
    KAFKA_SELECTOR = selectors.SelectSelector
else:
    KAFKA_SELECTOR = selectors.DefaultSelector
context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
def log_init(log_name, filemode='w'):
    logging.basicConfig(filename=os.path.join(os.getcwd(), log_name), level=logging.DEBUG, filemode=filemode,
                        format='%(asctime)s - %(levelname)s: %(message)s')

log_init('test')

consumer = KafkaConsumer('topic1',
                         group_id='my-group',
                         bootstrap_servers=['127.0.0.1:9092'],
                         ssl_context=context,
                         auto_offset_reset="latest",
                         api_version = (0,11),
                         sasl_mechanism="PLAIN",
                         sasl_plain_username="admin",
                         sasl_plain_password="pass",
                         selector=KAFKA_SELECTOR,
                         security_protocol='SASL_SSL')
for msg in consumer:
        recv = "%s:%d:%d: key=%s value=%s" %(msg.topic,msg.partition,msg.offset,msg.key,msg.value)
        logging.info(recv)

####################################

and I get the ERROR as this:

ERROR: <BrokerConnection node_id=bootstrap host=xxx:9092 <authenticating> [IPv4 ('xxx', 9092)]>: Error receiving reply from server
Traceback (most recent call last):
  File "xxxx/lib/python2.7/site-packages/kafka_python-1.4.2-py2.7.egg/kafka/conn.py", line 558, in _try_authenticate_plain
    data = self._recv_bytes_blocking(4)
  File "xxxx/lib/python2.7/site-packages/kafka_python-1.4.2-py2.7.egg/kafka/conn.py", line 535, in _recv_bytes_blocking
    fragment = self._sock.recv(n - len(data))
  File "xxxx/lib/python2.7/site-packages/eventlet/green/ssl.py", line 198, in recv
    read = self.read(buflen)
  File "xxxx/lib/python2.7/site-packages/eventlet/green/ssl.py", line 138, in read
    super(GreenSSLSocket, self).read, *args, **kwargs)
  File "xxxx/lib/python2.7/site-packages/eventlet/green/ssl.py", line 108, in _call_trampolining
    return func(*a, **kw)
  File "/usr/lib64/python2.7/ssl.py", line 634, in read
    v = self._sslobj.read(len or 1024)
SSLWantReadError: The operation did not complete (read) (_ssl.c:1936)
@zhgjun
Copy link
Author

zhgjun commented Jul 3, 2018

But if I use SASL or SSL only ,it has no problem. if I use SAAL_SSL , it do have the ERROR. (kerberos not use in my env)

@zhgjun
Copy link
Author

zhgjun commented Jul 6, 2018

why we use _sock.settimeout in _recv_bytes_blocking (\kafka\conn.py), as the blocking way to recv.

def _recv_bytes_blocking(self, n):
        self._sock.settimeout(self.config['request_timeout_ms'] / 1000)
        try:
            data = b''
            while len(data) < n:
                fragment = self._sock.recv(n - len(data))
                if not fragment:
                    raise ConnectionError('Connection reset during recv')
                data += fragment
            return data
        finally:
            self._sock.settimeout(0.0)

can we use _sock.setblocking(True) instead, like this:

    def _recv_bytes_blocking(self, n):
        #self._sock.settimeout(self.config['request_timeout_ms'] / 1000)
        self._sock.setblocking(True)
        try:
            data = b''
            while len(data) < n:
                fragment = self._sock.recv(n - len(data))
                if not fragment:
                    raise ConnectionError('Connection reset during recv')
                data += fragment
            return data
        finally:
            #self._sock.settimeout(0.0)
            self._sock.setblocking(False)

we i use _sock.setblocking , my problem was solved. so maybe this is a bug ,since when here use _sock.settimeout only in _recv_bytes_blocking, it will get ERROR as I have said above.

@zhgjun
Copy link
Author

zhgjun commented Jul 6, 2018

@dpkp , do you know why we use we use _sock.settimeout in _recv_bytes_blocking (\kafka\conn.py), as the blocking way to recv? can we use _sock.setblocking instead?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant