From 2ead86ab9dcbfe9217af01670c7d93ede73381c9 Mon Sep 17 00:00:00 2001 From: Brian Quinlan Date: Tue, 13 Sep 2022 21:16:28 +0000 Subject: [PATCH] Implement anonymous pipes and the ability to transmit them between processes using Unix Domain Sockets. Change-Id: I9c9f4ec0e99075a29c6f4d97c503e759134eb094 TESTED=Unit tests Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/257804 Commit-Queue: Brian Quinlan Reviewed-by: Lasse Nielsen Reviewed-by: Alexander Aprelev --- runtime/bin/file.cc | 44 ++++++ runtime/bin/file.h | 5 +- runtime/bin/file_android.cc | 11 ++ runtime/bin/file_fuchsia.cc | 11 ++ runtime/bin/file_linux.cc | 11 ++ runtime/bin/file_macos.cc | 11 ++ runtime/bin/file_win.cc | 21 ++- runtime/bin/io_natives.cc | 1 + runtime/bin/io_service.h | 25 +-- .../tests/concurrency/stress_test_list.json | 2 + .../js_dev_runtime/patch/io_patch.dart | 15 ++ .../_internal/js_runtime/lib/io_patch.dart | 15 ++ sdk/lib/_internal/vm/bin/file_patch.dart | 3 + sdk/lib/_internal/vm/bin/socket_patch.dart | 21 ++- sdk/lib/_internal/wasm/lib/io_patch.dart | 15 ++ sdk/lib/io/file.dart | 64 ++++++++ sdk/lib/io/file_impl.dart | 71 ++++++++- sdk/lib/io/io_service.dart | 25 +-- sdk/lib/io/socket.dart | 23 ++- tests/standalone/io/file_pipe_test.dart | 48 ++++++ tests/standalone/io/file_stream_test.dart | 22 ++- tests/standalone/io/unix_socket_test.dart | 140 +++++++++++++++++ tests/standalone_2/io/file_pipe_test.dart | 50 ++++++ tests/standalone_2/io/file_stream_test.dart | 22 ++- tests/standalone_2/io/unix_socket_test.dart | 142 +++++++++++++++++- 25 files changed, 777 insertions(+), 41 deletions(-) create mode 100644 tests/standalone/io/file_pipe_test.dart create mode 100644 tests/standalone_2/io/file_pipe_test.dart diff --git a/runtime/bin/file.cc b/runtime/bin/file.cc index e58636ea1c82..408539491837 100644 --- a/runtime/bin/file.cc +++ b/runtime/bin/file.cc @@ -550,6 +550,25 @@ void FUNCTION_NAME(File_CreateLink)(Dart_NativeArguments args) { } } +void FUNCTION_NAME(File_CreatePipe)(Dart_NativeArguments args) { + Namespace* namespc = Namespace::GetNamespace(args, 0); + + File* readPipe; + File* writePipe; + if (File::CreatePipe(namespc, &readPipe, &writePipe)) { + Dart_Handle pipes = ThrowIfError(Dart_NewList(2)); + Dart_Handle readHandle = + ThrowIfError(Dart_NewInteger(reinterpret_cast(readPipe))); + Dart_Handle writeHandle = + ThrowIfError(Dart_NewInteger(reinterpret_cast(writePipe))); + ThrowIfError(Dart_ListSetAt(pipes, 0, readHandle)); + ThrowIfError(Dart_ListSetAt(pipes, 1, writeHandle)); + Dart_SetReturnValue(args, pipes); + } else { + Dart_SetReturnValue(args, DartUtils::NewDartOSError()); + } +} + void FUNCTION_NAME(File_LinkTarget)(Dart_NativeArguments args) { Namespace* namespc = Namespace::GetNamespace(args, 0); Dart_Handle path_handle = Dart_GetNativeArgument(args, 1); @@ -915,6 +934,31 @@ CObject* File::CreateRequest(const CObjectArray& request) { : CObject::NewOSError(); } +CObject* File::CreatePipeRequest(const CObjectArray& request) { + if ((request.Length() < 1) || !request[0]->IsIntptr()) { + return CObject::IllegalArgumentError(); + } + Namespace* namespc = CObjectToNamespacePointer(request[0]); + RefCntReleaseScope rs(namespc); + + File* readPipe; + File* writePipe; + if (!CreatePipe(namespc, &readPipe, &writePipe)) { + return CObject::NewOSError(); + } + + CObjectArray* pipes = new CObjectArray(CObject::NewArray(2)); + CObjectNativePointer* readHandle = new CObjectNativePointer( + CObject::NewNativePointer(reinterpret_cast(readPipe), + sizeof(*readPipe), ReleaseFile)); + CObjectNativePointer* writeHandle = new CObjectNativePointer( + CObject::NewNativePointer(reinterpret_cast(writePipe), + sizeof(*writePipe), ReleaseFile)); + pipes->SetAt(0, readHandle); + pipes->SetAt(1, writeHandle); + return pipes; +} + CObject* File::OpenRequest(const CObjectArray& request) { if ((request.Length() < 1) || !request[0]->IsIntptr()) { return CObject::IllegalArgumentError(); diff --git a/runtime/bin/file.h b/runtime/bin/file.h index 836feb4b2fb9..d58fbea7bca7 100644 --- a/runtime/bin/file.h +++ b/runtime/bin/file.h @@ -228,10 +228,7 @@ class File : public ReferenceCounted { // (stdin, stout or stderr). static File* OpenStdio(int fd); -#if defined(DART_HOST_OS_FUCHSIA) || defined(DART_HOST_OS_LINUX) || \ - defined(DART_HOST_OS_ANDROID) || defined(DART_HOST_OS_MACOS) static File* OpenFD(int fd); -#endif static bool Exists(Namespace* namespc, const char* path); static bool ExistsUri(Namespace* namespc, const char* uri); @@ -239,6 +236,7 @@ class File : public ReferenceCounted { static bool CreateLink(Namespace* namespc, const char* path, const char* target); + static bool CreatePipe(Namespace* namespc, File** readPipe, File** writePipe); static bool Delete(Namespace* namespc, const char* path); static bool DeleteLink(Namespace* namespc, const char* path); static bool Rename(Namespace* namespc, @@ -297,6 +295,7 @@ class File : public ReferenceCounted { static CObject* ExistsRequest(const CObjectArray& request); static CObject* CreateRequest(const CObjectArray& request); + static CObject* CreatePipeRequest(const CObjectArray& request); static CObject* DeleteRequest(const CObjectArray& request); static CObject* RenameRequest(const CObjectArray& request); static CObject* CopyRequest(const CObjectArray& request); diff --git a/runtime/bin/file_android.cc b/runtime/bin/file_android.cc index 02378b57b90b..b676fb59b8c1 100644 --- a/runtime/bin/file_android.cc +++ b/runtime/bin/file_android.cc @@ -334,6 +334,17 @@ bool File::CreateLink(Namespace* namespc, return NO_RETRY_EXPECTED(SymlinkAt(target, ns.fd(), ns.path())) == 0; } +bool File::CreatePipe(Namespace* namespc, File** readPipe, File** writePipe) { + int pipe_fds[2]; + int status = NO_RETRY_EXPECTED(pipe(pipe_fds)); + if (status != 0) { + return false; + } + *readPipe = OpenFD(pipe_fds[0]); + *writePipe = OpenFD(pipe_fds[1]); + return true; +} + File::Type File::GetType(Namespace* namespc, const char* name, bool follow_links) { diff --git a/runtime/bin/file_fuchsia.cc b/runtime/bin/file_fuchsia.cc index 0b5e0fd1d1ff..c99156cd66fb 100644 --- a/runtime/bin/file_fuchsia.cc +++ b/runtime/bin/file_fuchsia.cc @@ -332,6 +332,17 @@ bool File::CreateLink(Namespace* namespc, return NO_RETRY_EXPECTED(symlinkat(target, ns.fd(), ns.path())) == 0; } +bool File::CreatePipe(Namespace* namespc, File** readPipe, File** writePipe) { + int pipe_fds[2]; + int status = NO_RETRY_EXPECTED(pipe(pipe_fds)); + if (status != 0) { + return false; + } + *readPipe = OpenFD(pipe_fds[0]); + *writePipe = OpenFD(pipe_fds[1]); + return true; +} + File::Type File::GetType(Namespace* namespc, const char* name, bool follow_links) { diff --git a/runtime/bin/file_linux.cc b/runtime/bin/file_linux.cc index e2091584bced..6dd6e90f8fab 100644 --- a/runtime/bin/file_linux.cc +++ b/runtime/bin/file_linux.cc @@ -328,6 +328,17 @@ bool File::CreateLink(Namespace* namespc, return NO_RETRY_EXPECTED(symlinkat(target, ns.fd(), ns.path())) == 0; } +bool File::CreatePipe(Namespace* namespc, File** readPipe, File** writePipe) { + int pipe_fds[2]; + int status = NO_RETRY_EXPECTED(pipe(pipe_fds)); + if (status != 0) { + return false; + } + *readPipe = OpenFD(pipe_fds[0]); + *writePipe = OpenFD(pipe_fds[1]); + return true; +} + File::Type File::GetType(Namespace* namespc, const char* name, bool follow_links) { diff --git a/runtime/bin/file_macos.cc b/runtime/bin/file_macos.cc index 15e7ec2999dc..0ad4f1fb929e 100644 --- a/runtime/bin/file_macos.cc +++ b/runtime/bin/file_macos.cc @@ -371,6 +371,17 @@ bool File::CreateLink(Namespace* namespc, return (status == 0); } +bool File::CreatePipe(Namespace* namespc, File** readPipe, File** writePipe) { + int pipe_fds[2]; + int status = NO_RETRY_EXPECTED(pipe(pipe_fds)); + if (status != 0) { + return false; + } + *readPipe = OpenFD(pipe_fds[0]); + *writePipe = OpenFD(pipe_fds[1]); + return true; +} + File::Type File::GetType(Namespace* namespc, const char* pathname, bool follow_links) { diff --git a/runtime/bin/file_win.cc b/runtime/bin/file_win.cc index 22d57341e5e6..f7248d55ce73 100644 --- a/runtime/bin/file_win.cc +++ b/runtime/bin/file_win.cc @@ -314,14 +314,18 @@ File* File::FileOpenW(const wchar_t* system_name, FileOpenMode mode) { return NULL; } } + return OpenFD(fd); +} + +File* File::OpenFD(int fd) { return new File(new FileHandle(fd)); } class StringRAII { public: - explicit StringRAII(StringRAII& origin) { - own_ = origin.own_; - s_ = origin.release(); + explicit StringRAII(StringRAII* origin) { + own_ = origin->own_; + s_ = origin->release(); } explicit StringRAII(const char* s) : s_(s), own_(false) {} @@ -616,6 +620,17 @@ bool File::CreateLink(Namespace* namespc, return (create_status != 0); } +bool File::CreatePipe(Namespace* namespc, File** readPipe, File** writePipe) { + int pipe_fds[2]; + int status = _pipe(pipe_fds, 4096, _O_BINARY); + if (status != 0) { + return false; + } + *readPipe = OpenFD(pipe_fds[0]); + *writePipe = OpenFD(pipe_fds[1]); + return true; +} + bool File::Delete(Namespace* namespc, const char* name) { Utf8ToWideScope system_name(PrefixLongFilePath(name)); int status = _wremove(system_name.wide()); diff --git a/runtime/bin/io_natives.cc b/runtime/bin/io_natives.cc index 5258357b35ee..12e64805ce21 100644 --- a/runtime/bin/io_natives.cc +++ b/runtime/bin/io_natives.cc @@ -40,6 +40,7 @@ namespace bin { V(File_Copy, 3) \ V(File_Create, 3) \ V(File_CreateLink, 3) \ + V(File_CreatePipe, 1) \ V(File_Delete, 2) \ V(File_DeleteLink, 2) \ V(File_Exists, 2) \ diff --git a/runtime/bin/io_service.h b/runtime/bin/io_service.h index 17d020e2ce1d..e0577a8f6052 100644 --- a/runtime/bin/io_service.h +++ b/runtime/bin/io_service.h @@ -48,18 +48,19 @@ namespace bin { V(File, Identical, 28) \ V(File, Stat, 29) \ V(File, Lock, 30) \ - V(Socket, Lookup, 31) \ - V(Socket, ListInterfaces, 32) \ - V(Socket, ReverseLookup, 33) \ - V(Directory, Create, 34) \ - V(Directory, Delete, 35) \ - V(Directory, Exists, 36) \ - V(Directory, CreateTemp, 37) \ - V(Directory, ListStart, 38) \ - V(Directory, ListNext, 39) \ - V(Directory, ListStop, 40) \ - V(Directory, Rename, 41) \ - V(SSLFilter, ProcessFilter, 42) + V(File, CreatePipe, 31) \ + V(Socket, Lookup, 32) \ + V(Socket, ListInterfaces, 33) \ + V(Socket, ReverseLookup, 34) \ + V(Directory, Create, 35) \ + V(Directory, Delete, 36) \ + V(Directory, Exists, 37) \ + V(Directory, CreateTemp, 38) \ + V(Directory, ListStart, 39) \ + V(Directory, ListNext, 40) \ + V(Directory, ListStop, 41) \ + V(Directory, Rename, 42) \ + V(SSLFilter, ProcessFilter, 43) #define DECLARE_REQUEST(type, method, id) k##type##method##Request = id, diff --git a/runtime/tests/concurrency/stress_test_list.json b/runtime/tests/concurrency/stress_test_list.json index f0310dca16e3..368c35373460 100644 --- a/runtime/tests/concurrency/stress_test_list.json +++ b/runtime/tests/concurrency/stress_test_list.json @@ -3258,6 +3258,7 @@ "../../../tests/standalone/io/file_non_ascii_sync_test.dart", "../../../tests/standalone/io/file_non_ascii_test.dart", "../../../tests/standalone/io/file_output_stream_test.dart", + "../../../tests/standalone/io/file_pipe_test.dart", "../../../tests/standalone/io/file_read_encoded_test.dart", "../../../tests/standalone/io/file_stat_test.dart", "../../../tests/standalone/io/file_stream_test.dart", @@ -6580,6 +6581,7 @@ "../../../tests/standalone_2/io/file_non_ascii_sync_test.dart", "../../../tests/standalone_2/io/file_non_ascii_test.dart", "../../../tests/standalone_2/io/file_output_stream_test.dart", + "../../../tests/standalone_2/io/file_pipe_test.dart", "../../../tests/standalone_2/io/file_read_encoded_test.dart", "../../../tests/standalone_2/io/file_stat_test.dart", "../../../tests/standalone_2/io/file_stream_test.dart", diff --git a/sdk/lib/_internal/js_dev_runtime/patch/io_patch.dart b/sdk/lib/_internal/js_dev_runtime/patch/io_patch.dart index ac9561197939..31ba7d76e2e8 100644 --- a/sdk/lib/_internal/js_dev_runtime/patch/io_patch.dart +++ b/sdk/lib/_internal/js_dev_runtime/patch/io_patch.dart @@ -122,6 +122,11 @@ class _File { throw UnsupportedError("File._createLink"); } + @patch + static List _createPipe(_Namespace namespace) { + throw UnsupportedError("File._createPipe"); + } + @patch static _linkTarget(_Namespace namespace, Uint8List rawPath) { throw UnsupportedError("File._linkTarget"); @@ -541,6 +546,16 @@ class ResourceHandle { factory ResourceHandle.fromStdout(Stdout stdout) { throw UnsupportedError("ResourceHandle.fromStdout constructor"); } + + @patch + factory ResourceHandle.fromReadPipe(ReadPipe pipe) { + throw UnsupportedError("ResourceHandle.fromReadPipe constructor"); + } + + @patch + factory ResourceHandle.fromWritePipe(WritePipe pipe) { + throw UnsupportedError("ResourceHandle.fromWritePipe constructor"); + } } @patch diff --git a/sdk/lib/_internal/js_runtime/lib/io_patch.dart b/sdk/lib/_internal/js_runtime/lib/io_patch.dart index f94f37941d50..e7a6375a2f37 100644 --- a/sdk/lib/_internal/js_runtime/lib/io_patch.dart +++ b/sdk/lib/_internal/js_runtime/lib/io_patch.dart @@ -122,6 +122,11 @@ class _File { throw new UnsupportedError("File._createLink"); } + @patch + static List _createPipe(_Namespace namespace) { + throw UnsupportedError("File._createPipe"); + } + @patch static _linkTarget(_Namespace namespace, Uint8List path) { throw new UnsupportedError("File._linkTarget"); @@ -541,6 +546,16 @@ class ResourceHandle { factory ResourceHandle.fromStdout(Stdout stdout) { throw UnsupportedError("ResourceHandle.fromStdout constructor"); } + + @patch + factory ResourceHandle.fromReadPipe(ReadPipe pipe) { + throw UnsupportedError("ResourceHandle.fromReadPipe constructor"); + } + + @patch + factory ResourceHandle.fromWritePipe(WritePipe pipe) { + throw UnsupportedError("ResourceHandle.fromWritePipe constructor"); + } } @patch diff --git a/sdk/lib/_internal/vm/bin/file_patch.dart b/sdk/lib/_internal/vm/bin/file_patch.dart index a5ac789220fe..db8f000b8eea 100644 --- a/sdk/lib/_internal/vm/bin/file_patch.dart +++ b/sdk/lib/_internal/vm/bin/file_patch.dart @@ -18,6 +18,9 @@ class _File { external static _createLink( _Namespace namespace, Uint8List rawPath, String target); @patch + @pragma("vm:external-name", "File_CreatePipe") + external static List _createPipe(_Namespace namespace); + @patch @pragma("vm:external-name", "File_LinkTarget") external static _linkTarget(_Namespace namespace, Uint8List rawPath); @patch diff --git a/sdk/lib/_internal/vm/bin/socket_patch.dart b/sdk/lib/_internal/vm/bin/socket_patch.dart index bdab2feda51e..159aa8505947 100644 --- a/sdk/lib/_internal/vm/bin/socket_patch.dart +++ b/sdk/lib/_internal/vm/bin/socket_patch.dart @@ -756,7 +756,8 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject { "Address family not supported by protocol family, " // ...and then add some details. "sourceAddress.type must be ${InternetAddressType.unix} but was " - "${source.type}", address: address); + "${source.type}", + address: address); } connectionResult = socket.nativeCreateUnixDomainBindConnect( address.address, source.address, _Namespace._namespace); @@ -2592,6 +2593,16 @@ class ResourceHandle { factory ResourceHandle.fromStdout(Stdout stdout) { return _ResourceHandleImpl(stdout._fd); } + + factory ResourceHandle.fromReadPipe(ReadPipe pipe) { + _ReadPipe rp = pipe as _ReadPipe; + return ResourceHandle.fromFile(rp._openedFile!); + } + + factory ResourceHandle.fromWritePipe(WritePipe pipe) { + _WritePipe wp = pipe as _WritePipe; + return ResourceHandle.fromFile(wp._file); + } } @pragma("vm:entry-point") @@ -2622,6 +2633,14 @@ class _ResourceHandleImpl implements ResourceHandle { return _RawSocket(nativeSocket); } + _ReadPipe toReadPipe() { + return _ReadPipe(toFile()); + } + + _WritePipe toWritePipe() { + return _WritePipe(toFile()); + } + @pragma("vm:external-name", "ResourceHandleImpl_toRawDatagramSocket") external RawDatagramSocket toRawDatagramSocket(); diff --git a/sdk/lib/_internal/wasm/lib/io_patch.dart b/sdk/lib/_internal/wasm/lib/io_patch.dart index 2fa4e1827889..104219052840 100644 --- a/sdk/lib/_internal/wasm/lib/io_patch.dart +++ b/sdk/lib/_internal/wasm/lib/io_patch.dart @@ -121,6 +121,11 @@ class _File { throw new UnsupportedError("File._createLink"); } + @patch + static List _createPipe(_Namespace namespace) { + throw UnsupportedError("File._createPipe"); + } + @patch static _linkTarget(_Namespace namespace, Uint8List path) { throw new UnsupportedError("File._linkTarget"); @@ -540,6 +545,16 @@ class ResourceHandle { factory ResourceHandle.fromStdout(Stdout stdout) { throw UnsupportedError("ResourceHandle.fromStdout constructor"); } + + @patch + factory ResourceHandle.fromReadPipe(ReadPipe pipe) { + throw UnsupportedError("ResourceHandle.fromReadPipe constructor"); + } + + @patch + factory ResourceHandle.fromWritePipe(WritePipe pipe) { + throw UnsupportedError("ResourceHandle.fromWritePipe constructor"); + } } @patch diff --git a/sdk/lib/io/file.dart b/sdk/lib/io/file.dart index 41ced7ee0e2d..0b532a3d111e 100644 --- a/sdk/lib/io/file.dart +++ b/sdk/lib/io/file.dart @@ -442,6 +442,12 @@ abstract class File implements FileSystemEntity { /// In order to make sure that system resources are freed, the stream /// must be read to completion or the subscription on the stream must /// be cancelled. + /// + /// If [File] is a [named pipe](https://en.wikipedia.org/wiki/Named_pipe) + /// then the returned [Stream] will wait until the write side of the pipe + /// is closed before signaling "done". If there are no writers attached + /// to the pipe when it is opened, then [Stream.listen] will wait until + /// a writer opens the pipe. Stream> openRead([int? start, int? end]); /// Creates a new independent [IOSink] for the file. @@ -909,3 +915,61 @@ class FileSystemException implements IOException { return sb.toString(); } } + +/// The "read" end of an [Pipe] created by [Pipe.create]. +/// +/// The read stream will continue to listen until the "write" end of the +/// pipe (i.e. [Pipe.write]) is closed. +/// +/// ```dart +/// final pipe = await Pipe.create(); +/// pipe.read.transform(utf8.decoder).listen((data) { +/// print(data); +/// }, onDone: () => print('Done')); +/// ``` +abstract class ReadPipe implements Stream> {} + +/// The "write" end of an [Pipe] created by [Pipe.create]. +/// +/// ```dart +/// final pipe = await Pipe.create(); +/// pipe.write.add("Hello World!".codeUnits); +/// pipe.write.close(); +/// ``` +abstract class WritePipe implements IOSink {} + +/// An anonymous pipe that can be used to send data in a single direction i.e. +/// data written to [write] can be read using [read]. +/// +/// On macOS and Linux (excluding Android), either the [read] or [write] +/// portion of the pipe can be transmitted to another process and used for +/// interprocess communication. +/// +/// For example: +/// ```dart +/// final pipe = await Pipe.create(); +/// final socket = await RawSocket.connect(address, 0); +/// socket.sendMessage([ +/// SocketControlMessage.fromHandles( +/// [ResourceHandle.fromReadPipe(pipe.read)]) +/// ], 'Hello'.codeUnits); +/// pipe.write.add('Hello over pipe!'.codeUnits); +/// pipe.write.close(); +/// ``` +abstract class Pipe { + /// The read end of the [Pipe]. + ReadPipe get read; + + /// The write end of the [Pipe]. + WritePipe get write; + + // Create an anonymous pipe. + static Future create() { + return _Pipe.create(); + } + + /// Synchronously creates an anonymous pipe. + factory Pipe.createSync() { + return _Pipe.createSync(); + } +} diff --git a/sdk/lib/io/file_impl.dart b/sdk/lib/io/file_impl.dart index 8bb8f7332c93..688484e6873b 100644 --- a/sdk/lib/io/file_impl.dart +++ b/sdk/lib/io/file_impl.dart @@ -13,7 +13,7 @@ class _FileStream extends Stream> { // Information about the underlying file. String? _path; - late RandomAccessFile _openedFile; + RandomAccessFile? _openedFile; int _position; int? _end; final Completer _closeCompleter = new Completer(); @@ -31,6 +31,10 @@ class _FileStream extends Stream> { _FileStream.forStdin() : _position = 0; + _FileStream.forRandomAccessFile(RandomAccessFile f) + : _position = 0, + _openedFile = f; + StreamSubscription listen(void onData(Uint8List event)?, {Function? onError, void onDone()?, bool? cancelOnError}) { _controller = new StreamController( @@ -56,7 +60,7 @@ class _FileStream extends Stream> { _controller.close(); } - _openedFile.close().catchError(_controller.addError).whenComplete(done); + _openedFile!.close().catchError(_controller.addError).whenComplete(done); return _closeCompleter.future; } @@ -82,20 +86,27 @@ class _FileStream extends Stream> { return; } } - _openedFile.read(readBytes).then((block) { + _openedFile!.read(readBytes).then((block) { _readInProgress = false; if (_unsubscribed) { _closeFile(); return; } _position += block.length; - if (block.length < readBytes || (_end != null && _position == _end)) { + // read() may return less than `readBytes` if `_openFile` is a pipe or + // terminal or if a signal is received. Only a empty return indicates + // that the write side of the pipe is closed or that we are at the end + // of a file. + // See https://man7.org/linux/man-pages/man2/read.2.html + if (block.length == 0 || (_end != null && _position == _end)) { _atEnd = true; } if (!_atEnd && !_controller.isPaused) { _readBlock(); } - _controller.add(block); + if (block.length > 0) { + _controller.add(block); + } if (_atEnd) { _closeFile(); } @@ -141,7 +152,10 @@ class _FileStream extends Stream> { } final path = _path; - if (path != null) { + final openedFile = _openedFile; + if (openedFile != null) { + onOpenFile(openedFile); + } else if (path != null) { new File(path) .open(mode: FileMode.read) .then(onOpenFile, onError: openFailed); @@ -166,6 +180,9 @@ class _FileStreamConsumer extends StreamConsumer> { _FileStreamConsumer.fromStdio(int fd) : _openFuture = new Future.value(_File._openStdioSync(fd)); + _FileStreamConsumer.fromRandomAccessFile(RandomAccessFile f) + : _openFuture = Future.value(f); + Future addStream(Stream> stream) { Completer completer = new Completer.sync(); _openFuture.then((openedFile) { @@ -260,6 +277,8 @@ class _File extends FileSystemEntity implements File { external static _createLink( _Namespace namespace, Uint8List rawPath, String target); + external static List _createPipe(_Namespace namespace); + external static _linkTarget(_Namespace namespace, Uint8List rawPath); void createSync({bool recursive = false, bool exclusive = false}) { @@ -1045,3 +1064,43 @@ class _RandomAccessFile implements RandomAccessFile { } } } + +class _ReadPipe extends _FileStream implements ReadPipe { + _ReadPipe(RandomAccessFile file) : super.forRandomAccessFile(file); +} + +class _WritePipe extends _IOSinkImpl implements WritePipe { + RandomAccessFile _file; + _WritePipe(file) + : this._file = file, + super(_FileStreamConsumer.fromRandomAccessFile(file), utf8); +} + +class _Pipe implements Pipe { + final ReadPipe _readPipe; + final WritePipe _writePipe; + + ReadPipe get read => _readPipe; + WritePipe get write => _writePipe; + + _Pipe(this._readPipe, this._writePipe); + + static Future<_Pipe> create() { + final completer = Completer<_Pipe>.sync(); + + _File._dispatchWithNamespace(_IOService.fileCreatePipe, [null]) + .then((response) { + final filePointers = (response as List).cast(); + completer.complete(_Pipe( + _ReadPipe(_RandomAccessFile(filePointers[0], '')), + _WritePipe(_RandomAccessFile(filePointers[1], '')))); + }); + return completer.future; + } + + factory _Pipe.createSync() { + final filePointers = _File._createPipe(_Namespace._namespace); + return _Pipe(_ReadPipe(_RandomAccessFile(filePointers[0] as int, '')), + _WritePipe(_RandomAccessFile(filePointers[1] as int, ''))); + } +} diff --git a/sdk/lib/io/io_service.dart b/sdk/lib/io/io_service.dart index 01b9e673ee9b..707b9fa6056d 100644 --- a/sdk/lib/io/io_service.dart +++ b/sdk/lib/io/io_service.dart @@ -37,18 +37,19 @@ class _IOService { static const int fileIdentical = 28; static const int fileStat = 29; static const int fileLock = 30; - static const int socketLookup = 31; - static const int socketListInterfaces = 32; - static const int socketReverseLookup = 33; - static const int directoryCreate = 34; - static const int directoryDelete = 35; - static const int directoryExists = 36; - static const int directoryCreateTemp = 37; - static const int directoryListStart = 38; - static const int directoryListNext = 39; - static const int directoryListStop = 40; - static const int directoryRename = 41; - static const int sslProcessFilter = 42; + static const int fileCreatePipe = 31; + static const int socketLookup = 32; + static const int socketListInterfaces = 33; + static const int socketReverseLookup = 34; + static const int directoryCreate = 35; + static const int directoryDelete = 36; + static const int directoryExists = 37; + static const int directoryCreateTemp = 38; + static const int directoryListStart = 39; + static const int directoryListNext = 40; + static const int directoryListStop = 41; + static const int directoryRename = 42; + static const int sslProcessFilter = 43; external static Future _dispatch(int request, List data); } diff --git a/sdk/lib/io/socket.dart b/sdk/lib/io/socket.dart index 488441475521..a660e885d143 100644 --- a/sdk/lib/io/socket.dart +++ b/sdk/lib/io/socket.dart @@ -869,9 +869,16 @@ abstract class ResourceHandle { /// Creates wrapper around current stdout. external factory ResourceHandle.fromStdout(Stdout stdout); + // Creates wrapper around a readable pipe. + external factory ResourceHandle.fromReadPipe(ReadPipe pipe); + + // Creates wrapper around a writeable pipe. + external factory ResourceHandle.fromWritePipe(WritePipe pipe); + /// Extracts opened file from resource handle. /// - /// This can also be used when receiving stdin and stdout handles. + /// This can also be used when receiving stdin and stdout handles and read + /// and write pipes. /// /// If this resource handle is not a file or stdio handle, the behavior of the /// returned [RandomAccessFile] is completely unspecified. @@ -898,6 +905,20 @@ abstract class ResourceHandle { /// the returned [RawDatagramSocket] is completely unspecified. /// Be very careful to avoid using a handle incorrectly. RawDatagramSocket toRawDatagramSocket(); + + /// Extracts a read pipe from resource handle. + /// + /// If this resource handle is not a readable pipe, the behavior of the + /// returned [ReadPipe] is completely unspecified. + /// Be very careful to avoid using a handle incorrectly. + ReadPipe toReadPipe(); + + /// Extracts a write pipe from resource handle. + /// + /// If this resource handle is not a writeable pipe, the behavior of the + /// returned [ReadPipe] is completely unspecified. + /// Be very careful to avoid using a handle incorrectly. + WritePipe toWritePipe(); } /// Control message part of the [SocketMessage] received by a call to diff --git a/tests/standalone/io/file_pipe_test.dart b/tests/standalone/io/file_pipe_test.dart new file mode 100644 index 000000000000..e36f91f534c7 --- /dev/null +++ b/tests/standalone/io/file_pipe_test.dart @@ -0,0 +1,48 @@ +// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. +// +// Dart test program for testing the dart:io `Pipe` class + +import 'dart:io'; + +import "package:async_helper/async_helper.dart"; +import "package:expect/expect.dart"; + +testReadFromClosedPipe() async { + final pipe = await Pipe.create(); + pipe.write.close(); + Expect.isTrue(await pipe.read.isEmpty); +} + +testCreateSync() async { + final pipe = Pipe.createSync(); + pipe.write.close(); + Expect.isTrue(await pipe.read.isEmpty); +} + +testMultipleWritesAndReads() async { + final pipe = await Pipe.create(); + int count = 0; + pipe.write.add([count]); + await pipe.read.listen((event) { + Expect.listEquals([count], event); + ++count; + if (count < 10) { + pipe.write.add([count]); + } else { + pipe.write.close(); + } + }, onDone: () => Expect.equals(10, count)); +} + +main() async { + asyncStart(); + try { + await testReadFromClosedPipe(); + await testCreateSync(); + await testMultipleWritesAndReads(); + } finally { + asyncEnd(); + } +} diff --git a/tests/standalone/io/file_stream_test.dart b/tests/standalone/io/file_stream_test.dart index 2d3577e8cb77..5d9ddcb8a2e8 100644 --- a/tests/standalone/io/file_stream_test.dart +++ b/tests/standalone/io/file_stream_test.dart @@ -58,7 +58,27 @@ void testStreamIsEmpty() { }); } -void main() { +Future testStreamAppendedToAfterOpen() async { + asyncStart(); + + final pipe = Pipe.createSync(); + pipe.write.add("Hello World".codeUnits); + int i = 0; + await pipe.read.listen((event) { + Expect.listEquals("Hello World".codeUnits, event); + if (i < 10) { + pipe.write.add("Hello World".codeUnits); + ++i; + } else { + pipe.write.close(); + } + }).asFuture(); + + asyncEnd(); +} + +void main() async { testPauseResumeCancelStream(); testStreamIsEmpty(); + await testStreamAppendedToAfterOpen(); } diff --git a/tests/standalone/io/unix_socket_test.dart b/tests/standalone/io/unix_socket_test.dart index 1c208b0d79b3..f9764d831b65 100644 --- a/tests/standalone/io/unix_socket_test.dart +++ b/tests/standalone/io/unix_socket_test.dart @@ -828,6 +828,140 @@ Future testStdioMessage(String tempDirPath, {bool caller = false}) async { }); } +Future testReadPipeMessage(String uniqueName) async { + if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) { + return; + } + final address = + InternetAddress('$uniqueName/sock', type: InternetAddressType.unix); + final server = await RawServerSocket.bind(address, 0, shared: false); + + server.listen((RawSocket socket) async { + socket.listen((e) async { + switch (e) { + case RawSocketEvent.read: + final SocketMessage? message = socket.readMessage(); + if (message == null) { + return; + } + Expect.equals('Hello', String.fromCharCodes(message.data)); + Expect.equals(1, message.controlMessages.length); + final SocketControlMessage controlMessage = + message.controlMessages[0]; + final handles = controlMessage.extractHandles(); + Expect.isNotNull(handles); + Expect.equals(1, handles.length); + final receivedPipe = handles[0].toReadPipe(); + Expect.equals('Hello over pipe!', + await receivedPipe.transform(utf8.decoder).join()); + socket.write('server replied'.codeUnits); + break; + case RawSocketEvent.readClosed: + socket.close(); + server.close(); + break; + } + }); + }); + + final RawServerSocket testServer = await createTestServer(); + final testPipe = await Pipe.create(); + + // Send a message containing an open pipe. + final socket = await RawSocket.connect(address, 0); + socket.listen((e) { + switch (e) { + case RawSocketEvent.write: + socket.sendMessage([ + SocketControlMessage.fromHandles( + [ResourceHandle.fromReadPipe(testPipe.read)]) + ], 'Hello'.codeUnits); + testPipe.write.add('Hello over pipe!'.codeUnits); + testPipe.write.close(); + break; + case RawSocketEvent.read: + final data = socket.read(); + if (data == null) { + return; + } + + final dataString = String.fromCharCodes(data); + Expect.equals('server replied', dataString); + socket.close(); + testPipe.write.close(); + testServer.close(); + } + }); +} + +Future testWritePipeMessage(String uniqueName) async { + if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) { + return; + } + final address = + InternetAddress('$uniqueName/sock', type: InternetAddressType.unix); + final server = await RawServerSocket.bind(address, 0, shared: false); + + server.listen((RawSocket socket) async { + socket.listen((e) async { + switch (e) { + case RawSocketEvent.read: + final SocketMessage? message = socket.readMessage(); + if (message == null) { + return; + } + Expect.equals('Hello', String.fromCharCodes(message.data)); + Expect.equals(1, message.controlMessages.length); + final SocketControlMessage controlMessage = + message.controlMessages[0]; + final handles = controlMessage.extractHandles(); + Expect.isNotNull(handles); + Expect.equals(1, handles.length); + final receivedPipe = handles[0].toWritePipe(); + + receivedPipe.add('Hello over pipe!'.codeUnits); + receivedPipe.close(); + socket.write('server replied'.codeUnits); + break; + case RawSocketEvent.readClosed: + socket.close(); + server.close(); + break; + } + }); + }); + + final RawServerSocket testServer = await createTestServer(); + final testPipe = await Pipe.create(); + + // Send a message containing an open pipe. + final socket = await RawSocket.connect(address, 0); + socket.listen((e) async { + switch (e) { + case RawSocketEvent.write: + socket.sendMessage([ + SocketControlMessage.fromHandles( + [ResourceHandle.fromWritePipe(testPipe.write)]) + ], 'Hello'.codeUnits); + + Expect.equals('Hello over pipe!', + await testPipe.read.transform(utf8.decoder).join()); + break; + case RawSocketEvent.read: + final data = socket.read(); + if (data == null) { + return; + } + + final dataString = String.fromCharCodes(data); + Expect.equals('server replied', dataString); + socket.close(); + testPipe.write.close(); + testServer.close(); + } + }); +} + Future testDeleteFile(String tempDirPath) async { if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) { return; @@ -954,6 +1088,12 @@ void main(List args) async { await withTempDir('unix_socket_test', (Directory dir) async { await testStdioMessage('${dir.path}', caller: true); }); + await withTempDir('unix_socket_test', (Directory dir) async { + await testReadPipeMessage('${dir.path}'); + }); + await withTempDir('unix_socket_test', (Directory dir) async { + await testWritePipeMessage('${dir.path}'); + }); await withTempDir('unix_socket_test', (Directory dir) async { await testDeleteFile('${dir.path}'); }); diff --git a/tests/standalone_2/io/file_pipe_test.dart b/tests/standalone_2/io/file_pipe_test.dart new file mode 100644 index 000000000000..7a97d620a924 --- /dev/null +++ b/tests/standalone_2/io/file_pipe_test.dart @@ -0,0 +1,50 @@ +// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. +// +// Dart test program for testing the dart:io `Pipe` class + +// @dart = 2.9 + +import 'dart:io'; + +import "package:async_helper/async_helper.dart"; +import "package:expect/expect.dart"; + +testReadFromClosedPipe() async { + final pipe = await Pipe.create(); + pipe.write.close(); + Expect.isTrue(await pipe.read.isEmpty); +} + +testCreateSync() async { + final pipe = Pipe.createSync(); + pipe.write.close(); + Expect.isTrue(await pipe.read.isEmpty); +} + +testMultipleWritesAndReads() async { + final pipe = await Pipe.create(); + int count = 0; + pipe.write.add([count]); + await pipe.read.listen((event) { + Expect.listEquals([count], event); + ++count; + if (count < 10) { + pipe.write.add([count]); + } else { + pipe.write.close(); + } + }, onDone: () => Expect.equals(10, count)); +} + +main() async { + asyncStart(); + try { + await testReadFromClosedPipe(); + await testCreateSync(); + await testMultipleWritesAndReads(); + } finally { + asyncEnd(); + } +} diff --git a/tests/standalone_2/io/file_stream_test.dart b/tests/standalone_2/io/file_stream_test.dart index 64c9d416856c..373d6e24d84e 100644 --- a/tests/standalone_2/io/file_stream_test.dart +++ b/tests/standalone_2/io/file_stream_test.dart @@ -60,7 +60,27 @@ void testStreamIsEmpty() { }); } -void main() { +Future testStreamAppendedToAfterOpen() async { + asyncStart(); + + final pipe = Pipe.createSync(); + pipe.write.add("Hello World".codeUnits); + int i = 0; + await pipe.read.listen((event) { + Expect.listEquals("Hello World".codeUnits, event); + if (i < 10) { + pipe.write.add("Hello World".codeUnits); + ++i; + } else { + pipe.write.close(); + } + }).asFuture(); + + asyncEnd(); +} + +void main() async { testPauseResumeCancelStream(); testStreamIsEmpty(); + await testStreamAppendedToAfterOpen(); } diff --git a/tests/standalone_2/io/unix_socket_test.dart b/tests/standalone_2/io/unix_socket_test.dart index 428a2194b359..c51d6122475a 100644 --- a/tests/standalone_2/io/unix_socket_test.dart +++ b/tests/standalone_2/io/unix_socket_test.dart @@ -738,7 +738,7 @@ Future testSocketMessage(String uniqueName) async { }); } -Future testStdioMessage(String tempDirPath, {bool caller: false}) async { +Future testStdioMessage(String tempDirPath, {bool caller = false}) async { if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) { return; } @@ -831,6 +831,140 @@ Future testStdioMessage(String tempDirPath, {bool caller: false}) async { return completer.future; } +Future testReadPipeMessage(String uniqueName) async { + if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) { + return; + } + final address = + InternetAddress('$uniqueName/sock', type: InternetAddressType.unix); + final server = await RawServerSocket.bind(address, 0, shared: false); + + server.listen((RawSocket socket) async { + socket.listen((e) async { + switch (e) { + case RawSocketEvent.read: + final SocketMessage message = socket.readMessage(); + if (message == null) { + return; + } + Expect.equals('Hello', String.fromCharCodes(message.data)); + Expect.equals(1, message.controlMessages.length); + final SocketControlMessage controlMessage = + message.controlMessages[0]; + final handles = controlMessage.extractHandles(); + Expect.isNotNull(handles); + Expect.equals(1, handles.length); + final receivedPipe = handles[0].toReadPipe(); + Expect.equals('Hello over pipe!', + await receivedPipe.transform(utf8.decoder).join()); + socket.write('server replied'.codeUnits); + break; + case RawSocketEvent.readClosed: + socket.close(); + server.close(); + break; + } + }); + }); + + final RawServerSocket testServer = await createTestServer(); + final testPipe = await Pipe.create(); + + // Send a message containing an open pipe. + final socket = await RawSocket.connect(address, 0); + socket.listen((e) { + switch (e) { + case RawSocketEvent.write: + socket.sendMessage([ + SocketControlMessage.fromHandles( + [ResourceHandle.fromReadPipe(testPipe.read)]) + ], 'Hello'.codeUnits); + testPipe.write.add('Hello over pipe!'.codeUnits); + testPipe.write.close(); + break; + case RawSocketEvent.read: + final data = socket.read(); + if (data == null) { + return; + } + + final dataString = String.fromCharCodes(data); + Expect.equals('server replied', dataString); + socket.close(); + testPipe.write.close(); + testServer.close(); + } + }); +} + +Future testWritePipeMessage(String uniqueName) async { + if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) { + return; + } + final address = + InternetAddress('$uniqueName/sock', type: InternetAddressType.unix); + final server = await RawServerSocket.bind(address, 0, shared: false); + + server.listen((RawSocket socket) async { + socket.listen((e) async { + switch (e) { + case RawSocketEvent.read: + final SocketMessage message = socket.readMessage(); + if (message == null) { + return; + } + Expect.equals('Hello', String.fromCharCodes(message.data)); + Expect.equals(1, message.controlMessages.length); + final SocketControlMessage controlMessage = + message.controlMessages[0]; + final handles = controlMessage.extractHandles(); + Expect.isNotNull(handles); + Expect.equals(1, handles.length); + final receivedPipe = handles[0].toWritePipe(); + + receivedPipe.add('Hello over pipe!'.codeUnits); + receivedPipe.close(); + socket.write('server replied'.codeUnits); + break; + case RawSocketEvent.readClosed: + socket.close(); + server.close(); + break; + } + }); + }); + + final RawServerSocket testServer = await createTestServer(); + final testPipe = await Pipe.create(); + + // Send a message containing an open pipe. + final socket = await RawSocket.connect(address, 0); + socket.listen((e) async { + switch (e) { + case RawSocketEvent.write: + socket.sendMessage([ + SocketControlMessage.fromHandles( + [ResourceHandle.fromWritePipe(testPipe.write)]) + ], 'Hello'.codeUnits); + + Expect.equals('Hello over pipe!', + await testPipe.read.transform(utf8.decoder).join()); + break; + case RawSocketEvent.read: + final data = socket.read(); + if (data == null) { + return; + } + + final dataString = String.fromCharCodes(data); + Expect.equals('server replied', dataString); + socket.close(); + testPipe.write.close(); + testServer.close(); + } + }); +} + Future testDeleteFile(String tempDirPath) async { if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) { return; @@ -955,6 +1089,12 @@ void main(List args) async { await withTempDir('unix_socket_test', (Directory dir) async { await testSocketMessage('${dir.path}'); }); + await withTempDir('unix_socket_test', (Directory dir) async { + await testReadPipeMessage('${dir.path}'); + }); + await withTempDir('unix_socket_test', (Directory dir) async { + await testWritePipeMessage('${dir.path}'); + }); await withTempDir('unix_socket_test', (Directory dir) async { await testStdioMessage('${dir.path}', caller: true); });