Skip to content

Commit

Permalink
Support GCP Uploader (facebookresearch#274)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookresearch#274

Implementing Resumable Uploads for GCS as Multipart upload using a cpp library is currently not available using GCP.
Changed earlier implementation of using S3 AWS multipart to now using GCP's resumable upload

Differential Revision: D37804966

fbshipit-source-id: 3a9af0f03bb9ac38b859bd05d6e169158dbaaf0d
  • Loading branch information
achyutFB authored and facebook-github-bot committed Jul 19, 2022
1 parent 84fb595 commit 01b63b8
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 9 deletions.
26 changes: 17 additions & 9 deletions fbpcf/io/cloud_util/CloudFileUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,32 @@
*/

#include "fbpcf/io/cloud_util/CloudFileUtil.h"

#include <re2/re2.h>

#include "fbpcf/aws/S3Util.h"
#include "fbpcf/exception/PcfException.h"
#include "fbpcf/gcp/GCSUtil.h"
#include "fbpcf/io/cloud_util/GCSFileReader.h"
#include "fbpcf/io/cloud_util/GCSFileUploader.h"
#include "fbpcf/io/cloud_util/S3Client.h"
#include "fbpcf/io/cloud_util/S3FileReader.h"
#include "fbpcf/io/cloud_util/S3FileUploader.h"

namespace fbpcf::cloudio {

CloudFileType getCloudFileType(const std::string& filePath) {
// S3 file format:
// 1. https://bucket-name.s3.region.amazonaws.com/key-name
// 2. https://bucket-name.s3-region.amazonaws.com/key-name
// 3. s3://bucket-name/key-name
// GCS file format:
// 1. https://storage.cloud.google.com/bucket-name/key-name
// 2. https://bucket-name.storage.googleapis.com/key-name
// 3. https://storage.googleapis.com/bucket-name/key-name
// 4. gs://bucket-name/key-name
/*
* S3 file format:
* 1. https://bucket-name.s3.region.amazonaws.com/key-name
* 2. https://bucket-name.s3-region.amazonaws.com/key-name
* 3. s3://bucket-name/key-name
* GCS file format:
* 1. https://storage.cloud.google.com/bucket-name/key-name
* 2. https://bucket-name.storage.googleapis.com/key-name
* 3. https://storage.googleapis.com/bucket-name/key-name
* 4. gs://bucket-name/key-name
*/
static const re2::RE2 s3Regex1(
"https://[a-z0-9.-]+.s3.[a-z0-9-]+.amazonaws.com/.+");
static const re2::RE2 s3Regex2(
Expand Down Expand Up @@ -77,6 +82,9 @@ std::unique_ptr<IFileUploader> getCloudFileUploader(
fbpcf::aws::S3ClientOption{.region = ref.region})
.getS3Client(),
filePath);
} else if (fileType == CloudFileType::GCS) {
return std::make_unique<GCSFileUploader>(
fbpcf::gcp::createGCSClient(), filePath);
} else {
throw fbpcf::PcfException("Not supported yet.");
}
Expand Down
51 changes: 51 additions & 0 deletions fbpcf/io/cloud_util/GCSFileUploader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#include "fbpcf/io/cloud_util/GCSFileUploader.h"

#include <folly/logging/xlog.h>

#include "fbpcf/exception/GcpException.h"
#include "fbpcf/gcp/GCSUtil.h"
#include "gen/aab7ed39/third-party/google-cloud-cpp/google/cloud/storage/google_cloud_cpp_storage#header-mode-symlink-tree-with-header-map,headers/google/cloud/storage/upload_options.h"

namespace fbpcf::cloudio {
static const std::string FILE_TYPE = "text/csv";

void GCSFileUploader::init() {
XLOG(INFO) << "Start resumable upload. ";
const auto& ref = fbpcf::gcp::uriToObjectReference(filePath_);
std::string bucket_ = ref.bucket;
std::string object_ = ref.key;
XLOG(INFO) << "Bucket: " << bucket_ << ", Object Name: " << object_;
namespace gcs = ::google::cloud::storage;
using ::google::cloud::StatusOr;
stream_ =
gcsClient_->WriteObject(bucket_, object_, gcs::ContentType(FILE_TYPE));
}

int32_t GCSFileUploader::upload(std::vector<char>& buf) {
stream_.write(buf.data(), buf.size());

XLOG(INFO) << " Resumable upload chunk successful. ";
XLOG(INFO) << " Bytes written: " << buf.size();

return buf.size();
}

int GCSFileUploader::complete() {
stream_.Close();
namespace gcs = ::google::cloud::storage;
using ::google::cloud::StatusOr;
StatusOr<gcs::ObjectMetadata> metadata = std::move(stream_).metadata();
if (!metadata) {
XLOG(ERR) << "Failed to close. Message: " << metadata.status().message();
return 1;
}
return 0;
}
} // namespace fbpcf::cloudio
36 changes: 36 additions & 0 deletions fbpcf/io/cloud_util/GCSFileUploader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <memory>
#include <vector>

#include <google/cloud/storage/client.h>
#include "fbpcf/io/cloud_util/IFileUploader.h"

namespace fbpcf::cloudio {
class GCSFileUploader : public IFileUploader {
public:
explicit GCSFileUploader(
std::shared_ptr<google::cloud::storage::Client> gcsClient,
const std::string& filePath)
: gcsClient_{std::move(gcsClient)}, filePath_{filePath} {
init();
}
int upload(std::vector<char>& buf) override;
int complete() override;

private:
void init() override;

std::shared_ptr<google::cloud::storage::Client> gcsClient_;
const std::string filePath_;
google::cloud::storage::ObjectWriteStream stream_;
};

} // namespace fbpcf::cloudio

0 comments on commit 01b63b8

Please sign in to comment.