Skip to content

Commit

Permalink
Fixed issue #2 - too much memory being consumed when sending large files
Browse files Browse the repository at this point in the history
  • Loading branch information
Ben committed Apr 13, 2017
1 parent bc227df commit b47aed7
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 29 deletions.
44 changes: 29 additions & 15 deletions src/core/InterceptorSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ InterceptorSession::InterceptorSession(const Config::ServerConfig* config,
boost::asio::io_service& ioService)
: m_config(config),
m_ioService(ioService),
m_strand(ioService),
m_iostrand(ioService),
m_fsstrand(ioService),
m_readTimer(ioService),
m_writeTimer(ioService),
m_state(CanSend)
Expand Down Expand Up @@ -47,19 +48,22 @@ void InterceptorSession::postReply(HttpBufferPtr buffer)
{
LOG_DEBUG("InterceptorSession::postReply()");
m_ioService.post(
m_strand.wrap(
m_iostrand.wrap(
boost::bind(&InterceptorSession::sendNext, shared_from_this(), buffer)));
}

void InterceptorSession::sendNext(HttpBufferPtr buffer)
{
LOG_DEBUG("InterceptorSession::sendNext()");
m_buffers.push_back(buffer);
{
boost::mutex::scoped_lock lock(m_buffersMutex);
m_buffers.push_back(buffer);

if (m_state & CanSend) {
auto v = m_buffers.front();
m_buffers.pop_front();
sendReply(v);
if (m_state & CanSend) {
auto v = m_buffers.front();
m_buffers.pop_front();
sendReply(v);
}
}
}

Expand All @@ -70,7 +74,12 @@ void InterceptorSession::sendReply(HttpBufferPtr buffer)
if (m_connection) {
startWriteTimer();
m_state &= ~CanSend;
m_connection->asyncWrite(buffer->m_buffers, m_strand.wrap

if (buffer->flags() & Http::HttpBuffer::HasMore)
m_ioService.post(
m_fsstrand.wrap(buffer->nextCall()));

m_connection->asyncWrite(buffer->m_buffers, m_iostrand.wrap
(boost::bind
(&InterceptorSession::handleTransmissionCompleted,
shared_from_this(),
Expand All @@ -93,10 +102,15 @@ void InterceptorSession::handleTransmissionCompleted(
LOG_DEBUG("Response sent ");
m_state |= CanSend;

if (!m_buffers.empty()) {
auto v = m_buffers.front();
m_buffers.pop_front();
sendReply(v);
{
boost::mutex::scoped_lock lock(m_buffersMutex);

if (!m_buffers.empty()) {

auto v = m_buffers.front();
m_buffers.pop_front();
sendReply(v);
}
}

} else {
Expand All @@ -117,7 +131,7 @@ void InterceptorSession::closeConnection()
m_state &= ~CanSend;
m_buffers.clear();
m_ioService.post(
m_strand.wrap(
m_iostrand.wrap(
boost::bind(&InboundConnection::disconnect, m_connection)));
m_connection.reset();
m_request.reset();
Expand Down Expand Up @@ -181,7 +195,7 @@ void InterceptorSession::startReadTimer()
m_readTimer.expires_from_now(boost::posix_time::seconds(
m_config->m_clientTimeout));
m_readTimer.async_wait
(m_strand.wrap
(m_iostrand.wrap
(boost::bind(&InterceptorSession::handleTimeout,
shared_from_this(),
ReadTimer,
Expand All @@ -196,7 +210,7 @@ void InterceptorSession::startWriteTimer()
m_writeTimer.expires_from_now(boost::posix_time::seconds(
m_config->m_serverTimeout));
m_writeTimer.async_wait
(m_strand.wrap
(m_iostrand.wrap
(boost::bind(&InterceptorSession::handleTimeout,
shared_from_this(),
WriteTimer,
Expand Down
5 changes: 4 additions & 1 deletion src/core/InterceptorSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <boost/asio.hpp>
#include <deque>
#include <boost/thread/mutex.hpp>

class InboundConnection;

Expand Down Expand Up @@ -63,13 +64,15 @@ class InterceptorSession : public
private:
const Config::ServerConfig* m_config;
boost::asio::io_service& m_ioService;
boost::asio::strand m_strand;
boost::asio::strand m_iostrand;
boost::asio::strand m_fsstrand;
InboundConnectionPtr m_connection;
unsigned char m_requestBuffer[4096];
HttpRequestPtr m_request;
HttpReplyPtr m_reply;

std::deque<HttpBufferPtr> m_buffers;
boost::mutex m_buffersMutex;

// Timers
boost::asio::deadline_timer m_readTimer;
Expand Down
10 changes: 9 additions & 1 deletion src/http/HttpBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ namespace Http {
class HttpBuffer {
public:
enum State {
Closing = 0x01
Closing = 0x01,
HasMore = 0x02
};

HttpBuffer()
Expand All @@ -30,6 +31,11 @@ namespace Http {
return m_flags;
}

auto nextCall() const
{
return m_nextCall;
}

public:
std::vector<boost::asio::const_buffer> m_buffers;

Expand All @@ -38,6 +44,8 @@ namespace Http {
std::vector<char*> m_bufs2;
friend class HttpReply;
int m_flags;
std::function<bool()> m_nextCall;

};

}
Expand Down
32 changes: 20 additions & 12 deletions src/http/HttpReply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace Http {
{
LOG_DEBUG("HttpReply::process()");
std::stringstream stream;
m_httpBuffer = std::make_shared<HttpBuffer>();

if (m_replyHeaders) {
delete m_replyHeaders;
Expand Down Expand Up @@ -221,11 +222,14 @@ namespace Http {
size_t totalBytes)
{
LOG_DEBUG("HttpReply::requestLargeFileContents()");
boost::mutex::scoped_lock lock(
m_mutex); //needed to be sure that previous call is completed
size_t bytes;
size_t to = std::min((size_t) from + MAX_CHUNK_SIZE, totalBytes - 1);
m_contentLength = totalBytes;
setFlag(LargeFileRequest, true);

m_httpBuffer = std::make_shared<HttpBuffer>();
std::stringstream stream;

if (FileUtils::readFile(page, from, to, stream, bytes) == Code::Ok) {
Expand All @@ -234,59 +238,63 @@ namespace Http {
post(stream);
return true;
} else {
from = to + 1;
m_httpBuffer->m_nextCall = std::bind(&HttpReply::requestLargeFileContents,
shared_from_this(), page, from , totalBytes);
m_httpBuffer->m_flags |= HttpBuffer::HasMore;
post(stream);
}

} else {
return false;
}

from = to + 1;
return requestLargeFileContents(page, from, totalBytes);
return true;
}

void HttpReply::post(std::stringstream& stream)
{
LOG_DEBUG("HttpReply::post()");
HttpBufferPtr httpBuffer = std::make_shared<HttpBuffer>();
std::vector<boost::asio::const_buffer> buffers;

if (!getFlag(HeadersSent))
httpBuffer->m_buffers.push_back({}); // emtpy place for headers
m_httpBuffer->m_buffers.push_back({}); // emtpy place for headers

buffers.push_back(buf(httpBuffer, std::string(stream.str())));
buffers.push_back(buf(m_httpBuffer, std::string(stream.str())));

#ifdef ENABLE_GZIP

if (canEncodeResponse()) {
encodeResponse(httpBuffer, buffers);
encodeResponse(m_httpBuffer, buffers);
}

#endif // ENABLE_GZIP

// We chunk only in the case that no header has been sent or if it's the last frame
if (canChunkResponse()) {
chunkResponse(httpBuffer, buffers);
chunkResponse(m_httpBuffer, buffers);
}

if (!getFlag(HeadersSent)) {
buildHeaders(httpBuffer);
buildHeaders(m_httpBuffer);
LOG_INFO(m_request->queryString() << " " << (int) m_status);
setFlag(HeadersSent, true);
}

httpBuffer->m_buffers.insert(httpBuffer->m_buffers.end(), buffers.begin(),
buffers.end());
m_httpBuffer->m_buffers.insert(m_httpBuffer->m_buffers.end(), buffers.begin(),
buffers.end());

if (getFlag(Closing)) {
httpBuffer->m_flags |= HttpBuffer::Closing;
m_httpBuffer->m_flags |= HttpBuffer::Closing;
}

auto session = m_request->session();

if (session) {
session->postReply(httpBuffer);
session->postReply(m_httpBuffer);
}

m_httpBuffer.reset();
}

bool HttpReply::chunkResponse(HttpBufferPtr httpBuffer,
Expand Down
5 changes: 5 additions & 0 deletions src/http/HttpReply.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <bitset>

#include <zlib.h>
#include <boost/thread/mutex.hpp>

namespace Http {
class HttpHeaders;
Expand Down Expand Up @@ -83,6 +84,10 @@ namespace Http {

z_stream m_gzip;
bool m_gzipBusy;

HttpBufferPtr m_httpBuffer;

boost::mutex m_mutex;
};

}
Expand Down

0 comments on commit b47aed7

Please sign in to comment.