forked from DataJuggler/BlazorFileUpload
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRemoteFileListEntryStream.cs
133 lines (119 loc) · 6.07 KB
/
RemoteFileListEntryStream.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Components;
using Microsoft.JSInterop;
namespace BlazorInputFile
{
// This class streams data from JS within the existing API limits of IJSRuntime.
// To produce good throughput, it prefetches up to buffer_size data from JS
// even when the consumer isn't asking for that much data, and does so by making
// N parallel requests in parallel (N ~= buffer_size / max_message_size).
//
// This should be understood as a TEMPORARY way to achieve the desired API and
// reasonable performance. Longer term we can surely replace this with something
// simpler and cleaner, either:
//
// - Extending JS interop to allow streaming responses via SignalR's built-in
// binary streaming support. That should reduce all of this to triviality.
// - Or, failing that, at least use something like System.IO.Pipelines to manage
// the supply/consumption of byte data with less custom code.
internal class RemoteFileListEntryStream : FileListEntryStream
{
private readonly int _maxMessageSize;
private readonly PreFetchingSequence<Block> _blockSequence;
private Block? _currentBlock;
private byte[] _currentBlockDecodingBuffer;
private int _currentBlockDecodingBufferConsumedLength;
public RemoteFileListEntryStream(IJSRuntime jsRuntime, ElementReference inputFileElement, FileListEntryImpl file, int maxMessageSize, int maxBufferSize)
: base(jsRuntime, inputFileElement, file)
{
_maxMessageSize = maxMessageSize;
_blockSequence = new PreFetchingSequence<Block>(
FetchBase64Block,
(file.Size + _maxMessageSize - 1) / _maxMessageSize,
Math.Max(1, maxBufferSize / _maxMessageSize)); // Degree of parallelism on fetch
_currentBlockDecodingBuffer = new byte[_maxMessageSize];
}
protected override async Task<int> CopyFileDataIntoBuffer(long sourceOffset, byte[] destination, int destinationOffset, int maxBytes, CancellationToken cancellationToken)
{
var totalBytesCopied = 0;
while (maxBytes > 0)
{
// If we don't yet have a block, or it's fully consumed, get the next one
if (!_currentBlock.HasValue || _currentBlockDecodingBufferConsumedLength == _currentBlock.Value.LengthBytes)
{
// If we've already read some data, and the next block is still pending,
// then just return now rather than awaiting
if (totalBytesCopied > 0
&& _blockSequence.TryPeekNext(out var nextBlock)
&& !nextBlock.Base64.IsCompleted)
{
break;
}
_currentBlock = _blockSequence.ReadNext(cancellationToken);
var currentBlockBase64 = await _currentBlock.Value.Base64;
// As a possible future optimisation, if we know the current block will fit entirely in
// the remaining destination space, we could decode directly into the destination without
// going via _currentBlockDecodingBuffer. However that complicates the logic a lot.
DecodeBase64ToBuffer(currentBlockBase64, _currentBlockDecodingBuffer, 0, _currentBlock.Value.LengthBytes);
_currentBlockDecodingBufferConsumedLength = 0;
}
// How much of the current block can we fit into the destination?
var numUnconsumedBytesInBlock = _currentBlock.Value.LengthBytes - _currentBlockDecodingBufferConsumedLength;
var numBytesToTransfer = Math.Min(numUnconsumedBytesInBlock, maxBytes);
if (numBytesToTransfer == 0)
{
break;
}
// Perform the copy
Array.Copy(_currentBlockDecodingBuffer, _currentBlockDecodingBufferConsumedLength, destination, destinationOffset, numBytesToTransfer);
maxBytes -= numBytesToTransfer;
destinationOffset += numBytesToTransfer;
_currentBlockDecodingBufferConsumedLength += numBytesToTransfer;
totalBytesCopied += numBytesToTransfer;
}
return totalBytesCopied;
}
private Block FetchBase64Block(long index, CancellationToken cancellationToken)
{
var sourceOffset = index * _maxMessageSize;
var blockLength = (int)Math.Min(_maxMessageSize, _file.Size - sourceOffset);
var task = _jsRuntime.InvokeAsync<string>(
"BlazorInputFile.readFileData",
cancellationToken,
_inputFileElement,
_file.Id,
index * _maxMessageSize,
blockLength).AsTask();
return new Block(task, blockLength);
}
private int DecodeBase64ToBuffer(string base64, byte[] buffer, int offset, int maxBytesToRead)
{
#if NETSTANDARD2_1
var bufferWithOffset = new Span<byte>(buffer, offset, maxBytesToRead);
return Convert.TryFromBase64String(base64, bufferWithOffset, out var actualBytesRead)
? actualBytesRead
: throw new InvalidOperationException("Failed to decode base64 data");
#else
var bytes = Convert.FromBase64String(base64);
if (bytes.Length > maxBytesToRead)
{
throw new InvalidOperationException($"Requested a maximum of {maxBytesToRead}, but received {bytes.Length}");
}
Array.Copy(bytes, 0, buffer, offset, bytes.Length);
return bytes.Length;
#endif
}
private readonly struct Block
{
public readonly Task<string> Base64;
public readonly int LengthBytes;
public Block(Task<string> base64, int lengthBytes)
{
Base64 = base64;
LengthBytes = lengthBytes;
}
}
}
}