Skip to content

Commit

Permalink
feat: add download/upload progress callback from proposal #8
Browse files Browse the repository at this point in the history
  • Loading branch information
crifurch committed May 13, 2024
1 parent 425234b commit 9564505
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 22 deletions.
3 changes: 2 additions & 1 deletion lib/src/file_system/entries/ftp_file.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ class FtpFile extends FtpEntry {
var result = false;
try {
final downloadFileStream = _client.fs.downloadFileStream(this);
//todo search way to provide file size if necessary
final uploadStream =
secondClient.fs.uploadFileFromStream(fileTo, downloadFileStream);
secondClient.fs.uploadFileFromStream(fileTo, downloadFileStream, 0);
result = await uploadStream;
} finally {
try {
Expand Down
61 changes: 47 additions & 14 deletions lib/src/file_system/ftp_file_system.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// ignore_for_file: constant_identifier_names

import 'dart:async';

import 'package:pure_ftp/src/file_system/entries/ftp_directory.dart';
Expand All @@ -16,6 +14,9 @@ import 'package:pure_ftp/src/ftp/utils/data_parser_utils.dart';
import 'package:pure_ftp/src/ftp_client.dart';
import 'package:pure_ftp/utils/list_utils.dart';

export 'package:pure_ftp/src/file_system/ftp_transfer.dart'
show OnTransferProgress;

typedef DirInfoCache
= MapEntry<String, Iterable<MapEntry<FtpEntry, FtpEntryInfo>>>;

Expand Down Expand Up @@ -197,21 +198,42 @@ class FtpFileSystem {
.toList();
});

Stream<List<int>> downloadFileStream(FtpFile file) =>
_transfer.downloadFileStream(file);
Stream<List<int>> downloadFileStream(
FtpFile file, {
OnTransferProgress? onReceiveProgress,
}) =>
_transfer.downloadFileStream(
file,
onReceiveProgress: onReceiveProgress,
);

Future<List<int>> downloadFile(FtpFile file) async {
Future<List<int>> downloadFile(
FtpFile file, {
OnTransferProgress? onReceiveProgress,
}) async {
final result = <int>[];
await downloadFileStream(file).listen(result.addAll).asFuture();
await downloadFileStream(
file,
onReceiveProgress: onReceiveProgress,
).listen(result.addAll).asFuture();
return result;
}

Future<bool> uploadFile(FtpFile file, List<int> data,
[UploadChunkSize chunkSize = UploadChunkSize.kb4]) async {
Future<bool> uploadFile(
FtpFile file,
List<int> data, {
UploadChunkSize chunkSize = UploadChunkSize.kb4,
OnTransferProgress? onUploadProgress,
}) async {
final stream = StreamController<List<int>>();
var result = false;
try {
final future = uploadFileFromStream(file, stream.stream);
final future = uploadFileFromStream(
file,
stream.stream,
data.length,
onUploadProgress: onUploadProgress,
);
if (data.isEmpty) {
stream.add(data);
} else {
Expand All @@ -229,6 +251,20 @@ class FtpFileSystem {
return result;
}

Future<bool> uploadFileFromStream(
FtpFile file,
Stream<List<int>> stream,
int fileSize, {
OnTransferProgress? onUploadProgress,
}) async {
return _transfer.uploadFileStream(
file,
stream,
fileSize,
onUploadProgress: onUploadProgress,
);
}

Future<FtpEntryInfo?> getEntryInfo(FtpEntry entry,
{bool fromCache = true}) async {
assert(
Expand All @@ -248,11 +284,6 @@ class FtpFileSystem {
throw UnimplementedError();
}

Future<bool> uploadFileFromStream(
FtpFile file, Stream<List<int>> stream) async {
return _transfer.uploadFileStream(file, stream);
}

FtpFileSystem copy() {
final ftpFileSystem = FtpFileSystem(client: _client.clone());
ftpFileSystem._currentDirectory = _currentDirectory;
Expand All @@ -267,7 +298,9 @@ class FtpFileSystem {
}

enum ListType {
// ignore: constant_identifier_names
LIST(FtpCommand.LIST),
// ignore: constant_identifier_names
MLSD(FtpCommand.MLSD),
;

Expand Down
31 changes: 24 additions & 7 deletions lib/src/file_system/ftp_transfer.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:math';
import 'dart:typed_data';

import 'package:pure_ftp/src/file_system/entries/ftp_file.dart';
Expand All @@ -8,22 +9,27 @@ import 'package:pure_ftp/src/ftp/ftp_commands.dart';
import 'package:pure_ftp/src/ftp/ftp_socket.dart';
import 'package:pure_ftp/src/socket/common/client_raw_socket.dart';

typedef OnTransferProgress = Function(int bytes, int total, double percents);

class FtpTransfer {
final FtpSocket _socket;

FtpTransfer({
required FtpSocket socket,
}) : _socket = socket;

Stream<List<int>> downloadFileStream(FtpFile file) {
Stream<List<int>> downloadFileStream(
FtpFile file, {
OnTransferProgress? onReceiveProgress,
}) {
final stream = StreamController<List<int>>();
unawaited(_socket.openTransferChannel((socketFuture, log) async {
final fileSize = await file.size();
FtpCommand.RETR.write(
_socket,
[file.path],
);
//will be closed by the transfer channel
// ignore: close_sinks
final socket = await socketFuture;
final response = await _socket.read();

Expand All @@ -32,11 +38,14 @@ class FtpTransfer {
if (!transferCompleted) {
throw FtpException('Error while downloading file');
}
var total = 0;
var downloaded = 0;
await socket.listen(
(event) {
stream.add(event);
log?.call('Downloaded ${total += event.length} bytes');
downloaded += event.length;
final total = max(fileSize, downloaded);
onReceiveProgress?.call(downloaded, total, downloaded / total * 100);
log?.call('Downloaded ${downloaded} of ${total} bytes');
},
).asFuture();
await _socket.read();
Expand All @@ -48,7 +57,12 @@ class FtpTransfer {
return stream.stream;
}

Future<bool> uploadFileStream(FtpFile file, Stream<List<int>> data) =>
Future<bool> uploadFileStream(
FtpFile file,
Stream<List<int>> data,
int fileSize, {
OnTransferProgress? onUploadProgress,
}) =>
_socket.openTransferChannel((socketFuture, log) async {
FtpCommand.STOR.write(
_socket,
Expand All @@ -64,12 +78,15 @@ class FtpTransfer {
if (!transferCompleted) {
throw FtpException('Error while uploading file');
}
var total = 0;
var uploaded = 0;
final transform = data.transform<Uint8List>(
StreamTransformer.fromHandlers(
handleData: (event, sink) {
sink.add(Uint8List.fromList(event));
log?.call('Uploaded ${total += event.length} bytes');
uploaded += event.length;
final total = max(fileSize, uploaded);
onUploadProgress?.call(uploaded, total, uploaded / total * 100);
log?.call('Downloaded ${uploaded} of ${total} bytes');
},
),
);
Expand Down

0 comments on commit 9564505

Please sign in to comment.