Skip to content

Commit

Permalink
feat: support gcs storage
Browse files Browse the repository at this point in the history
  • Loading branch information
NiwakaDev committed Jun 15, 2023
1 parent f08f726 commit 104bd6d
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 9 deletions.
6 changes: 5 additions & 1 deletion .env.example
Expand Up @@ -14,4 +14,8 @@ GT_AZBLOB_CONTAINER=AZBLOB container
GT_AZBLOB_ACCOUNT_NAME=AZBLOB account name
GT_AZBLOB_ACCOUNT_KEY=AZBLOB account key
GT_AZBLOB_ENDPOINT=AZBLOB endpoint

# Settings for gcs test
GT_GCS_BUCKET = GCS bucket
GT_GCS_SCOPE = GCS scope
GT_GCS_CREDENTIAL_PATH = GCS credential path
GT_GCS_ENDPOINT = GCS end point
1 change: 1 addition & 0 deletions src/cmd/src/datanode.rs
Expand Up @@ -281,6 +281,7 @@ mod tests {
ObjectStoreConfig::S3 { .. } => unreachable!(),
ObjectStoreConfig::Oss { .. } => unreachable!(),
ObjectStoreConfig::Azblob { .. } => unreachable!(),
ObjectStoreConfig::Gcs { .. } => unreachable!(),
};

