Skip to content

Commit

Permalink
AK+LibHTTP: Support chunked transfer encoding in async HTTP client
Browse files Browse the repository at this point in the history
This uses the AK::Generator/AK::AsyncStreamTransform added in the
previous two commits.
  • Loading branch information
DanShaders committed May 21, 2024
1 parent 3a94508 commit e02d2cb
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 6 deletions.
131 changes: 131 additions & 0 deletions AK/AsyncStreamBuffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright (c) 2024, Dan Klishch <danilklishch@gmail.com>
*
* SPDX-License-Identifier: BSD-2-Clause
*/

#pragma once

#include <AK/Coroutine.h>
#include <AK/Error.h>

namespace AK {

class AsyncStreamBuffer {
AK_MAKE_NONCOPYABLE(AsyncStreamBuffer);

public:
AsyncStreamBuffer()
{
m_capacity = min_capacity;
m_data = (u8*)kmalloc(m_capacity);
}

AsyncStreamBuffer(AsyncStreamBuffer&& other)
: m_read_head(exchange(other.m_read_head, 0))
, m_peek_head(exchange(other.m_peek_head, 0))
, m_capacity(exchange(other.m_capacity, 0))
, m_data(exchange(other.m_data, nullptr))
{
}

AsyncStreamBuffer& operator=(AsyncStreamBuffer&& buffer)
{
if (this != &buffer) {
this->~AsyncStreamBuffer();
new (this) AsyncStreamBuffer(move(buffer));
}
return *this;
}

~AsyncStreamBuffer()
{
if (m_data)
kfree_sized(m_data, m_capacity);
}

bool is_empty() const
{
return m_read_head == m_peek_head;
}

ReadonlyBytes data() const
{
return { m_data + m_read_head, m_peek_head - m_read_head };
}

void dequeue(size_t bytes)
{
m_read_head += bytes;
}

template<typename Func>
Coroutine<ErrorOr<size_t>> enqueue(size_t preferred_capacity_for_writing, Func&& func)
{
allocate_enough_space_for(preferred_capacity_for_writing);
size_t nread = CO_TRY(co_await func(Bytes { m_data + m_peek_head, m_capacity - m_peek_head }));
m_peek_head += nread;
co_return nread;
}

void append(ReadonlyBytes bytes)
{
if (m_peek_head + bytes.size() > m_capacity)
allocate_enough_space_for(bytes.size());
memcpy(m_data + m_peek_head, bytes.data(), bytes.size());
m_peek_head += bytes.size();
}

void append(u8 byte)
{
if (m_peek_head == m_capacity)
allocate_enough_space_for(1);
m_data[m_peek_head++] = byte;
}

Bytes get_bytes_for_writing(size_t length)
{
if (m_peek_head + length > m_capacity)
allocate_enough_space_for(length);
m_peek_head += length;
return { m_data + m_peek_head - length, length };
}

private:
static constexpr size_t min_capacity = 32;

void allocate_enough_space_for(size_t length)
{
if (m_read_head != 0) {
if (m_capacity - (m_peek_head - m_read_head) >= length) {
memmove(m_data, m_data + m_read_head, m_peek_head - m_read_head);
m_peek_head -= m_read_head;
m_read_head = 0;
return;
}
}

VERIFY(m_capacity < NumericLimits<size_t>::max() / 3);
size_t new_capacity = max(m_capacity * 3 / 2, m_capacity + length);

u8* new_data = (u8*)kmalloc(new_capacity);
memcpy(new_data, m_data + m_read_head, m_peek_head - m_read_head);
kfree_sized(m_data, m_capacity);

m_data = new_data;
m_capacity = new_capacity;
m_peek_head -= m_read_head;
m_read_head = 0;
}

size_t m_read_head { 0 };
size_t m_peek_head { 0 };
size_t m_capacity { 0 };
u8* m_data { nullptr };
};

}

#ifdef USING_AK_GLOBALLY
using AK::AsyncStreamBuffer;
#endif
21 changes: 21 additions & 0 deletions Tests/LibHTTP/TestHttp11Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,27 @@ Vector<HTTPUnitTest> const http_unit_tests = {
"\r\n"sv,
.body_expectation = "0123456789abcdef"sv,
},
{
.name = "Chunked"sv,
.method = HTTP::Method::GET,
.url = "/"sv,
.headers = {
{ "Host", "localhost" },
},
.response = "HTTP/1.1 200 OK\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"18\r\n"
"0123456789abcdef\r\n\r\n"
"19\r\n"
"Well hello friends!\r\n"
"0\r\n"
"\r\n"sv,
.request_expectation = "GET / HTTP/1.1\r\n"
"Host: localhost\r\n"
"\r\n"sv,
.body_expectation = "0123456789abcdef\r\nWell hello friends!"sv,
},
};

