Permalink
Browse files

http: Use a buffer for WebSocket output.

This way we can more safely write as much as we want.
  • Loading branch information...
unknownbrackets committed Apr 12, 2018
1 parent 556a46f commit 0fb15fc0d089ab8288fdd43fe489fd60f7922f71
Showing with 54 additions and 48 deletions.
  1. +51 −48 ext/native/net/websocket_server.cpp
  2. +3 −0 ext/native/net/websocket_server.h
@@ -130,24 +130,14 @@ void WebSocketServer::Send(const std::string &str) {
assert(open_);
assert(fragmentOpcode_ == -1);
SendHeader(true, (int)Opcode::TEXT, str.size());
// TODO: For long strings, this will block. Possibly not ideal.
if (!out_->Push(str.c_str(), str.size())) {
assert(false);
open_ = false;
closeReason_ = WebSocketClose::ABNORMAL;
}
SendBytes(str.c_str(), str.size());
}
void WebSocketServer::Send(const std::vector<uint8_t> &payload) {
assert(open_);
assert(fragmentOpcode_ == -1);
SendHeader(true, (int)Opcode::BINARY, payload.size());
// TODO: For long strings, this will block. Possibly not ideal.
if (!out_->Push((const char *)&payload[0], payload.size())) {
assert(false);
open_ = false;
closeReason_ = WebSocketClose::ABNORMAL;
}
SendBytes((const char *)&payload[0], payload.size());
}
void WebSocketServer::AddFragment(bool finish, const std::string &str) {
@@ -160,12 +150,7 @@ void WebSocketServer::AddFragment(bool finish, const std::string &str) {
} else {
assert(fragmentOpcode_ == (int)Opcode::TEXT || fragmentOpcode_ == -1);
}
// TODO: For long strings, this will block. Possibly not ideal.
if (!out_->Push(str.c_str(), str.size())) {
assert(false);
open_ = false;
closeReason_ = WebSocketClose::ABNORMAL;
}
SendBytes(str.c_str(), str.size());
if (finish) {
fragmentOpcode_ = -1;
}
@@ -181,12 +166,7 @@ void WebSocketServer::AddFragment(bool finish, const std::vector<uint8_t> &paylo
} else {
assert(fragmentOpcode_ == (int)Opcode::BINARY || fragmentOpcode_ == -1);
}
// TODO: For long strings, this will block. Possibly not ideal.
if (!out_->Push((const char *)&payload[0], payload.size())) {
assert(false);
open_ = false;
closeReason_ = WebSocketClose::ABNORMAL;
}
SendBytes((const char *)&payload[0], payload.size());
if (finish) {
fragmentOpcode_ = -1;
}
@@ -196,22 +176,14 @@ void WebSocketServer::Ping(const std::vector<uint8_t> &payload) {
assert(open_);
assert(payload.size() <= 125);
SendHeader(true, (int)Opcode::PING, payload.size());
if (!out_->Push((const char *)&payload[0], payload.size())) {
assert(false);
open_ = false;
closeReason_ = WebSocketClose::ABNORMAL;
}
SendBytes((const char *)&payload[0], payload.size());
}
void WebSocketServer::Pong(const std::vector<uint8_t> &payload) {
assert(open_);
assert(payload.size() <= 125);
SendHeader(true, (int)Opcode::PONG, payload.size());
if (!out_->Push((const char *)&payload[0], payload.size())) {
assert(false);
open_ = false;
closeReason_ = WebSocketClose::ABNORMAL;
}
SendBytes((const char *)&payload[0], payload.size());
}
void WebSocketServer::Close(WebSocketClose reason) {
@@ -223,11 +195,7 @@ void WebSocketServer::Close(WebSocketClose reason) {
(uint8_t)((r >> 8) & 0xFF),
(uint8_t)((r >> 0) & 0xFF),
};
if (!out_->Push((const char *)reasonData, sizeof(reasonData))) {
assert(false);
open_ = false;
closeReason_ = WebSocketClose::ABNORMAL;
}
SendBytes((const char *)reasonData, sizeof(reasonData));
sentClose_ = true;
}
@@ -237,9 +205,9 @@ bool WebSocketServer::Process(float timeout) {
return false;
}
out_->Flush(false);
SendFlush();
if (out_->Empty() && sentClose_) {
if (outBuf_.empty() && out_->Empty() && sentClose_) {
// Okay, we've sent the close. Don't wait for anything else (whether we got a close or not.)
open_ = false;
return false;
@@ -258,7 +226,7 @@ bool WebSocketServer::Process(float timeout) {
fd_set write;
FD_ZERO(&write);
if (!out_->Empty()) {
if (!outBuf_.empty() || !out_->Empty()) {
FD_SET(fd_, &write);
}
@@ -276,7 +244,7 @@ bool WebSocketServer::Process(float timeout) {
}
if (FD_ISSET(fd_, &write)) {
out_->Flush(false);
SendFlush();
}
if (FD_ISSET(fd_, &read)) {
if (in_->Empty() && !in_->TryFill()) {
@@ -462,7 +430,7 @@ bool WebSocketServer::ReadControlFrame(int opcode, size_t sz) {
if (opcode == (int)Opcode::PING) {
Pong(payload);
// Try to send immediately if possible, but don't block.
out_->Flush(false);
SendFlush();
if (ping_) {
ping_(payload);
@@ -491,19 +459,19 @@ bool WebSocketServer::ReadControlFrame(int opcode, size_t sz) {
void WebSocketServer::SendHeader(bool fin, int opcode, size_t sz) {
assert((opcode & 0x0F) == opcode);
uint8_t frameHeader = (fin ? 0x80 : 0x00) | opcode;
out_->Push((const char *)&frameHeader, 1);
SendBytes(&frameHeader, 1);
// We never mask from the server.
if (sz <= 125) {
uint8_t frameSize = (int8_t)sz;
out_->Push((const char *)&frameSize, 1);
SendBytes(&frameSize, 1);
} else if (sz <= 0xFFFF) {
uint8_t frameSize[] = {
126,
(uint8_t)((sz >> 8) & 0xFF),
(uint8_t)((sz >> 0) & 0xFF),
};
out_->Push((const char *)frameSize, sizeof(frameSize));
SendBytes(frameSize, sizeof(frameSize));
} else {
uint64_t sz64 = sz;
assert((sz64 & 0x8000000000000000ULL) == 0);
@@ -518,7 +486,42 @@ void WebSocketServer::SendHeader(bool fin, int opcode, size_t sz) {
(uint8_t)((sz64 >> 8) & 0xFF),
(uint8_t)((sz64 >> 0) & 0xFF),
};
out_->Push((const char *)frameSize, sizeof(frameSize));
SendBytes(frameSize, sizeof(frameSize));
}
}
void WebSocketServer::SendBytes(const void *p, size_t sz) {
const char *data = (const char *)p;
if (outBuf_.empty()) {
size_t pushed = out_->PushAtMost(data, sz);
data += pushed;
sz -= pushed;
}
if (sz != 0) {
size_t pos = outBuf_.size();
outBuf_.resize(pos + sz);
memcpy(&outBuf_[pos], data, sz);
}
}
void WebSocketServer::SendFlush() {
out_->Flush(false);
// Drain out as much of our buffer as possible.
size_t totalPushed = 0;
while (!outBuf_.empty()) {
size_t pushed = out_->PushAtMost((const char *)&outBuf_[totalPushed], outBuf_.size() - totalPushed);
if (pushed == 0)
break;
totalPushed += pushed;
out_->Flush(false);
}
if (totalPushed != 0) {
// Hopefully this is usually the entire buffer.
outBuf_.erase(outBuf_.begin(), outBuf_.begin() + totalPushed);
}
}
@@ -71,6 +71,8 @@ class WebSocketServer {
}
void SendHeader(bool fin, int opcode, size_t sz);
void SendBytes(const void *p, size_t sz);
void SendFlush();
bool ReadFrames();
bool ReadFrame();
bool ReadPending();
@@ -83,6 +85,7 @@ class WebSocketServer {
InputSink *in_ = nullptr;
OutputSink *out_ = nullptr;
WebSocketClose closeReason_ = WebSocketClose::NO_STATUS;
std::vector<uint8_t> outBuf_;
std::vector<uint8_t> pendingBuf_;
uint8_t pendingMask_[4]{};

0 comments on commit 0fb15fc

Please sign in to comment.