Skip to content
This repository has been archived by the owner on Jan 13, 2021. It is now read-only.

Commit

Permalink
Merge pull request #280 from plucury/development
Browse files Browse the repository at this point in the history
try to avoid deadlock
  • Loading branch information
Lukasa committed Jun 10, 2017
2 parents 88da7c3 + 0e2fca8 commit 23a1555
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 44 deletions.
59 changes: 16 additions & 43 deletions hyper/http20/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,36 +139,9 @@ def __init__(self, host, port=None, secure=None, window_manager=None,

# Concurrency
#
# Use one lock (_lock) to synchronize any interaction with global
# connection state, e.g. stream creation/deletion.
#
# It's ok to use the same in lock all these cases as they occur at
# different/linked points in the connection's lifecycle.
#
# Use another 2 locks (_write_lock, _read_lock) to synchronize
# - _send_cb
# - _recv_cb
# respectively.
#
# I.e, send/recieve on the connection and its streams are serialized
# separately across the threads accessing the connection. This is a
# simple way of providing thread-safety.
#
# _write_lock and _read_lock synchronize all interactions between
# streams and the connnection. There is a third I/O callback,
# _close_stream, passed to a stream's constructor. It does not need to
# be synchronized, it uses _send_cb internally (which is serialized);
# its other activity (safe deletion of the stream from self.streams)
# does not require synchronization.
#
# _read_lock may be acquired when already holding the _write_lock,
# when they both held it is always by acquiring _write_lock first.
#
# Either _read_lock or _write_lock may be acquired whilst holding _lock
# which should always be acquired before either of the other two.
# Use one universal lock (_lock) to synchronize all interaction
# with global connection state, _send_cb and _recv_cb.
self._lock = threading.RLock()
self._write_lock = threading.RLock()
self._read_lock = threading.RLock()

# Create the mutable state.
self.__wm_class = window_manager or FlowControlManager
Expand Down Expand Up @@ -232,7 +205,7 @@ def ping(self, opaque_data):
:returns: Nothing
"""
self.connect()
with self._write_lock:
with self._lock:
with self._conn as conn:
conn.ping(to_bytestring(opaque_data))
self._send_outstanding_data()
Expand Down Expand Up @@ -271,7 +244,7 @@ def request(self, method, url, body=None, headers=None):
# If threads interleave these operations, it could result in messages
# being sent in the wrong order, which can lead to the out-of-order
# messages with lower stream IDs being closed prematurely.
with self._write_lock:
with self._lock:
stream_id = self.putrequest(method, url)

default_headers = (':method', ':scheme', ':authority', ':path')
Expand Down Expand Up @@ -464,10 +437,10 @@ def _send_outstanding_data(self, tolerate_peer_gone=False,
send_empty=True):
# Concurrency
#
# Hold _write_lock; getting and writing data from _conn is synchronized
# Hold _lock; getting and writing data from _conn is synchronized
#
# I/O occurs while the lock is held; waiting threads will see a delay.
with self._write_lock:
with self._lock:
with self._conn as conn:
data = conn.data_to_send()
if data or send_empty:
Expand Down Expand Up @@ -557,9 +530,9 @@ def endheaders(self, message_body=None, final=False, stream_id=None):

# Concurrency:
#
# Hold _write_lock: synchronize access to the connection's HPACK
# Hold _lock: synchronize access to the connection's HPACK
# encoder and decoder and the subsquent write to the connection
with self._write_lock:
with self._lock:
stream.send_headers(headers_only)

# Send whatever data we have.
Expand Down Expand Up @@ -622,10 +595,10 @@ def _send_cb(self, data, tolerate_peer_gone=False):
"""
# Concurrency
#
# Hold _write_lock: ensures only writer at a time
# Hold _lock: ensures only writer at a time
#
# I/O occurs while the lock is held; waiting threads will see a delay.
with self._write_lock:
with self._lock:
try:
self._sock.sendall(data)
except socket.error as e:
Expand All @@ -640,12 +613,12 @@ def _adjust_receive_window(self, frame_len):
"""
# Concurrency
#
# Hold _write_lock; synchronize the window manager update and the
# Hold _lock; synchronize the window manager update and the
# subsequent potential write to the connection
#
# I/O may occur while the lock is held; waiting threads may see a
# delay.
with self._write_lock:
with self._lock:
increment = self.window_manager._handle_frame(frame_len)

if increment:
Expand All @@ -667,7 +640,7 @@ def _single_read(self):
# Synchronizes reading the data
#
# I/O occurs while the lock is held; waiting threads will see a delay.
with self._read_lock:
with self._lock:
if self._sock is None:
raise ConnectionError('tried to read after connection close')
self._sock.fill()
Expand Down Expand Up @@ -761,7 +734,7 @@ def _recv_cb(self, stream_id=0):
# re-acquired in the calls to self._single_read.
#
# I/O occurs while the lock is held; waiting threads will see a delay.
with self._read_lock:
with self._lock:
log.debug('recv for stream %d with %s already present',
stream_id,
self.recent_recv_streams)
Expand Down Expand Up @@ -812,11 +785,11 @@ def _send_rst_frame(self, stream_id, error_code):
"""
# Concurrency
#
# Hold _write_lock; synchronize generating the reset frame and writing
# Hold _lock; synchronize generating the reset frame and writing
# it
#
# I/O occurs while the lock is held; waiting threads will see a delay.
with self._write_lock:
with self._lock:
with self._conn as conn:
conn.reset_stream(stream_id, error_code=error_code)
self._send_outstanding_data()
Expand Down
40 changes: 39 additions & 1 deletion test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import pytest
from contextlib import contextmanager
from mock import patch
from concurrent.futures import ThreadPoolExecutor, TimeoutError
from h2.frame_buffer import FrameBuffer
from hyper.compat import ssl
from hyper.contrib import HTTP20Adapter
Expand Down Expand Up @@ -1039,7 +1040,6 @@ def test_version_after_http_upgrade(self):

def socket_handler(listener):
sock = listener.accept()[0]

# We should get the initial request.
data = b''
while not data.endswith(b'\r\n\r\n'):
Expand Down Expand Up @@ -1089,6 +1089,44 @@ def socket_handler(listener):

self.tear_down()

def test_connection_and_send_simultaneously(self):
# Since deadlock occurs probabilistic,
# It still has deadlock probability
# even the testcase is passed.
self.set_up()

recv_event = threading.Event()

def socket_handler(listener):
sock = listener.accept()[0]

receive_preamble(sock)
sock.recv(65535)

recv_event.set()
sock.close()

def do_req(conn):
conn.request('GET', '/')
recv_event.wait()

def do_connect(conn):
conn.connect()

self._start_server(socket_handler)
conn = self.get_connection()

pool = ThreadPoolExecutor(max_workers=2)
pool.submit(do_connect, conn)
f = pool.submit(do_req, conn)

try:
f.result(timeout=10)
except TimeoutError:
assert False

self.tear_down()


@patch('hyper.http20.connection.H2_NPN_PROTOCOLS', PROTOCOLS)
class TestRequestsAdapter(SocketLevelTest):
Expand Down

0 comments on commit 23a1555

Please sign in to comment.