Skip to content

Commit

Permalink
Add stream_await() for async stream "blocking"
Browse files Browse the repository at this point in the history
Summary: As with cURL, use stream_await() to drop the fd
into the asio event loop and get notified of read/write events.

Reviewed By: @JoelMarcey

Differential Revision: D1720052

Signature: t1:1720052:1418353349:c7c252f30e901d40e303e3e445ddf518aaf98d6d
  • Loading branch information
sgolemon authored and hhvm-bot committed Dec 12, 2014
1 parent b76766c commit b2559b6
Show file tree
Hide file tree
Showing 14 changed files with 342 additions and 16 deletions.
108 changes: 108 additions & 0 deletions hphp/runtime/base/file-await.cpp
@@ -0,0 +1,108 @@
#include "hphp/runtime/base/file-await.h"
#include "hphp/runtime/base/file.h"
#include "hphp/runtime/ext/asio/static_wait_handle.h"

namespace HPHP {
/////////////////////////////////////////////////////////////////////////////

void FileTimeoutHandler::timeoutExpired() noexcept {
if (m_fileAwait) {
m_fileAwait->setFinished(FileAwait::TIMEOUT);
}
}

void FileEventHandler::handlerReady(uint16_t events) noexcept {
if (m_fileAwait) {
m_fileAwait->setFinished(events ? FileAwait::READY : FileAwait::CLOSED);
}
}

/////////////////////////////////////////////////////////////////////////////

FileAwait::FileAwait(int fd, uint16_t events, double timeout) {
assert(fd >= 0);
assert(events & FileEventHandler::READ_WRITE);

m_file = std::make_shared<FileEventHandler>
(s_asio_event_base.get(), fd, this);
m_file->registerHandler(events);

int64_t timeout_ms = timeout * 1000.0;
if (timeout_ms > 0) {
m_timeout = std::make_shared<FileTimeoutHandler>
(s_asio_event_base.get(), this);
s_asio_event_base->runInEventBaseThread([this,timeout_ms]{
if (m_timeout) {
m_timeout->scheduleTimeout(timeout_ms);
}
});
}
}

FileAwait::~FileAwait() {
if (m_file) {
// Avoid possible race condition
m_file->m_fileAwait = nullptr;

m_file->unregisterHandler();
m_file.reset();
}
if (m_timeout) {
// Avoid race condition, we may (likely) finish destructing
// before the timeout cancels
m_timeout->m_fileAwait = nullptr;

std::shared_ptr<FileTimeoutHandler> to = m_timeout;
s_asio_event_base->runInEventBaseThread([to]{
to.get()->cancelTimeout();
});
m_timeout.reset();
}
}

void FileAwait::unserialize(Cell& c) {
c.m_type = KindOfInt64;
c.m_data.num = m_result;
}

void FileAwait::setFinished(int64_t status) {
if (status > m_result) {
m_result = status;
}
if (!m_finished) {
markAsFinished();
m_finished = true;
}
}

/////////////////////////////////////////////////////////////////////////////

Object File::await(uint16_t events, double timeout) {
if (isClosed()) {
Cell closedResult;
closedResult.m_type = KindOfInt64;
closedResult.m_data.num = FileAwait::CLOSED;
return c_StaticWaitHandle::CreateSucceeded(closedResult);
}
if (fd() < 0) {
throw Object(SystemLib::AllocExceptionObject(
"Unable to await on stream, invalid file descriptor"));
}
events = events & FileEventHandler::READ_WRITE;
if (!events) {
throw Object(SystemLib::AllocExceptionObject(
"Must await for reading, writing, or both."));
}

auto ev = new FileAwait(fd(), events, timeout);
try {
return ev->getWaitHandle();
} catch (...) {
assert(false);
ev->abandon();
throw;
}
}

/////////////////////////////////////////////////////////////////////////////
} // namespace HPHP
57 changes: 57 additions & 0 deletions hphp/runtime/base/file-await.h
@@ -0,0 +1,57 @@
#ifndef incl_HPHP_FILE_AWAIT_H
#define incl_HPHP_FILE_AWAIT_H

#include "hphp/runtime/base/base-includes.h"
#include "hphp/runtime/ext/asio/asio_external_thread_event.h"
#include "hphp/runtime/ext/asio/socket-event.h"

