Skip to content
This repository has been archived by the owner on Jan 13, 2021. It is now read-only.

Commit

Permalink
Merge branch 'fredthomsen-stream_conn_sync_iss_246' into development
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukasa committed Jun 12, 2016
2 parents 8950da0 + 3ac4dbd commit 329f1a4
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 23 deletions.
6 changes: 3 additions & 3 deletions hyper/http20/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,12 +572,12 @@ def _new_stream(self, stream_id=None, local_closed=False):
# self.next_stream_id in a consistent state
#
# No I/O occurs, the delay in waiting threads depends on their number.
with self._lock, self._conn as conn:
with self._lock:
s = Stream(
stream_id or self.next_stream_id,
self.__wm_class(DEFAULT_WINDOW_SIZE),
conn,
self._send_cb,
self._conn,
self._send_outstanding_data,
self._recv_cb,
self._stream_close_cb,
)
Expand Down
44 changes: 24 additions & 20 deletions hyper/http20/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self,
stream_id,
window_manager,
connection,
send_cb,
send_outstanding_data,
recv_cb,
close_cb):
self.stream_id = stream_id
Expand Down Expand Up @@ -72,11 +72,11 @@ def __init__(self,
# one for data being sent to us.
self._in_window_manager = window_manager

# Save off a reference to the state machine.
# Save off a reference to the state machine wrapped with lock.
self._conn = connection

# Save off a data callback.
self._send_cb = send_cb
self._send_outstanding_data = send_outstanding_data
self._recv_cb = recv_cb
self._close_cb = close_cb

Expand All @@ -94,8 +94,9 @@ def send_headers(self, end_stream=False):
Sends the complete saved header block on the stream.
"""
headers = self.get_headers()
self._conn.send_headers(self.stream_id, headers, end_stream)
self._send_cb(self._conn.data_to_send())
with self._conn as conn:
conn.send_headers(self.stream_id, headers, end_stream)
self._send_outstanding_data()

if end_stream:
self.local_closed = True
Expand Down Expand Up @@ -186,10 +187,11 @@ def receive_data(self, event):
self.data.append(event.data)

if increment and not self.remote_closed:
self._conn.increment_flow_control_window(
increment, stream_id=self.stream_id
)
self._send_cb(self._conn.data_to_send())
with self._conn as conn:
conn.increment_flow_control_window(
increment, stream_id=self.stream_id
)
self._send_outstanding_data()

def receive_end_stream(self, event):
"""
Expand Down Expand Up @@ -278,15 +280,14 @@ def close(self, error_code=None):
# FIXME: I think this is overbroad, but for now it's probably ok.
if not (self.remote_closed and self.local_closed):
try:
self._conn.reset_stream(self.stream_id, error_code or 0)
with self._conn as conn:
conn.reset_stream(self.stream_id, error_code or 0)
except h2.exceptions.ProtocolError:
# If for any reason we can't reset the stream, just tolerate
# it.
# If for any reason we can't reset the stream, just
# tolerate it.
pass
else:
self._send_cb(
self._conn.data_to_send(), tolerate_peer_gone=True
)
self._send_outstanding_data(tolerate_peer_gone=True)
self.remote_closed = True
self.local_closed = True

Expand All @@ -297,7 +298,9 @@ def _out_flow_control_window(self):
"""
The size of our outbound flow control window.
"""
return self._conn.local_flow_control_window(self.stream_id)

with self._conn as conn:
return conn.local_flow_control_window(self.stream_id)

def _send_chunk(self, data, final):
"""
Expand All @@ -321,10 +324,11 @@ def _send_chunk(self, data, final):
end_stream = True

# Send the frame and decrement the flow control window.
self._conn.send_data(
stream_id=self.stream_id, data=data, end_stream=end_stream
)
self._send_cb(self._conn.data_to_send())
with self._conn as conn:
conn.send_data(
stream_id=self.stream_id, data=data, end_stream=end_stream
)
self._send_outstanding_data()

if end_stream:
self.local_closed = True

0 comments on commit 329f1a4

Please sign in to comment.