Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement parallel uploads to a single backend #3684

Merged
merged 23 commits into from Mar 19, 2019
Merged
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4f4529a
Move upload related code from BackendHandler to BackendUploader
seantempleton Feb 10, 2019
05c3f48
Move encrypting, hashing, and creating index volumes out of BackendUp…
seantempleton Feb 23, 2019
1029d99
Simplify BackendUploader and remove unnecessary code
seantempleton Feb 23, 2019
3de04da
Run each upload on a separate thread
seantempleton Feb 23, 2019
0c122ff
Change IBackend and IStreamingBackend to return Task for Put()
seantempleton Feb 23, 2019
662627a
Update AlternativeFtp Backend for async Put
seantempleton Feb 17, 2019
b1c6e97
Update Amazon Cloud Drive backend for async Put
seantempleton Feb 17, 2019
5db326b
Update Azure Blob and Backblaze B2 backends for async Put
seantempleton Feb 23, 2019
7843f83
Update Google Cloud and Drive backends for async Put
seantempleton Feb 18, 2019
7f09c7a
Update Box and Rackspace Cloud Files backends for async Put
seantempleton Feb 23, 2019
4ca067a
Update Dropbox and File backends for async Put
seantempleton Feb 23, 2019
9a42eca
Update FTP and Jottacloud backends for async Put
seantempleton Feb 23, 2019
c347282
Update HubiC and OpenStack backends for async Put
seantempleton Feb 18, 2019
13f83a2
Update Mega, OneDrive, and Rclone backends for async Put
seantempleton Feb 23, 2019
c05f304
Update S3, Sharepoint, Tahoe, and WEBDAV backends for async Put
seantempleton Feb 23, 2019
12b15c0
Add asynchronous-concurrent-upload-limit option
seantempleton Feb 27, 2019
ac08aa2
Dynamically throttle and track the progress of concurrent uploads
seantempleton Feb 27, 2019
61f511c
Properly catch exceptions from an upload and cancel all other uploads
seantempleton Mar 4, 2019
8bd4b82
Fix several Codacy issues
seantempleton Mar 8, 2019
cc3e94f
Fix code review comments and other minor changes
seantempleton Mar 8, 2019
19c1c90
Check upload throttle speed every 2 seconds
seantempleton Mar 9, 2019
b02b19e
Change Put method name to PutAsync
seantempleton Mar 17, 2019
1f388f4
Fix MegaBackend null reference exception
seantempleton Mar 18, 2019
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.
+28 −31
Diff settings

Always

Just for now

Update Dropbox and File backends for async Put

  • Loading branch information...
seantempleton committed Feb 23, 2019
commit 4ca067a8f293e523a3cb0ba298cd6b3c1b25e10b
@@ -1,10 +1,10 @@
using System;
using Duplicati.Library.Common.IO;
using Duplicati.Library.Interface;
using System;
using System.Collections.Generic;
using System.IO;
using Duplicati.Library.Interface;
using Duplicati.Library.Common.IO;
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;

namespace Duplicati.Library.Backend
{
@@ -164,20 +164,18 @@ public void CreateFolder()
}
}

public Task Put(string remotename, Stream stream, CancellationToken cancelToken)
public async Task Put(string remotename, Stream stream, CancellationToken cancelToken)
{
try
{
string path = string.Format("{0}/{1}", m_path, remotename);
dbx.UploadFile(path, stream);
string path = $"{m_path}/{remotename}";
await dbx.UploadFileAsync(path, stream, cancelToken);
}
catch (DropboxException)
{
// we can catch some events here and convert them to Duplicati exceptions
throw;
}

return Task.FromResult(true);
}

public void Get(string remotename, Stream stream)
@@ -1,10 +1,11 @@
using System;
using System.IO;
using System.Net;

using Duplicati.Library.Utility;
using Duplicati.Library.Utility;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;

namespace Duplicati.Library.Backend
{
@@ -68,7 +69,7 @@ public FolderMetadata CreateFolder(string path)
}
}

