Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement parallel uploads to a single backend #3684

Merged
merged 23 commits into from Mar 19, 2019
Merged
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4f4529a
Move upload related code from BackendHandler to BackendUploader
seantempleton Feb 10, 2019
05c3f48
Move encrypting, hashing, and creating index volumes out of BackendUp…
seantempleton Feb 23, 2019
1029d99
Simplify BackendUploader and remove unnecessary code
seantempleton Feb 23, 2019
3de04da
Run each upload on a separate thread
seantempleton Feb 23, 2019
0c122ff
Change IBackend and IStreamingBackend to return Task for Put()
seantempleton Feb 23, 2019
662627a
Update AlternativeFtp Backend for async Put
seantempleton Feb 17, 2019
b1c6e97
Update Amazon Cloud Drive backend for async Put
seantempleton Feb 17, 2019
5db326b
Update Azure Blob and Backblaze B2 backends for async Put
seantempleton Feb 23, 2019
7843f83
Update Google Cloud and Drive backends for async Put
seantempleton Feb 18, 2019
7f09c7a
Update Box and Rackspace Cloud Files backends for async Put
seantempleton Feb 23, 2019
4ca067a
Update Dropbox and File backends for async Put
seantempleton Feb 23, 2019
9a42eca
Update FTP and Jottacloud backends for async Put
seantempleton Feb 23, 2019
c347282
Update HubiC and OpenStack backends for async Put
seantempleton Feb 18, 2019
13f83a2
Update Mega, OneDrive, and Rclone backends for async Put
seantempleton Feb 23, 2019
c05f304
Update S3, Sharepoint, Tahoe, and WEBDAV backends for async Put
seantempleton Feb 23, 2019
12b15c0
Add asynchronous-concurrent-upload-limit option
seantempleton Feb 27, 2019
ac08aa2
Dynamically throttle and track the progress of concurrent uploads
seantempleton Feb 27, 2019
61f511c
Properly catch exceptions from an upload and cancel all other uploads
seantempleton Mar 4, 2019
8bd4b82
Fix several Codacy issues
seantempleton Mar 8, 2019
cc3e94f
Fix code review comments and other minor changes
seantempleton Mar 8, 2019
19c1c90
Check upload throttle speed every 2 seconds
seantempleton Mar 9, 2019
b02b19e
Change Put method name to PutAsync
seantempleton Mar 17, 2019
1f388f4
Fix MegaBackend null reference exception
seantempleton Mar 18, 2019
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.
+48 −81
Diff settings

Always

Just for now

Simplify BackendUploader and remove unnecessary code

  • Loading branch information...
seantempleton committed Feb 23, 2019
commit 1029d99e102b01f447fd9b987d646e9b76f6b47d
@@ -14,18 +14,18 @@
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
using System;
using CoCoL;
using System.Threading.Tasks;
using Duplicati.Library.Interface;
using Duplicati.Library.Main.Operation.Common;
using Duplicati.Library.Main.Volumes;
using System.Collections.Generic;
using static Duplicati.Library.Main.Operation.Common.BackendHandler;
using Duplicati.Library.Interface;
using Newtonsoft.Json;
using Duplicati.Library.Utility;
using System.Linq;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using static Duplicati.Library.Main.Operation.Common.BackendHandler;

namespace Duplicati.Library.Main.Operation.Backup
{
@@ -41,7 +41,6 @@ public void SetFlushed(long size)
{
m_tcs.TrySetResult(size);
}

}

internal class IndexVolumeUploadRequest : IUploadRequest
@@ -88,21 +87,19 @@ internal class BackendUploader
{
private static readonly string LOGTAG = Logging.Log.LogTagFromType<BackendUploader>();

private Options m_options;
private IBackend m_backend;
private BackendHandler m_backendHandler;
private Common.ITaskReader m_taskreader;
private Options m_options;
private ITaskReader m_taskreader;
private StatsCollector m_stats;
private DatabaseCommon m_database;
private readonly BackupResults m_results;
private string m_lastThrottleUploadValue;
private string m_lastThrottleDownloadValue;

public BackendUploader(Common.BackendHandler backendHandler, IBackend backend, Options options, Common.DatabaseCommon database, BackupResults results, Common.ITaskReader taskreader, StatsCollector stats)
public BackendUploader(IBackend backend, Options options, DatabaseCommon database, BackupResults results, ITaskReader taskreader, StatsCollector stats)
{
this.m_options = options;
this.m_backend = backend;
this.m_backendHandler = backendHandler;
this.m_taskreader = taskreader;
this.m_stats = stats;
this.m_database = database;
@@ -118,80 +115,55 @@ public Task Run()

async self =>
{
var inProgress = new Queue<KeyValuePair<int, Task>>();
var inProgress = new List<Task>();
var max_pending = m_options.AsynchronousUploadLimit == 0 ? long.MaxValue : m_options.AsynchronousUploadLimit;
var noIndexFiles = m_options.IndexfilePolicy == Options.IndexFileStrategy.None;
var active = 0;
var lastSize = -1L;
while(!await self.Input.IsRetiredAsync && await m_taskreader.ProgressAsync)

while (!await self.Input.IsRetiredAsync && await m_taskreader.ProgressAsync)
{
try
{
var req = await self.Input.ReadAsync();

if (!await m_taskreader.ProgressAsync)
This conversation was marked as resolved by seantempleton

This comment has been minimized.

Copy link
@verhoek

verhoek Mar 8, 2019

Contributor

I am not a fan of the m_* member variable "convention" and would like to phase them out. I know the code is written mostly in such a way that it's not clear what local vars and member vars are -mainly due to large functions. It's quite possible I am the only one disliking the m_ style notation in duplicati team.

This comment has been minimized.

Copy link
@seantempleton

seantempleton Mar 9, 2019

Author Contributor

I dislike the m_* style as well. Being my first PR for the project I didn't want to step on any toes and kept with the current style. If there is an agreed upon standard I would be happy to switch to using that. I'm partial to m* or to no prefix at all myself.

This comment has been minimized.

Copy link
@warwickmm

warwickmm Mar 9, 2019

Contributor

If it were up to me, I would prefer to always use the this keyword when referring to instance members. At least for me, it is an immediate and clear indication that the variable refers to an instance member.

I have also tried to refrain from disturbing the local style too much, as in the end, consistency should perhaps be the ultimate goal.

This comment has been minimized.

Copy link
@seantempleton

seantempleton Mar 9, 2019

Author Contributor

warwickmm that's actually the other one I dislike. :) But I can get used to anything as long as it's consistent. If I really need to know the scope of a variable Visual Studio will show that in the Intellisense tooltip. I realize not everyone has that information available to them though. And I generally like to keep functions short enough that it is easy to tell anyway.

This comment has been minimized.

Copy link
@warwickmm

warwickmm Mar 9, 2019

Contributor

For me, moving the cursor and hovering is too much effort (especially on a laptop). But yes, as long as we're consistent I am fine with any style.

This comment has been minimized.

Copy link
@seantempleton

seantempleton Mar 9, 2019

Author Contributor

I looked through a lot of files and most of them are using the m_ style. I'll plan on leaving this as-is for now. I think having a coding standard should be a separate conversation where all interested parties can help make the final decision.
If the community agrees on a different standard before this PR is merged I can always update it then.

This comment has been minimized.

Copy link
@warwickmm

warwickmm Mar 9, 2019

Contributor

Sounds good. Thanks.

This comment has been minimized.

Copy link
@verhoek

verhoek Mar 10, 2019

Contributor

That's fine. I just noticed that you changed it to the m_* style actively, whence the remark. We should discuss coding styles in another place and you shouldn't change it here. I hope I can find some time soon to read the actual code of your implemented interesting feature.

continue;

var task = default(KeyValuePair<int, Task>);
if (req is VolumeUploadRequest)

if (req is VolumeUploadRequest volumeUpload)
{
lastSize = ((VolumeUploadRequest)req).BlockVolume.SourceSize;
if(((VolumeUploadRequest)req).IndexVolume == null)
task = new KeyValuePair<int, Task>(1, UploadFileAsync(((VolumeUploadRequest)req).BlockEntry, null));
lastSize = volumeUpload.BlockVolume.SourceSize;
if (volumeUpload.IndexVolume == null)
inProgress.Add(UploadFileAsync(volumeUpload.BlockEntry));
else
task = new KeyValuePair<int, Task>(2, UploadBlockAndIndexAsync((VolumeUploadRequest)req));
inProgress.Add(UploadBlockAndIndexAsync(volumeUpload));
}
else if (req is FilesetUploadRequest)
task = new KeyValuePair<int, Task>(1, UploadVolumeWriter(((FilesetUploadRequest)req).Fileset));
else if (req is IndexVolumeUploadRequest)
task = new KeyValuePair<int, Task>(1, UploadVolumeWriter(((IndexVolumeUploadRequest)req).IndexVolume));
else if (req is FlushRequest)
else if (req is FilesetUploadRequest filesetUpload)
inProgress.Add(UploadVolumeWriter(filesetUpload.Fileset));
else if (req is IndexVolumeUploadRequest indexUpload)
inProgress.Add(UploadVolumeWriter(indexUpload.IndexVolume));
else if (req is FlushRequest flush)
{
try
{
while(inProgress.Count > 0)
await inProgress.Dequeue().Value;
active = 0;
await Task.WhenAll(inProgress);
inProgress.Clear();
}
finally
{
((FlushRequest)req).SetFlushed(lastSize);
flush.SetFlushed(lastSize);
}
break;
}

if (task.Value != null)
{
inProgress.Enqueue(task);
active += task.Key;
}
}
catch(Exception ex)
catch (Exception ex)
{
if (!ex.IsRetiredException())
throw;
}

while(active >= max_pending)
while (inProgress.Count >= max_pending)
{
var top = inProgress.Dequeue();

// See if we are done
if (await Task.WhenAny(top.Value, Task.Delay(500)) != top.Value)
{
try
{
m_stats.SetBlocking(true);
await top.Value;
}
finally
{
m_stats.SetBlocking(false);
}
}

active -= top.Key;
var completedTask = await Task.WhenAny(inProgress);
inProgress.Remove(completedTask);
}
}

@@ -200,8 +172,7 @@ public Task Run()
try
{
m_stats.SetBlocking(true);
while (inProgress.Count > 0)
await inProgress.Dequeue().Value;
await Task.WhenAll(inProgress);
}
finally
{
@@ -214,10 +185,7 @@ private async Task UploadBlockAndIndexAsync(VolumeUploadRequest upload)
{
await UploadFileAsync(upload.BlockEntry).ConfigureAwait(false);
await UploadFileAsync(upload.IndexEntry).ConfigureAwait(false);

// Register that the index file is tracking the block file
var blockVolumeId = await m_database.GetRemoteVolumeIDAsync(upload.BlockVolume.RemoteFilename).ConfigureAwait(false);
await m_database.AddIndexBlockLinkAsync(upload.IndexVolume.VolumeID, blockVolumeId).ConfigureAwait(false);
await m_database.AddIndexBlockLinkAsync(upload.IndexVolume.VolumeID, upload.BlockVolume.VolumeID).ConfigureAwait(false);
}

private async Task UploadVolumeWriter(VolumeWriterBase volumeWriter)
@@ -230,23 +198,23 @@ private async Task UploadVolumeWriter(VolumeWriterBase volumeWriter)
await UploadFileAsync(fileEntry).ConfigureAwait(false);
}

private async Task<bool> UploadFileAsync(FileEntryItem item, Func<string, Task<IndexVolumeWriter>> createIndexFile = null)
private async Task UploadFileAsync(FileEntryItem item)
{
return await DoWithRetry(item, async () =>
await DoWithRetry(item, async () =>
{
if (item.IsRetry)
await RenameFileAfterErrorAsync(item).ConfigureAwait(false);

return await DoPut(item, false).ConfigureAwait(false);
await DoPut(item).ConfigureAwait(false);
}).ConfigureAwait(false);
}

private async Task<T> DoWithRetry<T>(FileEntryItem item, Func<Task<T>> method)
private async Task DoWithRetry(FileEntryItem item, Func<Task> method)
{
item.IsRetry = false;
Exception lastException = null;

if(!await m_taskreader.ProgressAsync)
if (!await m_taskreader.ProgressAsync)
throw new OperationCanceledException();

for (var i = 0; i < m_options.NumberOfRetries; i++)
@@ -259,7 +227,8 @@ private async Task<T> DoWithRetry<T>(FileEntryItem item, Func<Task<T>> method)

try
{
return await method().ConfigureAwait(false);
await method().ConfigureAwait(false);
return;
}
catch (Exception ex)
{
@@ -273,7 +242,7 @@ private async Task<T> DoWithRetry<T>(FileEntryItem item, Func<Task<T>> method)
await m_stats.SendEventAsync(item.Operation, i < m_options.NumberOfRetries ? BackendEventType.Retrying : BackendEventType.Failed, item.RemoteFilename, item.Size);

bool recovered = false;
if (ex is Duplicati.Library.Interface.FolderMissingException && m_options.AutocreateFolders)
if (m_options.AutocreateFolders && ex is FolderMissingException)
{
try
{
@@ -328,7 +297,7 @@ private async Task RenameFileAfterErrorAsync(FileEntryItem item)
item.RemoteFilename = newname;
}

private async Task<bool> DoPut(FileEntryItem item, bool updatedHash = false)
private async Task DoPut(FileEntryItem item)
{
if (item.TrackedInDb)
await m_database.UpdateRemoteVolumeAsync(item.RemoteFilename, RemoteVolumeState.Uploading, item.Size, item.Hash);
@@ -337,20 +306,20 @@ private async Task<bool> DoPut(FileEntryItem item, bool updatedHash = false)
{
Logging.Log.WriteDryrunMessage(LOGTAG, "WouldUploadVolume", "Would upload volume: {0}, size: {1}", item.RemoteFilename, Library.Utility.Utility.FormatSizeString(new FileInfo(item.LocalFilename).Length));
item.DeleteLocalFile();
return true;
return;
}

await m_database.LogRemoteOperationAsync("put", item.RemoteFilename, JsonConvert.SerializeObject(new { Size = item.Size, Hash = item.Hash }));
await m_stats.SendEventAsync(BackendActionType.Put, BackendEventType.Started, item.RemoteFilename, item.Size);

var begin = DateTime.Now;

if (m_backend is Library.Interface.IStreamingBackend && !m_options.DisableStreamingTransfers)
if (!m_options.DisableStreamingTransfers && m_backend is IStreamingBackend backend)
{
using (var fs = System.IO.File.OpenRead(item.LocalFilename))
using (var fs = File.OpenRead(item.LocalFilename))
using (var ts = new ThrottledStream(fs, m_options.MaxUploadPrSecond, m_options.MaxDownloadPrSecond))
using (var pgs = new Library.Utility.ProgressReportingStream(ts, pg => HandleProgress(ts, pg)))
((Library.Interface.IStreamingBackend)m_backend).Put(item.RemoteFilename, pgs);
using (var pgs = new ProgressReportingStream(ts, pg => HandleProgress(ts, pg)))
backend.Put(item.RemoteFilename, pgs);
}
else
m_backend.Put(item.RemoteFilename, item.LocalFilename);
@@ -374,8 +343,6 @@ private async Task<bool> DoPut(FileEntryItem item, bool updatedHash = false)

item.DeleteLocalFile();
await m_database.CommitTransactionAsync("CommitAfterUpload");

return true;
}

private void HandleProgress(ThrottledStream ts, long pg)
@@ -410,7 +410,7 @@ private async Task RunAsync(string[] sources, Library.Utility.IFilter filter)
long filesetid;
var counterToken = new CancellationTokenSource();
var backend = DynamicLoader.BackendLoader.GetBackend(m_backendurl, m_options.RawOptions);
var uploader = new Backup.BackendUploader(bk, backend, m_options, db, m_result, m_result.TaskReader, stats);
var uploader = new Backup.BackendUploader(backend, m_options, db, m_result, m_result.TaskReader, stats);
using (var snapshot = GetSnapshot(sources, m_options))
{
try
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.