Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dart should support anonymous pipes #49917

Closed
brianquinlan opened this issue Sep 7, 2022 · 9 comments
Closed

Dart should support anonymous pipes #49917

brianquinlan opened this issue Sep 7, 2022 · 9 comments
Assignees
Labels
area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. library-io

Comments

@brianquinlan
Copy link
Contributor

brianquinlan commented Sep 7, 2022

Dart should support anonymous pipes (e.g. as returned by the pipe system call).

Requested by @robert-ancell in #47310.

@brianquinlan brianquinlan added area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. library-io labels Sep 7, 2022
@brianquinlan brianquinlan self-assigned this Sep 7, 2022
@brianquinlan
Copy link
Contributor Author

brianquinlan commented Sep 7, 2022

How does this look as an API?

/// An `AnonymousPipe` holds two [RandomAccessFile]s that each allow unidirectional data flow
/// i.e. data written to [writePipe] can be read from [readPipe]. Creating an [AnonymousPipe] is
/// possible on Windows but, since there is no way to transmit it to another process, probably not
/// useful.
///
/// Either the [readPipe] or [writePipe] can be transmitted to a another process and used for interprocess
/// communication.
///
/// For example:
///
/// ```dart
/// import 'dart:async';
/// import 'dart:io';
///
/// void main() async {
///   final address =
///      InternetAddress('sock', type: InternetAddressType.unix);
///   final socket = await RawSocket.connect(address, 0);
///.  final pipe = await AnonymousPipe.create();
///   
///   await socket.sendMessage(<SocketControlMessage>[
///        SocketControlMessage.fromHandles(
///            <ResourceHandle>[ResourceHandle.fromFile(pipe.readPipe)])
///      ], 'Hi'.codeUnits);
///   await pipe.write.writeString("Message");
/// }
/// ```
abstract class AnonymousPipe {
  RandomAccessFile get readPipe;
  RandomAccessFile get writePipe;

  static Future<AnonymousPipe> create();
  static AnonymousPipe createSync();
}

I'd actual prefer to not create a new class for this. Does anyone have an alternative API suggestion?

@lrhn
Copy link
Member

lrhn commented Sep 8, 2022

Wait for records and use ({RandomAccessFile in, RandomAccessFile out})? 😉

For naming, why is it Anonymous? Do we expect a NamedPipe too?
The Pipe on the getters also seem unnecessary.

Consider:

abstract class Pipe {
  RandomAccessFile get read; // Would use "in" and "out", but "in" is a reserved word.
  RandomAccessFile get write;
  external factory Pipe.createSync();
  externals static Pipe create();
}

Is RandomAccessFile the best abstraction? It seems to assume being backed by a single file system file, since you can ask for the length. I guess that can be simulated by providing "bytes written/received so far".
What happens if you read from the write pipe, or write to the read pipe?
Do you need the distinction between a synchronous and an asynchronous write?

Even more alternatively, could it be a single RandomAccessFile, where reading and writing acts like the different ends of the same pipe.
If you want to give the file to someone else who should only read or write, you can wrap it in a "read-only"/"write-only" wrapper first.

Also consider a (Sink<Uint8List>, Stream<Uint8List>) pair instead. That way the two ends are limited to the in or out behaviors You can build abstractions on top of the Sink, and the Stream is directly supported by the languge, but you lose the ability to do synchronous read operations.

@a-siva
Copy link
Contributor

a-siva commented Sep 8, 2022

In the example above socket.sendMessage is used to communicate one end of the pipe to another process, but this does not work on Windows, what would be the option on Windows ?

Also @rmacnak-google points out that anonymous pipes created on Windows do not support non blocking IO (this maybe an issue for dart:io)

@brianquinlan
Copy link
Contributor Author

@lrhn

Thanks for the feedback!

Wait for records and use ({RandomAccessFile in, RandomAccessFile out})? 😉

I was thinking about it 😆

Is RandomAccessFile the best abstraction? It seems to assume being backed by a single file system file, since you can ask for the length.

I'm not sure what will happen on Windows but on Unix, it should return the number of bytes left to be read.

I guess that can be simulated by providing "bytes written/received so far". What happens if you read from the write pipe,
or write to the read pipe?