public FileMetaData UploadFile(String path, Stream stream)
public async Task<FileMetaData> UploadFileAsync(String path, Stream stream, CancellationToken cancelToken)
{
// start a session
var ussa = new UploadSessionStartArg();
@@ -84,22 +85,22 @@ public FileMetaData UploadFile(String path, Stream stream)
var areq = new AsyncHttpRequest(req);

byte[] buffer = new byte[Utility.Utility.DEFAULT_BUFFER_SIZE];
int sizeToRead = Math.Min((int)Utility.Utility.DEFAULT_BUFFER_SIZE, chunksize);

ulong globalBytesRead = 0;
using (var rs = areq.GetRequestStream())
{
int bytesRead = 0;
do
{
bytesRead = stream.Read(buffer, 0, Math.Min((int)Utility.Utility.DEFAULT_BUFFER_SIZE, chunksize));
bytesRead = await stream.ReadAsync(buffer, 0, sizeToRead, cancelToken).ConfigureAwait(false);
globalBytesRead += (ulong)bytesRead;
rs.Write(buffer, 0, bytesRead);

await rs.WriteAsync(buffer, 0, bytesRead, cancelToken).ConfigureAwait(false);
}
while (bytesRead > 0 && globalBytesRead < (ulong)chunksize);
while (bytesRead > 0 && globalBytesRead < (ulong)chunksize);
}

var ussr = ReadJSONResponse<UploadSessionStartResult>(areq); // pun intended
var ussr = await ReadJSONResponseAsync<UploadSessionStartResult>(areq, cancelToken); // pun intended

// keep appending until finished
// 1) read into buffer
@@ -124,23 +125,24 @@ public FileMetaData UploadFile(String path, Stream stream)
areq = new AsyncHttpRequest(req);

int bytesReadInRequest = 0;
sizeToRead = Math.Min(chunksize, (int)Utility.Utility.DEFAULT_BUFFER_SIZE);
using (var rs = areq.GetRequestStream())
{
int bytesRead = 0;
do
{
bytesRead = stream.Read(buffer, 0, Math.Min(chunksize, (int)Utility.Utility.DEFAULT_BUFFER_SIZE));
bytesRead = await stream.ReadAsync(buffer, 0, sizeToRead, cancelToken).ConfigureAwait(false);
bytesReadInRequest += bytesRead;
globalBytesRead += (ulong)bytesRead;
rs.Write(buffer, 0, bytesRead);
await rs.WriteAsync(buffer, 0, bytesRead, cancelToken).ConfigureAwait(false);

}
while (bytesRead > 0 && bytesReadInRequest < chunksize);
}

using (var response = GetResponse(areq))
using (var sr = new StreamReader(response.GetResponseStream()))
sr.ReadToEnd();
await sr.ReadToEndAsync();
}

// finish session and commit
@@ -188,23 +188,20 @@ public IEnumerable<IFileEntry> List()

#if DEBUG_RETRY
private static Random random = new Random();
public Task Put(string remotename, System.IO.Stream stream, CancellationToken cancelToken)
public async Task Put(string remotename, System.IO.Stream stream, CancellationToken cancelToken)
{
using(System.IO.FileStream writestream = systemIO.FileOpenWrite(GetRemoteName(remotename)))
{
if (random.NextDouble() > 0.6666)
throw new Exception("Random upload failure");
Utility.Utility.CopyStream(stream, writestream);
await Utility.Utility.CopyStreamAsync(stream, writestream, cancelToken);
}

return Task.FromResult(true);
}
#else
public Task Put(string remotename, System.IO.Stream stream, CancellationToken cancelToken)
public async Task Put(string remotename, System.IO.Stream stream, CancellationToken cancelToken)
{
using(System.IO.FileStream writestream = systemIO.FileOpenWrite(GetRemoteName(remotename)))
Utility.Utility.CopyStream(stream, writestream, true, m_copybuffer);
return Task.FromResult(true);
using (System.IO.FileStream writestream = systemIO.FileOpenWrite(GetRemoteName(remotename)))
await Utility.Utility.CopyStreamAsync(stream, writestream, true, cancelToken, m_copybuffer);
}
#endif

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.