Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions builtin-functions/kphp-light/stdlib/file-functions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ define('STREAM_CLIENT_CONNECT', 1);
define('DEFAULT_SOCKET_TIMEOUT', 60);

function stream_socket_client ($url ::: string, &$error_number ::: mixed = TODO, &$error_description ::: mixed = TODO, $timeout ::: float = DEFAULT_SOCKET_TIMEOUT, $flags ::: int = STREAM_CLIENT_CONNECT, $context ::: mixed = null) ::: mixed;
function stream_set_blocking ($stream, $mode ::: bool) ::: bool;

function fopen ($filename ::: string, $mode ::: string): mixed;
/** @kphp-extern-func-info interruptible */
Expand Down
1 change: 1 addition & 0 deletions runtime-light/k2-platform/k2-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ inline constexpr size_t DEFAULT_MEMORY_ALIGN = 16;

inline constexpr int32_t errno_ok = 0;
inline constexpr int32_t errno_e2big = E2BIG;
inline constexpr int32_t errno_ebadfd = EBADF;
inline constexpr int32_t errno_ebusy = EBUSY;
inline constexpr int32_t errno_enodev = ENODEV;
inline constexpr int32_t errno_einval = EINVAL;
Expand Down
10 changes: 10 additions & 0 deletions runtime-light/stdlib/file/file-system-functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ inline resource f$stream_socket_client(const string& address, std::optional<std:
return make_instance<kphp::fs::socket>(*std::move(expected));
}

inline bool f$stream_set_blocking(const resource& stream, bool mode) noexcept {
if (auto socket{from_mixed<class_instance<kphp::fs::socket>>(stream, {})}; !socket.is_null()) {
socket.get()->set_blocking(mode);
return true;
}

kphp::log::warning("unexpected resource in stream_set_blocking -> {}", stream.to_string().c_str());
return false;
}

inline Optional<string> f$file_get_contents(const string& stream) noexcept {
if (auto sync_resource{from_mixed<class_instance<kphp::fs::sync_resource>>(f$fopen(stream, FileSystemImageState::get().READ_MODE), {})};
!sync_resource.is_null()) {
Expand Down
6 changes: 6 additions & 0 deletions runtime-light/stdlib/file/resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,8 @@ class socket : public async_resource {

static auto open(std::string_view scheme) noexcept -> std::expected<socket, int32_t>;

auto set_blocking(bool blocking) noexcept -> void;

auto write(std::span<const std::byte> buf) noexcept -> kphp::coro::task<std::expected<size_t, int32_t>> override;
auto read(std::span<std::byte> buf) noexcept -> kphp::coro::task<std::expected<size_t, int32_t>> override;
auto get_contents() noexcept -> kphp::coro::task<std::expected<string, int32_t>> override;
Expand Down Expand Up @@ -689,6 +691,10 @@ inline auto socket::open(std::string_view scheme) noexcept -> std::expected<sock
return expected;
}

inline auto socket::set_blocking(bool blocking) noexcept -> void {
m_stream.set_blocking(blocking);
}

inline auto socket::write(std::span<const std::byte> buf) noexcept -> kphp::coro::task<std::expected<size_t, int32_t>> {
if (!m_open) [[unlikely]] {
co_return std::unexpected{k2::errno_enodev};
Expand Down
39 changes: 39 additions & 0 deletions runtime-light/streams/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
namespace kphp::component {

class stream {
bool m_non_blocking{false};
k2::descriptor m_descriptor{k2::INVALID_PLATFORM_DESCRIPTOR};
kphp::coro::io_scheduler& m_scheduler{kphp::coro::io_scheduler::get()};

explicit stream(k2::descriptor descriptor) noexcept
: m_descriptor(descriptor) {}

auto read_non_blocking(std::span<std::byte> buf) const noexcept -> std::expected<size_t, int32_t>;
auto write_non_blocking(std::span<const std::byte> buf) const noexcept -> std::expected<size_t, int32_t>;

public:
stream(stream&& other) noexcept
: m_descriptor(std::exchange(other.m_descriptor, k2::INVALID_PLATFORM_DESCRIPTOR)) {}
Expand Down Expand Up @@ -60,6 +64,7 @@ class stream {
auto descriptor() const noexcept -> k2::descriptor;
auto reset(k2::descriptor descriptor) noexcept -> void;
auto status() const noexcept -> k2::StreamStatus;
auto set_blocking(bool blocking) noexcept -> void;

auto read(std::span<std::byte> buf) const noexcept -> kphp::coro::task<std::expected<size_t, int32_t>>;
template<std::invocable<std::span<const std::byte>> F>
Expand All @@ -72,6 +77,28 @@ class stream {

// ================================================================================================

inline auto stream::read_non_blocking(std::span<std::byte> buf) const noexcept -> std::expected<size_t, int32_t> {
switch (status().read_status) {
case k2::IOStatus::IOBlocked:
return std::expected<size_t, int32_t>{0};
case k2::IOStatus::IOClosed:
return std::unexpected{k2::errno_ebadfd};
case k2::IOStatus::IOAvailable:
return k2::read(m_descriptor, buf);
}
}

inline auto stream::write_non_blocking(std::span<const std::byte> buf) const noexcept -> std::expected<size_t, int32_t> {
switch (status().write_status) {
case k2::IOStatus::IOBlocked:
return std::expected<size_t, int32_t>{0};
case k2::IOStatus::IOClosed:
return std::unexpected{k2::errno_ebadfd};
case k2::IOStatus::IOAvailable:
return k2::write(m_descriptor, buf);
}
}

inline auto stream::open(std::string_view target, k2::stream_kind stream_kind) noexcept -> std::expected<kphp::component::stream, int32_t> {
int32_t errc{};
k2::descriptor descriptor{k2::INVALID_PLATFORM_DESCRIPTOR};
Expand Down Expand Up @@ -125,7 +152,15 @@ inline auto stream::status() const noexcept -> k2::StreamStatus {
return stream_status;
}

inline auto stream::set_blocking(bool blocking) noexcept -> void {
m_non_blocking = !blocking;
}

inline auto stream::read(std::span<std::byte> buf) const noexcept -> kphp::coro::task<std::expected<size_t, int32_t>> {
if (m_non_blocking) {
co_return read_non_blocking(buf);
}

for (size_t read{}; read < buf.size();) {
switch (co_await m_scheduler.poll(m_descriptor, kphp::coro::poll_op::read)) {
case kphp::coro::poll_status::event:
Expand Down Expand Up @@ -158,6 +193,10 @@ auto stream::read_all(F f) const noexcept -> kphp::coro::task<std::expected<void
}

inline auto stream::write(std::span<const std::byte> buf) const noexcept -> kphp::coro::task<std::expected<size_t, int32_t>> {
if (m_non_blocking) {
co_return write_non_blocking(buf);
}

for (size_t written{}; written < buf.size();) {
switch (co_await m_scheduler.poll(m_descriptor, kphp::coro::poll_op::write)) {
case kphp::coro::poll_status::event:
Expand Down
Loading