Skip to content
This repository has been archived by the owner on Jul 22, 2019. It is now read-only.

Fix the bug where ConnectionPool cannot be used with multiprocessing #314

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

kevinjqiu
Copy link

As discussed in #313

When couchdb-python is used with multiprocessing, you get TypeError: 'ResponseBody' object is not iterable.

This happens in couchdb.http.Session:request method:

        # Read the full response for empty responses so that the connection is
        # in good state for the next request
        if method == 'HEAD' or resp.getheader('content-length') == '0' or \
                status < 200 or status in (204, 304):
            resp.read()
            self.connection_pool.release(url, conn)

        # Buffer small non-JSON response bodies
        elif int(resp.getheader('content-length', sys.maxsize)) < CHUNK_SIZE:
            data = resp.read()
            self.connection_pool.release(url, conn)

        # For large or chunked response bodies, do not buffer the full body,
        # and instead return a minimal file-like object
        else:
            data = ResponseBody(resp, self.connection_pool, url, conn)
            streamed = True

In this particular case, the resp object fails to match either condition and falls through to the else clause, which causes a raw ResponseBody object to be returned upstream to the client code, and when client code does response['row'], it fails b/c ResponseBody object does not support item indexing.

Adding a print on the resp object reveals that resp.getheader('content-length') is None, and hence the second elif is skipped.

The reason for content-length to be None: httplib.HTTPConnection.begin, line 470~475:

        if self.version == 9:                                                                                                                                                                  
            self.length = None                                                                                                                                                                 
            self.chunked = 0                                                                                                                                                                   
            self.will_close = 1                                                                                                                                                                
            self.msg = HTTPMessage(StringIO())                                                                                                                                                 
            return

so HTTPConnection thinks it's connecting to a HTTP/0.9 server, even though couchdb response was HTTP/1.1.

Tracing further, in order for self.version == 9, version returned by HTTPConnection._read_status must be 9:

    def _read_status(self):
        # Initialize with Simple-Response defaults
        line = self.fp.readline(_MAXLINE + 1)
        ...

Putting a print statement after the line is read, and rerun the bug script:

HTP/1.1 0 O

2re:CuD. Eln OTP17

T0KSevr ochB/1.61(rag/)ETag: "1-b188d355f013ee97662615a5b4a85577"

Traceback (most recent call last):
  File "bug.py", line 36, in <module>
    main()
  File "bug.py", line 31, in main
    docs = pool.map(query_id, ['1', '2', '3'])
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
TypeError: 'ResponseBody' object is not iterable

The process got a garbled status line, even though the response from couchdb is fine. With the garbled status line, _read_status method assumes it must be HTTP/0.9 so it returns version==9.

So basically what we have here is a race condition where all three processes are talking to the server over the same socket at the same time. This is because the couchdb-python's ConnectionPool is created in the parent process by the Session object, which is in turn only created once per Database object. So in essence, all three sub-processes are sharing the same session object and the same connection pool object. Because they all talk to the same host/port combination, they all checkout the same connection object from the pool and same underlying socket is being used across all three subprocesses, and hence the bug.

The fix here is to make ConnectionPool process aware in that the connections are keyed by the current pid in addition to scheme and netloc. This way, we make sure that sub-processes get their own separate connections.

TBH, I'm not sure this is a good implementation. Having the ConnectionPool knowing about the process it's running on seems to be violating its responsibility. Feel free to suggest another better solution.

@@ -19,6 +21,10 @@
from couchdb.tests import testutil


def _current_pid():
return multiprocessing.current_process().pid

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would os.getpid() make more sense here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@elistevens
Copy link

I reported this (or at least a very similar) issue back in 2011: #205

I ended up solving my issue with application level code like:

    @property
    def db(self):
        if self._db_pid != os.getpid():
            self.db = couchdb.Database(self.url)

        return self._db

    @db.setter
    def db(self, value):
        self._db_pid = os.getpid()
        self._db = value

Not pretty, but gets the job done. It would be great if this could get into the library proper.

@kevinjqiu
Copy link
Author

@djc Thoughts?

@djc
Copy link
Owner

djc commented Mar 16, 2017

Sorry, I've been very busy recently.

I think it looks okay. Can we do os.getpid() instead of all the multiprocessing stuff in couchdb.http? Also, would be nice if you can clean up your commits to squash the typo commit, and maybe separate the tests from the fix.

@kevinjqiu
Copy link
Author

@djc Done.

couchdb/http.py Outdated
self.lock = Lock()

def get(self, url):
@property
def _current_process_id(self):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this property buys us? Just using os.getpid() is actually shorter to write, and it's used in just two places.

@djc
Copy link
Owner

djc commented Mar 19, 2017

It looks like this change breaks something on Python 3.4. Would you be able to investigate?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants