Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement parallel uploads to a single backend #3684

Merged
merged 23 commits into from Mar 19, 2019
Merged
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4f4529a
Move upload related code from BackendHandler to BackendUploader
seantempleton Feb 10, 2019
05c3f48
Move encrypting, hashing, and creating index volumes out of BackendUp…
seantempleton Feb 23, 2019
1029d99
Simplify BackendUploader and remove unnecessary code
seantempleton Feb 23, 2019
3de04da
Run each upload on a separate thread
seantempleton Feb 23, 2019
0c122ff
Change IBackend and IStreamingBackend to return Task for Put()
seantempleton Feb 23, 2019
662627a
Update AlternativeFtp Backend for async Put
seantempleton Feb 17, 2019
b1c6e97
Update Amazon Cloud Drive backend for async Put
seantempleton Feb 17, 2019
5db326b
Update Azure Blob and Backblaze B2 backends for async Put
seantempleton Feb 23, 2019
7843f83
Update Google Cloud and Drive backends for async Put
seantempleton Feb 18, 2019
7f09c7a
Update Box and Rackspace Cloud Files backends for async Put
seantempleton Feb 23, 2019
4ca067a
Update Dropbox and File backends for async Put
seantempleton Feb 23, 2019
9a42eca
Update FTP and Jottacloud backends for async Put
seantempleton Feb 23, 2019
c347282
Update HubiC and OpenStack backends for async Put
seantempleton Feb 18, 2019
13f83a2
Update Mega, OneDrive, and Rclone backends for async Put
seantempleton Feb 23, 2019
c05f304
Update S3, Sharepoint, Tahoe, and WEBDAV backends for async Put
seantempleton Feb 23, 2019
12b15c0
Add asynchronous-concurrent-upload-limit option
seantempleton Feb 27, 2019
ac08aa2
Dynamically throttle and track the progress of concurrent uploads
seantempleton Feb 27, 2019
61f511c
Properly catch exceptions from an upload and cancel all other uploads
seantempleton Mar 4, 2019
8bd4b82
Fix several Codacy issues
seantempleton Mar 8, 2019
cc3e94f
Fix code review comments and other minor changes
seantempleton Mar 8, 2019
19c1c90
Check upload throttle speed every 2 seconds
seantempleton Mar 9, 2019
b02b19e
Change Put method name to PutAsync
seantempleton Mar 17, 2019
1f388f4
Fix MegaBackend null reference exception
seantempleton Mar 18, 2019
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.
+282 −181
Diff settings

Always

Just for now

@@ -20,6 +20,12 @@
using Duplicati.Library.Main.Operation.Common;
using Duplicati.Library.Main.Volumes;
using System.Collections.Generic;
using static Duplicati.Library.Main.Operation.Common.BackendHandler;
using Duplicati.Library.Interface;
using Newtonsoft.Json;
using Duplicati.Library.Utility;
using System.Linq;
using System.IO;

