diff --git a/fbpcf/io/cloud_util/CloudFileUtil.cpp b/fbpcf/io/cloud_util/CloudFileUtil.cpp index ebe58daa..8097c751 100644 --- a/fbpcf/io/cloud_util/CloudFileUtil.cpp +++ b/fbpcf/io/cloud_util/CloudFileUtil.cpp @@ -6,11 +6,13 @@ */ #include "fbpcf/io/cloud_util/CloudFileUtil.h" +#include #include #include "fbpcf/aws/S3Util.h" #include "fbpcf/exception/PcfException.h" #include "fbpcf/gcp/GCSUtil.h" #include "fbpcf/io/cloud_util/GCSFileReader.h" +#include "fbpcf/io/cloud_util/GCSFileUploader.h" #include "fbpcf/io/cloud_util/S3Client.h" #include "fbpcf/io/cloud_util/S3FileReader.h" #include "fbpcf/io/cloud_util/S3FileUploader.h" @@ -18,15 +20,17 @@ namespace fbpcf::cloudio { CloudFileType getCloudFileType(const std::string& filePath) { - // S3 file format: - // 1. https://bucket-name.s3.region.amazonaws.com/key-name - // 2. https://bucket-name.s3-region.amazonaws.com/key-name - // 3. s3://bucket-name/key-name - // GCS file format: - // 1. https://storage.cloud.google.com/bucket-name/key-name - // 2. https://bucket-name.storage.googleapis.com/key-name - // 3. https://storage.googleapis.com/bucket-name/key-name - // 4. gs://bucket-name/key-name + /* + * S3 file format: + * 1. https://bucket-name.s3.region.amazonaws.com/key-name + * 2. https://bucket-name.s3-region.amazonaws.com/key-name + * 3. s3://bucket-name/key-name + * GCS file format: + * 1. https://storage.cloud.google.com/bucket-name/key-name + * 2. https://bucket-name.storage.googleapis.com/key-name + * 3. https://storage.googleapis.com/bucket-name/key-name + * 4. gs://bucket-name/key-name + */ static const re2::RE2 s3Regex1( "https://[a-z0-9.-]+.s3.[a-z0-9-]+.amazonaws.com/.+"); static const re2::RE2 s3Regex2( @@ -77,6 +81,9 @@ std::unique_ptr getCloudFileUploader( fbpcf::aws::S3ClientOption{.region = ref.region}) .getS3Client(), filePath); + } else if (fileType == CloudFileType::GCS) { + return std::make_unique( + fbpcf::gcp::createGCSClient(), filePath); } else { throw fbpcf::PcfException("Not supported yet."); } diff --git a/fbpcf/io/cloud_util/GCSFileUploader.cpp b/fbpcf/io/cloud_util/GCSFileUploader.cpp new file mode 100644 index 00000000..cb1ab287 --- /dev/null +++ b/fbpcf/io/cloud_util/GCSFileUploader.cpp @@ -0,0 +1,45 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +#include "fbpcf/exception/GcpException.h" +#include "fbpcf/gcp/GCSUtil.h" +#include "fbpcf/io/cloud_util/GCSFileUploader.h" + +namespace fbpcf::cloudio { +static const std::string FILE_TYPE = "text/csv"; + +void GCSFileUploader::init() {} + +int32_t GCSFileUploader::upload(std::vector& buf) { + XLOG(INFO) << "Start resumable upload. "; + const auto& ref = fbpcf::gcp::uriToObjectReference(filePath_); + std::string bucket_ = ref.bucket; + std::string object_ = ref.key; + + namespace gcs = ::google::cloud::storage; + using ::google::cloud::StatusOr; + std::string str(buf.begin(), buf.end()); + + StatusOr object_metadata = gcsClient_->InsertObject( + bucket_, object_, str, gcs::ContentType(FILE_TYPE)); + + if (!object_metadata) { + throw GcpException{ + "Resumable upload failed: " + object_metadata.status().message()}; + return 0; + } + XLOG(INFO) << " Resumable upload successful "; + XLOG(INFO) << "Bucket: " << bucket_ << ", Object Name: " << object_; + return str.size(); +} + +int GCSFileUploader::complete() { + return 0; +} +} // namespace fbpcf::cloudio diff --git a/fbpcf/io/cloud_util/GCSFileUploader.h b/fbpcf/io/cloud_util/GCSFileUploader.h new file mode 100644 index 00000000..c099ec00 --- /dev/null +++ b/fbpcf/io/cloud_util/GCSFileUploader.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include + +#include +#include "fbpcf/io/cloud_util/IFileUploader.h" + +namespace fbpcf::cloudio { +class GCSFileUploader : public IFileUploader { + public: + explicit GCSFileUploader( + std::shared_ptr gcsClient, + const std::string& filePath) + : gcsClient_{std::move(gcsClient)}, filePath_{filePath} { + init(); + } + int upload(std::vector& buf) override; + int complete() override; + + private: + void init() override; + + std::shared_ptr gcsClient_; + const std::string filePath_; +}; + +} // namespace fbpcf::cloudio