Skip to content

Commit

Permalink
[vm/io] Return data right away if it's ready.
Browse files Browse the repository at this point in the history
This fixes problem with directory watching on Windows where data might be accumulated before dart stream is setup.

Bug: dart-lang/sdk#37909

Change-Id: I601842522e76e4a6a4e6a22d6b376a49af200e6f
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/115801
Reviewed-by: Zichang Guo <zichangguo@google.com>
Commit-Queue: Alexander Aprelev <aam@google.com>
  • Loading branch information
aam authored and commit-bot@chromium.org committed Sep 9, 2019
1 parent 601d893 commit 95db62d
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 54 deletions.
28 changes: 22 additions & 6 deletions runtime/bin/eventhandler_win.cc
Original file line number Diff line number Diff line change
Expand Up @@ -403,15 +403,19 @@ bool DirectoryWatchHandle::IssueRead() {
return true;
}
OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
// Set up pending_read_ before ReadDirectoryChangesW because it might be
// needed in ReadComplete invoked on event loop thread right away if data is
// also ready right away.
pending_read_ = buffer;
ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
BOOL ok = ReadDirectoryChangesW(handle_, buffer->GetBufferStart(),
buffer->GetBufferSize(), recursive_, events_,
NULL, buffer->GetCleanOverlapped(), NULL);
if (ok || (GetLastError() == ERROR_IO_PENDING)) {
// Completing asynchronously.
pending_read_ = buffer;
return true;
}
pending_read_ = nullptr;
OverlappedBuffer::DisposeBuffer(buffer);
return false;
}
Expand Down Expand Up @@ -1103,7 +1107,8 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
// If out events (can write events) have been requested, and there
// are no pending writes, meaning any writes are already complete,
// post an out event immediately.
if ((events & (1 << kOutEvent)) != 0) {
intptr_t out_event_mask = 1 << kOutEvent;
if ((events & out_event_mask) != 0) {
if (!handle->HasPendingWrite()) {
if (handle->is_client_socket()) {
if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) {
Expand All @@ -1114,14 +1119,25 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
}
}
} else {
intptr_t event_mask = 1 << kOutEvent;
if ((handle->Mask() & event_mask) != 0) {
Dart_Port port = handle->NextNotifyDartPort(event_mask);
DartUtils::PostInt32(port, event_mask);
if ((handle->Mask() & out_event_mask) != 0) {
Dart_Port port = handle->NextNotifyDartPort(out_event_mask);
DartUtils::PostInt32(port, out_event_mask);
}
}
}
}
// Similarly, if in events (can read events) have been requested, and
// there is pending data available, post an in event immediately.
intptr_t in_event_mask = 1 << kInEvent;
if ((events & in_event_mask) != 0) {
if (handle->data_ready_ != nullptr &&
!handle->data_ready_->IsEmpty()) {
if ((handle->Mask() & in_event_mask) != 0) {
Dart_Port port = handle->NextNotifyDartPort(in_event_mask);
DartUtils::PostInt32(port, in_event_mask);
}
}
}
} else if (IS_COMMAND(msg->data, kShutdownReadCommand)) {
ASSERT(handle->is_client_socket());

Expand Down
4 changes: 4 additions & 0 deletions runtime/bin/file_system_watcher_win.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ intptr_t FileSystemWatcher::WatchPath(intptr_t id,

DirectoryWatchHandle* handle =
new DirectoryWatchHandle(dir, list_events, recursive);
// Issue a read directly, to be sure events are tracked from now on. This is
// okay, since in Dart, we create the socket and start reading immidiately.
handle->EnsureInitialized(EventHandler::delegate());
handle->IssueRead();
return reinterpret_cast<intptr_t>(handle);
}

Expand Down
113 changes: 65 additions & 48 deletions tests/standalone_2/io/file_system_watcher_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ void testWatchDeleteDir() {
var watcher = dir.watch(events: 0);

asyncStart();
var sub;
sub = watcher.listen((event) {
watcher.listen((event) {
if (event is FileSystemDeleteEvent) {
Expect.isTrue(event.path == dir.path);
}
Expand Down Expand Up @@ -298,9 +297,8 @@ void testWatchMoveSelf() {
var watcher = dir2.watch();

asyncStart();
var sub;
bool gotDelete = false;
sub = watcher.listen((event) {
watcher.listen((event) {
if (event is FileSystemDeleteEvent) {
Expect.isTrue(event.path.endsWith('dir'));
gotDelete = true;
Expand All @@ -319,68 +317,87 @@ testWatchConsistentModifiedFile() async {
// happens in a very short period of time the modifying event will be missed before the
// stream listen has been set up and the watcher will hang forever.
// Bug: https://github.com/dart-lang/sdk/issues/37233
// Bug: https://github.com/dart-lang/sdk/issues/37909
asyncStart();
ReceivePort receivePort = ReceivePort();
await Isolate.spawn(modifyFiles, receivePort.sendPort);

await for (var object in receivePort) {
Completer<bool> exiting = Completer<bool>();

Directory dir;
Completer<bool> modificationEventReceived = Completer<bool>();

StreamSubscription receiverSubscription;
SendPort workerSendPort;
receiverSubscription = receivePort.listen((object) async {
if (object == 'modification_started') {
var watcher = dir.watch();
var subscription;
// Wait for event and check the type
subscription = watcher.listen((data) async {
if (data is FileSystemModifyEvent) {
Expect.isTrue(data.path.endsWith('file'));
await subscription.cancel();
modificationEventReceived.complete(true);
}
});
return;
}
if (object == 'end') {
receivePort.close();
break;
await receiverSubscription.cancel();
exiting.complete(true);
return;
}
var sendPort = object[0];
var path = object[1];
var dir = new Directory(path);

sendPort.send('start');
// Delay some time to ensure that watcher is created when modification is running consistently.
await Future.delayed(Duration(milliseconds: 10));
var watcher = dir.watch();
var subscription;

// Wait for event and check the type
subscription = watcher.listen((data) {
if (data is FileSystemModifyEvent) {
Expect.isTrue(data.path.endsWith('file'));
subscription.cancel();
}
});
// init event
workerSendPort = object[0];
dir = new Directory(object[1]);
});

// Create a file to signal modifier isolate to stop modification and clean up temp directory.
var file = new File(join(dir.path, 'EventReceived'));
file.createSync();
sendPort.send('end');
}
Completer<bool> workerExitedCompleter = Completer();
RawReceivePort exitReceivePort = RawReceivePort((object) { workerExitedCompleter.complete(true); });
RawReceivePort errorReceivePort = RawReceivePort((object) { print('worker errored: $object'); });
await Isolate.spawn(modifyFiles, receivePort.sendPort, onExit: exitReceivePort.sendPort, onError: errorReceivePort.sendPort);

await modificationEventReceived.future;
workerSendPort.send('end');

await exiting.future;
await workerExitedCompleter.future;
exitReceivePort.close();
errorReceivePort.close();
asyncEnd();
}

void modifyFiles(SendPort sendPort) {
void modifyFiles(SendPort sendPort) async {
// Send sendPort back to listen for modification signal.
ReceivePort receivePort = ReceivePort();
var dir = Directory.systemTemp.createTempSync('dart_file_system_watcher');

// Create file within the directory and keep modifying.
var file = new File(join(dir.path, 'file'));
file.createSync();
bool done = false;
var subscription;
sendPort.send([receivePort.sendPort, dir.path]);
subscription = receivePort.listen((data) {
if (data == 'end') {
// Clean up the directory and files
dir.deleteSync(recursive: true);
sendPort.send('end');
subscription.cancel();
} else {
// This signal file is created once watcher isolate receives the event.
var signal = new File(join(dir.path, 'EventReceived'));
while (!signal.existsSync()) {
// Start modifying the file continuously before watcher start watching.
for (int i = 0; i < 100; i++) {
file.writeAsStringSync('a');
}
}
subscription = receivePort.listen((object) async {
if (object == 'end') {
await subscription.cancel();
done = true;
}
});
sendPort.send([receivePort.sendPort, dir.path]);
bool notificationSent = false;
while(!done) {
// Start modifying the file continuously before watcher start watching.
for (int i = 0; i < 100; i++) {
file.writeAsStringSync('a');
}
if (!notificationSent) {
sendPort.send('modification_started');
notificationSent = true;
}
await Future.delayed(Duration());
}
// Clean up the directory and files
dir.deleteSync(recursive: true);
sendPort.send('end');
}

void main() {
Expand Down

0 comments on commit 95db62d

Please sign in to comment.