assert_eq!(
Expand Down
28 changes: 28 additions & 0 deletions src/datanode/src/datanode.rs
Expand Up @@ -48,6 +48,7 @@ pub enum ObjectStoreConfig {
S3(S3Config),
Oss(OssConfig),
Azblob(AzblobConfig),
Gcs(GcsConfig),
}

/// Storage engine config
Expand Down Expand Up @@ -118,6 +119,19 @@ pub struct AzblobConfig {
pub cache_capacity: Option<ReadableSize>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct GcsConfig {
pub root: String,
pub bucket: String,
pub scope: String,
#[serde(skip_serializing)]
pub credential_path: SecretString,
pub endpoint: String,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
}

impl Default for S3Config {
fn default() -> Self {
Self {
Expand Down Expand Up @@ -162,6 +176,20 @@ impl Default for AzblobConfig {
}
}

impl Default for GcsConfig {
fn default() -> Self {
Self {
root: String::default(),
bucket: String::default(),
scope: String::default(),
credential_path: SecretString::from(String::default()),
endpoint: String::default(),
cache_path: Option::default(),
cache_capacity: Option::default(),
}
}
}

impl Default for ObjectStoreConfig {
fn default() -> Self {
ObjectStoreConfig::File(FileConfig {
Expand Down
9 changes: 9 additions & 0 deletions src/datanode/src/store.rs
Expand Up @@ -16,6 +16,7 @@

mod azblob;
mod fs;
mod gcs;
mod oss;
mod s3;

Expand All @@ -40,6 +41,7 @@ pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result
ObjectStoreConfig::Azblob(azblob_config) => {
azblob::new_azblob_object_store(azblob_config).await
}
ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await,
}?;

// Enable retry layer and cache layer for non-fs object storages
Expand Down Expand Up @@ -88,6 +90,13 @@ async fn create_object_store_with_cache(
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
ObjectStoreConfig::Gcs(gcs_config) => {
let path = gcs_config.cache_path.as_ref();
let capacity = gcs_config
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
_ => (None, ReadableSize(0)),
};

Expand Down
42 changes: 42 additions & 0 deletions src/datanode/src/store/gcs.rs
@@ -0,0 +1,42 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_telemetry::logging::info;
use object_store::services::Gcs as GCSBuilder;
use object_store::{util, ObjectStore};
use secrecy::ExposeSecret;
use snafu::prelude::*;

use crate::datanode::GcsConfig;
use crate::error::{self, Result};

pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<ObjectStore> {
let root = util::normalize_dir(&gcs_config.root);
info!(
"The gcs storage bucket is: {}, root is: {}",
gcs_config.bucket, &root
);

let mut builder = GCSBuilder::default();
builder
.root(&root)
.bucket(&gcs_config.bucket)
.scope(&gcs_config.scope)
.credential_path(gcs_config.credential_path.expose_secret())
.endpoint(&gcs_config.endpoint);

Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish())
}
28 changes: 27 additions & 1 deletion src/object-store/tests/object_store_test.rs
Expand Up @@ -23,7 +23,7 @@ use object_store::services::{Fs, S3};
use object_store::test_util::TempFolder;
use object_store::{util, ObjectStore, ObjectStoreBuilder};
use opendal::raw::Accessor;
use opendal::services::{Azblob, Oss};
use opendal::services::{Azblob, Gcs, Oss};
use opendal::{EntryMode, Operator, OperatorBuilder};

async fn test_object_crud(store: &ObjectStore) -> Result<()> {
Expand Down Expand Up @@ -185,6 +185,32 @@ async fn test_azblob_backend() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_gcs_backend() -> Result<()> {
logging::init_default_ut_logging();
if let Ok(container) = env::var("GT_AZBLOB_CONTAINER") {
if !container.is_empty() {
logging::info!("Running azblob test.");

let mut builder = Gcs::default();
builder
.root(&uuid::Uuid::new_v4().to_string())
.bucket(&env::var("GT_GCS_BUCKET").unwrap())
.scope(&env::var("GT_GCS_SCOPE").unwrap())
.credential_path(&env::var("GT_GCS_CREDENTIAL_PATH").unwrap())
.endpoint(&env::var("GT_GCS_ENDPOINT").unwrap());

let store = ObjectStore::new(builder).unwrap().finish();

let guard = TempFolder::new(&store, "/");
test_object_crud(&store).await?;
test_object_list(&store).await?;
guard.remove_all().await?;
}
}
Ok(())
}

async fn assert_lru_cache<C: Accessor + Clone>(
cache_layer: &LruCacheLayer<C>,
file_names: &[&str],
Expand Down
43 changes: 38 additions & 5 deletions tests-integration/src/test_util.rs
Expand Up @@ -29,8 +29,8 @@ use common_runtime::Builder as RuntimeBuilder;
use common_test_util::ports;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datanode::datanode::{
AzblobConfig, DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, ProcedureConfig,
S3Config, StorageConfig, WalConfig,
AzblobConfig, DatanodeOptions, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig,
ProcedureConfig, S3Config, StorageConfig, WalConfig,
};
use datanode::error::{CreateTableSnafu, Result};
use datanode::instance::Instance;
Expand All @@ -43,7 +43,7 @@ use datatypes::vectors::{
};
use frontend::instance::Instance as FeInstance;
use frontend::service_config::{MysqlOptions, PostgresOptions};
use object_store::services::{Azblob, Oss, S3};
use object_store::services::{Azblob, Gcs, Oss, S3};
use object_store::test_util::TempFolder;
use object_store::ObjectStore;
use secrecy::ExposeSecret;
Expand All @@ -68,6 +68,7 @@ pub enum StorageType {
File,
Oss,
Azblob,
Gcs,
}

impl StorageType {
Expand Down Expand Up @@ -97,6 +98,13 @@ impl StorageType {
false
}
}
StorageType::Gcs => {
if let Ok(b) = env::var("GT_GCS_BUCKET") {
!b.is_empty()
} else {
false
}
}
}
}
}
Expand All @@ -119,6 +127,28 @@ pub fn get_test_store_config(
let _ = dotenv::dotenv();

match store_type {
StorageType::Gcs => {
let gcs_config = GcsConfig {
root: uuid::Uuid::new_v4().to_string(),
bucket: env::var("GT_GCS_BUCKET").unwrap(),
scope: env::var("GT_GCS_SCOPE").unwrap(),
credential_path: env::var("GT_GCS_CREDENTIAL_PATH").unwrap().into(),
endpoint: env::var("GT_GCS_ENDPOINT").unwrap(),
..Default::default()
};

let mut builder = Gcs::default();
builder
.root(&gcs_config.root)
.bucket(&gcs_config.bucket)
.scope(&gcs_config.scope)
.credential_path(gcs_config.credential_path.expose_secret())
.endpoint(&gcs_config.endpoint);

let config = ObjectStoreConfig::Gcs(gcs_config);
let store = ObjectStore::new(builder).unwrap().finish();
(config, TempDirGuard::Gcs(TempFolder::new(&store, "/")))
}
StorageType::Azblob => {
let azblob_config = AzblobConfig {
root: uuid::Uuid::new_v4().to_string(),
Expand Down Expand Up @@ -216,6 +246,7 @@ pub enum TempDirGuard {
S3(TempFolder),
Oss(TempFolder),
Azblob(TempFolder),
Gcs(TempFolder),
}

pub struct TestGuard {
Expand All @@ -229,8 +260,10 @@ pub struct StorageGuard(pub TempDirGuard);

impl TestGuard {
pub async fn remove_all(&mut self) {
if let TempDirGuard::S3(guard) | TempDirGuard::Oss(guard) | TempDirGuard::Azblob(guard) =
&mut self.storage_guard.0
if let TempDirGuard::S3(guard)
| TempDirGuard::Oss(guard)
| TempDirGuard::Azblob(guard)
| TempDirGuard::Gcs(guard) = &mut self.storage_guard.0
{
guard.remove_all().await.unwrap()
}
Expand Down
4 changes: 2 additions & 2 deletions tests-integration/tests/main.rs
Expand Up @@ -21,7 +21,7 @@ mod sql;
#[macro_use]
mod region_failover;

grpc_tests!(File, S3, S3WithCache, Oss);
http_tests!(File, S3, S3WithCache, Oss);
grpc_tests!(File, S3, S3WithCache, Oss, Gcs);
http_tests!(File, S3, S3WithCache, Oss, Gcs);
region_failover_tests!(File, S3, S3WithCache, Oss);
sql_tests!(File);

0 comments on commit 104bd6d

Please sign in to comment.