Skip to content

Commit

Permalink
Implement anonymous pipes and the ability to transmit them between pr…
Browse files Browse the repository at this point in the history
…ocesses using Unix Domain Sockets.

Change-Id: I9c9f4ec0e99075a29c6f4d97c503e759134eb094
TESTED=Unit tests
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/257804
Commit-Queue: Brian Quinlan <bquinlan@google.com>
Reviewed-by: Lasse Nielsen <lrn@google.com>
Reviewed-by: Alexander Aprelev <aam@google.com>
  • Loading branch information
brianquinlan authored and Commit Bot committed Sep 13, 2022
1 parent e324ada commit 2ead86a
Show file tree
Hide file tree
Showing 25 changed files with 777 additions and 41 deletions.
44 changes: 44 additions & 0 deletions runtime/bin/file.cc
Expand Up @@ -550,6 +550,25 @@ void FUNCTION_NAME(File_CreateLink)(Dart_NativeArguments args) {
}
}

void FUNCTION_NAME(File_CreatePipe)(Dart_NativeArguments args) {
Namespace* namespc = Namespace::GetNamespace(args, 0);

File* readPipe;
File* writePipe;
if (File::CreatePipe(namespc, &readPipe, &writePipe)) {
Dart_Handle pipes = ThrowIfError(Dart_NewList(2));
Dart_Handle readHandle =
ThrowIfError(Dart_NewInteger(reinterpret_cast<intptr_t>(readPipe)));
Dart_Handle writeHandle =
ThrowIfError(Dart_NewInteger(reinterpret_cast<intptr_t>(writePipe)));
ThrowIfError(Dart_ListSetAt(pipes, 0, readHandle));
ThrowIfError(Dart_ListSetAt(pipes, 1, writeHandle));
Dart_SetReturnValue(args, pipes);
} else {
Dart_SetReturnValue(args, DartUtils::NewDartOSError());
}
}