You get FileSystemException: writeFrom failed, path = '' (OS Error: Bad file descriptor, errno = 9) but that is consistent with doing something like:

f = await File('something').open(); // read is the default mode
await f.writeString('Hi'); // Same exception as above.

Do you need the distinction between a synchronous and an asynchronous write?

You mean for the create methods? We have that for File so I thought that I'd be consistent.

Even more alternatively, could it be a single RandomAccessFile, where reading and writing acts like the different ends of the same pipe. If you want to give the file to someone else who should only read or write, you can wrap it in a "read-only"/"write-only" wrapper first.

Also consider a (Sink<Uint8List>, Stream<Uint8List>) pair instead. That way the two ends are limited to the in or out behaviors You can build abstractions on top of the Sink, and the Stream is directly supported by the languge, but you lose the ability to do synchronous read operations.

I'd rather not introduce a new Sink subclass and a new Stream subclass.

@brianquinlan
Copy link
Contributor Author

In the example above socket.sendMessage is used to communicate one end of the pipe to another process, but this does not work on Windows, what would be the option on Windows ?

Yeah, I don't think that being able to create pipes would be useful on Windows.

Also @rmacnak-google points out that anonymous pipes created on Windows do not support non blocking IO (this maybe an issue for dart:io)

Isn't dart:io file io synchronous at the system call level?

@brianquinlan
Copy link
Contributor Author

brianquinlan commented Sep 8, 2022

@aam, @rmacnak-google seem to also prefer the Stream<Uint8List>, Sink<Uint8List> pair approach. I think that would end up looking like this:

abstract class ReadPipe implements Stream<Uint8List> {
  Future<int> unreadBytes(); // The number of unread bytes.
}

abstract class WritePipe implements Sink<Uint8List> {
}

abstract class Pipe {
  ReadPipe get read;  // Can't just be a Stream<Uint8List> because that won't work with ResourceHandle
  WritePipe get write; // Can't just be a Sink<Uint8List> because that won't work with ResourceHandle
  factory Pipe.createSync();
  static Future<Pipe> create();
}

// ResourceHandle needs two new methods to extract file descriptors from read/write pipes:
ResourceHandle.fromReadPipe(ReadPipe pipe);
ResourceHandle.fromWritePipe(WritePipe pipe);

IMO: This seems pretty heavy - we need two need classes and two new methods in ResourceHandle.

But I'm happy to go with that if it is more idiomatic.

@robert-ancell
Copy link
Contributor

robert-ancell commented Sep 8, 2022

Some of the proposed API is using the term 'pipe' for the pipe ends - this will be confusing. I suggest something like:

abstract class Pipe {
   RandomAccessFile reaEnd;
   RandomAccessFile writeEnd;
}

or

abstract class Pipe {
   PipeInput input;
   PipeOutput output;
}

@brianquinlan
Copy link
Contributor Author

brianquinlan commented Sep 9, 2022

@robert-ancell Ack. We won't do that.

@aam @rmacnak-google @lrhn : I implemented the proposal where the read end of the pipe is a Stream. The issue is that the Stream will be exhausted when there is no more data to read. That doesn't match my expectations on how people typically use pipes i.e. pipes are infinite streams and there should be a way to wait until new data appears.

UPDATE: I think that I can fix this behavior without changing the interface (which is described below)

The exact interface looked like:

abstract class ReadPipe implements Stream<List<int>> {
//  Future<int> unreadBytes(); // The number of unread bytes.
}

abstract class WritePipe implements IOSink {
}

abstract class Pipe {
  ReadPipe get read;
  WritePipe get write;

  static Future<Pipe> create() {
  	return _Pipe._create();
  }
}

My test code was:

void main() async {
  final pipe = await Pipe.create();
 pipe.read.listen((event) {
    print(event);
  }) // onDone is triggered immediately because no data was written to the pipe.
}

RandomAccessFile does not have this problem because you can just keep calling a read method until data arrives.

Options:

  1. switch back to read and write being RandomAccessFile instances.
  2. add a close method to ReadPipe and only trigger onDone when it is called.

@brianquinlan
Copy link
Contributor Author

Fixed in 2ead86a

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. library-io
Projects
None yet
Development

No branches or pull requests

4 participants