Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support GCP Uploader #274

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions fbpcf/io/cloud_util/CloudFileUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,32 @@

#include <string>

#include <google/cloud/storage/client.h>
#include <re2/re2.h>

#include "fbpcf/aws/S3Util.h"
#include "fbpcf/exception/PcfException.h"
#include "fbpcf/gcp/GCSUtil.h"
#include "fbpcf/io/cloud_util/GCSFileReader.h"
#include "fbpcf/io/cloud_util/GCSFileUploader.h"
#include "fbpcf/io/cloud_util/S3Client.h"
#include "fbpcf/io/cloud_util/S3FileReader.h"
#include "fbpcf/io/cloud_util/S3FileUploader.h"

namespace fbpcf::cloudio {

CloudFileType getCloudFileType(const std::string& filePath) {
// S3 file format:
// 1. https://bucket-name.s3.region.amazonaws.com/key-name
// 2. https://bucket-name.s3-region.amazonaws.com/key-name
// 3. s3://bucket-name/key-name
// GCS file format:
// 1. https://storage.cloud.google.com/bucket-name/key-name
// 2. https://bucket-name.storage.googleapis.com/key-name
// 3. https://storage.googleapis.com/bucket-name/key-name
// 4. gs://bucket-name/key-name
/*
* S3 file format:
* 1. https://bucket-name.s3.region.amazonaws.com/key-name
* 2. https://bucket-name.s3-region.amazonaws.com/key-name
* 3. s3://bucket-name/key-name
* GCS file format:
* 1. https://storage.cloud.google.com/bucket-name/key-name
* 2. https://bucket-name.storage.googleapis.com/key-name
* 3. https://storage.googleapis.com/bucket-name/key-name
* 4. gs://bucket-name/key-name
*/
static const re2::RE2 s3Regex1(
"https://[a-z0-9.-]+.s3.[a-z0-9-]+.amazonaws.com/.+");
static const re2::RE2 s3Regex2(
Expand Down Expand Up @@ -81,6 +85,9 @@ std::unique_ptr<IFileUploader> getCloudFileUploader(
fbpcf::aws::S3ClientOption{.region = ref.region})
.getS3Client(),
filePath);
} else if (fileType == CloudFileType::GCS) {
return std::make_unique<GCSFileUploader>(
fbpcf::gcp::createGCSClient(), filePath);
} else {
throw fbpcf::PcfException("Not supported yet.");
}
Expand Down
71 changes: 71 additions & 0 deletions fbpcf/io/cloud_util/GCSFileUploader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#include "fbpcf/io/cloud_util/GCSFileUploader.h"

#include <folly/logging/xlog.h>

#include "fbpcf/exception/GcpException.h"
#include "fbpcf/gcp/GCSUtil.h"

namespace fbpcf::cloudio {
namespace gcs = ::google::cloud::storage;
using ::google::cloud::StatusOr;
static const std::string FILE_TYPE = "text/csv";

void GCSFileUploader::init() {
XLOG(INFO) << "Starting resumable upload. ";
const auto& ref = fbpcf::gcp::uriToObjectReference(filePath_);
std::string bucket_ = ref.bucket;
std::string object_ = ref.key;
XLOG(INFO) << "Bucket: " << bucket_ << ", Key: " << object_;

stream_ =
gcsClient_->WriteObject(bucket_, object_, gcs::ContentType(FILE_TYPE));
sessionId_ = stream_.resumable_session_id();
}

int32_t GCSFileUploader::upload(std::vector<char>& buf) {
stream_.write(buf.data(), buf.size());
auto status = stream_.last_status();
if (!status.ok()) {
XLOG(ERR) << "Upload failed. Part number: " << partNumber_
<< ". Aborting...";
abortUpload(sessionId_);
return 0;
} else {
XLOG(INFO) << "Upload succeeded. Part number: " << partNumber_;
XLOG(INFO) << "Bytes written: " << buf.size();
partNumber_++;
return buf.size();
}
}

int GCSFileUploader::complete() {
stream_.Close();
StatusOr<gcs::ObjectMetadata> metadata = std::move(stream_).metadata();
if (!metadata) {
XLOG(ERR) << "Failed to close file " << filePath_;
XLOG(ERR) << "Status Message: " << metadata.status().message();
abortUpload(sessionId_);
return 1;
} else {
XLOG(INFO) << "File " << filePath_ << " uploaded successfully.";
return 0;
}
}

google::cloud::Status GCSFileUploader::abortUpload(std::string session_id) {
google::cloud::Status status = gcsClient_->DeleteResumableUpload(session_id);
if (status.ok()) {
XLOG(INFO) << "Aborted upload successfully. ";
} else {
XLOG(ERR) << "Abort upload failed. Message: " << status.message();
}
return status;
}
} // namespace fbpcf::cloudio
39 changes: 39 additions & 0 deletions fbpcf/io/cloud_util/GCSFileUploader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <memory>
#include <vector>

#include <google/cloud/storage/client.h>
#include "fbpcf/io/cloud_util/IFileUploader.h"

namespace fbpcf::cloudio {
class GCSFileUploader : public IFileUploader {
public:
explicit GCSFileUploader(
std::shared_ptr<google::cloud::storage::Client> gcsClient,
const std::string& filePath)
: gcsClient_{std::move(gcsClient)}, filePath_{filePath} {
init();
}
int upload(std::vector<char>& buf) override;
int complete() override;

private:
void init() override;
google::cloud::Status abortUpload(std::string session_id);
std::shared_ptr<google::cloud::storage::Client> gcsClient_;
const std::string filePath_;
std::size_t partNumber_ = 1;
std::string sessionId_;

google::cloud::storage::ObjectWriteStream stream_;
};

} // namespace fbpcf::cloudio