void FUNCTION_NAME(File_LinkTarget)(Dart_NativeArguments args) {
Namespace* namespc = Namespace::GetNamespace(args, 0);
Dart_Handle path_handle = Dart_GetNativeArgument(args, 1);
Expand Down Expand Up @@ -915,6 +934,31 @@ CObject* File::CreateRequest(const CObjectArray& request) {
: CObject::NewOSError();
}

CObject* File::CreatePipeRequest(const CObjectArray& request) {
if ((request.Length() < 1) || !request[0]->IsIntptr()) {
return CObject::IllegalArgumentError();
}
Namespace* namespc = CObjectToNamespacePointer(request[0]);
RefCntReleaseScope<Namespace> rs(namespc);

File* readPipe;
File* writePipe;
if (!CreatePipe(namespc, &readPipe, &writePipe)) {
return CObject::NewOSError();
}

CObjectArray* pipes = new CObjectArray(CObject::NewArray(2));
CObjectNativePointer* readHandle = new CObjectNativePointer(
CObject::NewNativePointer(reinterpret_cast<intptr_t>(readPipe),
sizeof(*readPipe), ReleaseFile));
CObjectNativePointer* writeHandle = new CObjectNativePointer(
CObject::NewNativePointer(reinterpret_cast<intptr_t>(writePipe),
sizeof(*writePipe), ReleaseFile));
pipes->SetAt(0, readHandle);
pipes->SetAt(1, writeHandle);
return pipes;
}

CObject* File::OpenRequest(const CObjectArray& request) {
if ((request.Length() < 1) || !request[0]->IsIntptr()) {
return CObject::IllegalArgumentError();
Expand Down
5 changes: 2 additions & 3 deletions runtime/bin/file.h
Expand Up @@ -228,17 +228,15 @@ class File : public ReferenceCounted<File> {
// (stdin, stout or stderr).
static File* OpenStdio(int fd);

#if defined(DART_HOST_OS_FUCHSIA) || defined(DART_HOST_OS_LINUX) || \
defined(DART_HOST_OS_ANDROID) || defined(DART_HOST_OS_MACOS)
static File* OpenFD(int fd);
#endif

static bool Exists(Namespace* namespc, const char* path);
static bool ExistsUri(Namespace* namespc, const char* uri);
static bool Create(Namespace* namespc, const char* path, bool exclusive);
static bool CreateLink(Namespace* namespc,
const char* path,
const char* target);
static bool CreatePipe(Namespace* namespc, File** readPipe, File** writePipe);
static bool Delete(Namespace* namespc, const char* path);
static bool DeleteLink(Namespace* namespc, const char* path);
static bool Rename(Namespace* namespc,
Expand Down Expand Up @@ -297,6 +295,7 @@ class File : public ReferenceCounted<File> {

static CObject* ExistsRequest(const CObjectArray& request);
static CObject* CreateRequest(const CObjectArray& request);
static CObject* CreatePipeRequest(const CObjectArray& request);
static CObject* DeleteRequest(const CObjectArray& request);
static CObject* RenameRequest(const CObjectArray& request);
static CObject* CopyRequest(const CObjectArray& request);
Expand Down
11 changes: 11 additions & 0 deletions runtime/bin/file_android.cc
Expand Up @@ -334,6 +334,17 @@ bool File::CreateLink(Namespace* namespc,
return NO_RETRY_EXPECTED(SymlinkAt(target, ns.fd(), ns.path())) == 0;
}

bool File::CreatePipe(Namespace* namespc, File** readPipe, File** writePipe) {
int pipe_fds[2];
int status = NO_RETRY_EXPECTED(pipe(pipe_fds));
if (status != 0) {
return false;
}
*readPipe = OpenFD(pipe_fds[0]);
*writePipe = OpenFD(pipe_fds[1]);
return true;
}

File::Type File::GetType(Namespace* namespc,
const char* name,
bool follow_links) {
Expand Down
11 changes: 11 additions & 0 deletions runtime/bin/file_fuchsia.cc
Expand Up @@ -332,6 +332,17 @@ bool File::CreateLink(Namespace* namespc,
return NO_RETRY_EXPECTED(symlinkat(target, ns.fd(), ns.path())) == 0;
}

bool File::CreatePipe(Namespace* namespc, File** readPipe, File** writePipe) {
int pipe_fds[2];
int status = NO_RETRY_EXPECTED(pipe(pipe_fds));
if (status != 0) {
return false;
}
*readPipe = OpenFD(pipe_fds[0]);
*writePipe = OpenFD(pipe_fds[1]);
return true;
}

File::Type File::GetType(Namespace* namespc,
const char* name,
bool follow_links) {
Expand Down
11 changes: 11 additions & 0 deletions runtime/bin/file_linux.cc
Expand Up @@ -328,6 +328,17 @@ bool File::CreateLink(Namespace* namespc,
return NO_RETRY_EXPECTED(symlinkat(target, ns.fd(), ns.path())) == 0;
}

bool File::CreatePipe(Namespace* namespc, File** readPipe, File** writePipe) {
int pipe_fds[2];
int status = NO_RETRY_EXPECTED(pipe(pipe_fds));
if (status != 0) {
return false;
}
*readPipe = OpenFD(pipe_fds[0]);
*writePipe = OpenFD(pipe_fds[1]);
return true;
}

File::Type File::GetType(Namespace* namespc,
const char* name,
bool follow_links) {
Expand Down
11 changes: 11 additions & 0 deletions runtime/bin/file_macos.cc
Expand Up @@ -371,6 +371,17 @@ bool File::CreateLink(Namespace* namespc,
return (status == 0);
}

bool File::CreatePipe(Namespace* namespc, File** readPipe, File** writePipe) {
int pipe_fds[2];
int status = NO_RETRY_EXPECTED(pipe(pipe_fds));
if (status != 0) {
return false;
}
*readPipe = OpenFD(pipe_fds[0]);
*writePipe = OpenFD(pipe_fds[1]);
return true;
}

File::Type File::GetType(Namespace* namespc,
const char* pathname,
bool follow_links) {
Expand Down
21 changes: 18 additions & 3 deletions runtime/bin/file_win.cc
Expand Up @@ -314,14 +314,18 @@ File* File::FileOpenW(const wchar_t* system_name, FileOpenMode mode) {
return NULL;
}
}
return OpenFD(fd);
}

File* File::OpenFD(int fd) {
return new File(new FileHandle(fd));
}

class StringRAII {
public:
explicit StringRAII(StringRAII& origin) {
own_ = origin.own_;
s_ = origin.release();
explicit StringRAII(StringRAII* origin) {
own_ = origin->own_;
s_ = origin->release();
}

explicit StringRAII(const char* s) : s_(s), own_(false) {}
Expand Down Expand Up @@ -616,6 +620,17 @@ bool File::CreateLink(Namespace* namespc,
return (create_status != 0);
}

bool File::CreatePipe(Namespace* namespc, File** readPipe, File** writePipe) {
int pipe_fds[2];
int status = _pipe(pipe_fds, 4096, _O_BINARY);
if (status != 0) {
return false;
}
*readPipe = OpenFD(pipe_fds[0]);
*writePipe = OpenFD(pipe_fds[1]);
return true;
}

bool File::Delete(Namespace* namespc, const char* name) {
Utf8ToWideScope system_name(PrefixLongFilePath(name));
int status = _wremove(system_name.wide());
Expand Down
1 change: 1 addition & 0 deletions runtime/bin/io_natives.cc
Expand Up @@ -40,6 +40,7 @@ namespace bin {
V(File_Copy, 3) \
V(File_Create, 3) \
V(File_CreateLink, 3) \
V(File_CreatePipe, 1) \
V(File_Delete, 2) \
V(File_DeleteLink, 2) \
V(File_Exists, 2) \
Expand Down
25 changes: 13 additions & 12 deletions runtime/bin/io_service.h
Expand Up @@ -48,18 +48,19 @@ namespace bin {
V(File, Identical, 28) \
V(File, Stat, 29) \
V(File, Lock, 30) \
V(Socket, Lookup, 31) \
V(Socket, ListInterfaces, 32) \
V(Socket, ReverseLookup, 33) \
V(Directory, Create, 34) \
V(Directory, Delete, 35) \
V(Directory, Exists, 36) \
V(Directory, CreateTemp, 37) \
V(Directory, ListStart, 38) \
V(Directory, ListNext, 39) \
V(Directory, ListStop, 40) \
V(Directory, Rename, 41) \
V(SSLFilter, ProcessFilter, 42)
V(File, CreatePipe, 31) \
V(Socket, Lookup, 32) \
V(Socket, ListInterfaces, 33) \
V(Socket, ReverseLookup, 34) \
V(Directory, Create, 35) \
V(Directory, Delete, 36) \
V(Directory, Exists, 37) \
V(Directory, CreateTemp, 38) \
V(Directory, ListStart, 39) \
V(Directory, ListNext, 40) \
V(Directory, ListStop, 41) \
V(Directory, Rename, 42) \
V(SSLFilter, ProcessFilter, 43)

#define DECLARE_REQUEST(type, method, id) k##type##method##Request = id,

Expand Down
2 changes: 2 additions & 0 deletions runtime/tests/concurrency/stress_test_list.json
Expand Up @@ -3258,6 +3258,7 @@
"../../../tests/standalone/io/file_non_ascii_sync_test.dart",
"../../../tests/standalone/io/file_non_ascii_test.dart",
"../../../tests/standalone/io/file_output_stream_test.dart",
"../../../tests/standalone/io/file_pipe_test.dart",
"../../../tests/standalone/io/file_read_encoded_test.dart",
"../../../tests/standalone/io/file_stat_test.dart",
"../../../tests/standalone/io/file_stream_test.dart",
Expand Down Expand Up @@ -6580,6 +6581,7 @@
"../../../tests/standalone_2/io/file_non_ascii_sync_test.dart",
"../../../tests/standalone_2/io/file_non_ascii_test.dart",
"../../../tests/standalone_2/io/file_output_stream_test.dart",
"../../../tests/standalone_2/io/file_pipe_test.dart",
"../../../tests/standalone_2/io/file_read_encoded_test.dart",
"../../../tests/standalone_2/io/file_stat_test.dart",
"../../../tests/standalone_2/io/file_stream_test.dart",
Expand Down
15 changes: 15 additions & 0 deletions sdk/lib/_internal/js_dev_runtime/patch/io_patch.dart
Expand Up @@ -122,6 +122,11 @@ class _File {
throw UnsupportedError("File._createLink");
}

@patch
static List<dynamic> _createPipe(_Namespace namespace) {
throw UnsupportedError("File._createPipe");
}

@patch
static _linkTarget(_Namespace namespace, Uint8List rawPath) {
throw UnsupportedError("File._linkTarget");
Expand Down Expand Up @@ -541,6 +546,16 @@ class ResourceHandle {
factory ResourceHandle.fromStdout(Stdout stdout) {
throw UnsupportedError("ResourceHandle.fromStdout constructor");
}

@patch
factory ResourceHandle.fromReadPipe(ReadPipe pipe) {
throw UnsupportedError("ResourceHandle.fromReadPipe constructor");
}

@patch
factory ResourceHandle.fromWritePipe(WritePipe pipe) {
throw UnsupportedError("ResourceHandle.fromWritePipe constructor");
}
}

@patch
Expand Down
15 changes: 15 additions & 0 deletions sdk/lib/_internal/js_runtime/lib/io_patch.dart
Expand Up @@ -122,6 +122,11 @@ class _File {
throw new UnsupportedError("File._createLink");
}

@patch
static List<dynamic> _createPipe(_Namespace namespace) {
throw UnsupportedError("File._createPipe");
}

@patch
static _linkTarget(_Namespace namespace, Uint8List path) {
throw new UnsupportedError("File._linkTarget");
Expand Down Expand Up @@ -541,6 +546,16 @@ class ResourceHandle {
factory ResourceHandle.fromStdout(Stdout stdout) {
throw UnsupportedError("ResourceHandle.fromStdout constructor");
}

@patch
factory ResourceHandle.fromReadPipe(ReadPipe pipe) {
throw UnsupportedError("ResourceHandle.fromReadPipe constructor");
}

@patch
factory ResourceHandle.fromWritePipe(WritePipe pipe) {
throw UnsupportedError("ResourceHandle.fromWritePipe constructor");
}
}

@patch
Expand Down
3 changes: 3 additions & 0 deletions sdk/lib/_internal/vm/bin/file_patch.dart
Expand Up @@ -18,6 +18,9 @@ class _File {
external static _createLink(
_Namespace namespace, Uint8List rawPath, String target);
@patch
@pragma("vm:external-name", "File_CreatePipe")
external static List<dynamic> _createPipe(_Namespace namespace);
@patch
@pragma("vm:external-name", "File_LinkTarget")
external static _linkTarget(_Namespace namespace, Uint8List rawPath);
@patch
Expand Down
21 changes: 20 additions & 1 deletion sdk/lib/_internal/vm/bin/socket_patch.dart
Expand Up @@ -756,7 +756,8 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
"Address family not supported by protocol family, "
// ...and then add some details.
"sourceAddress.type must be ${InternetAddressType.unix} but was "
"${source.type}", address: address);
"${source.type}",
address: address);
}
connectionResult = socket.nativeCreateUnixDomainBindConnect(
address.address, source.address, _Namespace._namespace);
Expand Down Expand Up @@ -2592,6 +2593,16 @@ class ResourceHandle {
factory ResourceHandle.fromStdout(Stdout stdout) {
return _ResourceHandleImpl(stdout._fd);
}

factory ResourceHandle.fromReadPipe(ReadPipe pipe) {
_ReadPipe rp = pipe as _ReadPipe;
return ResourceHandle.fromFile(rp._openedFile!);
}

factory ResourceHandle.fromWritePipe(WritePipe pipe) {
_WritePipe wp = pipe as _WritePipe;
return ResourceHandle.fromFile(wp._file);
}
}

@pragma("vm:entry-point")
Expand Down Expand Up @@ -2622,6 +2633,14 @@ class _ResourceHandleImpl implements ResourceHandle {
return _RawSocket(nativeSocket);
}

_ReadPipe toReadPipe() {
return _ReadPipe(toFile());
}

_WritePipe toWritePipe() {
return _WritePipe(toFile());
}

@pragma("vm:external-name", "ResourceHandleImpl_toRawDatagramSocket")
external RawDatagramSocket toRawDatagramSocket();

Expand Down
15 changes: 15 additions & 0 deletions sdk/lib/_internal/wasm/lib/io_patch.dart
Expand Up @@ -121,6 +121,11 @@ class _File {
throw new UnsupportedError("File._createLink");
}

@patch
static List<dynamic> _createPipe(_Namespace namespace) {
throw UnsupportedError("File._createPipe");
}

@patch
static _linkTarget(_Namespace namespace, Uint8List path) {
throw new UnsupportedError("File._linkTarget");
Expand Down Expand Up @@ -540,6 +545,16 @@ class ResourceHandle {
factory ResourceHandle.fromStdout(Stdout stdout) {
throw UnsupportedError("ResourceHandle.fromStdout constructor");
}

@patch
factory ResourceHandle.fromReadPipe(ReadPipe pipe) {
throw UnsupportedError("ResourceHandle.fromReadPipe constructor");
}

@patch
factory ResourceHandle.fromWritePipe(WritePipe pipe) {
throw UnsupportedError("ResourceHandle.fromWritePipe constructor");
}
}

@patch
Expand Down

0 comments on commit 2ead86a

Please sign in to comment.