Skip to content

Commit

Permalink
disable isolates
Browse files Browse the repository at this point in the history
Disable isolates since they exacerbate the
memory growth problem. Add QueueExecutor
which adds deduplication to scheduled
jobs.

issue: #24
  • Loading branch information
greensopinion committed Jan 16, 2022
1 parent 1a7e211 commit 8a2ece5
Show file tree
Hide file tree
Showing 16 changed files with 327 additions and 47 deletions.
2 changes: 1 addition & 1 deletion lib/src/cache/image_tile_loading_cache.dart
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import 'dart:ui';

import '../executor/executor.dart';
import 'package:vector_tile_renderer/vector_tile_renderer.dart';

import '../executor/executor.dart';
import '../grid/renderer_pipeline.dart';
import '../tile_identity.dart';
import 'cache_stats.dart';
Expand Down
4 changes: 2 additions & 2 deletions lib/src/cache/vector_tile_loading_cache.dart
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class VectorTileLoadingCache {
Future<VectorTile> _loadTile(String source, String key, TileIdentity tile,
CancellationCallback cancelled) async {
final bytes = await _loadBytes(source, key, tile);
return _executor.submit(
Job('read bytes: $tile', _readTileBytes, bytes, cancelled: cancelled));
return _executor.submit(Job('read bytes: $tile', _readTileBytes, bytes,
cancelled: cancelled, deduplicationKey: 'decode bytes: $tile'));
}

Future<Uint8List> _loadBytes(
Expand Down
15 changes: 6 additions & 9 deletions lib/src/executor/executor.dart
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
import 'dart:async';
import 'dart:io';
import 'dart:math';

import 'package:flutter/foundation.dart';
import 'isolate_executor.dart';

import 'direct_executor.dart';
import 'pool_executor.dart';
import 'queue_executor.dart';

typedef CancellationCallback = bool Function();

class Job<Q, R> {
final String name;
final deduplicationKey;
final ComputeCallback<Q, R> computeFunction;
final Q value;
final CancellationCallback? cancelled;

Job(this.name, this.computeFunction, this.value, {this.cancelled});
Job(this.name, this.computeFunction, this.value,
{this.cancelled, required this.deduplicationKey});

bool get isCancelled => cancelled == null ? false : cancelled!();
}
Expand All @@ -35,6 +33,5 @@ class CancellationException implements Exception {
CancellationException();
}

Executor newExecutor() => kDebugMode
? IsolateExecutor()
: PoolExecutor(concurrency: max(Platform.numberOfProcessors - 2, 1));
Executor newExecutor() => QueueExecutor();
// kDebugMode ? IsolateExecutor() : PoolExecutor(concurrency: 2);
27 changes: 26 additions & 1 deletion lib/src/executor/isolate_executor.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ class IsolateExecutor extends Executor {
_stream = null;
}

bool hasJobWithDeduplicationKey(Job job) =>
job.deduplicationKey != null &&
_jobByKey.values.any(
(otherJob) => otherJob.job.deduplicationKey == job.deduplicationKey);

@override
bool get disposed => _disposed;

Expand All @@ -53,6 +58,7 @@ class IsolateExecutor extends Executor {
return await internalJob.completer.future;
} finally {
--_outstanding;
_jobByKey.remove(key);
}
}

Expand All @@ -64,6 +70,7 @@ class IsolateExecutor extends Executor {
void _submitOne() {
_queue.removeWhere((job) {
if (job.job.isCancelled) {
_jobByKey.remove(job.key);
job.completer.completeError(CancellationException());
return true;
}
Expand Down Expand Up @@ -117,7 +124,10 @@ class IsolateExecutor extends Executor {
} else if (result is _JobOutput) {
--_submitted;
final work = _jobByKey.remove(result.key);
work?.completer.complete(result.message);
if (work != null) {
work.completer.complete(result.message);
_completeWithDeduplication(work, result.message);
}
} else {
print('unexpected message: $result');
}
Expand All @@ -129,6 +139,21 @@ class IsolateExecutor extends Executor {
final thisKey = _keySeed++;
return '$thisKey';
}

void _completeWithDeduplication(_Job work, result) {
final deduplicationKey = work.job.deduplicationKey;
if (deduplicationKey != null) {
_queue.removeWhere((queued) {
if (!queued.job.isCancelled &&
queued.job.deduplicationKey == deduplicationKey) {
_jobByKey.remove(queued.key);
queued.completer.complete(result);
return true;
}
return false;
});
}
}
}

class _Job<Q, R> {
Expand Down
15 changes: 10 additions & 5 deletions lib/src/executor/pool_executor.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import 'package:flutter/foundation.dart';

import '../extensions.dart';
import 'executor.dart';
import 'isolate_executor.dart';

Expand Down Expand Up @@ -27,9 +26,15 @@ class PoolExecutor extends Executor {
_delegates.map((delegate) => delegate.submit(job)).toList();

@override
Future<R> submit<Q, R>(Job<Q, R> job) => _nextDelegate().submit(job);

Executor _nextDelegate() {
Future<R> submit<Q, R>(Job<Q, R> job) => _nextDelegate(job).submit(job);

Executor _nextDelegate(Job job) {
final affinityDelegate = _delegates
.where((delegate) => delegate.hasJobWithDeduplicationKey(job))
.firstOrNull;
if (affinityDelegate != null) {
return affinityDelegate;
}
for (int attempt = 0; attempt < _delegates.length; ++attempt) {
final delegate = _delegates[_nextIndex()];
if (delegate.outstanding == 0) {
Expand Down
82 changes: 82 additions & 0 deletions lib/src/executor/queue_executor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import 'dart:async';

import 'package:vector_map_tiles/src/executor/executor.dart';

class QueueExecutor extends Executor {
bool _disposed = false;
final _queue = <_Job>[];
var _scheduled = false;

@override
void dispose() {
_disposed = true;
}

@override
bool get disposed => _disposed;

@override
Future<R> submit<Q, R>(Job<Q, R> job) {
final internalJob = _Job(job);
_queue.add(internalJob); //LIFO
_schedule();
return internalJob.completer.future;
}

@override
List<Future<R>> submitAll<Q, R>(Job<Q, R> job) => [submit(job)];

void _schedule() {
_completeCancelled();
if (!_scheduled && _queue.isNotEmpty) {
_scheduled = true;
scheduleMicrotask(_runOneAndReschedule);
}
}

void _runOneAndReschedule() async {
_scheduled = false;
if (_queue.isNotEmpty) {
final job = _queue.removeLast(); //LIFO
try {
if (_disposed) {
throw 'disposed';
}
if (job.request.isCancelled) {
throw CancellationException();
}
final result = await job.apply();
job.completer.complete(result);
_completeCancelled();
_completeDuplicates(job, result);
} catch (error, stack) {
job.completer.completeError(error, stack);
}
}
_schedule();
}

void _completeDuplicates(_Job job, result) {
final deduplicationKey = job.request.deduplicationKey;
if (deduplicationKey != null) {
_queue.removeWhere((queued) {
if (queued.request.deduplicationKey == deduplicationKey) {
queued.completer.complete(result);
return true;
}
return false;
});
}
}

void _completeCancelled() {}
}

class _Job<Q, R> {
final Job<Q, R> request;
final completer = Completer<R>();

_Job(this.request);

Future<R> apply() async => await request.computeFunction(request.value);
}
4 changes: 4 additions & 0 deletions lib/src/extensions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ extension ListExtension<T> on List<T> {
return copy;
}
}

extension IterableExtension<T> on Iterable<T> {
T? get firstOrNull => isEmpty ? null : first;
}
6 changes: 1 addition & 5 deletions lib/src/grid/grid_vector_tile.dart
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,11 @@ class _VectorTilePainter extends CustomPainter {
foreground: Paint()..color = Color.fromARGB(0xff, 0, 0, 0),
fontSize: 15);
final roundedScale = (scale * 1000).roundToDouble() / 1000;
final renderedOffset = Offset(
-tileSizer.translationDelta.dx, -tileSizer.translationDelta.dy);
final renderedBox = renderedOffset & size;
final tileBox = tileSizer.tileClip(size, scale);
final text = TextPainter(
text: TextSpan(
style: textStyle,
text:
'${model.tile}\nscale=$roundedScale\nsize=$size\ntranslation=${tileSizer.translationDelta}\nbox=${renderedBox.debugString()}\ntileBox=${tileBox.debugString()}\npaintCount=$_paintCount'),
'${model.tile}\nscale=$roundedScale\npaintCount=$_paintCount'),
textAlign: TextAlign.start,
textDirection: TextDirection.ltr)
..layout();
Expand Down
4 changes: 2 additions & 2 deletions lib/src/grid/tile_widgets.dart
Original file line number Diff line number Diff line change
Expand Up @@ -192,5 +192,5 @@ class TileWidgets extends ChangeNotifier {
// A larger zoom difference also results in higher memory consumption since
// off-screen painting onto a canvas consumes a lot of memory, and can
// result in app crashes.
final _maxSmallerZoomDifference = 3;
final _maxLargerZoomDifference = -2;
final _maxSmallerZoomDifference = 2;
final _maxLargerZoomDifference = -1;
10 changes: 6 additions & 4 deletions lib/src/stream/preprocessing_tile_provider.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ class PreprocessingTileProvider extends TileProvider {
}

void _initialize() async {
final futures = _executor.submitAll(
Job('setup preprocessor', _setupPreprocessor, _preprocessor));
final futures = _executor.submitAll(Job(
'setup preprocessor', _setupPreprocessor, _preprocessor,
deduplicationKey: null));
for (final future in futures) {
await future;
}
Expand All @@ -41,9 +42,10 @@ class PreprocessingTileProvider extends TileProvider {
if (!_ready) {
await _readyCompleter.future;
}
final deduplicationKey = 'preprocess: ${tile.identity}';
final preprocessed = await _executor.submit(Job(
'preprocess: ${tile.identity}', _preprocessTile, tile.tileset!,
cancelled: request.cancelled));
deduplicationKey, _preprocessTile, tile.tileset!,
cancelled: request.cancelled, deduplicationKey: deduplicationKey));
return Tile(
identity: tile.identity, format: tile.format, tileset: preprocessed);
}
Expand Down
2 changes: 1 addition & 1 deletion lib/src/stream/tile_supplier.dart
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import 'dart:async';
import 'dart:ui';

import '../executor/executor.dart';
import 'package:vector_tile_renderer/vector_tile_renderer.dart';

import '../../vector_map_tiles.dart';
import '../executor/executor.dart';

enum TileFormat { vector, raster }

Expand Down
1 change: 1 addition & 0 deletions lib/src/tile_viewport.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:math';

import 'package:flutter_map/plugin_api.dart';

import '../vector_map_tiles.dart';

class TileViewport {
Expand Down
11 changes: 7 additions & 4 deletions test/src/executor/direct_executor_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@ void main() {
});

test('runs a task', () async {
final result = await executor.submit(Job(_testJobName, _task, 3));
final result = await executor
.submit(Job(_testJobName, _task, 3, deduplicationKey: null));
expect(result, equals(4));
});

test('runs a submit all task', () async {
final result = executor.submitAll(Job(_testJobName, _task, 3));
final result =
executor.submitAll(Job(_testJobName, _task, 3, deduplicationKey: null));
expect(result.length, 1);
expect(await result[0], equals(4));
});

test('rejects tasks when disposed', () async {
executor.dispose();
try {
await executor.submit(Job(_testJobName, (message) => _task, 'a-message'));
await executor.submit(Job(_testJobName, (message) => _task, 'a-message',
deduplicationKey: null));
throw 'expected an error';
} catch (error) {
expect(error, 'disposed');
Expand All @@ -39,7 +42,7 @@ void main() {
test('rejects tasks when task is cancelled', () async {
try {
await executor.submit(Job(_testJobName, (message) => _task, 'a-message',
cancelled: () => true));
cancelled: () => true, deduplicationKey: null));
throw 'expected an error';
} on CancellationException {
// ignore
Expand Down
Loading

0 comments on commit 8a2ece5

Please sign in to comment.