Skip to content

Commit

Permalink
Add QUICStreamStateListener
Browse files Browse the repository at this point in the history
  • Loading branch information
maskit committed May 11, 2021
1 parent f66646c commit f90e8dd
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 76 deletions.
36 changes: 27 additions & 9 deletions iocore/net/quic/QUICBidirectionalStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ QUICBidirectionalStream::recv(const QUICStreamFrame &frame)

this->_adapter->write(stream_frame->offset(), reinterpret_cast<uint8_t *>(stream_frame->data()->start()),
stream_frame->data_length(), stream_frame->has_fin_flag());
this->_state.update_with_receiving_frame(*new_frame);
if (this->_state.update_with_receiving_frame(*new_frame)) {
this->_notify_state_change();
}

delete new_frame;
new_frame = this->_received_stream_frame_buffer.pop();
Expand Down Expand Up @@ -152,7 +154,9 @@ QUICBidirectionalStream::recv(const QUICStreamDataBlockedFrame &frame)
QUICConnectionErrorUPtr
QUICBidirectionalStream::recv(const QUICStopSendingFrame &frame)
{
this->_state.update_with_receiving_frame(frame);
if (this->_state.update_with_receiving_frame(frame)) {
this->_notify_state_change();
}
this->_reset_reason = QUICStreamErrorUPtr(new QUICStreamError(this, QUIC_APP_ERROR_CODE_STOPPING));
// We received and processed STOP_SENDING frame, so return NO_ERROR here
return nullptr;
Expand All @@ -161,7 +165,9 @@ QUICBidirectionalStream::recv(const QUICStopSendingFrame &frame)
QUICConnectionErrorUPtr
QUICBidirectionalStream::recv(const QUICRstStreamFrame &frame)
{
this->_state.update_with_receiving_frame(frame);
if (this->_state.update_with_receiving_frame(frame)) {
this->_notify_state_change();
}
this->_adapter->notify_eos();
return nullptr;
}
Expand Down Expand Up @@ -201,7 +207,9 @@ QUICBidirectionalStream::generate_frame(uint8_t *buf, QUICEncryptionLevel level,
return nullptr;
}
this->_records_rst_stream_frame(level, *static_cast<QUICRstStreamFrame *>(frame));
this->_state.update_with_sending_frame(*frame);
if (this->_state.update_with_sending_frame(*frame)) {
this->_notify_state_change();
}
this->_is_reset_sent = true;
return frame;
}
Expand All @@ -215,7 +223,9 @@ QUICBidirectionalStream::generate_frame(uint8_t *buf, QUICEncryptionLevel level,
return nullptr;
}
this->_records_stop_sending_frame(level, *static_cast<QUICStopSendingFrame *>(frame));
this->_state.update_with_sending_frame(*frame);
if (this->_state.update_with_sending_frame(*frame)) {
this->_notify_state_change();
}
this->_is_stop_sending_sent = true;
return frame;
}
Expand Down Expand Up @@ -306,7 +316,9 @@ QUICBidirectionalStream::generate_frame(uint8_t *buf, QUICEncryptionLevel level,
this->_records_stream_frame(level, *static_cast<QUICStreamFrame *>(frame));

this->_adapter->encourge_write();
this->_state.update_with_sending_frame(*frame);
if (this->_state.update_with_sending_frame(*frame)) {
this->_notify_state_change();
}

return frame;
}
Expand All @@ -331,7 +343,9 @@ QUICBidirectionalStream::_on_frame_acked(QUICFrameInformationUPtr &info)
break;
}

this->_state.update_on_ack();
if (this->_state.update_on_ack()) {
this->_notify_state_change();
}
}

void
Expand Down Expand Up @@ -373,13 +387,17 @@ QUICBidirectionalStream::reset(QUICStreamErrorUPtr error)
void
QUICBidirectionalStream::on_read()
{
this->_state.update_on_read();
if (this->_state.update_on_read()) {
this->_notify_state_change();
}
}

void
QUICBidirectionalStream::on_eos()
{
this->_state.update_on_eos();
if (this->_state.update_on_eos()) {
this->_notify_state_change();
}
}

QUICOffset
Expand Down
14 changes: 14 additions & 0 deletions iocore/net/quic/QUICStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,20 @@ QUICStream::_records_crypto_frame(QUICEncryptionLevel level, const QUICCryptoFra
this->_records_frame(frame.id(), std::move(info));
}

void
QUICStream::set_state_listener(QUICStreamStateListener *listener)
{
this->_state_listener = listener;
}

void
QUICStream::_notify_state_change()
{
if (this->_state_listener) {
// TODO Check own state and call an appropriate callback function
}
}

