Skip to content

Commit

Permalink
Convert Delegate API to asynchronous
Browse files Browse the repository at this point in the history
In previous CLs the API was made synchronous, and it appears to be a
mistake, when implementing the production delegate.
Changing the API to the async now.

Low-Coverage-Reason: file_upload_impl.cc not ready yet.
Bug: b:264399295
Change-Id: Ide646fad9bb1469d0a25440ad1e073790b637c5d
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/4283601
Reviewed-by: Vignesh Shenvi <vshenvi@google.com>
Commit-Queue: Leonid Baraz <lbaraz@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1108602}
  • Loading branch information
Leonid Baraz authored and Chromium LUCI CQ committed Feb 22, 2023
1 parent 01a87fa commit 56ed952
Show file tree
Hide file tree
Showing 7 changed files with 458 additions and 396 deletions.
31 changes: 19 additions & 12 deletions chrome/browser/policy/messaging_layer/upload/file_upload_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,28 @@
namespace reporting {
FileUploadDelegate::FileUploadDelegate() = default;

Status FileUploadDelegate::DoInitiate(base::StringPiece origin_path,
base::StringPiece upload_parameters,
int64_t* total,
std::string* session_token) {
return Status(error::UNIMPLEMENTED, "Not yet implemented");
void FileUploadDelegate::DoInitiate(
base::StringPiece origin_path,
base::StringPiece upload_parameters,
base::OnceCallback<void(
StatusOr<std::pair<int64_t /*total*/, std::string /*session_token*/>>)>
cb) {
std::move(cb).Run(Status(error::UNIMPLEMENTED, "Not yet implemented"));
}

Status FileUploadDelegate::DoNextStep(int64_t total,
int64_t* uploaded,
std::string* session_token) {
return Status(error::UNIMPLEMENTED, "Not yet implemented");
void FileUploadDelegate::DoNextStep(
int64_t total,
int64_t uploaded,
base::StringPiece session_token,
base::OnceCallback<void(StatusOr<std::pair<int64_t /*uploaded*/,
std::string /*session_token*/>>)>
cb) {
std::move(cb).Run(Status(error::UNIMPLEMENTED, "Not yet implemented"));
}

Status FileUploadDelegate::DoFinalize(base::StringPiece session_token,
std::string* access_parameters) {
return Status(error::UNIMPLEMENTED, "Not yet implemented");
void FileUploadDelegate::DoFinalize(
base::StringPiece session_token,
base::OnceCallback<void(StatusOr<std::string /*access_parameters*/>)> cb) {
std::move(cb).Run(Status(error::UNIMPLEMENTED, "Not yet implemented"));
}
} // namespace reporting
41 changes: 27 additions & 14 deletions chrome/browser/policy/messaging_layer/upload/file_upload_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,33 @@ class FileUploadDelegate : public FileUploadJob::Delegate {

private:
// Delegate implementation.
Status DoInitiate(base::StringPiece origin_path, // IN
base::StringPiece upload_parameters, // IN
int64_t* total, // OUT
std::string* session_token // OUT
) override;

Status DoNextStep(int64_t total, // IN
int64_t* uploaded, // INOUT
std::string* session_token // INOUT
) override;

Status DoFinalize(base::StringPiece session_token, // IN
std::string* access_parameters // OUT
) override;
// Asynchronously initializes upload.
// Calls back with `total` and `session_token` are set, or Status in case
// of error.
void DoInitiate(
base::StringPiece origin_path,
base::StringPiece upload_parameters,
base::OnceCallback<void(
StatusOr<std::pair<int64_t /*total*/,
std::string /*session_token*/>>)> cb) override;

// Asynchronously uploads the next chunk.
// Calls back with new `uploaded` and `session_token` (could be the same),
// or Status in case of an error.
void DoNextStep(
int64_t total,
int64_t uploaded,
base::StringPiece session_token,
base::OnceCallback<void(
StatusOr<std::pair<int64_t /*uploaded*/,
std::string /*session_token*/>>)> cb) override;

// Asynchronously finalizes upload (once `uploaded` reached `total`).
// Calls back with `access_parameters`, or Status in case of error.
void DoFinalize(
base::StringPiece session_token,
base::OnceCallback<void(StatusOr<std::string /*access_parameters*/>)> cb)
override;
};
} // namespace reporting

Expand Down
114 changes: 47 additions & 67 deletions chrome/browser/policy/messaging_layer/upload/file_upload_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include <memory>
#include <string>
#include <tuple>
#include <utility>

#include "base/functional/bind.h"
#include "base/functional/callback_forward.h"
Expand Down Expand Up @@ -152,34 +154,26 @@ void FileUploadJob::Initiate(base::OnceClosure done_cb) {
}
base::ThreadPool::PostTask(
FROM_HERE, {base::TaskPriority::BEST_EFFORT, base::MayBlock()},
base::BindOnce(
[](Delegate* delegate, base::StringPiece origin_path,
base::StringPiece upload_parameters,
base::OnceCallback<void(Status status, int64_t total,
std::string session_token)> result_cb) {
int64_t total = 0;
std::string session_token;
auto status = delegate->DoInitiate(origin_path, upload_parameters,
&total, &session_token);
std::move(result_cb).Run(status, total, std::move(session_token));
},
base::Unretained(delegate_), settings_.origin_path(),
settings_.upload_parameters(),
base::BindPostTaskToCurrentDefault(base::BindOnce(
&FileUploadJob::DoneInitiate, weak_ptr_factory_.GetWeakPtr(),
std::move(done)))));
base::BindOnce(&Delegate::DoInitiate, base::Unretained(delegate_),
settings_.origin_path(), settings_.upload_parameters(),
base::BindPostTaskToCurrentDefault(base::BindOnce(
&FileUploadJob::DoneInitiate,
weak_ptr_factory_.GetWeakPtr(), std::move(done)))));
}

void FileUploadJob::DoneInitiate(base::ScopedClosureRunner done,
Status status,
int64_t total,
std::string session_token) {
void FileUploadJob::DoneInitiate(
base::ScopedClosureRunner done,
StatusOr<std::pair<int64_t /*total*/, std::string /*session_token*/>>
result) {
DCHECK_CALLED_ON_VALID_SEQUENCE(job_sequence_checker_);
in_action_ = false;
if (!status.ok()) {
status.SaveTo(tracker_.mutable_status());
if (!result.ok()) {
result.status().SaveTo(tracker_.mutable_status());
return;
}
int64_t total = 0L;
base::StringPiece session_token;
std::tie(total, session_token) = result.ValueOrDie();
if (total <= 0L) {
Status{error::FAILED_PRECONDITION, "Empty upload"}.SaveTo(
tracker_.mutable_status());
Expand All @@ -192,7 +186,7 @@ void FileUploadJob::DoneInitiate(base::ScopedClosureRunner done,
}
tracker_.set_total(total);
tracker_.set_uploaded(0L);
tracker_.set_session_token(std::move(session_token));
tracker_.set_session_token(session_token.data(), session_token.size());
}

void FileUploadJob::NextStep(base::OnceClosure done_cb) {
Expand Down Expand Up @@ -228,37 +222,27 @@ void FileUploadJob::NextStep(base::OnceClosure done_cb) {
}
base::ThreadPool::PostTask(
FROM_HERE, {base::TaskPriority::BEST_EFFORT, base::MayBlock()},
base::BindOnce(
[](Delegate* delegate, int64_t total, int64_t uploaded,
base::StringPiece session_token,
base::OnceCallback<void(Status status, int64_t uploaded,
std::string session_token)> result_cb) {
int64_t uploaded_to_update = uploaded;
std::string session_token_to_optionally_update(session_token);
const auto status =
delegate->DoNextStep(total, &uploaded_to_update,
&session_token_to_optionally_update);
std::move(result_cb).Run(
status, uploaded_to_update,
std::move(session_token_to_optionally_update));
},
base::Unretained(delegate_), tracker_.total(), tracker_.uploaded(),
tracker_.session_token(),
base::BindPostTaskToCurrentDefault(base::BindOnce(
&FileUploadJob::DoneNextStep, weak_ptr_factory_.GetWeakPtr(),
std::move(done)))));
base::BindOnce(&Delegate::DoNextStep, base::Unretained(delegate_),
tracker_.total(), tracker_.uploaded(),
tracker_.session_token(),
base::BindPostTaskToCurrentDefault(base::BindOnce(
&FileUploadJob::DoneNextStep,
weak_ptr_factory_.GetWeakPtr(), std::move(done)))));
}

void FileUploadJob::DoneNextStep(base::ScopedClosureRunner done,
Status status,
int64_t uploaded,
std::string session_token) {
void FileUploadJob::DoneNextStep(
base::ScopedClosureRunner done,
StatusOr<std::pair<int64_t /*uploaded*/, std::string /*session_token*/>>
result) {
DCHECK_CALLED_ON_VALID_SEQUENCE(job_sequence_checker_);
in_action_ = false;
if (!status.ok()) {
status.SaveTo(tracker_.mutable_status());
if (!result.ok()) {
result.status().SaveTo(tracker_.mutable_status());
return;
}
int64_t uploaded = 0L;
base::StringPiece session_token;
std::tie(uploaded, session_token) = result.ValueOrDie();
if (session_token.empty()) {
Status{error::DATA_LOSS, "Job has lost session_token"}.SaveTo(
tracker_.mutable_status());
Expand All @@ -273,7 +257,7 @@ void FileUploadJob::DoneNextStep(base::ScopedClosureRunner done,
return;
}
tracker_.set_uploaded(uploaded);
tracker_.set_session_token(std::move(session_token));
tracker_.set_session_token(session_token.data(), session_token.size());
}

void FileUploadJob::Finalize(base::OnceClosure done_cb) {
Expand Down Expand Up @@ -304,38 +288,34 @@ void FileUploadJob::Finalize(base::OnceClosure done_cb) {
if (timer_.IsRunning()) {
timer_.Reset();
}

base::ThreadPool::PostTask(
FROM_HERE, {base::TaskPriority::BEST_EFFORT, base::MayBlock()},
base::BindOnce(
[](Delegate* delegate, base::StringPiece session_token,
base::OnceCallback<void(Status, std::string)> result_cb) {
std::string access_parameters;
const auto status =
delegate->DoFinalize(session_token, &access_parameters);
std::move(result_cb).Run(status, std::move(access_parameters));
},
base::Unretained(delegate_), tracker_.session_token(),
base::BindPostTaskToCurrentDefault(base::BindOnce(
&FileUploadJob::DoneFinalize, weak_ptr_factory_.GetWeakPtr(),
std::move(done)))));
base::BindOnce(&Delegate::DoFinalize, base::Unretained(delegate_),
tracker_.session_token(),
base::BindPostTaskToCurrentDefault(base::BindOnce(
&FileUploadJob::DoneFinalize,
weak_ptr_factory_.GetWeakPtr(), std::move(done)))));
}

void FileUploadJob::DoneFinalize(base::ScopedClosureRunner done,
Status status,
std::string access_parameters) {
void FileUploadJob::DoneFinalize(
base::ScopedClosureRunner done,
StatusOr<std::string /*access_parameters*/> result) {
DCHECK_CALLED_ON_VALID_SEQUENCE(job_sequence_checker_);
in_action_ = false;
if (!status.ok()) {
status.SaveTo(tracker_.mutable_status());
if (!result.ok()) {
result.status().SaveTo(tracker_.mutable_status());
return;
}
base::StringPiece access_parameters = result.ValueOrDie();
if (access_parameters.empty()) {
Status{error::FAILED_PRECONDITION, "Access parameters not set"}.SaveTo(
tracker_.mutable_status());
return;
}
tracker_.clear_session_token();
tracker_.set_access_parameters(std::move(access_parameters));
tracker_.set_access_parameters(access_parameters.data(),
access_parameters.size());
}

const UploadSettings& FileUploadJob::settings() const {
Expand Down
71 changes: 38 additions & 33 deletions chrome/browser/policy/messaging_layer/upload/file_upload_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define CHROME_BROWSER_POLICY_MESSAGING_LAYER_UPLOAD_FILE_UPLOAD_JOB_H_

#include <string>
#include <utility>

#include "base/containers/flat_map.h"
#include "base/functional/callback_forward.h"
Expand Down Expand Up @@ -43,28 +44,33 @@ class FileUploadJob {
public:
virtual ~Delegate() = default;

// Initializes upload.
// Populates `total` and `session_token`, sets `uploaded` to 0.
virtual Status DoInitiate(base::StringPiece origin_path, // IN
base::StringPiece upload_parameters, // IN
int64_t* total, // OUT
std::string* session_token // OUT
) = 0;

// Performs upload of the next chunk.
// Updates `uploaded` and optionally `session_token`.
// Returns status in case of an error.
virtual Status DoNextStep(int64_t total, // IN
int64_t* uploaded, // INOUT
std::string* session_token // INOUT
) = 0;

// Finalizes upload (once uploaded reached total).
// Populates `access_parameters`.
// Returns status in case of an error.
virtual Status DoFinalize(base::StringPiece session_token, // IN
std::string* access_parameters // OUT
) = 0;
// Asynchronously initializes upload.
// Calls back with `total` and `session_token` are set, or Status in case
// of error.
virtual void DoInitiate(
base::StringPiece origin_path,
base::StringPiece upload_parameters,
base::OnceCallback<
void(StatusOr<std::pair<int64_t /*total*/,
std::string /*session_token*/>>)> cb) = 0;

// Asynchronously uploads the next chunk.
// Calls back with new `uploaded` and `session_token` (could be the same),
// or Status in case of an error.
virtual void DoNextStep(
int64_t total,
int64_t uploaded,
base::StringPiece session_token,
base::OnceCallback<
void(StatusOr<std::pair<int64_t /*uploaded*/,
std::string /*session_token*/>>)> cb) = 0;

// Asynchronously finalizes upload (once `uploaded` reached `total`).
// Calls back with `access_parameters`, or Status in case of error.
virtual void DoFinalize(
base::StringPiece session_token,
base::OnceCallback<void(StatusOr<std::string /*access_parameters*/>)>
cb) = 0;

protected:
Delegate() = default;
Expand Down Expand Up @@ -154,19 +160,18 @@ class FileUploadJob {

private:
// The next three methods complement `Initiate`, `NextStep` and `Finalize` -
// they are called after delegate calls are executed on a thread pool, and
// they are invoked after delegate calls are executed on a thread pool, and
// resume execution on the Job's default task runner.
void DoneInitiate(base::ScopedClosureRunner done,
Status status,
int64_t total,
std::string session_token);
void DoneNextStep(base::ScopedClosureRunner done,
Status status,
int64_t uploaded,
std::string session_token);
void DoneInitiate(
base::ScopedClosureRunner done,
StatusOr<std::pair<int64_t /*total*/, std::string /*session_token*/>>
result);
void DoneNextStep(
base::ScopedClosureRunner done,
StatusOr<std::pair<int64_t /*uploaded*/, std::string /*session_token*/>>
result);
void DoneFinalize(base::ScopedClosureRunner done,
Status status,
std::string access_parameters);
StatusOr<std::string /*access_parameters*/> result);

// Unowned delegate that performs actual actions.
// It must outlive the job (the same delegate may be used by multiple jobs).
Expand Down

0 comments on commit 56ed952

Please sign in to comment.