ASYNC_TEST_CASE(unit_tests_single)
Expand Down
86 changes: 80 additions & 6 deletions Userland/Libraries/LibHTTP/Http11Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
* SPDX-License-Identifier: BSD-2-Clause
*/

#include <AK/AsyncStreamBuffer.h>
#include <AK/AsyncStreamHelpers.h>
#include <AK/AsyncStreamTransform.h>
#include <AK/GenericLexer.h>
#include <LibHTTP/Http11Connection.h>

Expand Down Expand Up @@ -89,6 +91,67 @@ Coroutine<ErrorOr<StatusCodeAndHeaders>> receive_response_headers(AsyncStream* s
.headers = headers,
};
}

class ChunkedBodyStream final : public AsyncStreamTransform<AsyncInputStream> {
public:
ChunkedBodyStream(AsyncInputStream* stream)
: AsyncStreamTransform(MaybeOwned { *stream }, generate())
{
}

ReadonlyBytes buffered_data_unchecked(Badge<AsyncInputStream>) const override
{
return m_buffer.data();
}

void dequeue(Badge<AsyncInputStream>, size_t bytes) override
{
m_buffer.dequeue(bytes);
}

private:
Generator generate()
{
while (true) {
auto line = CO_TRY(co_await AsyncStreamHelpers::consume_until(m_stream.ptr(), "\r\n"sv));

auto lexer = GenericLexer { line };
auto length_or_error = lexer.consume_decimal_integer<size_t>();
if (length_or_error.is_error()) {
m_stream->reset();
co_return Error::from_string_literal("Invalid chunk length");
}
if (!lexer.consume_specific("\r\n")) {
m_stream->reset();
co_return Error::from_string_literal("Expected \\r\\n after chunk length");
}
VERIFY(lexer.is_eof());
size_t chunk_length = length_or_error.release_value();
bool is_last_chunk = chunk_length == 0;

while (chunk_length > 0) {
auto data = CO_TRY(co_await m_stream->peek());
size_t to_copy = min(data.size(), chunk_length);
// FIXME: We can reuse the buffer of the underlying stream if our reading frame doesn't span
// multiple chunks.
m_buffer.append(must_sync(m_stream->read(to_copy)));
chunk_length -= to_copy;
co_yield {};
}

if (CO_TRY(co_await m_stream->read(2)) != "\r\n"sv.bytes()) {
m_stream->reset();
co_return Error::from_string_literal("Expected \\r\\n after a chunk");
}

if (is_last_chunk)
co_return {};
}
}

AsyncStreamBuffer m_buffer;
};

}

Coroutine<ErrorOr<NonnullOwnPtr<Http11Response>>> Http11Response::create(Badge<Http11Connection>, RequestData&& data, AsyncStream* stream)
Expand All @@ -107,20 +170,31 @@ Coroutine<ErrorOr<NonnullOwnPtr<Http11Response>>> Http11Response::create(Badge<H
auto [status_code, headers] = CO_TRY(co_await receive_response_headers(stream));

Optional<size_t> content_length;
Optional<StringView> transfer_encoding;
for (auto const& header : headers) {
if (header.header.equals_ignoring_ascii_case("Content-Length"sv)) {
content_length = header.value.to_number<size_t>();
} else if (header.header.equals_ignoring_ascii_case("Transfer-Encoding"sv)) {
transfer_encoding = header.value;
}
}

if (!content_length.has_value()) {
stream->reset();
co_return Error::from_string_literal("'Content-Length' must be provided");
OwnPtr<AsyncInputStream> body;
if (transfer_encoding.has_value()) {
if (transfer_encoding.value() != "chunked"sv) {
stream->reset();
co_return Error::from_string_literal("Unsupported 'Transfer-Encoding'");
}
body = make<ChunkedBodyStream>(stream);
} else {
if (!content_length.has_value()) {
stream->reset();
co_return Error::from_string_literal("'Content-Length' must be provided");
}
body = make<AsyncInputStreamSlice>(stream, content_length.value());
}

auto body = make<AsyncInputStreamSlice>(stream, content_length.value());

co_return adopt_own(*new (nothrow) Http11Response(move(body), status_code, move(headers)));
co_return adopt_own(*new (nothrow) Http11Response(body.release_nonnull(), status_code, move(headers)));
}

}

0 comments on commit e02d2cb

Please sign in to comment.