Skip to content

Commit

Permalink
Support GCP Uploader (facebookresearch#274)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookresearch#274

Implementing Resumable Uploads for GCS as Multipart upload using a cpp library is currently not available using GCP.
Changed earlier implementation of using S3 AWS multipart to now using GCP's resumable upload

Differential Revision: D37804966

fbshipit-source-id: 23122dc6bb4845ef8e25e3d09ffadab88e12840a
  • Loading branch information
achyutFB authored and facebook-github-bot committed Jul 19, 2022
1 parent f5b3011 commit 0d0fda9
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 9 deletions.
26 changes: 17 additions & 9 deletions fbpcf/io/cloud_util/CloudFileUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,32 @@
*/

#include "fbpcf/io/cloud_util/CloudFileUtil.h"

#include <re2/re2.h>

#include "fbpcf/aws/S3Util.h"
#include "fbpcf/exception/PcfException.h"
#include "fbpcf/gcp/GCSUtil.h"
#include "fbpcf/io/cloud_util/GCSFileReader.h"
#include "fbpcf/io/cloud_util/GCSFileUploader.h"
#include "fbpcf/io/cloud_util/S3Client.h"
#include "fbpcf/io/cloud_util/S3FileReader.h"
#include "fbpcf/io/cloud_util/S3FileUploader.h"

namespace fbpcf::cloudio {

CloudFileType getCloudFileType(const std::string& filePath) {
// S3 file format:
// 1. https://bucket-name.s3.region.amazonaws.com/key-name
// 2. https://bucket-name.s3-region.amazonaws.com/key-name
// 3. s3://bucket-name/key-name
// GCS file format:
// 1. https://storage.cloud.google.com/bucket-name/key-name
// 2. https://bucket-name.storage.googleapis.com/key-name
// 3. https://storage.googleapis.com/bucket-name/key-name
// 4. gs://bucket-name/key-name
/*
* S3 file format:
* 1. https://bucket-name.s3.region.amazonaws.com/key-name
* 2. https://bucket-name.s3-region.amazonaws.com/key-name
* 3. s3://bucket-name/key-name
* GCS file format:
* 1. https://storage.cloud.google.com/bucket-name/key-name
* 2. https://bucket-name.storage.googleapis.com/key-name
* 3. https://storage.googleapis.com/bucket-name/key-name
* 4. gs://bucket-name/key-name
*/
static const re2::RE2 s3Regex1(
"https://[a-z0-9.-]+.s3.[a-z0-9-]+.amazonaws.com/.+");
static const re2::RE2 s3Regex2(
Expand Down Expand Up @@ -77,6 +82,9 @@ std::unique_ptr<IFileUploader> getCloudFileUploader(
fbpcf::aws::S3ClientOption{.region = ref.region})
.getS3Client(),
filePath);
} else if (fileType == CloudFileType::GCS) {
return std::make_unique<GCSFileUploader>(
fbpcf::gcp::createGCSClient(), filePath);
} else {
throw fbpcf::PcfException("Not supported yet.");
}
Expand Down
71 changes: 71 additions & 0 deletions fbpcf/io/cloud_util/GCSFileUploader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#include "fbpcf/io/cloud_util/GCSFileUploader.h"

#include <folly/logging/xlog.h>

#include "fbpcf/exception/GcpException.h"
#include "fbpcf/gcp/GCSUtil.h"

namespace fbpcf::cloudio {
namespace gcs = ::google::cloud::storage;
using ::google::cloud::StatusOr;
static const std::string FILE_TYPE = "text/csv";

void GCSFileUploader::init() {
XLOG(INFO) << "Starting resumable upload. ";
const auto& ref = fbpcf::gcp::uriToObjectReference(filePath_);
std::string bucket_ = ref.bucket;
std::string object_ = ref.key;
XLOG(INFO) << "Bucket: " << bucket_ << ", Key: " << object_;

stream_ =
gcsClient_->WriteObject(bucket_, object_, gcs::ContentType(FILE_TYPE));
sessionId_ = stream_.resumable_session_id();
}

int32_t GCSFileUploader::upload(std::vector<char>& buf) {
stream_.write(buf.data(), buf.size());
auto status = stream_.last_status();
if (!status.ok()) {
XLOG(ERR) << "Upload failed. Part number: " << partNumber_
<< ". Aborting...";
abortUpload(sessionId_);
return 0;
} else {
XLOG(INFO) << "Upload succeeded. Part number: " << partNumber_;
XLOG(INFO) << "Bytes written: " << buf.size();
partNumber_++;
return buf.size();
}
}

int GCSFileUploader::complete() {
stream_.Close();
StatusOr<gcs::ObjectMetadata> metadata = std::move(stream_).metadata();
if (!metadata) {
XLOG(ERR) << "Failed to close file " << filePath_;
XLOG(ERR) << "Status Message: " << metadata.status().message();
abortUpload(sessionId_);
return 1;
} else {
XLOG(INFO) << "File " << filePath_ << " uploaded successfully.";
return 0;
}
}

google::cloud::Status GCSFileUploader::abortUpload(std::string session_id) {
google::cloud::Status status = gcsClient_->DeleteResumableUpload(session_id);
if (status.ok()) {
XLOG(INFO) << "Aborted upload successfully. ";
} else {
XLOG(ERR) << "Abort upload failed. Message: " << status.message();
}
return status;
}
} // namespace fbpcf::cloudio
39 changes: 39 additions & 0 deletions fbpcf/io/cloud_util/GCSFileUploader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <memory>
#include <vector>

#include <google/cloud/storage/client.h>
#include "fbpcf/io/cloud_util/IFileUploader.h"

namespace fbpcf::cloudio {
class GCSFileUploader : public IFileUploader {
public:
explicit GCSFileUploader(
std::shared_ptr<google::cloud::storage::Client> gcsClient,
const std::string& filePath)
: gcsClient_{std::move(gcsClient)}, filePath_{filePath} {
init();
}
int upload(std::vector<char>& buf) override;
int complete() override;

private:
void init() override;
google::cloud::Status abortUpload(std::string session_id);
std::shared_ptr<google::cloud::storage::Client> gcsClient_;
const std::string filePath_;
std::size_t partNumber_ = 1;
std::string sessionId_;

google::cloud::storage::ObjectWriteStream stream_;
};

} // namespace fbpcf::cloudio

0 comments on commit 0d0fda9

Please sign in to comment.