Skip to content

Commit

Permalink
Debounce the proxied connection over proxied devices. (flutter#124540)
Browse files Browse the repository at this point in the history
Debounce the proxied connection over proxied devices.
  • Loading branch information
chingjun authored and exaby73 committed Apr 17, 2023
1 parent fd1100c commit 470a7e3
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 2 deletions.
3 changes: 2 additions & 1 deletion packages/flutter_tools/lib/src/commands/daemon.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import '../emulator.dart';
import '../features.dart';
import '../globals.dart' as globals;
import '../project.dart';
import '../proxied_devices/debounce_data_stream.dart';
import '../proxied_devices/file_transfer.dart';
import '../resident_runner.dart';
import '../run_cold.dart';
Expand Down Expand Up @@ -1463,7 +1464,7 @@ class ProxyDomain extends Domain {
}

_forwardedConnections[id] = socket;
socket.listen((List<int> data) {
debounceDataStream(socket).listen((List<int> data) {
sendEvent('proxy.data.$id', null, data);
}, onError: (Object error, StackTrace stackTrace) {
// Socket error, probably disconnected.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:async';
import 'dart:typed_data';

/// Merges the values in a stream that are sent less than [duration] apart.
///
/// To minimize latency, the merged stream will always emit the first value that
/// is sent after a pause of at least [duration] long. After the first message,
/// all values that are sent within [duration] will be merged into one.
Stream<Uint8List> debounceDataStream(Stream<Uint8List> stream, [Duration duration = const Duration(milliseconds: 100)]) {
final StreamController<Uint8List> controller = StreamController<Uint8List>();
final BytesBuilder buffer = BytesBuilder(copy: false);

bool isDone = false;
Timer? timer;

// Called when timer triggers, sends out the buffered messages.
void onTimer() {
if (buffer.isNotEmpty) {
controller.add(buffer.toBytes());
buffer.clear();
if (isDone) {
controller.close();
} else {
// Start another timer even if we have nothing to send right now, so
// that outgoing messages are at least [duration] apart.
timer = Timer(duration, onTimer);
}
} else {
timer = null;
}
}

controller.onListen = () {
final StreamSubscription<Uint8List> subscription = stream.listen((Uint8List data) {
if (timer == null) {
controller.add(data);
// Start the timer to make sure that the next message is at least [duration] apart.
timer = Timer(duration, onTimer);
} else {
buffer.add(data);
}
}, onError: (Object error, StackTrace stackTrace) {
// Forward the error.
controller.addError(error, stackTrace);
}, onDone: () {
isDone = true;
// Delay closing the channel if we still have buffered data.
if (timer == null) {
controller.close();
}
});

controller.onCancel = () {
subscription.cancel();
};
};

return controller.stream;
}
3 changes: 2 additions & 1 deletion packages/flutter_tools/lib/src/proxied_devices/devices.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import '../daemon.dart';
import '../device.dart';
import '../device_port_forwarder.dart';
import '../project.dart';
import 'debounce_data_stream.dart';
import 'file_transfer.dart';

bool _isNullable<T>() => null is T;
Expand Down Expand Up @@ -563,7 +564,7 @@ class ProxiedPortForwarder extends DevicePortForwarder {
// Do nothing here.
},
));
socket.listen((Uint8List data) {
debounceDataStream(socket).listen((Uint8List data) {
unawaited(connection.sendRequest('proxy.write', <String, Object>{
'id': id,
}, data).then(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'dart:async';
import 'dart:typed_data';

import 'package:fake_async/fake_async.dart';
import 'package:flutter_tools/src/proxied_devices/debounce_data_stream.dart';

import '../../src/common.dart';

void main() {
group('debounceDataStreams', () {
late FakeAsync fakeAsync;
late StreamController<Uint8List> source;
late Stream<Uint8List> output;
const Duration debounceDuration = Duration(seconds: 10);
const Duration smallDuration = Duration(milliseconds: 10);

void addToSource(int value) {
source.add(Uint8List.fromList(<int>[value]));
}

setUp(() {
fakeAsync = FakeAsync();
fakeAsync.run((FakeAsync time) {
source = StreamController<Uint8List>();
output = debounceDataStream(source.stream, debounceDuration);
});
});

testWithoutContext('does not listen if returned stream is not listened to', () {
expect(source.hasListener, false);
output.listen(dummy);
expect(source.hasListener, true);
});

testWithoutContext('forwards data normally is all data if longer than duration apart', () {
fakeAsync.run((FakeAsync time) {
final List<Uint8List> outputItems = <Uint8List>[];
output.listen(outputItems.add);

addToSource(1);
time.elapse(debounceDuration + smallDuration);
addToSource(2);
time.elapse(debounceDuration + smallDuration);
addToSource(3);
time.elapse(debounceDuration + smallDuration);

expect(outputItems, <List<int>>[
<int>[1],
<int>[2],
<int>[3],
]);
});
});

testWithoutContext('merge data after the first if sent within duration', () {
fakeAsync.run((FakeAsync time) {
final List<Uint8List> outputItems = <Uint8List>[];
output.listen(outputItems.add);

addToSource(1);
time.elapse(smallDuration);
addToSource(2);
time.elapse(smallDuration);
addToSource(3);
time.elapse(debounceDuration + smallDuration);

expect(outputItems, <List<int>>[
<int>[1],
<int>[2, 3],
]);
});
});

testWithoutContext('output data in separate chunks if time between them is longer than duration', () {
fakeAsync.run((FakeAsync time) {
final List<Uint8List> outputItems = <Uint8List>[];
output.listen(outputItems.add);

addToSource(1);
time.elapse(smallDuration);
addToSource(2);
time.elapse(smallDuration);
addToSource(3);
time.elapse(debounceDuration + smallDuration);
addToSource(4);
time.elapse(smallDuration);
addToSource(5);
time.elapse(debounceDuration + smallDuration);

expect(outputItems, <List<int>>[
<int>[1],
<int>[2, 3],
<int>[4, 5],
]);
});
});

testWithoutContext('sends the last chunk after debounce duration', () {
fakeAsync.run((FakeAsync time) {
final List<Uint8List> outputItems = <Uint8List>[];
output.listen(outputItems.add);

addToSource(1);
time.flushMicrotasks();
expect(outputItems, <List<int>>[<int>[1]]);

time.elapse(smallDuration);
addToSource(2);
time.elapse(smallDuration);
addToSource(3);
expect(outputItems, <List<int>>[<int>[1]]);

time.elapse(debounceDuration + smallDuration);
expect(outputItems, <List<int>>[
<int>[1],
<int>[2, 3],
]);
});
});

testWithoutContext('close if source stream is closed', () {
fakeAsync.run((FakeAsync time) {
bool isDone = false;
output.listen(dummy, onDone: () => isDone = true);
expect(isDone, false);
source.close();
time.flushMicrotasks();
expect(isDone, true);
});
});

testWithoutContext('delay close until after last chunk is sent', () {
fakeAsync.run((FakeAsync time) {
final List<Uint8List> outputItems = <Uint8List>[];
bool isDone = false;
output.listen(outputItems.add, onDone: () => isDone = true);

addToSource(1);
time.flushMicrotasks();
expect(outputItems, <List<int>>[<int>[1]]);

addToSource(2);
source.close();
time.elapse(smallDuration);
expect(isDone, false);
expect(outputItems, <List<int>>[<int>[1]]);

time.elapse(debounceDuration + smallDuration);
expect(outputItems, <List<int>>[
<int>[1],
<int>[2],
]);
expect(isDone, true);
});
});

testWithoutContext('close if returned stream is closed', () {
fakeAsync.run((FakeAsync time) {
bool isCancelled = false;
source.onCancel = () => isCancelled = true;
final StreamSubscription<Uint8List> subscription = output.listen(dummy);
expect(isCancelled, false);
subscription.cancel();
expect(isCancelled, true);
});
});
});
}

Uint8List dummy(Uint8List data) => data;

0 comments on commit 470a7e3

Please sign in to comment.