namespace Duplicati.Library.Main.Operation.Backup
{
@@ -74,9 +80,32 @@ public VolumeUploadRequest(BlockVolumeWriter blockvolume, TemporaryIndexVolume i
/// This class encapsulates all requests to the backend
/// and ensures that the <code>AsynchronousUploadLimit</code> is honored
/// </summary>
internal static class BackendUploader
internal class BackendUploader
{
public static Task Run(Common.BackendHandler backend, Options options, Common.DatabaseCommon database, BackupResults results, Common.ITaskReader taskreader, StatsCollector stats)
private static readonly string LOGTAG = Logging.Log.LogTagFromType<BackendUploader>();

private Options m_options;
private IBackend m_backend;
private BackendHandler m_backendHandler;
private Common.ITaskReader m_taskreader;
private StatsCollector m_stats;
private DatabaseCommon m_database;
private readonly BackupResults m_results;
private string m_lastThrottleUploadValue;
private string m_lastThrottleDownloadValue;

public BackendUploader(Common.BackendHandler backendHandler, IBackend backend, Options options, Common.DatabaseCommon database, BackupResults results, Common.ITaskReader taskreader, StatsCollector stats)
{
this.m_options = options;
this.m_backend = backend;
this.m_backendHandler = backendHandler;
this.m_taskreader = taskreader;
this.m_stats = stats;
this.m_database = database;
this.m_results = results;
}

public Task Run()
{
return AutomationExtensions.RunTask(new
{
@@ -86,18 +115,18 @@ public static Task Run(Common.BackendHandler backend, Options options, Common.Da
async self =>
{
var inProgress = new Queue<KeyValuePair<int, Task>>();
var max_pending = options.AsynchronousUploadLimit == 0 ? long.MaxValue : options.AsynchronousUploadLimit;
var noIndexFiles = options.IndexfilePolicy == Options.IndexFileStrategy.None;
var max_pending = m_options.AsynchronousUploadLimit == 0 ? long.MaxValue : m_options.AsynchronousUploadLimit;
var noIndexFiles = m_options.IndexfilePolicy == Options.IndexFileStrategy.None;
var active = 0;
var lastSize = -1L;

while(!await self.Input.IsRetiredAsync && await taskreader.ProgressAsync)
while(!await self.Input.IsRetiredAsync && await m_taskreader.ProgressAsync)
{
try
{
var req = await self.Input.ReadAsync();

if (!await taskreader.ProgressAsync)
if (!await m_taskreader.ProgressAsync)
This conversation was marked as resolved by seantempleton

This comment has been minimized.

Copy link
@verhoek

verhoek Mar 8, 2019

Contributor

I am not a fan of the m_* member variable "convention" and would like to phase them out. I know the code is written mostly in such a way that it's not clear what local vars and member vars are -mainly due to large functions. It's quite possible I am the only one disliking the m_ style notation in duplicati team.

This comment has been minimized.

Copy link
@seantempleton

seantempleton Mar 9, 2019

Author Contributor

I dislike the m_* style as well. Being my first PR for the project I didn't want to step on any toes and kept with the current style. If there is an agreed upon standard I would be happy to switch to using that. I'm partial to m* or to no prefix at all myself.

This comment has been minimized.

Copy link
@warwickmm

warwickmm Mar 9, 2019

Contributor

If it were up to me, I would prefer to always use the this keyword when referring to instance members. At least for me, it is an immediate and clear indication that the variable refers to an instance member.

I have also tried to refrain from disturbing the local style too much, as in the end, consistency should perhaps be the ultimate goal.

This comment has been minimized.

Copy link
@seantempleton

seantempleton Mar 9, 2019

Author Contributor

warwickmm that's actually the other one I dislike. :) But I can get used to anything as long as it's consistent. If I really need to know the scope of a variable Visual Studio will show that in the Intellisense tooltip. I realize not everyone has that information available to them though. And I generally like to keep functions short enough that it is easy to tell anyway.

This comment has been minimized.

Copy link
@warwickmm

warwickmm Mar 9, 2019

Contributor

For me, moving the cursor and hovering is too much effort (especially on a laptop). But yes, as long as we're consistent I am fine with any style.

This comment has been minimized.

Copy link
@seantempleton

seantempleton Mar 9, 2019

Author Contributor

I looked through a lot of files and most of them are using the m_ style. I'll plan on leaving this as-is for now. I think having a coding standard should be a separate conversation where all interested parties can help make the final decision.
If the community agrees on a different standard before this PR is merged I can always update it then.

This comment has been minimized.

Copy link
@warwickmm

warwickmm Mar 9, 2019

Contributor

Sounds good. Thanks.

This comment has been minimized.

Copy link
@verhoek

verhoek Mar 10, 2019

Contributor

That's fine. I just noticed that you changed it to the m_* style actively, whence the remark. We should discuss coding styles in another place and you shouldn't change it here. I hope I can find some time soon to read the actual code of your implemented interesting feature.

continue;

var task = default(KeyValuePair<int, Task>);
@@ -106,14 +135,14 @@ public static Task Run(Common.BackendHandler backend, Options options, Common.Da
lastSize = ((VolumeUploadRequest)req).BlockVolume.SourceSize;

if (noIndexFiles || ((VolumeUploadRequest)req).IndexVolume == null)
task = new KeyValuePair<int, Task>(1, backend.UploadFileAsync(((VolumeUploadRequest)req).BlockVolume, null));
task = new KeyValuePair<int, Task>(1, UploadFileAsync(((VolumeUploadRequest)req).BlockVolume, null));
else
task = new KeyValuePair<int, Task>(2, backend.UploadFileAsync(((VolumeUploadRequest)req).BlockVolume, name => ((VolumeUploadRequest)req).IndexVolume.CreateVolume(name, options, database)));
task = new KeyValuePair<int, Task>(2, UploadFileAsync(((VolumeUploadRequest)req).BlockVolume, name => ((VolumeUploadRequest)req).IndexVolume.CreateVolume(name, m_options, m_database)));
}
else if (req is FilesetUploadRequest)
task = new KeyValuePair<int, Task>(1, backend.UploadFileAsync(((FilesetUploadRequest)req).Fileset));
task = new KeyValuePair<int, Task>(1, UploadFileAsync(((FilesetUploadRequest)req).Fileset));
else if (req is IndexVolumeUploadRequest)
task = new KeyValuePair<int, Task>(1, backend.UploadFileAsync(((IndexVolumeUploadRequest)req).IndexVolume));
task = new KeyValuePair<int, Task>(1, UploadFileAsync(((IndexVolumeUploadRequest)req).IndexVolume));
else if (req is FlushRequest)
{
try
@@ -126,6 +155,7 @@ public static Task Run(Common.BackendHandler backend, Options options, Common.Da
{
((FlushRequest)req).SetFlushed(lastSize);
}
break;
}

if (task.Value != null)
@@ -149,34 +179,258 @@ public static Task Run(Common.BackendHandler backend, Options options, Common.Da
{
try
{
stats.SetBlocking(true);
m_stats.SetBlocking(true);
await top.Value;
}
finally
{
stats.SetBlocking(false);
m_stats.SetBlocking(false);
}
}

active -= top.Key;
}
}

results.OperationProgressUpdater.UpdatePhase(OperationPhase.Backup_WaitForUpload);
m_results.OperationProgressUpdater.UpdatePhase(OperationPhase.Backup_WaitForUpload);

try
{
stats.SetBlocking(true);
m_stats.SetBlocking(true);
while (inProgress.Count > 0)
await inProgress.Dequeue().Value;
}
finally
{
stats.SetBlocking(false);
m_stats.SetBlocking(false);
}
});

}

private async Task UploadFileAsync(VolumeWriterBase item, Func<string, Task<IndexVolumeWriter>> createIndexFile = null)
{
var fe = new FileEntryItem(BackendActionType.Put, item.RemoteFilename);
fe.SetLocalfilename(item.LocalFilename);

var tcs = new TaskCompletionSource<bool>();

fe.Encrypt(m_options);
fe.UpdateHashAndSize(m_options);

try
{
await DoWithRetry(fe, async () =>
{
if (fe.IsRetry)
await RenameFileAfterErrorAsync(fe).ConfigureAwait(false);
return await DoPut(fe).ConfigureAwait(false);
}).ConfigureAwait(false);

if (createIndexFile != null)
{
var ix = await createIndexFile(fe.RemoteFilename).ConfigureAwait(false);
var indexFile = new FileEntryItem(BackendActionType.Put, ix.RemoteFilename);
indexFile.SetLocalfilename(ix.LocalFilename);

await m_database.UpdateRemoteVolumeAsync(indexFile.RemoteFilename, RemoteVolumeState.Uploading, -1, null);

await DoWithRetry(indexFile, async () =>
{
if (indexFile.IsRetry)
await RenameFileAfterErrorAsync(indexFile).ConfigureAwait(false);

var res = await DoPut(indexFile).ConfigureAwait(false);

// Register that the index file is tracking the block file
await m_database.AddIndexBlockLinkAsync(
ix.VolumeID,
await m_database.GetRemoteVolumeIDAsync(fe.RemoteFilename)
).ConfigureAwait(false);


return res;
}).ConfigureAwait(false);
}

tcs.TrySetResult(true);
}
catch (Exception ex)
{
if (ex is System.Threading.ThreadAbortException)
tcs.TrySetCanceled();
else
tcs.TrySetException(ex);
}

await tcs.Task.ConfigureAwait(false);
}

private async Task<T> DoWithRetry<T>(FileEntryItem item, Func<Task<T>> method)
{
item.IsRetry = false;
Exception lastException = null;

if(!await m_taskreader.ProgressAsync)
throw new OperationCanceledException();

for (var i = 0; i < m_options.NumberOfRetries; i++)
{
if (m_options.RetryDelay.Ticks != 0 && i != 0)
await Task.Delay(m_options.RetryDelay).ConfigureAwait(false);

if (!await m_taskreader.ProgressAsync)
throw new OperationCanceledException();

try
{
return await method().ConfigureAwait(false);
}
catch (Exception ex)
{
item.IsRetry = true;
lastException = ex;
Logging.Log.WriteRetryMessage(LOGTAG, $"Retry{item.Operation}", ex, "Operation {0} with file {1} attempt {2} of {3} failed with message: {4}", item.Operation, item.RemoteFilename, i + 1, m_options.NumberOfRetries, ex.Message);
// If the thread is aborted, we exit here
if (ex is System.Threading.ThreadAbortException || ex is OperationCanceledException)
break;

await m_stats.SendEventAsync(item.Operation, i < m_options.NumberOfRetries ? BackendEventType.Retrying : BackendEventType.Failed, item.RemoteFilename, item.Size);

bool recovered = false;
if (ex is Duplicati.Library.Interface.FolderMissingException && m_options.AutocreateFolders)
{
try
{
// If we successfully create the folder, we can re-use the connection
m_backend.CreateFolder();
recovered = true;
}
catch (Exception dex)
{
Logging.Log.WriteWarningMessage(LOGTAG, "FolderCreateError", dex, "Failed to create folder: {0}", ex.Message);
}
}

if (!recovered)
ResetBackend(ex);
}
finally
{
if (m_options.NoConnectionReuse)
ResetBackend(null);
}
}

throw lastException;
}

private void ResetBackend(Exception ex)
{
try
{
m_backend?.Dispose();
}
catch (Exception dex)
{
Logging.Log.WriteWarningMessage(LOGTAG, "BackendDisposeError", dex, "Failed to dispose backend instance: {0}", ex?.Message);
}
m_backend = null;
}

private async Task RenameFileAfterErrorAsync(FileEntryItem item)
{
var p = VolumeBase.ParseFilename(item.RemoteFilename);
var guid = VolumeWriterBase.GenerateGuid();
var time = p.Time.Ticks == 0 ? p.Time : p.Time.AddSeconds(1);
var newname = VolumeBase.GenerateFilename(p.FileType, p.Prefix, guid, time, p.CompressionModule, p.EncryptionModule);
var oldname = item.RemoteFilename;

await m_stats.SendEventAsync(item.Operation, BackendEventType.Rename, oldname, item.Size);
await m_stats.SendEventAsync(item.Operation, BackendEventType.Rename, newname, item.Size);
Logging.Log.WriteInformationMessage(LOGTAG, "RenameRemoteTargetFile", "Renaming \"{0}\" to \"{1}\"", oldname, newname);
await m_database.RenameRemoteFileAsync(oldname, newname);
item.RemoteFilename = newname;
}

private async Task<bool> DoPut(FileEntryItem item, bool updatedHash = false)
{
// If this is not already encrypted, do it now
item.Encrypt(m_options);

updatedHash |= item.UpdateHashAndSize(m_options);

if (updatedHash && item.TrackedInDb)
await m_database.UpdateRemoteVolumeAsync(item.RemoteFilename, RemoteVolumeState.Uploading, item.Size, item.Hash);

if (m_options.Dryrun)
{
Logging.Log.WriteDryrunMessage(LOGTAG, "WouldUploadVolume", "Would upload volume: {0}, size: {1}", item.RemoteFilename, Library.Utility.Utility.FormatSizeString(new FileInfo(item.LocalFilename).Length));
item.DeleteLocalFile();
return true;
}

await m_database.LogRemoteOperationAsync("put", item.RemoteFilename, JsonConvert.SerializeObject(new { Size = item.Size, Hash = item.Hash }));
await m_stats.SendEventAsync(BackendActionType.Put, BackendEventType.Started, item.RemoteFilename, item.Size);

var begin = DateTime.Now;

if (m_backend is Library.Interface.IStreamingBackend && !m_options.DisableStreamingTransfers)
{
using (var fs = System.IO.File.OpenRead(item.LocalFilename))
using (var ts = new ThrottledStream(fs, m_options.MaxUploadPrSecond, m_options.MaxDownloadPrSecond))
using (var pgs = new Library.Utility.ProgressReportingStream(ts, pg => HandleProgress(ts, pg)))
((Library.Interface.IStreamingBackend)m_backend).Put(item.RemoteFilename, pgs);
}
else
m_backend.Put(item.RemoteFilename, item.LocalFilename);

var duration = DateTime.Now - begin;
Logging.Log.WriteProfilingMessage(LOGTAG, "UploadSpeed", "Uploaded {0} in {1}, {2}/s", Library.Utility.Utility.FormatSizeString(item.Size), duration, Library.Utility.Utility.FormatSizeString((long)(item.Size / duration.TotalSeconds)));

if (item.TrackedInDb)
await m_database.UpdateRemoteVolumeAsync(item.RemoteFilename, RemoteVolumeState.Uploaded, item.Size, item.Hash);

await m_stats.SendEventAsync(BackendActionType.Put, BackendEventType.Completed, item.RemoteFilename, item.Size);

if (m_options.ListVerifyUploads)
{
var f = m_backend.List().FirstOrDefault(n => n.Name.Equals(item.RemoteFilename, StringComparison.OrdinalIgnoreCase));
if (f == null)
throw new Exception(string.Format("List verify failed, file was not found after upload: {0}", item.RemoteFilename));
else if (f.Size != item.Size && f.Size >= 0)
throw new Exception(string.Format("List verify failed for file: {0}, size was {1} but expected to be {2}", f.Name, f.Size, item.Size));
}

item.DeleteLocalFile();
await m_database.CommitTransactionAsync("CommitAfterUpload");

return true;
}

private void HandleProgress(ThrottledStream ts, long pg)
{
if (!m_taskreader.TransferProgressAsync.WaitForTask().Result)
throw new OperationCanceledException();

// Update the throttle speeds if they have changed
string tmp;
m_options.RawOptions.TryGetValue("throttle-upload", out tmp);
if (tmp != m_lastThrottleUploadValue)
{
ts.WriteSpeed = m_options.MaxUploadPrSecond;
m_lastThrottleUploadValue = tmp;
}

m_options.RawOptions.TryGetValue("throttle-download", out tmp);
if (tmp != m_lastThrottleDownloadValue)
{
ts.ReadSpeed = m_options.MaxDownloadPrSecond;
m_lastThrottleDownloadValue = tmp;
}

m_stats.UpdateBackendProgress(pg);
}
}
}

Oops, something went wrong.
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.