Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debounce the proxied connection over proxied devices. #124540

Merged
merged 3 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/flutter_tools/lib/src/commands/daemon.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import '../emulator.dart';
import '../features.dart';
import '../globals.dart' as globals;
import '../project.dart';
import '../proxied_devices/debounce_data_stream.dart';
import '../proxied_devices/file_transfer.dart';
import '../resident_runner.dart';
import '../run_cold.dart';
Expand Down Expand Up @@ -1416,7 +1417,7 @@ class ProxyDomain extends Domain {
}

_forwardedConnections[id] = socket;
socket.listen((List<int> data) {
debounceDataStream(socket).listen((List<int> data) {
sendEvent('proxy.data.$id', null, data);
}, onError: (Object error, StackTrace stackTrace) {
// Socket error, probably disconnected.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:async';
import 'dart:typed_data';

/// Merges the values in a stream that are sent less than [duration] apart.
///
/// To minimize latency, the merged stream will always emit the first value that
/// is sent after a pause of at least [duration] long. After the first message,
/// all values that are sent within [duration] will be merged into one.
Stream<Uint8List> debounceDataStream(Stream<Uint8List> stream, [Duration duration = const Duration(milliseconds: 100)]) {
final StreamController<Uint8List> controller = StreamController<Uint8List>();
final BytesBuilder buffer = BytesBuilder(copy: false);

bool isDone = false;
Timer? timer;

// Called when timer triggers, sends out the buffered messages.
void onTimer() {
if (buffer.isNotEmpty) {
controller.add(buffer.toBytes());
buffer.clear();
if (isDone) {
controller.close();
} else {
// Start another timer even if we have nothing to send right now, so
// that outgoing messages are at least [duration] apart.
timer = Timer(duration, onTimer);
}
} else {
timer = null;
}
}

controller.onListen = () {
final StreamSubscription<Uint8List> subscription = stream.listen((Uint8List data) {
if (timer == null) {
controller.add(data);
// Start the timer to make sure that the next message is at least [duration] apart.
timer = Timer(duration, onTimer);
} else {
buffer.add(data);
}
}, onError: (Object error, StackTrace stackTrace) {
// Forward the error.
controller.addError(error, stackTrace);
}, onDone: () {
isDone = true;
// Delay closing the channel if we still have buffered data.
if (timer == null) {
controller.close();
}
});

controller.onCancel = () {
subscription.cancel();
};
};

return controller.stream;
}
3 changes: 2 additions & 1 deletion packages/flutter_tools/lib/src/proxied_devices/devices.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import '../daemon.dart';
import '../device.dart';
import '../device_port_forwarder.dart';
import '../project.dart';
import 'debounce_data_stream.dart';
import 'file_transfer.dart';

bool _isNullable<T>() => null is T;
Expand Down Expand Up @@ -536,7 +537,7 @@ class ProxiedPortForwarder extends DevicePortForwarder {
// Do nothing here.
},
));
socket.listen((Uint8List data) {
debounceDataStream(socket).listen((Uint8List data) {
unawaited(connection.sendRequest('proxy.write', <String, Object>{
'id': id,
}, data).then(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:async';
import 'dart:typed_data';

import 'package:fake_async/fake_async.dart';
import 'package:flutter_tools/src/proxied_devices/debounce_data_stream.dart';

import '../../src/common.dart';

void main() {
group('debounceDataStreams', () {
late FakeAsync fakeAsync;
late StreamController<Uint8List> source;
late Stream<Uint8List> output;
const Duration debounceDuration = Duration(seconds: 10);
const Duration smallDuration = Duration(milliseconds: 10);

void addToSource(int value) {
source.add(Uint8List.fromList(<int>[value]));
}

setUp(() {
fakeAsync = FakeAsync();
fakeAsync.run((FakeAsync time) {
source = StreamController<Uint8List>();
output = debounceDataStream(source.stream, debounceDuration);
});
});

testWithoutContext('does not listen if returned stream is not listened to', () {
expect(source.hasListener, false);
output.listen(dummy);
expect(source.hasListener, true);
});

testWithoutContext('forwards data normally is all data if longer than duration apart', () {
fakeAsync.run((FakeAsync time) {
final List<Uint8List> outputItems = <Uint8List>[];
output.listen(outputItems.add);

addToSource(1);
time.elapse(debounceDuration + smallDuration);
addToSource(2);
time.elapse(debounceDuration + smallDuration);
addToSource(3);
time.elapse(debounceDuration + smallDuration);

expect(outputItems, <List<int>>[
<int>[1],
<int>[2],
<int>[3],
]);
});
});

testWithoutContext('merge data after the first if sent within duration', () {
fakeAsync.run((FakeAsync time) {
final List<Uint8List> outputItems = <Uint8List>[];
output.listen(outputItems.add);

addToSource(1);
time.elapse(smallDuration);
addToSource(2);
time.elapse(smallDuration);
addToSource(3);
time.elapse(debounceDuration + smallDuration);

expect(outputItems, <List<int>>[
<int>[1],
<int>[2, 3],
]);
});
});

testWithoutContext('output data in separate chunks if time between them is longer than duration', () {
fakeAsync.run((FakeAsync time) {
final List<Uint8List> outputItems = <Uint8List>[];
output.listen(outputItems.add);

addToSource(1);
time.elapse(smallDuration);
addToSource(2);
time.elapse(smallDuration);
addToSource(3);
time.elapse(debounceDuration + smallDuration);
addToSource(4);
time.elapse(smallDuration);
addToSource(5);
time.elapse(debounceDuration + smallDuration);

expect(outputItems, <List<int>>[
<int>[1],
<int>[2, 3],
<int>[4, 5],
]);
});
});

testWithoutContext('sends the last chunk after debounce duration', () {
fakeAsync.run((FakeAsync time) {
final List<Uint8List> outputItems = <Uint8List>[];
output.listen(outputItems.add);

addToSource(1);
time.flushMicrotasks();
expect(outputItems, <List<int>>[<int>[1]]);

time.elapse(smallDuration);
addToSource(2);
time.elapse(smallDuration);
addToSource(3);
expect(outputItems, <List<int>>[<int>[1]]);

time.elapse(debounceDuration + smallDuration);
expect(outputItems, <List<int>>[
<int>[1],
<int>[2, 3],
]);
});
});

testWithoutContext('close if source stream is closed', () {
fakeAsync.run((FakeAsync time) {
bool isDone = false;
output.listen(dummy, onDone: () => isDone = true);
expect(isDone, false);
source.close();
time.flushMicrotasks();
expect(isDone, true);
});
});

testWithoutContext('delay close until after last chunk is sent', () {
fakeAsync.run((FakeAsync time) {
final List<Uint8List> outputItems = <Uint8List>[];
bool isDone = false;
output.listen(outputItems.add, onDone: () => isDone = true);

addToSource(1);
time.flushMicrotasks();
expect(outputItems, <List<int>>[<int>[1]]);

addToSource(2);
source.close();
time.elapse(smallDuration);
expect(isDone, false);
expect(outputItems, <List<int>>[<int>[1]]);

time.elapse(debounceDuration + smallDuration);
expect(outputItems, <List<int>>[
<int>[1],
<int>[2],
]);
expect(isDone, true);
});
});

testWithoutContext('close if returned stream is closed', () {
fakeAsync.run((FakeAsync time) {
bool isCancelled = false;
source.onCancel = () => isCancelled = true;
final StreamSubscription<Uint8List> subscription = output.listen(dummy);
expect(isCancelled, false);
subscription.cancel();
expect(isCancelled, true);
});
});
});
}

Uint8List dummy(Uint8List data) => data;