Skip to content

Commit

Permalink
Split io.dart into io_{file, http, web}.dart, so we can change the im…
Browse files Browse the repository at this point in the history
…plementations on different platforms. Also started to tweak the tests to work across them.
  • Loading branch information
bramp committed Feb 1, 2024
1 parent 7b775be commit 4733bb7
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 152 deletions.
77 changes: 3 additions & 74 deletions packages/pmtiles/lib/src/io.dart
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import 'dart:collection';
import 'dart:io';

import 'package:http/http.dart';
import 'package:pool/pool.dart';

export 'io_file.dart' if (dart.library.js) 'io_web.dart';
export 'io_http.dart';

/// Simple interface so we can abstract reading from Files, or Http.
abstract interface class ReadAt {
Expand All @@ -13,77 +13,6 @@ abstract interface class ReadAt {
Future<void> close();
}

class FileAt implements ReadAt {
final File file;

/// RandomAccessFile only allows a single outstanding read at any time, so
/// we open a new RandomAccessFile for each read. To bound the number
/// we use a pool to cap us to 8 outstanding reads.
final _pool = Pool(8, timeout: Duration(seconds: 30));

FileAt(this.file);

@override
Future<ByteStream> readAt(final int offset, final int length) async {
return _pool.withResource(() async {
// TODO Consider caching the open files.
final file = await this.file.open(mode: FileMode.read);
try {
final f = await file.setPosition(offset);
final data = await f.read(length);

return ByteStream.fromBytes(data);
} finally {
await file.close();
}
});
}

@override
Future<void> close() {
return _pool.close();
}
}

class HttpAt implements ReadAt {
final Client client;
final Uri url;
final Map<String, String>? headers;

// We are assuming the remote server supports range reads.
HttpAt(this.client, this.url, {this.headers});

@override
Future<ByteStream> readAt(int offset, int length) async {
final request = Request("GET", url);

if (headers != null) request.headers.addAll(headers!);
request.headers[HttpHeaders.rangeHeader] =
'bytes=$offset-${offset + length - 1}';

final response = await client.send(request);

if (response.statusCode != 206) {
throw HttpException('Unexpected status code: ${response.statusCode}');
}

final responseLength = response.headers[HttpHeaders.contentLengthHeader];
if (responseLength != null && int.parse(responseLength) != length) {
throw HttpException(
'Unexpected Content-Length: $responseLength expected $length');
}

// TODO check Content-Range: bytes 0-1023/146515

return response.stream;
}

@override
Future<void> close() async {
return client.close();
}
}

/// An List<int> that is made up of a List of List<int>.
class CordBuffer {
final _buffers = Queue<List<int>>();
Expand Down
37 changes: 37 additions & 0 deletions packages/pmtiles/lib/src/io_file.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import 'dart:io';
import 'package:http/http.dart';
import 'package:pool/pool.dart';

import 'io.dart';

class FileAt implements ReadAt {
final File file;

/// RandomAccessFile only allows a single outstanding read at any time, so
/// we open a new RandomAccessFile for each read. To bound the number
/// we use a pool to cap us to 8 outstanding reads.
final _pool = Pool(8, timeout: Duration(seconds: 30));

FileAt(this.file);

@override
Future<ByteStream> readAt(final int offset, final int length) async {
return _pool.withResource(() async {
// TODO Consider caching the open files.
final file = await this.file.open(mode: FileMode.read);
try {
final f = await file.setPosition(offset);
final data = await f.read(length);

return ByteStream.fromBytes(data);
} finally {
await file.close();
}
});
}

@override
Future<void> close() {
return _pool.close();
}
}
43 changes: 43 additions & 0 deletions packages/pmtiles/lib/src/io_http.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import 'dart:io'; // TODO Remove
import 'package:http/http.dart';

import 'io.dart';

class HttpAt implements ReadAt {
final Client client;
final Uri url;
final Map<String, String>? headers;

// We are assuming the remote server supports range reads.
HttpAt(this.client, this.url, {this.headers});

@override
Future<ByteStream> readAt(int offset, int length) async {
final request = Request("GET", url);

if (headers != null) request.headers.addAll(headers!);
request.headers[HttpHeaders.rangeHeader] =
'bytes=$offset-${offset + length - 1}';

final response = await client.send(request);

if (response.statusCode != 206) {
throw HttpException('Unexpected status code: ${response.statusCode}');
}

final responseLength = response.headers[HttpHeaders.contentLengthHeader];
if (responseLength != null && int.parse(responseLength) != length) {
throw HttpException(
'Unexpected Content-Length: $responseLength expected $length');
}

// TODO check Content-Range: bytes 0-1023/146515

return response.stream;
}

@override
Future<void> close() async {
return client.close();
}
}
19 changes: 19 additions & 0 deletions packages/pmtiles/lib/src/io_web.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import 'package:http/http.dart';

import 'io.dart';

class FileAt implements ReadAt {
FileAt(Object file) {
throw UnimplementedError('On the web, File APIs are not implemented');
}

@override
Future<ByteStream> readAt(final int offset, final int length) async {
throw UnimplementedError('On the web, File APIs are not implemented');
}

@override
Future<void> close() {
throw UnimplementedError('On the web, File APIs are not implemented');
}
}
18 changes: 13 additions & 5 deletions packages/pmtiles/test/archive_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,28 @@ import 'package:test/test.dart';
/// package.
void main() {
group('http', () {
test('no server', () async {
test('Connection Refused', () async {
try {
final tiles =
await PmTilesArchive.fromUri(Uri.parse('http://localhost:1234'));
tiles.close();
} on ClientException catch (e) {
expect(e.message, equals('Connection refused'));
expect(
e.message,
anyOf(
contains('Connection refused'),

// In the browser, a "ClientException: XMLHttpRequest error" is thrown
// which doesn't tell us the error :(, so just skip this test.
contains('XMLHttpRequest error'),
));
return;
}

fail('Expected ClientException');
});
}, testOn: "!node");

test('not found', () async {
test('404 Not Found', () async {
var client = MockClient((request) async {
return Response("", 404);
});
Expand Down Expand Up @@ -54,5 +62,5 @@ void main() {

fail('Expected PathNotFoundException');
});
});
}, testOn: "!js");
}
79 changes: 79 additions & 0 deletions packages/pmtiles/test/io_file_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
@TestOn('!js') // Exclude from dart2js builds

import 'dart:io';
import 'dart:typed_data';

import 'package:pmtiles/src/io.dart';
import 'package:test/test.dart';

void main() {
group('FileAt', () {
late Directory directory;
late File tempFile;

setUp(() async {
directory = await Directory.systemTemp.createTemp();
tempFile = File("${directory.path}/test_file");

final f = await tempFile.create();
await f.writeAsBytes(
List.generate(100, (index) => index),
);
});

tearDown(() async {
await directory.delete(recursive: true);
});

final tests = <(int, int), List<int>>{
(0, 0): [],
(0, 1): [0],
(0, 5): [0, 1, 2, 3, 4],
(0, 10): [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
(1, 9): [1, 2, 3, 4, 5, 6, 7, 8, 9],
(2, 2): [2, 3],
};

test('readAt single', () async {
final f = FileAt(tempFile);

for (final t in tests.entries) {
final (offset, length) = t.key;
final expected = t.value;

final stream = await f.readAt(offset, length);
final bytes = await stream.toBytes();

expect(bytes, equals(expected));
}
});

test('readAt concurrent', () async {
final f = FileAt(tempFile);

Future<Uint8List> readAt(int offset, int length) async {
final stream = await f.readAt(offset, length);
return await stream.toBytes();
}

var count = 0;
final allReads = tests.map((key, value) {
final (offset, length) = key;
final expected = value;

return MapEntry(
key,
readAt(offset, length).then((actual) {
expect(actual, equals(expected));
count++;
}),
);
});

// Now we await all the reads at the same time
// The actual test is done a few lines up.
await Future.wait(allReads.values);
expect(count, equals(tests.length));
});
});
}
73 changes: 0 additions & 73 deletions packages/pmtiles/test/io_test.dart
Original file line number Diff line number Diff line change
@@ -1,80 +1,7 @@
import 'dart:io';
import 'dart:typed_data';

import 'package:pmtiles/src/io.dart';
import 'package:test/test.dart';

void main() {
group('FileAt', () {
late Directory directory;
late File tempFile;

setUp(() async {
directory = await Directory.systemTemp.createTemp();
tempFile = File("${directory.path}/test_file");

final f = await tempFile.create();
await f.writeAsBytes(
List.generate(100, (index) => index),
);
});

tearDown(() async {
await directory.delete(recursive: true);
});

final tests = <(int, int), List<int>>{
(0, 0): [],
(0, 1): [0],
(0, 5): [0, 1, 2, 3, 4],
(0, 10): [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
(1, 9): [1, 2, 3, 4, 5, 6, 7, 8, 9],
(2, 2): [2, 3],
};

test('readAt single', () async {
final f = FileAt(tempFile);

for (final t in tests.entries) {
final (offset, length) = t.key;
final expected = t.value;

final stream = await f.readAt(offset, length);
final bytes = await stream.toBytes();

expect(bytes, equals(expected));
}
});

test('readAt concurrent', () async {
final f = FileAt(tempFile);

Future<Uint8List> readAt(int offset, int length) async {
final stream = await f.readAt(offset, length);
return await stream.toBytes();
}

var count = 0;
final allReads = tests.map((key, value) {
final (offset, length) = key;
final expected = value;

return MapEntry(
key,
readAt(offset, length).then((actual) {
expect(actual, equals(expected));
count++;
}),
);
});

// Now we await all the reads at the same time
// The actual test is done a few lines up.
await Future.wait(allReads.values);
expect(count, equals(tests.length));
});
});

group('CordBuffer', () {
test('addAll', () {
final buffer = CordBuffer();
Expand Down

0 comments on commit 4733bb7

Please sign in to comment.