void
QUICStream::reset(QUICStreamErrorUPtr error)
{
Expand Down
14 changes: 13 additions & 1 deletion iocore/net/quic/QUICStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "QUICDebugNames.h"

class QUICStreamAdapter;
class QUICStreamStateListener;

/**
* @brief QUIC Stream
Expand Down Expand Up @@ -84,6 +85,8 @@ class QUICStream : public QUICFrameGenerator, public QUICFrameRetransmitter
virtual void stop_sending(QUICStreamErrorUPtr error);
virtual void reset(QUICStreamErrorUPtr error);

void set_state_listener(QUICStreamStateListener *listener);

LINK(QUICStream, link);

protected:
Expand All @@ -92,16 +95,25 @@ class QUICStream : public QUICFrameGenerator, public QUICFrameRetransmitter
QUICOffset _send_offset = 0;
QUICOffset _reordered_bytes = 0;

QUICStreamAdapter *_adapter = nullptr;
QUICStreamAdapter *_adapter = nullptr;
QUICStreamStateListener *_state_listener = nullptr;

virtual void _on_adapter_updated(){};

void _notify_state_change();

void _records_rst_stream_frame(QUICEncryptionLevel level, const QUICRstStreamFrame &frame);
void _records_stream_frame(QUICEncryptionLevel level, const QUICStreamFrame &frame);
void _records_stop_sending_frame(QUICEncryptionLevel level, const QUICStopSendingFrame &frame);
void _records_crypto_frame(QUICEncryptionLevel level, const QUICCryptoFrame &frame);
};

class QUICStreamStateListener
{
public:
virtual void on_stream_state_close(const QUICStream *stream) = 0;
};

#define QUICStreamDebug(fmt, ...) \
Debug("quic_stream", "[%s] [%" PRIu64 "] [%s] " fmt, this->_connection_info->cids().data(), this->_id, \
QUICDebugNames::stream_state(this->_state.get()), ##__VA_ARGS__)
Expand Down
29 changes: 29 additions & 0 deletions iocore/net/quic/QUICStreamManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ QUICStreamManager::_find_or_create_stream(QUICStreamId stream_id)

stream = this->_stream_factory.create(stream_id, local_max_stream_data, remote_max_stream_data);
ink_assert(stream != nullptr);
stream->set_state_listener(this);
this->stream_list.push(stream);

QUICApplication *application = this->_app_map->get(stream_id);
Expand Down Expand Up @@ -455,6 +456,34 @@ QUICStreamManager::generate_frame(uint8_t *buf, QUICEncryptionLevel level, uint6
return frame;
}

void
QUICStreamManager::on_stream_state_close(const QUICStream *stream)
{
auto direction = this->_context->connection_info()->direction();
switch (QUICTypeUtil::detect_stream_type(stream->id())) {
case QUICStreamType::SERVER_BIDI:
if (direction == NET_VCONNECTION_OUT) {
this->_local_max_streams_bidi += 1;
}
break;
case QUICStreamType::SERVER_UNI:
if (direction == NET_VCONNECTION_OUT) {
this->_local_max_streams_uni += 1;
}
break;
case QUICStreamType::CLIENT_BIDI:
if (direction == NET_VCONNECTION_IN) {
this->_local_max_streams_bidi += 1;
}
break;
case QUICStreamType::CLIENT_UNI:
if (direction == NET_VCONNECTION_IN) {
this->_local_max_streams_uni += 1;
}
break;
}
}

bool
QUICStreamManager::_is_level_matched(QUICEncryptionLevel level)
{
Expand Down
5 changes: 4 additions & 1 deletion iocore/net/quic/QUICStreamManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

class QUICTransportParameters;

class QUICStreamManager : public QUICFrameHandler, public QUICFrameGenerator
class QUICStreamManager : public QUICFrameHandler, public QUICFrameGenerator, public QUICStreamStateListener
{
public:
QUICStreamManager(QUICContext *context, QUICApplicationMap *app_map);
Expand Down Expand Up @@ -68,6 +68,9 @@ class QUICStreamManager : public QUICFrameHandler, public QUICFrameGenerator
QUICFrame *generate_frame(uint8_t *buf, QUICEncryptionLevel level, uint64_t connection_credit, uint16_t maximum_frame_size,
size_t current_packet_size, uint32_t timestamp) override;

// QUICStreamStateListener
void on_stream_state_close(const QUICStream *stream) override;

protected:
virtual bool _is_level_matched(QUICEncryptionLevel level) override;

Expand Down

0 comments on commit f90e8dd

Please sign in to comment.