Skip to content

Commit

Permalink
Support GCP Uploader (#274)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #274

Implementing Resumable Uploads for GCS as Multipart upload using a cpp library is currently not available using GCP.
Changed earlier implementation of using S3 AWS multipart to now using GCP's resumable upload

Differential Revision: D37804966

fbshipit-source-id: 294c43893ee4bfdebac02b0c23fefaa8a98f37aa
  • Loading branch information
achyutFB authored and facebook-github-bot committed Jul 13, 2022
1 parent bdd4ca7 commit 4074ffe
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 3 deletions.
13 changes: 10 additions & 3 deletions fbpcf/io/cloud_util/CloudFileUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
*/

#include "fbpcf/io/cloud_util/CloudFileUtil.h"
#include <aws/s3/S3Client.h>
#include <re2/re2.h>
#include "fbpcf/aws/S3Util.h"
#include "fbpcf/exception/PcfException.h"
#include "fbpcf/gcp/GCSUtil.h"
#include "fbpcf/io/cloud_util/GCSFileReader.h"
#include "fbpcf/io/cloud_util/GCSFileUploader.h"
#include "fbpcf/io/cloud_util/S3Client.h"
#include "fbpcf/io/cloud_util/S3FileReader.h"
#include "fbpcf/io/cloud_util/S3FileUploader.h"
Expand All @@ -19,12 +21,14 @@ namespace fbpcf::cloudio {

CloudFileType getCloudFileType(const std::string& filePath) {
// S3 file format:
// 1. https://bucket-name.s3.Region.amazonaws.com/key-name
// 2. https://bucket-name.s3-Region.amazonaws.com/key-name
// 1. https://bucket-name.s3.region.amazonaws.com/key-name
// 2. https://bucket-name.s3-region.amazonaws.com/key-name
// 3. s3://bucket-name/key-name
// GCS file format:
// 1. https://storage.cloud.google.com/bucket-name/key-name
// 2. gs://bucket-name/key-name
// 2. https://bucket-name.storage.googleapis.com/key-name
// 3. https://storage.googleapis.com/bucket-name/key-name
// 4. gs://bucket-name/key-name
static const re2::RE2 s3Regex1(
"https://[a-z0-9.-]+.s3.[a-z0-9-]+.amazonaws.com/.+");
static const re2::RE2 s3Regex2(
Expand Down Expand Up @@ -75,6 +79,9 @@ std::unique_ptr<IFileUploader> getCloudFileUploader(
fbpcf::aws::S3ClientOption{.region = ref.region})
.getS3Client(),
filePath);
} else if (fileType == CloudFileType::GCS) {
return std::make_unique<GCSFileUploader>(
fbpcf::gcp::createGCSClient(), filePath);
} else {
throw fbpcf::PcfException("Not supported yet.");
}
Expand Down
44 changes: 44 additions & 0 deletions fbpcf/io/cloud_util/GCSFileUploader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#include <fbpcf/exception/GcpException.h>
#include <fbpcf/gcp/GCSUtil.h>
#include <fbpcf/io/cloud_util/GCSFileUploader.h>
#include <folly/logging/xlog.h>

namespace fbpcf::cloudio {
static const std::string FILE_TYPE = "text/csv";

void GCSFileUploader::init() {}

int32_t GCSFileUploader::upload(std::vector<char>& buf) {
XLOG(INFO) << "Start resumable upload. ";
const auto& ref = fbpcf::gcp::uriToObjectReference(filePath_);
std::string bucket_ = ref.bucket;
std::string object_ = ref.key;

namespace gcs = ::google::cloud::storage;
using ::google::cloud::StatusOr;
std::string str(buf.begin(), buf.end());

StatusOr<gcs::ObjectMetadata> object_metadata = gcsClient_->InsertObject(
bucket_, object_, str, gcs::ContentType(FILE_TYPE));

if (!object_metadata) {
throw GcpException{
"Resumable upload failed: " + object_metadata.status().message()};
return 0;
}
XLOG(INFO) << " Resumable upload successful ";
XLOG(INFO) << "Bucket: " << bucket_ << ", Object Name: " << object_;
return str.size();
}

int GCSFileUploader::complete() {
return 0;
}
} // namespace fbpcf::cloudio
34 changes: 34 additions & 0 deletions fbpcf/io/cloud_util/GCSFileUploader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <fbpcf/io/cloud_util/IFileUploader.h>
#include <google/cloud/storage/client.h>
#include <memory>
#include <vector>

namespace fbpcf::cloudio {
class GCSFileUploader : public IFileUploader {
public:
explicit GCSFileUploader(
std::shared_ptr<google::cloud::storage::Client> gcsClient,
const std::string& filePath)
: gcsClient_{std::move(gcsClient)}, filePath_{filePath} {
init();
}
int upload(std::vector<char>& buf) override;
int complete() override;

private:
void init() override;

std::shared_ptr<google::cloud::storage::Client> gcsClient_;
const std::string filePath_;
};

} // namespace fbpcf::cloudio

0 comments on commit 4074ffe

Please sign in to comment.