namespace HPHP {
/////////////////////////////////////////////////////////////////////////////

class FileAwait;

class FileTimeoutHandler : public AsioTimeoutHandler {
friend class FileAwait;
public:
FileTimeoutHandler(AsioEventBase* base, FileAwait* fa):
AsioTimeoutHandler(base), m_fileAwait(fa) {}

void timeoutExpired() noexcept override;
private:
FileAwait* m_fileAwait;
};

class FileEventHandler : public AsioEventHandler {
friend class FileAwait;
public:
FileEventHandler(AsioEventBase* base, int fd, FileAwait* fa):
AsioEventHandler(base, fd), m_fileAwait(fa) {}

void handlerReady(uint16_t events) noexcept override;
private:
FileAwait* m_fileAwait;
};

class FileAwait : public AsioExternalThreadEvent {
public:
enum Status {
ERROR = -1,
TIMEOUT = 0,
READY,
CLOSED,
};

FileAwait(int fd, uint16_t events, double timeout);
~FileAwait();
void unserialize(Cell& c) override;
void setFinished(int64_t status);
private:
std::shared_ptr<FileEventHandler> m_file;
std::shared_ptr<FileTimeoutHandler> m_timeout;
int m_result{-1};
bool m_finished{false};
};

/////////////////////////////////////////////////////////////////////////////
} // namespace HPHP
#endif // incl_HPHP_FILE_AWAIT_H
2 changes: 2 additions & 0 deletions hphp/runtime/base/file.h
Expand Up @@ -149,6 +149,8 @@ struct File : SweepableResourceData {
virtual bool lock(int operation, bool &wouldblock);
virtual bool stat(struct stat *sb);

virtual Object await(uint16_t events, double timeout);

virtual Array getMetaData();
virtual Variant getWrapperMetaData() { return Variant(); }
String getWrapperType() const;
Expand Down
9 changes: 7 additions & 2 deletions hphp/runtime/base/temp-file.h
Expand Up @@ -39,8 +39,13 @@ class TempFile : public PlainFile {
const String& o_getClassNameHook() const { return classnameof(); }

// implementing File
virtual bool open(const String& filename, const String& mode);
virtual bool close();
bool open(const String& filename, const String& mode) override;
bool close() override;

Object await(uint16_t events, double timeout) override {
throw Object(SystemLib::AllocExceptionObject(
"Temporary stream does not support awaiting"));
}

private:
bool m_autoDelete;
Expand Down
33 changes: 19 additions & 14 deletions hphp/runtime/base/user-file.h
Expand Up @@ -39,24 +39,29 @@ class UserFile : public File, public UserFSNode {
return openImpl(filename, mode, 0);
}
bool openImpl(const String& filename, const String& mode, int options);
virtual bool close();
virtual int64_t readImpl(char *buffer, int64_t length);
virtual int64_t writeImpl(const char *buffer, int64_t length);
virtual bool seekable() { return m_StreamSeek || m_Call; }
virtual bool seek(int64_t offset, int whence = SEEK_SET);
virtual int64_t tell();
virtual bool eof();
virtual bool rewind() { return seek(0, SEEK_SET); }
virtual bool flush();
virtual bool truncate(int64_t size);
virtual bool lock(int operation) {
bool close() override;
int64_t readImpl(char *buffer, int64_t length) override;
int64_t writeImpl(const char *buffer, int64_t length) override;
bool seekable() override { return m_StreamSeek || m_Call; }
bool seek(int64_t offset, int whence = SEEK_SET) override;
int64_t tell() override;
bool eof() override;
bool rewind() override { return seek(0, SEEK_SET); }
bool flush() override;
bool truncate(int64_t size) override;
bool lock(int operation) override {
bool wouldBlock = false;
return lock(operation, wouldBlock);
}
virtual bool lock(int operation, bool &wouldBlock);
virtual bool stat(struct stat* buf);
bool lock(int operation, bool &wouldBlock) override;
bool stat(struct stat* buf) override;

virtual Variant getWrapperMetaData() { return Variant(m_obj); }
Object await(uint16_t events, double timeout) override {
throw Object(SystemLib::AllocExceptionObject(
"Userstreams do not support awaiting"));
}

Variant getWrapperMetaData() override { return Variant(m_obj); }

int access(const String& path, int mode);
int lstat(const String& path, struct stat* buf);
Expand Down
19 changes: 19 additions & 0 deletions hphp/runtime/ext/stream/ext_stream.cpp
Expand Up @@ -26,6 +26,7 @@
#include "hphp/runtime/base/zend-printf.h"
#include "hphp/runtime/server/server-stats.h"
#include "hphp/runtime/base/file.h"
#include "hphp/runtime/base/file-await.h"
#include "hphp/runtime/base/stream-wrapper.h"
#include "hphp/runtime/base/stream-wrapper-registry.h"
#include "hphp/runtime/base/user-stream-wrapper.h"
Expand Down Expand Up @@ -149,6 +150,15 @@ static class StreamExtension : public Extension {
REGISTER_CONSTANT(STREAM_SOCK_STREAM, k_STREAM_SOCK_STREAM);
REGISTER_CONSTANT(STREAM_USE_PATH, k_STREAM_USE_PATH);

REGISTER_CONSTANT(STREAM_AWAIT_READ, FileEventHandler::READ);
REGISTER_CONSTANT(STREAM_AWAIT_WRITE, FileEventHandler::WRITE);
REGISTER_CONSTANT(STREAM_AWAIT_READ_WRITE, FileEventHandler::READ_WRITE);

REGISTER_CONSTANT(STREAM_AWAIT_ERROR, FileAwait::ERROR);
REGISTER_CONSTANT(STREAM_AWAIT_TIMEOUT, FileAwait::TIMEOUT);
REGISTER_CONSTANT(STREAM_AWAIT_READY, FileAwait::READY);
REGISTER_CONSTANT(STREAM_AWAIT_CLOSED, FileAwait::CLOSED);

HHVM_FE(stream_context_create);
HHVM_FE(stream_context_get_options);
HHVM_FE(stream_context_set_option);
Expand All @@ -168,6 +178,7 @@ static class StreamExtension : public Extension {
HHVM_FE(stream_wrapper_unregister);
HHVM_FE(stream_resolve_include_path);
HHVM_FE(stream_select);
HHVM_FE(stream_await);
HHVM_FE(stream_set_blocking);
HHVM_FE(stream_set_timeout);
HHVM_FE(stream_set_write_buffer);
Expand Down Expand Up @@ -416,6 +427,14 @@ Variant HHVM_FUNCTION(stream_select,
vtv_sec, tv_usec);
}

Object HHVM_FUNCTION(stream_await,
const Resource& stream,
uint16_t events,
double timeout /*= 0.0 */) {
auto f = stream.getTyped<File>();
return f->await(events, timeout);
}

bool HHVM_FUNCTION(stream_set_blocking,
const Resource& stream,
int mode) {
Expand Down
5 changes: 5 additions & 0 deletions hphp/runtime/ext/stream/ext_stream.h
Expand Up @@ -195,6 +195,11 @@ Variant HHVM_FUNCTION(stream_select,
const Variant& vtv_sec,
int tv_usec = 0);

Object HHVM_FUNCTION(stream_await,
const Resource& stream,
uint16_t events,
double timeout = 0.0);

bool HHVM_FUNCTION(stream_set_blocking,
const Resource& stream,
int mode);
Expand Down
20 changes: 20 additions & 0 deletions hphp/runtime/ext/stream/ext_stream.php
Expand Up @@ -320,6 +320,26 @@ function stream_select(mixed &$read,
mixed $vtv_sec,
int $tv_usec = 0): mixed;

/**
* Awaitable version of stream_select()
*
* @param resource $fp - Stream resource, must be backed by a file descriptor
* such as a normal file, socket, tempfile, or stdio.
* Does not work with memory streams or user streams.
* @param int $events - Mix of STREAM_AWAIT_READ and/or STREAM_EVENT_WRITE
* @param float $timeout - Timeout in seconds
*
* @return int - Result code
* STREAM_AWAIT_CLOSED: Stream is closed
* STREAM_AWAIT_READY: Activity on the provided stream
* STREAM_AWAIT_TIMEOUT: No activity (timeout occured)
* STREAM_AWAIT_ERROR: Error
*/
<<__Native>>
async function stream_await(resource $fp,
int $events,
float $timeout = 0.0): Awaitable<int>;

/**
* Sets blocking or non-blocking mode on a stream. This function works for
* any stream that supports non-blocking mode (currently, regular files and
Expand Down
9 changes: 9 additions & 0 deletions hphp/test/slow/async/streams/memory-stream.php
@@ -0,0 +1,9 @@
<?hh

try {
$wh = stream_await(fopen('php://memory', 'rw'),
STREAM_AWAIT_READ | STREAM_AWAIT_WRITE, 0.0);
var_dump($wh->join());
} catch (Exception $e) {
echo "Exception: ", $e->getMessage(), "\n";
}
1 change: 1 addition & 0 deletions hphp/test/slow/async/streams/memory-stream.php.expect
@@ -0,0 +1 @@
Exception: Temporary stream does not support awaiting

0 comments on commit b2559b6

Please sign in to comment.