diff --git a/Cargo.lock b/Cargo.lock index a08cb40471..f740adacab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,14 +75,14 @@ dependencies = [ "arena", "arrow_deps", "async-trait", - "base64 0.13.0", + "base64", "common_types", "common_util", "env_logger", "futures", "lazy_static", "log", - "object_store", + "object_store 0.1.0", "parquet 0.1.0", "prometheus 0.12.0", "proto", @@ -91,7 +91,7 @@ dependencies = [ "serde_derive", "skiplist", "smallvec", - "snafu", + "snafu 0.6.10", "table_engine", "tempfile", "tokio", @@ -185,7 +185,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "36cf490284453aad7556bbb0874b25a8bdd66ef2e9f43a0c5c42646374a44a38" dependencies = [ "arrow-format", - "base64 0.13.0", + "base64", "chrono", "futures", "hash_hasher", @@ -290,12 +290,6 @@ dependencies = [ "rustc-demangle", ] -[[package]] -name = "base64" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" - [[package]] name = "base64" version = "0.13.0" @@ -317,7 +311,7 @@ dependencies = [ "env_logger", "futures", "log", - "object_store", + "object_store 0.1.0", "parquet 0.1.0", "serde", "serde_derive", @@ -486,7 +480,7 @@ name = "bytes" version = "0.1.0" dependencies = [ "bytes 1.1.0", - "snafu", + "snafu 0.6.10", ] [[package]] @@ -522,7 +516,7 @@ dependencies = [ "async-trait", "common_types", "common_util", - "snafu", + "snafu 0.6.10", "table_engine", ] @@ -537,7 +531,7 @@ dependencies = [ "common_util", "log", "server", - "snafu", + "snafu 0.6.10", "system_catalog", "table_engine", "tokio", @@ -676,7 +670,7 @@ dependencies = [ "proto", "serde", "serde_derive", - "snafu", + "snafu 0.6.10", "sqlparser", ] @@ -702,7 +696,7 @@ dependencies = [ "serde_derive", "slog", "slog-global 0.1.0 (git+https://github.com/breezewish/slog-global.git?rev=0e23a5baff302a9d7bccd85f8f31e43339c2f2c1)", - "snafu", + "snafu 0.6.10", "tempfile", "time 0.1.43", "tokio", @@ -738,12 +732,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" -[[package]] -name = "convert_case" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" - [[package]] name = "core-foundation" version = "0.9.3" @@ -987,19 +975,6 @@ dependencies = [ "unicode-segmentation", ] -[[package]] -name = "derive_more" -version = "0.99.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" -dependencies = [ - "convert_case", - "proc-macro2", - "quote", - "rustc_version", - "syn", -] - [[package]] name = "digest" version = "0.9.0" @@ -1212,12 +1187,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" -[[package]] -name = "fuchsia-cprng" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" - [[package]] name = "futures" version = "0.3.21" @@ -1317,12 +1286,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "gcc" -version = "0.3.55" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" - [[package]] name = "generic-array" version = "0.14.5" @@ -1484,7 +1447,7 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4cff78e5788be1e0ab65b04d306b2ed5092c815ec97ec70f4ebd5aee158aa55d" dependencies = [ - "base64 0.13.0", + "base64", "bitflags", "bytes 1.1.0", "headers-core", @@ -1512,6 +1475,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1677,7 +1646,7 @@ dependencies = [ "common_util", "log", "query_engine", - "snafu", + "snafu 0.6.10", "sql", "table_engine", "tokio", @@ -2064,7 +2033,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "snafu", + "snafu 0.6.10", "table_engine", "tokio", "url", @@ -2312,18 +2281,26 @@ dependencies = [ [[package]] name = "object_store" version = "0.1.0" +dependencies = [ + "object_store 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "object_store" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8dcd74d8c1c0d2699d82b62c69cc5374260aa4e3a5f95330572f14822557529" dependencies = [ "async-trait", "bytes 1.1.0", - "common_util", + "chrono", "futures", "itertools", - "oss-rust-sdk", "percent-encoding", - "snafu", - "tempfile", + "snafu 0.7.1", "tokio", - "tokio-util 0.6.10", + "tracing 0.1.34", + "url", "walkdir", ] @@ -2408,23 +2385,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "oss-rust-sdk" -version = "0.3.1" -source = "git+https://github.com/CeresDB/oss-rust-sdk.git?branch=async-api#d4071e611fb85e5983744e4f56b3fb1b92abfcb9" -dependencies = [ - "async-trait", - "base64 0.12.3", - "bytes 1.1.0", - "chrono", - "derive_more", - "httpdate", - "log", - "quick-xml", - "reqwest", - "rust-crypto", -] - [[package]] name = "parking_lot" version = "0.11.2" @@ -2490,7 +2450,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c718575b34e488fa78d4f0286356abb8466573cb17ae8faa96ffd871ca6e8c6" dependencies = [ "arrow", - "base64 0.13.0", + "base64", "brotli", "byteorder", "chrono", @@ -2576,33 +2536,13 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" -[[package]] -name = "pin-project" -version = "0.4.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9615c18d31137579e9ff063499264ddc1278e7b1982757ebc111028c4d1dc909" -dependencies = [ - "pin-project-internal 0.4.29", -] - [[package]] name = "pin-project" version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" dependencies = [ - "pin-project-internal 1.0.10", -] - -[[package]] -name = "pin-project-internal" -version = "0.4.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "044964427019eed9d49d9d5bbce6047ef18f37100ea400912a9fa4a3523ab12a" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "pin-project-internal", ] [[package]] @@ -2832,7 +2772,7 @@ dependencies = [ "common_util", "futures", "log", - "snafu", + "snafu 0.6.10", "sql", "table_engine", "udf", @@ -2844,15 +2784,6 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" -[[package]] -name = "quick-xml" -version = "0.18.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cc440ee4802a86e357165021e3e255a9143724da31db1e2ea540214c96a0f82" -dependencies = [ - "memchr", -] - [[package]] name = "quote" version = "1.0.18" @@ -2862,29 +2793,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "rand" -version = "0.3.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64ac302d8f83c0c1974bf758f6b041c6c8ada916fbb44a609158ca8b064cc76c" -dependencies = [ - "libc", - "rand 0.4.6", -] - -[[package]] -name = "rand" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" -dependencies = [ - "fuchsia-cprng", - "libc", - "rand_core 0.3.1", - "rdrand", - "winapi", -] - [[package]] name = "rand" version = "0.7.3" @@ -2929,21 +2837,6 @@ dependencies = [ "rand_core 0.6.3", ] -[[package]] -name = "rand_core" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" -dependencies = [ - "rand_core 0.4.2", -] - -[[package]] -name = "rand_core" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" - [[package]] name = "rand_core" version = "0.5.1" @@ -2995,15 +2888,6 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "rdrand" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" -dependencies = [ - "rand_core 0.3.1", -] - [[package]] name = "redox_syscall" version = "0.2.13" @@ -3065,7 +2949,7 @@ version = "0.11.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46a1f7aa4f35e5e8b4160449f51afc758f0ce6454315a9fa7d0d113e958c41eb" dependencies = [ - "base64 0.13.0", + "base64", "bytes 1.1.0", "encoding_rs", "futures-core", @@ -3110,19 +2994,6 @@ dependencies = [ "librocksdb_sys", ] -[[package]] -name = "rust-crypto" -version = "0.2.36" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f76d05d3993fd5f4af9434e8e436db163a12a9d40e1a58a726f27a01dfd12a2a" -dependencies = [ - "gcc", - "libc", - "rand 0.3.23", - "rustc-serialize", - "time 0.1.43", -] - [[package]] name = "rustc-demangle" version = "0.1.21" @@ -3135,12 +3006,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" -[[package]] -name = "rustc-serialize" -version = "0.3.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcf128d1287d2ea9d80910b5f1120d0b8eede3fbf1abe91c40d39ea7d51e6fda" - [[package]] name = "rustc_version" version = "0.4.0" @@ -3310,7 +3175,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "snafu", + "snafu 0.6.10", "sql", "system_catalog", "table_engine", @@ -3501,9 +3366,17 @@ checksum = "eab12d3c261b2308b0d80c26fffb58d17eba81a4be97890101f416b478c79ca7" dependencies = [ "backtrace", "doc-comment", - "futures-core", - "pin-project 0.4.29", - "snafu-derive", + "snafu-derive 0.6.10", +] + +[[package]] +name = "snafu" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5177903bf45656592d9eb5c0e22f408fc023aae51dbe2088889b71633ba451f2" +dependencies = [ + "doc-comment", + "snafu-derive 0.7.1", ] [[package]] @@ -3517,6 +3390,18 @@ dependencies = [ "syn", ] +[[package]] +name = "snafu-derive" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "410b26ed97440d90ced3e2488c868d56a86e2064f5d7d6f417909b286afe25e5" +dependencies = [ + "heck 0.4.0", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "snap" version = "1.0.5" @@ -3555,7 +3440,7 @@ dependencies = [ "log", "paste 1.0.7", "regex", - "snafu", + "snafu 0.6.10", "sqlparser", "table_engine", "tokio", @@ -3610,7 +3495,7 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87c85aa3f8ea653bfd3ddf25f7ee357ee4d204731f6aa9ad04002306f6e2774c" dependencies = [ - "heck", + "heck 0.3.3", "proc-macro2", "quote", "syn", @@ -3622,7 +3507,7 @@ version = "0.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5bb0dc7ee9c15cea6199cde9a127fa16a4c5819af85395457ad72d68edc85a38" dependencies = [ - "heck", + "heck 0.3.3", "proc-macro2", "quote", "rustversion", @@ -3671,7 +3556,7 @@ dependencies = [ "log", "proto", "protobuf", - "snafu", + "snafu 0.6.10", "table_engine", "tokio", ] @@ -3691,7 +3576,7 @@ dependencies = [ "serde", "serde_derive", "smallvec", - "snafu", + "snafu 0.6.10", "tokio", ] @@ -3930,7 +3815,7 @@ checksum = "511de3f85caf1c98983545490c3d09685fa8eb634e57eec22bb4db271f46cbd8" dependencies = [ "futures-util", "log", - "pin-project 1.0.10", + "pin-project", "tokio", "tungstenite", ] @@ -3943,7 +3828,6 @@ checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" dependencies = [ "bytes 1.1.0", "futures-core", - "futures-io", "futures-sink", "log", "pin-project-lite", @@ -4104,7 +3988,7 @@ version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0b2d8558abd2e276b0a8df5c05a2ec762609344191e5fd23e292c910e9165b5" dependencies = [ - "base64 0.13.0", + "base64", "byteorder", "bytes 1.1.0", "http", @@ -4166,7 +4050,7 @@ dependencies = [ "common_util", "hyperloglog", "smallvec", - "snafu", + "snafu 0.6.10", ] [[package]] @@ -4303,7 +4187,7 @@ dependencies = [ "futures", "log", "rocksdb", - "snafu", + "snafu 0.6.10", "tempfile", "tokio", ] @@ -4346,7 +4230,7 @@ dependencies = [ "mime_guess", "multipart", "percent-encoding", - "pin-project 1.0.10", + "pin-project", "scoped-tls", "serde", "serde_json", diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 5a3a2fb94e..677f605b0c 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -19,7 +19,7 @@ use futures::{ stream, SinkExt, TryStreamExt, }; use log::{error, info}; -use object_store::{path::ObjectStorePath, ObjectStore}; +use object_store::ObjectStore; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{predicate::Predicate, table::Result as TableResult}; use tokio::sync::oneshot; @@ -526,8 +526,7 @@ where let (batch_record_sender, batch_record_receiver) = channel::>(DEFAULT_CHANNEL_SIZE); let file_id = table_data.alloc_file_id(); - let mut sst_file_path = self.space_store.store.new_path(); - table_data.set_sst_file_path(file_id, &mut sst_file_path); + let sst_file_path = table_data.set_sst_file_path(file_id); // TODO: min_key max_key set in sst_builder build let mut sst_meta = SstMetaData { @@ -567,7 +566,7 @@ where Box::new(e) as _ }) .with_context(|| FailBuildSst { - path: sst_file_path.display(), + path: sst_file_path.to_string(), })?; // update sst metadata by built info. @@ -645,8 +644,7 @@ where // Alloc file id for next sst file let file_id = table_data.alloc_file_id(); - let mut sst_file_path = self.space_store.store.new_path(); - table_data.set_sst_file_path(file_id, &mut sst_file_path); + let sst_file_path = table_data.set_sst_file_path(file_id); let sst_builder_options = SstBuilderOptions { sst_type: table_data.sst_type, @@ -679,7 +677,7 @@ where Box::new(e) as _ }) .with_context(|| FailBuildSst { - path: sst_file_path.display(), + path: sst_file_path.to_string(), })?; // update sst metadata by built info. @@ -848,8 +846,7 @@ impl SpaceStore SpaceStore SpaceStore, File, FactoryImpl>; + TableEngineImpl, LocalFileSystem, FactoryImpl>; /// Default instance -pub(crate) type EngineInstance = InstanceRef, File, FactoryImpl>; +pub(crate) type EngineInstance = + InstanceRef, LocalFileSystem, FactoryImpl>; /// Config of analytic engine. #[derive(Debug, Clone, Deserialize)] diff --git a/analytic_engine/src/row_iter/record_batch_stream.rs b/analytic_engine/src/row_iter/record_batch_stream.rs index 13cf049b13..19261a7a80 100644 --- a/analytic_engine/src/row_iter/record_batch_stream.rs +++ b/analytic_engine/src/row_iter/record_batch_stream.rs @@ -8,6 +8,7 @@ use common_types::{ use common_util::define_result; use futures::stream::{self, Stream, StreamExt}; use log::{error, trace}; +use object_store::ObjectStore; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{ predicate::{filter_record_batch::RecordBatchFilter, Predicate}, @@ -208,7 +209,7 @@ pub async fn filtered_stream_from_sst_file( ) -> Result where Fa: sst::factory::Factory, - S: object_store::ObjectStore, + S: ObjectStore, { stream_from_sst_file( space_id, @@ -233,11 +234,10 @@ pub async fn stream_from_sst_file( ) -> Result where Fa: sst::factory::Factory, - S: object_store::ObjectStore, + S: ObjectStore, { sst_file.read_meter().mark(); - let mut path = store.new_path(); - sst_util::set_sst_file_path(space_id, table_id, sst_file.id(), &mut path); + let path = sst_util::new_sst_file_path(space_id, table_id, sst_file.id()); let mut sst_reader = sst_factory .new_sst_reader(sst_reader_options, &path, store) .with_context(|| SstReaderNotFound { diff --git a/analytic_engine/src/setup.rs b/analytic_engine/src/setup.rs index 82345d88b8..db5a14f5f4 100644 --- a/analytic_engine/src/setup.rs +++ b/analytic_engine/src/setup.rs @@ -5,7 +5,7 @@ use std::{path::Path, sync::Arc}; use common_util::define_result; -use object_store::disk::File; +use object_store::LocalFileSystem; use parquet::{ cache::{LruDataCache, LruMetaCache}, DataCacheRef, MetaCacheRef, @@ -34,6 +34,17 @@ pub enum Error { #[snafu(display("Failed to open manifest, err:{}", source))] OpenManifest { source: crate::meta::details::Error }, + + #[snafu(display("Failed to open object store, err:{}", source))] + OpenObjectStore { + source: object_store::ObjectStoreError, + }, + + #[snafu(display("Failed to create dir for {}, err:{}", path, source))] + CreateDir { + path: String, + source: std::io::Error, + }, } define_result!(Error); @@ -87,7 +98,12 @@ async fn open_instance( }; let sst_path = data_path.join(STORE_DIR_NAME); - let store = File::new(sst_path); + tokio::fs::create_dir_all(&sst_path) + .await + .context(CreateDir { + path: sst_path.to_string_lossy().into_owned(), + })?; + let store = LocalFileSystem::new_with_prefix(sst_path).context(OpenObjectStore)?; let open_ctx = OpenContext { config, runtimes: engine_runtimes, diff --git a/analytic_engine/src/sst/builder.rs b/analytic_engine/src/sst/builder.rs index 3eecbcdf2a..36b6df796f 100644 --- a/analytic_engine/src/sst/builder.rs +++ b/analytic_engine/src/sst/builder.rs @@ -15,10 +15,9 @@ pub mod error { #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Failed to persist sst content, path:{}, err:{}", path, source))] - Persist { - path: String, - source: Box, + #[snafu(display("Failed to perform storage operation, err:{}", source))] + Storage { + source: object_store::ObjectStoreError, }, #[snafu(display("Failed to encode meta data, err:{}", source))] @@ -26,9 +25,6 @@ pub mod error { source: Box, }, - #[snafu(display("Failed to get sst file size, path:{}", path))] - GetFileSize { path: String }, - #[snafu(display( "Failed to encode record batch into sst, err:{}.\nBacktrace:\n{}", source, @@ -43,6 +39,11 @@ pub mod error { PollRecordBatch { source: Box, }, + + #[snafu(display("Failed to read data, err:{}", source))] + ReadData { + source: Box, + }, } define_result!(Error); diff --git a/analytic_engine/src/sst/factory.rs b/analytic_engine/src/sst/factory.rs index f910468515..6240000f02 100644 --- a/analytic_engine/src/sst/factory.rs +++ b/analytic_engine/src/sst/factory.rs @@ -6,7 +6,7 @@ use std::{fmt::Debug, sync::Arc}; use common_types::projected_schema::ProjectedSchema; use common_util::runtime::Runtime; -use object_store::ObjectStore; +use object_store::{ObjectStore, Path}; use parquet::{DataCacheRef, MetaCacheRef}; use table_engine::predicate::PredicateRef; @@ -23,14 +23,14 @@ pub trait Factory: Clone { fn new_sst_reader<'a, S: ObjectStore>( &self, options: &SstReaderOptions, - path: &'a S::Path, + path: &'a Path, storage: &'a S, ) -> Option>; fn new_sst_builder<'a, S: ObjectStore>( &self, options: &SstBuilderOptions, - path: &'a S::Path, + path: &'a Path, storage: &'a S, ) -> Option>; } @@ -66,7 +66,7 @@ impl Factory for FactoryImpl { fn new_sst_reader<'a, S: ObjectStore>( &self, options: &SstReaderOptions, - path: &'a S::Path, + path: &'a Path, storage: &'a S, ) -> Option> { match options.sst_type { @@ -77,7 +77,7 @@ impl Factory for FactoryImpl { fn new_sst_builder<'a, S: ObjectStore>( &self, options: &SstBuilderOptions, - path: &'a S::Path, + path: &'a Path, storage: &'a S, ) -> Option> { match options.sst_type { diff --git a/analytic_engine/src/sst/file.rs b/analytic_engine/src/sst/file.rs index b1219adeaf..1f6c8d4cd9 100644 --- a/analytic_engine/src/sst/file.rs +++ b/analytic_engine/src/sst/file.rs @@ -28,7 +28,7 @@ use common_util::{ runtime::{JoinHandle, Runtime}, }; use log::{debug, error, info}; -use object_store::{path::ObjectStorePath, ObjectStore}; +use object_store::ObjectStore; use proto::{common::TimeRange as TimeRangePb, sst::SstMetaData as SstMetaDataPb}; use snafu::{ResultExt, Snafu}; use table_engine::table::TableId; @@ -579,24 +579,22 @@ impl FilePurger { while let Some(request) = receiver.recv().await { match request { Request::Purge(purge_request) => { - let mut sst_file_path = store.new_path(); - sst_util::set_sst_file_path( + let sst_file_path = sst_util::new_sst_file_path( purge_request.space_id, purge_request.table_id, purge_request.file_id, - &mut sst_file_path, ); info!( "File purger delete file, purge_request:{:?}, sst_file_path:{}", purge_request, - sst_file_path.display() + sst_file_path.to_string() ); if let Err(e) = store.delete(&sst_file_path).await { error!( "File purger failed to delete file, sst_file_path:{}, err:{}", - sst_file_path.display(), + sst_file_path.to_string(), e ); } diff --git a/analytic_engine/src/sst/parquet/builder.rs b/analytic_engine/src/sst/parquet/builder.rs index 8bba10cc79..0f2f8bbf85 100644 --- a/analytic_engine/src/sst/parquet/builder.rs +++ b/analytic_engine/src/sst/parquet/builder.rs @@ -22,10 +22,10 @@ use arrow_deps::{ }; use async_trait::async_trait; use common_types::{bytes::BufMut, request_id::RequestId}; -use futures::AsyncRead; +use futures::{AsyncRead, AsyncReadExt}; use log::debug; -use object_store::{path::ObjectStorePath, ObjectStore}; -use snafu::{ensure, ResultExt}; +use object_store::{ObjectStore, Path}; +use snafu::ResultExt; use crate::sst::{ builder::{RecordBatchStream, SstBuilder, *}, @@ -38,7 +38,7 @@ use crate::sst::{ #[derive(Debug)] pub struct ParquetSstBuilder<'a, S: ObjectStore> { /// The path where the data is persisted. - path: &'a S::Path, + path: &'a Path, /// The storage where the data is persist. storage: &'a S, /// Max row group size. @@ -47,7 +47,7 @@ pub struct ParquetSstBuilder<'a, S: ObjectStore> { } impl<'a, S: ObjectStore> ParquetSstBuilder<'a, S> { - pub fn new(path: &'a S::Path, storage: &'a S, options: &SstBuilderOptions) -> Self { + pub fn new(path: &'a Path, storage: &'a S, options: &SstBuilderOptions) -> Self { Self { path, storage, @@ -377,7 +377,7 @@ impl<'a, S: ObjectStore> SstBuilder for ParquetSstBuilder<'a, S> { ); let total_row_num = Arc::new(AtomicUsize::new(0)); - let reader = RecordBytesReader { + let mut reader = RecordBytesReader { request_id, record_stream, encoding_buffer: EncodingBuffer::default(), @@ -391,33 +391,25 @@ impl<'a, S: ObjectStore> SstBuilder for ParquetSstBuilder<'a, S> { stream_finished: false, fetched_row_num: 0, }; - - self.storage - .put(self.path, reader, None) + // TODO(ruihang): `RecordBytesReader` support stream read. It could be improved + // if the storage supports streaming upload (maltipart upload). + let mut bytes = vec![]; + reader + .read_to_end(&mut bytes) .await .map_err(|e| Box::new(e) as _) - .context(Persist { - path: self.path.display(), - })?; + .context(ReadData)?; + drop(reader); - let result = self - .storage - .list_with_delimiter(self.path) + self.storage + .put(self.path, bytes.into()) .await - .map_err(|e| Box::new(e) as _) - .context(Persist { - path: self.path.display(), - })?; - - ensure!( - result.objects.len() == 1, - GetFileSize { - path: self.path.display(), - } - ); + .context(Storage)?; + + let file_head = self.storage.head(self.path).await.context(Storage)?; Ok(SstInfo { - file_size: result.objects[0].size, + file_size: file_head.size, row_num: total_row_num.load(Ordering::Relaxed), }) } @@ -434,7 +426,7 @@ mod tests { }; use common_util::runtime::{self, Runtime}; use futures::stream; - use object_store::disk::File; + use object_store::LocalFileSystem; use table_engine::predicate::Predicate; use tempfile::tempdir; @@ -476,9 +468,8 @@ mod tests { let dir = tempdir().unwrap(); let root = dir.path(); - let store = File::new(root); - let mut sst_file_path = store.new_path(); - sst_file_path.set_file_name("data.par"); + let store = LocalFileSystem::new_with_prefix(root).unwrap(); + let sst_file_path = Path::from("data.par"); let schema = build_schema(); let projected_schema = ProjectedSchema::no_projection(schema.clone()); diff --git a/analytic_engine/src/sst/parquet/reader.rs b/analytic_engine/src/sst/parquet/reader.rs index f515855ff7..cf40850bb2 100644 --- a/analytic_engine/src/sst/parquet/reader.rs +++ b/analytic_engine/src/sst/parquet/reader.rs @@ -3,13 +3,13 @@ //! Sst reader implementation based on parquet. use std::{ - fs::File, pin::Pin, sync::Arc, task::{Context, Poll}, time::Instant, }; +pub use arrow_deps::parquet::util::cursor::SliceableCursor; use arrow_deps::{ arrow::{error::Result as ArrowResult, record_batch::RecordBatch}, parquet::{ @@ -26,7 +26,7 @@ use common_types::{ use common_util::runtime::Runtime; use futures::Stream; use log::{debug, error, trace}; -use object_store::{path::ObjectStorePath, ObjectStore}; +use object_store::{ObjectStore, Path}; use parquet::{ reverse_reader::Builder as ReverseRecordBatchReaderBuilder, CachableSerializedFileReader, DataCacheRef, MetaCacheRef, @@ -46,28 +46,42 @@ const DEFAULT_CHANNEL_CAP: usize = 1000; pub async fn read_sst_meta( storage: &S, - path: &S::Path, + path: &Path, meta_cache: &Option, data_cache: &Option, -) -> Result<(CachableSerializedFileReader, SstMetaData)> { - let file = storage +) -> Result<(CachableSerializedFileReader, SstMetaData)> { + let get_result = storage .get(path) .await .map_err(|e| Box::new(e) as _) .with_context(|| ReadPersist { - path: path.display(), + path: path.to_string(), })?; + // TODO: The `ChunkReader` (trait from parquet crate) doesn't support async + // read. So under this situation it would be better to pass a local file to + // it, avoiding consumes lots of memory. Once parquet support stream data source + // we can feed the `GetResult` to it directly. + let bytes = SliceableCursor::new(Arc::new( + get_result + .bytes() + .await + .map_err(|e| Box::new(e) as _) + .context(ReadPersist { + path: path.to_string(), + })? + .to_vec(), + )); // generate the file reader let file_reader = CachableSerializedFileReader::new( - path.display(), - file, + path.to_string(), + bytes, meta_cache.clone(), data_cache.clone(), ) .map_err(|e| Box::new(e) as _) .with_context(|| ReadPersist { - path: path.display(), + path: path.to_string(), })?; // parse sst meta data @@ -92,13 +106,13 @@ pub async fn read_sst_meta( /// The implementation of sst based on parquet and object storage. pub struct ParquetSstReader<'a, S: ObjectStore> { /// The path where the data is persisted. - path: &'a S::Path, + path: &'a Path, /// The storage where the data is persist. storage: &'a S, projected_schema: ProjectedSchema, predicate: PredicateRef, meta_data: Option, - file_reader: Option>, + file_reader: Option>, /// The batch of rows in one `record_batch`. batch_size: usize, /// Read the rows in reverse order. @@ -112,7 +126,7 @@ pub struct ParquetSstReader<'a, S: ObjectStore> { } impl<'a, S: ObjectStore> ParquetSstReader<'a, S> { - pub fn new(path: &'a S::Path, storage: &'a S, options: &SstReaderOptions) -> Self { + pub fn new(path: &'a Path, storage: &'a S, options: &SstReaderOptions) -> Self { Self { path, storage, @@ -146,7 +160,7 @@ impl<'a, S: ObjectStore> ParquetSstReader<'a, S> { } fn read_record_batches(&mut self, tx: Sender>) -> Result<()> { - let path = self.path.display(); + let path = self.path.to_string(); ensure!(self.file_reader.is_some(), ReadAgain { path }); let file_reader = self.file_reader.take().unwrap(); @@ -228,7 +242,7 @@ impl<'a, S: ObjectStore> ParquetSstReader<'a, S> { /// A reader for projection and filter on the parquet file. struct ProjectAndFilterReader { file_path: String, - file_reader: Option>, + file_reader: Option>, schema: Schema, projected_schema: ProjectedSchema, row_projector: RowProjector, @@ -357,7 +371,7 @@ impl<'a, S: ObjectStore> SstReader for ParquetSstReader<'a, S> { ) -> Result> + Send + Unpin>> { debug!( "read sst:{}, projected_schema:{:?}, predicate:{:?}", - self.path.display(), + self.path.to_string(), self.projected_schema, self.predicate ); diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index b011ec6881..436bb5b0e1 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -23,7 +23,7 @@ use common_types::{ }; use common_util::define_result; use log::{debug, info}; -use object_store::path::ObjectStorePath; +use object_store::Path; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{engine::CreateTableRequest, table::TableId}; use wal::manager::RegionId; @@ -444,8 +444,8 @@ impl TableData { } /// Set the sst file path into the object storage path. - pub fn set_sst_file_path(&self, file_id: FileId, path: &mut impl ObjectStorePath) { - sst_util::set_sst_file_path(self.space_id, self.id, file_id, path) + pub fn set_sst_file_path(&self, file_id: FileId) -> Path { + sst_util::new_sst_file_path(self.space_id, self.id, file_id) } /// Allocate next memtable id diff --git a/analytic_engine/src/table/sst_util.rs b/analytic_engine/src/table/sst_util.rs index b5d760a079..fbdc36cc24 100644 --- a/analytic_engine/src/table/sst_util.rs +++ b/analytic_engine/src/table/sst_util.rs @@ -2,7 +2,9 @@ //! utilities for sst. -use object_store::path::ObjectStorePath; +use std::iter::FromIterator; + +use object_store::Path; use table_engine::table::TableId; use crate::{space::SpaceId, sst::manager::FileId}; @@ -15,13 +17,10 @@ pub fn sst_file_name(id: FileId) -> String { format!("{}.{}", id, SST_FILE_SUFFIX) } -/// Set the sst file path. -pub fn set_sst_file_path( - space_id: SpaceId, - table_id: TableId, - file_id: FileId, - path: &mut P, -) { - path.push_all_dirs([space_id.to_string().as_str(), table_id.to_string().as_str()]); - path.set_file_name(sst_file_name(file_id)); +pub fn new_sst_file_path(space_id: SpaceId, table_id: TableId, file_id: FileId) -> Path { + Path::from_iter([ + space_id.to_string(), + table_id.to_string(), + sst_file_name(file_id), + ]) } diff --git a/benchmarks/bench.toml b/benchmarks/bench.toml index 1566ad0211..55c1e0c15f 100644 --- a/benchmarks/bench.toml +++ b/benchmarks/bench.toml @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +# Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. [sst_bench] store_path = "/tmp/ceresdb/1/1" diff --git a/benchmarks/src/merge_memtable_bench.rs b/benchmarks/src/merge_memtable_bench.rs index 7596576aa6..2d2d225013 100644 --- a/benchmarks/src/merge_memtable_bench.rs +++ b/benchmarks/src/merge_memtable_bench.rs @@ -27,14 +27,14 @@ use common_types::{ }; use common_util::runtime::Runtime; use log::info; -use object_store::{disk::File, ObjectStore}; +use object_store::LocalFileSystem; use parquet::{DataCacheRef, MetaCacheRef}; use table_engine::{predicate::Predicate, table::TableId}; use crate::{config::MergeMemTableBenchConfig, util}; pub struct MergeMemTableBench { - store: File, + store: LocalFileSystem, memtables: MemTableVec, max_projections: usize, schema: Schema, @@ -50,7 +50,7 @@ impl MergeMemTableBench { pub fn new(config: MergeMemTableBenchConfig) -> Self { assert!(!config.sst_file_ids.is_empty()); - let store = File::new(config.store_path); + let store = LocalFileSystem::new_with_prefix(config.store_path).unwrap(); let runtime = Arc::new(util::new_runtime(config.runtime_thread_num)); let space_id = config.space_id; let table_id = config.table_id; @@ -59,8 +59,7 @@ impl MergeMemTableBench { let data_cache: Option = None; // Use first sst's schema. - let mut sst_path = store.new_path(); - sst_util::set_sst_file_path(space_id, table_id, config.sst_file_ids[0], &mut sst_path); + let sst_path = sst_util::new_sst_file_path(space_id, table_id, config.sst_file_ids[0]); let schema = runtime.block_on(util::schema_from_sst( &store, &sst_path, @@ -73,8 +72,7 @@ impl MergeMemTableBench { let mut memtables = Vec::with_capacity(config.sst_file_ids.len()); for id in &config.sst_file_ids { - let mut sst_path = store.new_path(); - sst_util::set_sst_file_path(space_id, table_id, *id, &mut sst_path); + let sst_path = sst_util::new_sst_file_path(space_id, table_id, *id); let memtable_factory = SkiplistMemTableFactory; let memtable_opts = Options { diff --git a/benchmarks/src/merge_sst_bench.rs b/benchmarks/src/merge_sst_bench.rs index a0ccab50d5..cc967eb1a6 100644 --- a/benchmarks/src/merge_sst_bench.rs +++ b/benchmarks/src/merge_sst_bench.rs @@ -22,7 +22,7 @@ use analytic_engine::{ use common_types::{projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema}; use common_util::runtime::Runtime; use log::info; -use object_store::{disk::File, ObjectStore}; +use object_store::LocalFileSystem; use parquet::{DataCacheRef, MetaCacheRef}; use table_engine::{predicate::Predicate, table::TableId}; use tokio::sync::mpsc::{self, UnboundedReceiver}; @@ -30,7 +30,7 @@ use tokio::sync::mpsc::{self, UnboundedReceiver}; use crate::{config::MergeSstBenchConfig, util}; pub struct MergeSstBench { - store: File, + store: LocalFileSystem, max_projections: usize, schema: Schema, sst_reader_options: SstReaderOptions, @@ -46,13 +46,12 @@ impl MergeSstBench { pub fn new(config: MergeSstBenchConfig) -> Self { assert!(!config.sst_file_ids.is_empty()); - let store = File::new(config.store_path); + let store = LocalFileSystem::new_with_prefix(config.store_path).unwrap(); let runtime = Arc::new(util::new_runtime(config.runtime_thread_num)); let space_id = config.space_id; let table_id = config.table_id; - let mut sst_path = store.new_path(); - sst_util::set_sst_file_path(space_id, table_id, config.sst_file_ids[0], &mut sst_path); + let sst_path = sst_util::new_sst_file_path(space_id, table_id, config.sst_file_ids[0]); let meta_cache: Option = None; let data_cache: Option = None; diff --git a/benchmarks/src/parquet_bench.rs b/benchmarks/src/parquet_bench.rs index b52c84f7e1..f49b7e5d74 100644 --- a/benchmarks/src/parquet_bench.rs +++ b/benchmarks/src/parquet_bench.rs @@ -9,11 +9,12 @@ use arrow_deps::parquet::{ file::{ metadata::RowGroupMetaData, reader::FileReader, serialized_reader::SerializedFileReader, }, + util::cursor::SliceableCursor, }; use common_types::schema::Schema; use common_util::runtime::Runtime; use log::info; -use object_store::{disk::File, path::ObjectStorePath, ObjectStore}; +use object_store::{LocalFileSystem, ObjectStore, Path}; use parquet::{DataCacheRef, MetaCacheRef}; use table_engine::predicate::PredicateRef; @@ -22,7 +23,7 @@ use crate::{config::SstBenchConfig, util}; type RowGroupPredicate = Box bool + 'static>; pub struct ParquetBench { - store: File, + store: LocalFileSystem, pub sst_file_name: String, max_projections: usize, projection: Vec, @@ -34,12 +35,11 @@ pub struct ParquetBench { impl ParquetBench { pub fn new(config: SstBenchConfig) -> Self { - let store = File::new(config.store_path); + let store = LocalFileSystem::new_with_prefix(config.store_path).unwrap(); let runtime = util::new_runtime(config.runtime_thread_num); - let mut sst_path = store.new_path(); - sst_path.set_file_name(&config.sst_file_name); + let sst_path = Path::from(config.sst_file_name.clone()); let meta_cache: Option = None; let data_cache: Option = None; @@ -80,13 +80,13 @@ impl ParquetBench { } pub fn run_bench(&self) { - let mut sst_path = self.store.new_path(); - sst_path.set_file_name(&self.sst_file_name); + let sst_path = Path::from(self.sst_file_name.clone()); self.runtime.block_on(async { let open_instant = Instant::now(); - let file = self.store.get(&sst_path).await.unwrap(); - let mut file_reader = SerializedFileReader::new(file).unwrap(); + let get_result = self.store.get(&sst_path).await.unwrap(); + let cursor = SliceableCursor::new(Arc::new(get_result.bytes().await.unwrap().to_vec())); + let mut file_reader = SerializedFileReader::new(cursor).unwrap(); let open_cost = open_instant.elapsed(); let filter_begin_instant = Instant::now(); @@ -127,7 +127,7 @@ impl ParquetBench { fn build_row_group_predicate( &self, - file_reader: &SerializedFileReader, + file_reader: &SerializedFileReader, ) -> RowGroupPredicate { let row_groups = file_reader.metadata().row_groups(); let filter_results = self.predicate.filter_row_groups(&self.schema, row_groups); diff --git a/benchmarks/src/scan_memtable_bench.rs b/benchmarks/src/scan_memtable_bench.rs index 424e1886e8..b0063aafbb 100644 --- a/benchmarks/src/scan_memtable_bench.rs +++ b/benchmarks/src/scan_memtable_bench.rs @@ -12,7 +12,7 @@ use analytic_engine::memtable::{ use arena::NoopCollector; use common_types::projected_schema::ProjectedSchema; use log::info; -use object_store::{disk::File, path::ObjectStorePath, ObjectStore}; +use object_store::{LocalFileSystem, Path}; use parquet::{DataCacheRef, MetaCacheRef}; use crate::{config::ScanMemTableBenchConfig, util}; @@ -25,13 +25,12 @@ pub struct ScanMemTableBench { impl ScanMemTableBench { pub fn new(config: ScanMemTableBenchConfig) -> Self { - let store = File::new(config.store_path); + let store = LocalFileSystem::new_with_prefix(config.store_path).unwrap(); let runtime = Arc::new(util::new_runtime(config.runtime_thread_num)); let meta_cache: Option = None; let data_cache: Option = None; - let mut sst_path = store.new_path(); - sst_path.set_file_name(&config.sst_file_name); + let sst_path = Path::from(config.sst_file_name); let schema = runtime.block_on(util::schema_from_sst( &store, &sst_path, diff --git a/benchmarks/src/sst_bench.rs b/benchmarks/src/sst_bench.rs index 882e40b1fa..2835d301fd 100644 --- a/benchmarks/src/sst_bench.rs +++ b/benchmarks/src/sst_bench.rs @@ -9,7 +9,7 @@ use common_types::{projected_schema::ProjectedSchema, schema::Schema}; use common_util::runtime::Runtime; use futures::stream::StreamExt; use log::info; -use object_store::{disk::File, path::ObjectStorePath, ObjectStore}; +use object_store::{LocalFileSystem, Path}; use parquet::{ cache::{LruDataCache, LruMetaCache}, DataCacheRef, MetaCacheRef, @@ -18,7 +18,7 @@ use parquet::{ use crate::{config::SstBenchConfig, util}; pub struct SstBench { - store: File, + store: LocalFileSystem, pub sst_file_name: String, max_projections: usize, schema: Schema, @@ -28,12 +28,10 @@ pub struct SstBench { impl SstBench { pub fn new(config: SstBenchConfig) -> Self { - let store = File::new(config.store_path); - let runtime = Arc::new(util::new_runtime(config.runtime_thread_num)); - let mut sst_path = store.new_path(); - sst_path.set_file_name(&config.sst_file_name); + let store = LocalFileSystem::new_with_prefix(config.store_path).unwrap(); + let sst_path = Path::from(config.sst_file_name.clone()); let meta_cache: Option = if let Some(sst_meta_cache_cap) = config.sst_meta_cache_cap { Some(Arc::new(LruMetaCache::new(sst_meta_cache_cap))) @@ -92,8 +90,7 @@ impl SstBench { } pub fn run_bench(&self) { - let mut sst_path = self.store.new_path(); - sst_path.set_file_name(&self.sst_file_name); + let sst_path = Path::from(self.sst_file_name.clone()); let sst_factory = FactoryImpl; let mut sst_reader = sst_factory diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs index 666722d91b..01c3bfd556 100644 --- a/benchmarks/src/sst_tools.rs +++ b/benchmarks/src/sst_tools.rs @@ -25,11 +25,7 @@ use common_types::{projected_schema::ProjectedSchema, request_id::RequestId}; use common_util::runtime::Runtime; use futures::TryStreamExt; use log::info; -use object_store::{ - disk::File, - path::{file::FilePath, ObjectStorePath}, - ObjectStore, -}; +use object_store::{LocalFileSystem, Path}; use serde_derive::Deserialize; use table_engine::{predicate::Predicate, table::TableId}; use tokio::sync::mpsc; @@ -58,9 +54,8 @@ async fn create_sst_from_stream(config: SstConfig, record_batch_stream: RecordBa config, sst_builder_options ); - let store = File::new(config.store_path); - let mut sst_file_path = store.new_path(); - sst_file_path.set_file_name(&config.sst_file_name); + let store = LocalFileSystem::new_with_prefix(config.store_path).unwrap(); + let sst_file_path = Path::from(config.sst_file_name); let mut builder = sst_factory .new_sst_builder(&sst_builder_options, &sst_file_path, &store) @@ -87,10 +82,8 @@ pub struct RebuildSstConfig { pub async fn rebuild_sst(config: RebuildSstConfig, runtime: Arc) { info!("Start rebuild sst, config:{:?}", config); - let store = File::new(config.store_path.clone()); - - let mut input_path = store.new_path(); - input_path.set_file_name(&config.input_file_name); + let store = LocalFileSystem::new_with_prefix(config.store_path.clone()).unwrap(); + let input_path = Path::from(config.input_file_name); let sst_meta = util::meta_from_sst(&store, &input_path, &None, &None).await; @@ -124,8 +117,8 @@ pub async fn rebuild_sst(config: RebuildSstConfig, runtime: Arc) { async fn sst_to_record_batch_stream( sst_reader_options: &SstReaderOptions, - input_path: &FilePath, - store: &File, + input_path: &Path, + store: &LocalFileSystem, ) -> RecordBatchStream { let sst_factory = FactoryImpl; let mut sst_reader = sst_factory @@ -164,7 +157,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { let space_id = config.space_id; let table_id = config.table_id; - let store = File::new(config.store_path.clone()); + let store = LocalFileSystem::new_with_prefix(config.store_path.clone()).unwrap(); let (tx, _rx) = mpsc::unbounded_channel(); let purge_queue = FilePurgeQueue::new(space_id, table_id, tx); @@ -184,13 +177,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { .max() .unwrap(); - let mut first_sst_path = store.new_path(); - sst_util::set_sst_file_path( - space_id, - table_id, - config.sst_file_ids[0], - &mut first_sst_path, - ); + let first_sst_path = sst_util::new_sst_file_path(space_id, table_id, config.sst_file_ids[0]); let schema = util::schema_from_sst(&store, &first_sst_path, &None, &None).await; let iter_options = IterOptions { batch_size: config.read_batch_row_num, diff --git a/benchmarks/src/util.rs b/benchmarks/src/util.rs index 639c3da19b..edf9cbe700 100644 --- a/benchmarks/src/util.rs +++ b/benchmarks/src/util.rs @@ -22,7 +22,7 @@ use common_types::{ }; use common_util::runtime::{self, Runtime}; use futures::stream::StreamExt; -use object_store::{disk::File, path::file::FilePath, ObjectStore}; +use object_store::{LocalFileSystem, Path}; use parquet::{DataCacheRef, MetaCacheRef}; use table_engine::{predicate::Predicate, table::TableId}; @@ -36,8 +36,8 @@ pub fn new_runtime(thread_num: usize) -> Runtime { } pub async fn meta_from_sst( - store: &File, - sst_path: &FilePath, + store: &LocalFileSystem, + sst_path: &Path, meta_cache: &Option, data_cache: &Option, ) -> SstMetaData { @@ -49,8 +49,8 @@ pub async fn meta_from_sst( } pub async fn schema_from_sst( - store: &File, - sst_path: &FilePath, + store: &LocalFileSystem, + sst_path: &Path, meta_cache: &Option, data_cache: &Option, ) -> Schema { @@ -74,8 +74,8 @@ pub fn projected_schema_by_number( } pub async fn load_sst_to_memtable( - store: &File, - sst_path: &FilePath, + store: &LocalFileSystem, + sst_path: &Path, schema: &Schema, memtable: &MemTableRef, runtime: Arc, @@ -117,7 +117,7 @@ pub async fn load_sst_to_memtable( } pub async fn file_handles_from_ssts( - store: &File, + store: &LocalFileSystem, space_id: SpaceId, table_id: TableId, sst_file_ids: &[FileId], @@ -128,8 +128,7 @@ pub async fn file_handles_from_ssts( let mut file_handles = Vec::with_capacity(sst_file_ids.len()); for file_id in sst_file_ids.iter() { - let mut path = store.new_path(); - sst_util::set_sst_file_path(space_id, table_id, *file_id, &mut path); + let path = sst_util::new_sst_file_path(space_id, table_id, *file_id); let sst_meta = meta_from_sst(store, &path, meta_cache, data_cache).await; let file_meta = FileMeta { diff --git a/components/object_store/Cargo.toml b/components/object_store/Cargo.toml index bea2abc65a..3ab3333905 100644 --- a/components/object_store/Cargo.toml +++ b/components/object_store/Cargo.toml @@ -4,20 +4,5 @@ version = "0.1.0" authors = ["CeresDB Authors "] edition = "2018" -[dependencies] # In alphabetical order -async-trait = "0.1.53" -bytes = "1.0" -common_util = { path = "../../common_util" } -futures = "0.3" -itertools = "0.10" -percent-encoding = "2.1" -snafu = { version = "0.6.10", features = ["futures", "backtraces"] } -tokio = { version = "1.0", features = ["macros", "fs"] } -# Filesystem integration -tokio-util = { version = "0.6.3", features = [ "io","compat" ] } -walkdir = "2.3.2" -# Aliyun OSS integration -oss-rust-sdk = { git = "https://github.com/CeresDB/oss-rust-sdk.git", branch = "async-api" } - -[dev-dependencies] -tempfile = "3.1.0" +[dependencies] +upstream = { package = "object_store", version = "0.1.0" } diff --git a/components/object_store/src/disk.rs b/components/object_store/src/disk.rs deleted file mode 100644 index 14cdbb9cc0..0000000000 --- a/components/object_store/src/disk.rs +++ /dev/null @@ -1,389 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -//! This module contains the IOx implementation for using local disk as the -//! object store. -use std::{collections::BTreeSet, convert::TryFrom, io, path::PathBuf}; - -use async_trait::async_trait; -use futures::{ - stream, - stream::{BoxStream, StreamExt}, - AsyncRead, -}; -use snafu::{Backtrace, GenerateBacktrace, OptionExt, ResultExt, Snafu}; -use tokio::fs; -use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt}; -use walkdir::WalkDir; - -use crate::{path::file::FilePath, ListResult, ObjectMeta, ObjectStore}; - -/// A specialized `Result` for filesystem object store-related errors -pub type Result = std::result::Result; - -/// A specialized `Error` for filesystem object store-related errors -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display( - "Expected streamed data to have length {}, got {}.\nBacktrace:\n{}", - expected, - actual, - backtrace - ))] - DataDoesNotMatchLength { - expected: usize, - actual: usize, - backtrace: Backtrace, - }, - - #[snafu(display("File size for {} did not fit in a usize: {}.\nBacktrace:\n{}", path.display(), source, backtrace))] - FileSizeOverflowedUsize { - path: PathBuf, - source: std::num::TryFromIntError, - backtrace: Backtrace, - }, - - #[snafu(display("Unable to walk dir: {}.\nBacktrace:\n{}", source, backtrace))] - UnableToWalkDir { - source: walkdir::Error, - backtrace: Backtrace, - }, - - #[snafu(display("Unable to access metadata for {}: {}.\nBacktrace:\n{}", path.display(), source, backtrace))] - UnableToAccessMetadata { - path: PathBuf, - source: walkdir::Error, - backtrace: Backtrace, - }, - - #[snafu(display("Unable to copy data to file: {}.\nBacktrace:\n{}", source, backtrace))] - UnableToCopyDataToFile { - source: io::Error, - backtrace: Backtrace, - }, - - #[snafu(display("Unable to create dir {}: {}.\nBacktrace:\n{}", path.display(), source, backtrace))] - UnableToCreateDir { - source: io::Error, - path: PathBuf, - backtrace: Backtrace, - }, - - #[snafu(display("Unable to create file {}: {}.\nBacktrace:\n{}", path.display(), err, backtrace))] - UnableToCreateFile { - path: PathBuf, - err: io::Error, - backtrace: Backtrace, - }, - - #[snafu(display("Unable to delete file {}: {}.\nBacktrace:\n{}", path.display(), source, backtrace))] - UnableToDeleteFile { - source: io::Error, - path: PathBuf, - backtrace: Backtrace, - }, - - #[snafu(display("Unable to open file {}: {}.\nBacktrace:\n{}", path.display(), source, backtrace))] - UnableToOpenFile { - source: io::Error, - path: PathBuf, - backtrace: Backtrace, - }, - - #[snafu(display("Unable to read data from file {}: {}.\nBacktrace:\n{}", path.display(), source, backtrace))] - UnableToReadBytes { - source: io::Error, - path: PathBuf, - backtrace: Backtrace, - }, - - #[snafu(display( - "Unable to stream data from the request into memory: {}.\nBacktrace:\n{}", - source, - backtrace - ))] - UnableToStreamDataIntoMemory { - source: std::io::Error, - backtrace: Backtrace, - }, -} - -/// Local filesystem storage suitable for testing or for opting out of using a -/// cloud storage provider. -#[derive(Debug)] -pub struct File { - root: FilePath, -} - -#[async_trait] -impl ObjectStore for File { - type Error = Error; - type Path = FilePath; - type Reader = Compat; - - fn new_path(&self) -> Self::Path { - FilePath::default() - } - - async fn put( - &self, - location: &Self::Path, - bytes: R, - _length: Option, - ) -> Result<(), Self::Error> - where - R: AsyncRead + Send + Unpin, - { - let path = self.path(location); - - let mut file = match fs::File::create(&path).await { - Ok(f) => f, - Err(err) if err.kind() == std::io::ErrorKind::NotFound => { - let parent = path - .parent() - .context(UnableToCreateFile { path: &path, err })?; - fs::create_dir_all(&parent) - .await - .context(UnableToCreateDir { path: parent })?; - - match fs::File::create(&path).await { - Ok(f) => f, - Err(err) => return UnableToCreateFile { path, err }.fail(), - } - } - Err(err) => return UnableToCreateFile { path, err }.fail(), - }; - - tokio::io::copy(&mut bytes.compat(), &mut file) - .await - .context(UnableToCopyDataToFile)?; - - Ok(()) - } - - async fn get(&self, location: &Self::Path) -> Result { - let path = self.path(location); - let file = fs::File::open(&path) - .await - .context(UnableToOpenFile { path: &path })?; - Ok(file.into_std().await) - } - - async fn delete(&self, location: &Self::Path) -> Result<(), Self::Error> { - let path = self.path(location); - fs::remove_file(&path) - .await - .context(UnableToDeleteFile { path })?; - Ok(()) - } - - async fn list<'a>( - &'a self, - prefix: Option<&'a Self::Path>, - ) -> Result, Self::Error>>, Self::Error> { - let root_path = self.root.to_raw(); - let walkdir = WalkDir::new(&root_path) - // Don't include the root directory itself - .min_depth(1); - - let s = - walkdir.into_iter().filter_map(move |result_dir_entry| { - match convert_walkdir_result(result_dir_entry) { - Err(e) => Some(Err(e)), - Ok(None) => None, - Ok(entry @ Some(_)) => entry - .filter(|dir_entry| dir_entry.file_type().is_file()) - .map(|file| { - let relative_path = file.path().strip_prefix(&root_path).expect( - "Must start with root path because this came from walking the root", - ); - FilePath::raw(relative_path, false) - }) - .filter(|name| prefix.map_or(true, |p| name.prefix_matches(p))) - .map(|name| Ok(vec![name])), - } - }); - - Ok(stream::iter(s).boxed()) - } - - async fn list_with_delimiter( - &self, - prefix: &Self::Path, - ) -> Result, Self::Error> { - // Always treat prefix as relative because the list operations don't know - // anything about where on disk the root of this object store is; they - // only care about what's within this object store's directory. See - // documentation for `push_path`: it deliberately does *not* behave as - // `PathBuf::push` does: there is no way to replace the root. So even if - // `prefix` isn't relative, we treat it as such here. - let mut resolved_prefix = self.root.clone(); - resolved_prefix.push_path(prefix); - - // It is valid to specify a prefix with directories `[foo, bar]` and filename - // `baz`, in which case we want to treat it like a glob for - // `foo/bar/baz*` and there may not actually be a file or directory - // named `foo/bar/baz`. We want to look at all the entries in - // `foo/bar/`, so remove the file name. - let mut search_path = resolved_prefix.clone(); - search_path.unset_file_name(); - - let walkdir = WalkDir::new(&search_path.to_raw()) - .min_depth(1) - .max_depth(1); - - let mut common_prefixes = BTreeSet::new(); - let mut objects = Vec::new(); - - let root_path = self.root.to_raw(); - for entry_res in walkdir.into_iter().map(convert_walkdir_result) { - if let Some(entry) = entry_res? { - let entry_location = FilePath::raw(entry.path(), false); - - if entry_location.prefix_matches(&resolved_prefix) { - let metadata = entry - .metadata() - .context(UnableToAccessMetadata { path: entry.path() })?; - - if metadata.is_dir() { - let parts = entry_location - .parts_after_prefix(&resolved_prefix) - .expect("must have prefix because of the if prefix_matches condition"); - - let mut relative_location = prefix.to_owned(); - relative_location.push_part_as_dir(&parts[0]); - common_prefixes.insert(relative_location); - } else { - let path = entry - .path() - .strip_prefix(&root_path) - .expect("must have prefix because of the if prefix_matches condition"); - let location = FilePath::raw(path, false); - - let last_modified = metadata - .modified() - .expect("Modified file time should be supported on this platform"); - let size = usize::try_from(metadata.len()) - .context(FileSizeOverflowedUsize { path: entry.path() })?; - - objects.push(ObjectMeta { - location, - last_modified, - size, - }); - } - } - } - } - - Ok(ListResult { - next_token: None, - common_prefixes: common_prefixes.into_iter().collect(), - objects, - }) - } -} - -impl File { - /// Create new filesystem storage. - pub fn new(root: impl Into) -> Self { - Self { - root: FilePath::raw(root, true), - } - } - - /// Return full path of the given location - pub fn path(&self, location: &FilePath) -> PathBuf { - let mut path = self.root.clone(); - path.push_path(location); - path.to_raw() - } -} - -/// Convert walkdir results and converts not-found errors into `None`. -fn convert_walkdir_result( - res: std::result::Result, -) -> Result> { - match res { - Ok(entry) => Ok(Some(entry)), - Err(walkdir_err) => match walkdir_err.io_error() { - Some(io_err) => match io_err.kind() { - io::ErrorKind::NotFound => Ok(None), - _ => Err(Error::UnableToWalkDir { - source: walkdir_err, - backtrace: Backtrace::generate(), - }), - }, - None => Err(Error::UnableToWalkDir { - source: walkdir_err, - backtrace: Backtrace::generate(), - }), - }, - } -} - -#[cfg(test)] -mod tests { - use std::io::Read; - - use bytes::Bytes; - use tempfile::TempDir; - - use super::*; - use crate::{ - path::ObjectStorePath, - tests::{list_with_delimiter, put_get_delete_list}, - ObjectStore, - }; - - #[tokio::test] - async fn file_test() { - let root = TempDir::new().unwrap(); - let file = File::new(root.path()); - - put_get_delete_list(&file).await.unwrap(); - list_with_delimiter(&file).await.unwrap(); - } - - #[tokio::test] - async fn creates_dir_if_not_present() { - let root = TempDir::new().unwrap(); - let file = File::new(root.path()); - - let data = Bytes::from("arbitrary data"); - let mut location = file.new_path(); - location.push_all_dirs(&["nested", "file", "test_file"]); - - file.put(&location, Box::new(data.as_ref()), Some(data.len())) - .await - .unwrap(); - - let mut read_data = Vec::with_capacity(data.len()); - file.get(&location) - .await - .unwrap() - .read_to_end(&mut read_data) - .unwrap(); - assert_eq!(&*read_data, data); - } - - #[tokio::test] - async fn unknown_length() { - let root = TempDir::new().unwrap(); - let file = File::new(root.path()); - - let data = Bytes::from("arbitrary data"); - - let mut location = file.new_path(); - location.set_file_name("some_file"); - file.put(&location, Box::new(data.as_ref()), None) - .await - .unwrap(); - let mut read_data = Vec::with_capacity(data.len()); - file.get(&location) - .await - .unwrap() - .read_to_end(&mut read_data) - .unwrap(); - assert_eq!(&*read_data, data); - } -} diff --git a/components/object_store/src/lib.rs b/components/object_store/src/lib.rs index 326a68459c..20b9446f12 100644 --- a/components/object_store/src/lib.rs +++ b/components/object_store/src/lib.rs @@ -1,329 +1,8 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -//! # object_store -//! -//! This crate provides APIs for interacting with object storage services. It -//! currently supports PUT, GET, DELETE, and list for in-memory and -//! local file storage. -//! -//! Future compatibility will include Aliyun OSS. -//! -//! Fork from https://github.com/influxdata/influxdb_iox/tree/main/object_store +//! Re-export of [object_store] crate. -use std::time::SystemTime; - -use async_trait::async_trait; -use futures::{stream::BoxStream, AsyncRead}; -use path::ObjectStorePath; - -pub mod disk; -pub mod path; - -/// Universal API to multiple object store services. -// TODO(xikai): ObjectStore -> FileStore -#[async_trait] -pub trait ObjectStore: std::fmt::Debug + Send + Sync + 'static { - /// The type of the locations used in interacting with this object store. - type Path: ObjectStorePath; - - /// The error returned from fallible methods - type Error: std::error::Error + Send + Sync + 'static; - - type Reader: AsyncRead + Send + Unpin; - - /// Return a new location path appropriate for this object storage - fn new_path(&self) -> Self::Path; - - /// Save the provided bytes to the specified location. - async fn put( - &self, - location: &Self::Path, - bytes: R, - length: Option, - ) -> Result<(), Self::Error> - where - R: AsyncRead + Send + Unpin; - - /// Return the bytes that are stored at the specified location. - async fn get(&self, location: &Self::Path) -> Result; - - /// Delete the object at the specified location. - async fn delete(&self, location: &Self::Path) -> Result<(), Self::Error>; - - /// List all the objects with the given prefix. - async fn list<'a>( - &'a self, - prefix: Option<&'a Self::Path>, - ) -> Result, Self::Error>>, Self::Error>; - - /// List objects with the given prefix and an implementation specific - /// delimiter. Returns common prefixes (directories) in addition to object - /// metadata. - async fn list_with_delimiter( - &self, - prefix: &Self::Path, - ) -> Result, Self::Error>; -} - -/// Result of a list call that includes objects, prefixes (directories) and a -/// token for the next set of results. Individual result sets may be limited to -/// 1,00 objects based on the underlying object storage's limitations. -#[derive(Debug)] -pub struct ListResult { - /// Token passed to the API for the next page of list results. - pub next_token: Option, - /// Prefixes that are common (like directories) - pub common_prefixes: Vec

, - /// Object metadata for the listing - pub objects: Vec>, -} - -/// The metadata that describes an object. -#[derive(Debug)] -pub struct ObjectMeta { - /// The full path to the object - pub location: P, - /// The last modified time - pub last_modified: SystemTime, - /// The size in bytes of the object - pub size: usize, -} - -#[cfg(test)] -mod tests { - use std::io::Read; - - use bytes::Bytes; - use futures::{stream, StreamExt, TryStreamExt}; - - use super::*; - use crate::path::{file::FilePath, parsed::DirsAndFileName}; - - type Error = Box; - type Result = std::result::Result; - - async fn flatten_list_stream< - P: path::ObjectStorePath, - E: std::error::Error + Send + Sync + 'static, - R: AsyncRead + Unpin, - >( - storage: &impl ObjectStore, - prefix: Option<&P>, - ) -> Result> { - storage - .list(prefix) - .await? - .map_ok(|v| stream::iter(v).map(Ok)) - .try_flatten() - .try_collect() - .await - } - - pub(crate) async fn put_get_delete_list< - P: path::ObjectStorePath, - E: std::error::Error + Send + Sync + 'static, - R: AsyncRead + Unpin, - >( - storage: &impl ObjectStore, - ) -> Result<()> { - delete_fixtures(storage).await; - - let content_list = flatten_list_stream(storage, None).await?; - assert!( - content_list.is_empty(), - "Expected list to be empty; found: {:?}", - content_list - ); - - let data = Bytes::from("arbitrary data"); - let mut location = storage.new_path(); - location.push_dir("test_dir"); - location.set_file_name("test_file.json"); - - storage - .put(&location, data.as_ref(), Some(data.len())) - .await?; - - // List everything - let content_list = flatten_list_stream(storage, None).await?; - assert_eq!(content_list, &[location.clone()]); - - // List everything starting with a prefix that should return results - let mut prefix = storage.new_path(); - prefix.push_dir("test_dir"); - let content_list = flatten_list_stream(storage, Some(&prefix)).await?; - assert_eq!(content_list, &[location.clone()]); - - // List everything starting with a prefix that shouldn't return results - let mut prefix = storage.new_path(); - prefix.push_dir("something"); - let content_list = flatten_list_stream(storage, Some(&prefix)).await?; - assert!(content_list.is_empty()); - - let mut read_data = Vec::with_capacity(data.len()); - - storage.get(&location).await?.read_to_end(&mut read_data)?; - assert_eq!(&*read_data, data); - - storage.delete(&location).await?; - - let content_list = flatten_list_stream(storage, None).await?; - assert!(content_list.is_empty()); - - Ok(()) - } - - pub(crate) async fn list_with_delimiter< - P: path::ObjectStorePath, - E: std::error::Error + Send + Sync + 'static, - R: AsyncRead + Unpin, - >( - storage: &impl ObjectStore, - ) -> Result<()> { - delete_fixtures(storage).await; - - // ==================== check: store is empty ==================== - let content_list = flatten_list_stream(storage, None).await?; - assert!(content_list.is_empty()); - - // ==================== do: create files ==================== - let data = Bytes::from("arbitrary data"); - - let files: Vec<_> = [ - "test_file", - "mydb/wb/000/000/000.segment", - "mydb/wb/000/000/001.segment", - "mydb/wb/000/000/002.segment", - "mydb/wb/001/001/000.segment", - "mydb/wb/foo.json", - "mydb/data/whatevs", - ] - .iter() - .map(|&s| str_to_path(storage, s)) - .collect(); - - for f in &files { - storage - .put(f, data.as_ref(), Some(data.len())) - .await - .unwrap(); - } - - // ==================== check: prefix-list `mydb/wb` (directory) - // ==================== - let mut prefix = storage.new_path(); - prefix.push_all_dirs(&["mydb", "wb"]); - - let mut expected_000 = prefix.clone(); - expected_000.push_dir("000"); - let mut expected_001 = prefix.clone(); - expected_001.push_dir("001"); - let mut expected_location = prefix.clone(); - expected_location.set_file_name("foo.json"); - - let result = storage.list_with_delimiter(&prefix).await.unwrap(); - - assert_eq!(result.common_prefixes, vec![expected_000, expected_001]); - assert_eq!(result.objects.len(), 1); - - let object = &result.objects[0]; - - assert_eq!(object.location, expected_location); - assert_eq!(object.size, data.len()); - - // ==================== check: prefix-list `mydb/wb/000/000/001` (partial - // filename) ==================== - let mut prefix = storage.new_path(); - prefix.push_all_dirs(&["mydb", "wb", "000", "000"]); - prefix.set_file_name("001"); - - let mut expected_location = storage.new_path(); - expected_location.push_all_dirs(&["mydb", "wb", "000", "000"]); - expected_location.set_file_name("001.segment"); - - let result = storage.list_with_delimiter(&prefix).await.unwrap(); - assert!(result.common_prefixes.is_empty()); - assert_eq!(result.objects.len(), 1); - - let object = &result.objects[0]; - - assert_eq!(object.location, expected_location); - - // ==================== check: prefix-list `not_there` (non-existing prefix) - // ==================== - let mut prefix = storage.new_path(); - prefix.push_all_dirs(&["not_there"]); - - let result = storage.list_with_delimiter(&prefix).await.unwrap(); - assert!(result.common_prefixes.is_empty()); - assert!(result.objects.is_empty()); - - // ==================== do: remove all files ==================== - for f in &files { - storage.delete(f).await.unwrap(); - } - - // ==================== check: store is empty ==================== - let content_list = flatten_list_stream(storage, None).await?; - assert!(content_list.is_empty()); - - Ok(()) - } - - /// Parse a str as a `CloudPath` into a `DirAndFileName`, even though the - /// associated storage might not be cloud storage, to reuse the cloud - /// path parsing logic. Then convert into the correct type of path for - /// the given storage. - fn str_to_path< - P: path::ObjectStorePath, - E: std::error::Error + Send + Sync, - R: AsyncRead + Unpin, - >( - storage: &impl ObjectStore, - val: &str, - ) -> P { - let cloud_path = FilePath::raw(val, false); - let parsed: DirsAndFileName = cloud_path.into(); - - let mut new_path = storage.new_path(); - for part in parsed.directories { - new_path.push_dir(part.to_string()); - } - - if let Some(file_name) = parsed.file_name { - new_path.set_file_name(file_name.to_string()); - } - new_path - } - - async fn delete_fixtures< - P: path::ObjectStorePath, - E: std::error::Error + Send + Sync, - R: AsyncRead + Unpin, - >( - storage: &impl ObjectStore, - ) { - let files: Vec<_> = [ - "test_file", - "mydb/wb/000/000/000.segment", - "mydb/wb/000/000/001.segment", - "mydb/wb/000/000/002.segment", - "mydb/wb/001/001/000.segment", - "mydb/wb/foo.json", - "mydb/data/whatevs", - ] - .iter() - .map(|&s| str_to_path(storage, s)) - .collect(); - - for f in &files { - // don't care if it errors, should fail elsewhere - let _ = storage.delete(f).await; - } - } - - // Tests TODO: - // GET nonexisting location (in_memory/file) - // DELETE nonexisting location - // PUT overwriting -} +pub use upstream::{ + local::LocalFileSystem, path::Path, Error as ObjectStoreError, GetResult, ListResult, + ObjectMeta, ObjectStore, +}; diff --git a/components/object_store/src/path/file.rs b/components/object_store/src/path/file.rs deleted file mode 100644 index acdae35f69..0000000000 --- a/components/object_store/src/path/file.rs +++ /dev/null @@ -1,518 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -use std::{ - mem, - path::{is_separator, PathBuf}, -}; - -use crate::path::{parsed::DirsAndFileName, parts::PathPart, ObjectStorePath}; - -/// An object storage location suitable for passing to disk based object -/// storage. -#[derive(Debug, Clone, Default, PartialEq, Eq, Ord, PartialOrd)] -pub struct FilePath { - inner: FilePathRepresentation, -} - -impl ObjectStorePath for FilePath { - fn set_file_name(&mut self, part: impl Into) { - self.inner = mem::take(&mut self.inner).set_file_name(part); - } - - fn push_dir(&mut self, part: impl Into) { - self.inner = mem::take(&mut self.inner).push_dir(part); - } - - fn push_all_dirs<'a>(&mut self, parts: impl AsRef<[&'a str]>) { - self.inner = mem::take(&mut self.inner).push_all_dirs(parts); - } - - fn display(&self) -> String { - self.to_raw().display().to_string() - } -} - -impl FilePath { - /// Creates a file storage location from a `PathBuf` without parsing or - /// allocating unless other methods are called on this instance that - /// need it. - /// - /// The "nature" of path (i.e. if it is a directory or file) will be - /// guessed. So paths ending with a separator (e.g. `/foo/bar/` on - /// Linux) are treated as a directory. However for all other paths (like - /// `/foo/bar` on Linux) it is not clear if a directory or file is meant - /// w/o inspecting the underlying store. To workaround that there is the - /// `assume_directory` flag which will treat ambiguous paths as directories. - /// If set to `false`, these cases will be treated as files. - pub fn raw(path: impl Into, assume_directory: bool) -> Self { - let path = path.into(); - Self { - inner: FilePathRepresentation::Raw(path, assume_directory), - } - } - - /// Creates a filesystem `PathBuf` location by using the standard library's - /// `PathBuf` building implementation appropriate for the current - /// platform. - pub fn to_raw(&self) -> PathBuf { - use FilePathRepresentation::*; - - match &self.inner { - Raw(path, _) => path.to_owned(), - Parsed(dirs_and_file_name) => { - let mut path: PathBuf = dirs_and_file_name - .directories - .iter() - .map(PathPart::encoded) - .collect(); - if let Some(file_name) = &dirs_and_file_name.file_name { - path.push(file_name.encoded()); - } - path - } - } - } - - /// Add the parts of `path` to the end of this path. Notably does - /// *not* behave as `PathBuf::push` does: there is no way to replace the - /// root. If `self` has a file name, that will be removed, then the - /// directories of `path` will be appended, then any file name of `path` - /// will be assigned to `self`. - pub fn push_path(&mut self, path: &Self) { - self.inner = mem::take(&mut self.inner).push_path(path) - } - - /// Add a `PathPart` to the end of the path's directories. - pub fn push_part_as_dir(&mut self, part: &PathPart) { - self.inner = mem::take(&mut self.inner).push_part_as_dir(part); - } - - /// Whether the prefix is the start of this path or not. - pub fn prefix_matches(&self, prefix: &Self) -> bool { - self.inner.prefix_matches(&prefix.inner) - } - - /// Returns all directory and file name `PathParts` in `self` after the - /// specified `prefix`. Ignores any `file_name` part of `prefix`. - /// Returns `None` if `self` dosen't start with `prefix`. - pub fn parts_after_prefix(&self, prefix: &Self) -> Option> { - self.inner.parts_after_prefix(&prefix.inner) - } - - /// Remove this path's file name, if there is one. - pub fn unset_file_name(&mut self) { - self.inner = mem::take(&mut self.inner).unset_file_name(); - } -} - -impl From for DirsAndFileName { - fn from(file_path: FilePath) -> Self { - file_path.inner.into() - } -} - -impl From for FilePath { - fn from(dirs_and_file_name: DirsAndFileName) -> Self { - Self { - inner: FilePathRepresentation::Parsed(dirs_and_file_name), - } - } -} - -#[derive(Debug, Clone, Eq)] -enum FilePathRepresentation { - // raw: native path representation and also remember if we always assume it is a directory - // assume_directory: bool - Raw(PathBuf, bool), - Parsed(DirsAndFileName), -} - -impl Default for FilePathRepresentation { - fn default() -> Self { - Self::Parsed(DirsAndFileName::default()) - } -} - -impl PartialEq for FilePathRepresentation { - fn eq(&self, other: &Self) -> bool { - matches!(self.cmp(other), std::cmp::Ordering::Equal) - } -} -impl PartialOrd for FilePathRepresentation { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for FilePathRepresentation { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - use FilePathRepresentation::*; - match (self, other) { - (Parsed(self_parts), Parsed(other_parts)) => self_parts.cmp(other_parts), - (Parsed(self_parts), _) => { - let other_parts: DirsAndFileName = other.to_owned().into(); - self_parts.cmp(&other_parts) - } - (_, Parsed(other_parts)) => { - let self_parts: DirsAndFileName = self.to_owned().into(); - self_parts.cmp(other_parts) - } - _ => { - let self_parts: DirsAndFileName = self.to_owned().into(); - let other_parts: DirsAndFileName = other.to_owned().into(); - self_parts.cmp(&other_parts) - } - } - } -} - -impl FilePathRepresentation { - fn push_dir(self, part: impl Into) -> Self { - let mut dirs_and_file_name: DirsAndFileName = self.into(); - - dirs_and_file_name.push_dir(part); - Self::Parsed(dirs_and_file_name) - } - - fn push_all_dirs<'a>(self, parts: impl AsRef<[&'a str]>) -> Self { - let mut dirs_and_file_name: DirsAndFileName = self.into(); - - dirs_and_file_name.push_all_dirs(parts); - Self::Parsed(dirs_and_file_name) - } - - fn set_file_name(self, part: impl Into) -> Self { - let mut dirs_and_file_name: DirsAndFileName = self.into(); - - dirs_and_file_name.set_file_name(part); - Self::Parsed(dirs_and_file_name) - } - - fn unset_file_name(self) -> Self { - let mut dirs_and_file_name: DirsAndFileName = self.into(); - - dirs_and_file_name.unset_file_name(); - Self::Parsed(dirs_and_file_name) - } - - /// Add the parts of `path` to the end of this path. Notably does - /// *not* behave as `PathBuf::push` does: there is no way to replace the - /// root. If `self` has a file name, that will be removed, then the - /// directories of `path` will be appended, then any file name of `path` - /// will be assigned to `self`. - fn push_path(self, path: &FilePath) -> Self { - let DirsAndFileName { - directories: path_dirs, - file_name: path_file_name, - } = path.inner.to_owned().into(); - let mut dirs_and_file_name: DirsAndFileName = self.into(); - - dirs_and_file_name.directories.extend(path_dirs); - dirs_and_file_name.file_name = path_file_name; - - Self::Parsed(dirs_and_file_name) - } - - /// Add a `PathPart` to the end of the path's directories. - fn push_part_as_dir(self, part: &PathPart) -> Self { - let mut dirs_and_file_name: DirsAndFileName = self.into(); - - dirs_and_file_name.push_part_as_dir(part); - - Self::Parsed(dirs_and_file_name) - } - - fn prefix_matches(&self, prefix: &Self) -> bool { - use FilePathRepresentation::*; - match (self, prefix) { - (Parsed(self_parts), Parsed(prefix_parts)) => self_parts.prefix_matches(prefix_parts), - (Parsed(self_parts), _) => { - let prefix_parts: DirsAndFileName = prefix.to_owned().into(); - self_parts.prefix_matches(&prefix_parts) - } - (_, Parsed(prefix_parts)) => { - let self_parts: DirsAndFileName = self.to_owned().into(); - self_parts.prefix_matches(prefix_parts) - } - _ => { - let self_parts: DirsAndFileName = self.to_owned().into(); - let prefix_parts: DirsAndFileName = prefix.to_owned().into(); - self_parts.prefix_matches(&prefix_parts) - } - } - } - - /// Returns all directory and file name `PathParts` in `self` after the - /// specified `prefix`. Ignores any `file_name` part of `prefix`. - /// Returns `None` if `self` dosen't start with `prefix`. - fn parts_after_prefix(&self, prefix: &Self) -> Option> { - use FilePathRepresentation::*; - match (self, prefix) { - (Parsed(self_parts), Parsed(prefix_parts)) => { - self_parts.parts_after_prefix(prefix_parts) - } - (Parsed(self_parts), _) => { - let prefix_parts: DirsAndFileName = prefix.to_owned().into(); - self_parts.parts_after_prefix(&prefix_parts) - } - (_, Parsed(prefix_parts)) => { - let self_parts: DirsAndFileName = self.to_owned().into(); - self_parts.parts_after_prefix(prefix_parts) - } - _ => { - let self_parts: DirsAndFileName = self.to_owned().into(); - let prefix_parts: DirsAndFileName = prefix.to_owned().into(); - self_parts.parts_after_prefix(&prefix_parts) - } - } - } -} - -impl From for DirsAndFileName { - fn from(file_path_rep: FilePathRepresentation) -> Self { - use FilePathRepresentation::*; - - match file_path_rep { - Raw(path, assume_directory) => { - let mut parts: Vec = path - .iter() - .flat_map(|s| s.to_os_string().into_string().map(PathPart)) - .collect(); - - if !assume_directory && !parts.is_empty() && !is_directory(&path) { - let file_name = Some(parts.pop().expect("cannot be empty")); - Self { - directories: parts, - file_name, - } - } else { - Self { - directories: parts, - file_name: None, - } - } - } - Parsed(dirs_and_file_name) => dirs_and_file_name, - } - } -} - -/// Checks if the path is for sure a directory (i.e. ends with a separator). -fn is_directory(path: &std::path::Path) -> bool { - if let Some(s) = path.to_str() { - if let Some(c) = s.chars().last() { - return is_separator(c); - } - } - false -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::parsed_path; - - #[test] - fn path_buf_to_dirs_and_file_name_conversion() { - // Last section ending in `.json` is a file name - let path_buf: PathBuf = "/one/two/blah.json".into(); - let file_path = FilePath::raw(path_buf, false); - let parts: DirsAndFileName = file_path.into(); - let mut expected_parts = parsed_path!(["/", "one", "two"], "blah.json"); - expected_parts.directories[0] = PathPart("/".to_string()); // not escaped - assert_eq!(parts, expected_parts); - - // Last section ending in `.segment` is a file name - let path_buf: PathBuf = "/one/two/blah.segment".into(); - let file_path = FilePath::raw(path_buf, false); - let parts: DirsAndFileName = file_path.into(); - let mut expected_parts = parsed_path!(["/", "one", "two"], "blah.segment"); - expected_parts.directories[0] = PathPart("/".to_string()); // not escaped - assert_eq!(parts, expected_parts); - - // Last section ending in `.parquet` is a file name - let path_buf: PathBuf = "/one/two/blah.parquet".into(); - let file_path = FilePath::raw(path_buf, false); - let parts: DirsAndFileName = file_path.into(); - let mut expected_parts = parsed_path!(["/", "one", "two"], "blah.parquet"); - expected_parts.directories[0] = PathPart("/".to_string()); // not escaped - assert_eq!(parts, expected_parts); - - // Last section ending in `.txt` is NOT a file name; we don't recognize that - // extension - let path_buf: PathBuf = "/one/two/blah.txt".into(); - let file_path = FilePath::raw(path_buf, false); - let parts: DirsAndFileName = file_path.into(); - let mut expected_parts = parsed_path!(["/", "one", "two"], "blah.txt"); - expected_parts.directories[0] = PathPart("/".to_string()); // not escaped - assert_eq!(parts, expected_parts); - - // Last section containing a `.` isn't a file name - let path_buf: PathBuf = "/one/two/blah.blah".into(); - let file_path = FilePath::raw(path_buf, false); - let parts: DirsAndFileName = file_path.into(); - let mut expected_parts = parsed_path!(["/", "one", "two"], "blah.blah"); - expected_parts.directories[0] = PathPart("/".to_string()); // not escaped - assert_eq!(parts, expected_parts); - - // Last section starting with a `.` isn't a file name (macos temp dirs do this) - let path_buf: PathBuf = "/one/two/.blah".into(); - let file_path = FilePath::raw(path_buf, false); - let parts: DirsAndFileName = file_path.into(); - let mut expected_parts = parsed_path!(["/", "one", "two"], ".blah"); - expected_parts.directories[0] = PathPart("/".to_string()); // not escaped - assert_eq!(parts, expected_parts); - - let path_buf: PathBuf = "/a/b/d".into(); - let file_path = FilePath::raw(path_buf, false); - let parts: DirsAndFileName = file_path.into(); - let mut expected_parts = parsed_path!(["/", "a", "b"], "d"); - expected_parts.directories[0] = PathPart("/".to_string()); // not escaped - assert_eq!(parts, expected_parts); - - let path_buf: PathBuf = "/a/b/c".into(); - let file_path = FilePath::raw(path_buf, true); - let parts: DirsAndFileName = file_path.into(); - let mut expected_parts = parsed_path!(["/", "a", "b", "c"]); - expected_parts.directories[0] = PathPart("/".to_string()); // not escaped - assert_eq!(parts, expected_parts); - } - - #[test] - fn conversions() { - // dir and file name - let path_buf: PathBuf = "foo/bar/blah.json".into(); - let file_path = FilePath::raw(path_buf, false); - let parts: DirsAndFileName = file_path.into(); - - let expected_parts = parsed_path!(["foo", "bar"], "blah.json"); - assert_eq!(parts, expected_parts); - - // dir, no file name - let path_buf: PathBuf = "foo/bar/".into(); - let file_path = FilePath::raw(path_buf, false); - let parts: DirsAndFileName = file_path.into(); - - let expected_parts = parsed_path!(["foo", "bar"]); - assert_eq!(parts, expected_parts); - - // same but w/o the final marker - let path_buf: PathBuf = "foo/bar".into(); - let file_path = FilePath::raw(path_buf, false); - let parts: DirsAndFileName = file_path.into(); - - let expected_parts = parsed_path!(["foo"], "bar"); - assert_eq!(parts, expected_parts); - - // same but w/o the final marker, but forced to be a directory - let path_buf: PathBuf = "foo/bar".into(); - let file_path = FilePath::raw(path_buf, true); - let parts: DirsAndFileName = file_path.into(); - - let expected_parts = parsed_path!(["foo", "bar"]); - assert_eq!(parts, expected_parts); - - // no dir, file name - let path_buf: PathBuf = "blah.json".into(); - let file_path = FilePath::raw(path_buf, false); - let parts: DirsAndFileName = file_path.into(); - - let expected_parts = parsed_path!([], "blah.json"); - assert_eq!(parts, expected_parts); - - // empty - let path_buf: PathBuf = "".into(); - let file_path = FilePath::raw(path_buf, false); - let parts: DirsAndFileName = file_path.into(); - - let expected_parts = parsed_path!(); - assert_eq!(parts, expected_parts); - - // weird file name - let path_buf: PathBuf = "blah.x".into(); - let file_path = FilePath::raw(path_buf, false); - let parts: DirsAndFileName = file_path.into(); - - let expected_parts = parsed_path!("blah.x"); - assert_eq!(parts, expected_parts); - } - - #[test] - fn equality() { - let path_buf: PathBuf = "foo/bar/blah.json".into(); - let file_path = FilePath::raw(path_buf, false); - let parts: DirsAndFileName = file_path.clone().into(); - let parsed: FilePath = parts.into(); - - assert_eq!(file_path, parsed); - } - - #[test] - fn ordering() { - let a_path_buf: PathBuf = "foo/bar/a.json".into(); - let a_file_path = FilePath::raw(&a_path_buf, false); - let a_parts: DirsAndFileName = a_file_path.into(); - let a_parsed: FilePath = a_parts.into(); - - let b_path_buf: PathBuf = "foo/bar/b.json".into(); - let b_file_path = FilePath::raw(&b_path_buf, false); - - assert!(a_path_buf < b_path_buf); - assert!( - a_parsed < b_file_path, - "a was not less than b: a = {:#?}\nb = {:#?}", - a_parsed, - b_file_path - ); - } - - #[test] - fn path_display() { - let a_path_buf: PathBuf = "foo/bar/a.json".into(); - let expected_display = a_path_buf.display().to_string(); - let a_file_path = FilePath::raw(&a_path_buf, false); - - assert_eq!(a_file_path.display(), expected_display); - - let a_parts: DirsAndFileName = a_file_path.into(); - let a_parsed: FilePath = a_parts.into(); - - assert_eq!(a_parsed.display(), expected_display); - } - - #[test] - fn test_file_path_represent_ord() { - let file1 = FilePathRepresentation::Raw(PathBuf::from("/aa/bb"), false); - let file1_bak = FilePathRepresentation::Raw(PathBuf::from("/aa/bb"), false); - let file2 = FilePathRepresentation::Raw(PathBuf::from("/zz/aa/bb"), false); - - assert!(file1 == file1_bak); - assert!(file1 < file2) - } - - #[test] - fn test_file_path_parts_after_prefix() { - let file = FilePath::raw("/a/b/c", false); - let file2 = FilePath::raw("/a/b", true); - let ret = file.parts_after_prefix(&file2); - assert_eq!(ret, Some(vec![PathPart("c".to_string())])); - - let file = FilePath::raw("/a/b/c", false); - let file2 = FilePath::raw("/a/b", false); - let ret = file.parts_after_prefix(&file2); - assert_eq!( - ret, - Some(vec![PathPart("b".to_string()), PathPart("c".to_string())]) - ); - - let file = FilePath::raw("/a/b/d", false); - let file2 = FilePath::raw("/a/b/c/dd", true); - let ret = file.parts_after_prefix(&file2); - assert_eq!(ret, None); - - let file = FilePath::raw("/a/b/d", true); - let file2 = FilePath::raw("/a/b/c", true); - let ret = file.parts_after_prefix(&file2); - assert_eq!(ret, None); - } -} diff --git a/components/object_store/src/path/mod.rs b/components/object_store/src/path/mod.rs deleted file mode 100644 index e5922d6df8..0000000000 --- a/components/object_store/src/path/mod.rs +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -//! This module contains code for abstracting object locations that work -//! across different backing implementations and platforms. - -pub mod file; -pub mod parsed; -pub mod parts; - -/// The delimiter to separate object namespaces, creating a directory structure. -pub const DELIMITER: &str = "/"; - -/// Universal interface for handling paths and locations for objects and -/// directories in the object store. -/// -/// -/// Deliberately does not implement `Display` or `ToString`! -pub trait ObjectStorePath: - std::fmt::Debug + Clone + PartialEq + Eq + Send + Sync + 'static -{ - /// Set the file name of this path - fn set_file_name(&mut self, part: impl Into); - - /// Add a part to the end of the path's directories, encoding any restricted - /// characters. - fn push_dir(&mut self, part: impl Into); - - /// Push a bunch of parts as directories in one go. - fn push_all_dirs<'a>(&mut self, parts: impl AsRef<[&'a str]>); - - /// Like `std::path::Path::display, converts an `ObjectStorePath` to a - /// `String` suitable for printing; not suitable for sending to - /// APIs. - fn display(&self) -> String; -} diff --git a/components/object_store/src/path/parsed.rs b/components/object_store/src/path/parsed.rs deleted file mode 100644 index 0c9781a9b6..0000000000 --- a/components/object_store/src/path/parsed.rs +++ /dev/null @@ -1,389 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -use itertools::Itertools; - -use crate::path::{parts::PathPart, ObjectStorePath, DELIMITER}; - -/// A path stored as a collection of 0 or more directories and 0 or 1 file name -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Default, Hash)] -pub struct DirsAndFileName { - /// Directory hierarchy. - pub directories: Vec, - - /// Filename, if set. - pub file_name: Option, -} - -impl ObjectStorePath for DirsAndFileName { - fn set_file_name(&mut self, part: impl Into) { - let part = part.into(); - self.file_name = Some((&*part).into()); - } - - fn push_dir(&mut self, part: impl Into) { - let part = part.into(); - self.directories.push((&*part).into()); - } - - fn push_all_dirs<'a>(&mut self, parts: impl AsRef<[&'a str]>) { - self.directories - .extend(parts.as_ref().iter().map(|&v| v.into())); - } - - fn display(&self) -> String { - let mut s = self - .directories - .iter() - .map(PathPart::encoded) - .join(DELIMITER); - - if !s.is_empty() { - s.push_str(DELIMITER); - } - if let Some(file_name) = &self.file_name { - s.push_str(file_name.encoded()); - } - s - } -} - -impl DirsAndFileName { - pub(crate) fn prefix_matches(&self, prefix: &Self) -> bool { - let diff = itertools::diff_with( - self.directories.iter(), - prefix.directories.iter(), - |a, b| a == b, - ); - - use itertools::Diff; - match diff { - None => match (self.file_name.as_ref(), prefix.file_name.as_ref()) { - (Some(self_file), Some(prefix_file)) => { - self_file.encoded().starts_with(prefix_file.encoded()) - } - (Some(_self_file), None) => true, - (None, Some(_prefix_file)) => false, - (None, None) => true, - }, - Some(Diff::Shorter(_, mut remaining_self)) => { - let next_dir = remaining_self - .next() - .expect("must have at least one mismatch to be in this case"); - match prefix.file_name.as_ref() { - Some(prefix_file) => next_dir.encoded().starts_with(prefix_file.encoded()), - None => true, - } - } - Some(Diff::FirstMismatch(_, mut remaining_self, mut remaining_prefix)) => { - let first_prefix = remaining_prefix - .next() - .expect("must have at least one mismatch to be in this case"); - - // There must not be any other remaining parts in the prefix - remaining_prefix.next().is_none() - // and the next item in self must start with the last item in the prefix - && remaining_self - .next() - .expect("must be at least one value") - .encoded() - .starts_with(first_prefix.encoded()) - } - _ => false, - } - } - - /// Returns all directory and file name `PathParts` in `self` after the - /// specified `prefix`. Ignores any `file_name` part of `prefix`. - /// Returns `None` if `self` dosen't start with `prefix`. - pub(crate) fn parts_after_prefix(&self, prefix: &Self) -> Option> { - if self.directories.len() < prefix.directories.len() { - return None; - } - - let mut dirs_iter = self.directories.iter(); - let mut prefix_dirs_iter = prefix.directories.iter(); - - let mut parts = vec![]; - - for dir in &mut dirs_iter { - let pre = prefix_dirs_iter.next(); - - match pre { - None => { - parts.push(dir.to_owned()); - break; - } - Some(p) if p == dir => continue, - Some(_) => return None, - } - } - - parts.extend(dirs_iter.cloned()); - - if let Some(file_name) = &self.file_name { - parts.push(file_name.to_owned()); - } - - Some(parts) - } - - /// Add a `PathPart` to the end of the path's directories. - pub(crate) fn push_part_as_dir(&mut self, part: &PathPart) { - self.directories.push(part.to_owned()); - } - - /// Remove the file name, if any. - pub(crate) fn unset_file_name(&mut self) { - self.file_name = None; - } -} - -/// Short-cut macro to create [`DirsAndFileName`] instances. -/// -/// # Example -/// ``` -/// use object_store::parsed_path; -/// -/// // empty path -/// parsed_path!(); -/// -/// // filename only -/// parsed_path!("test.txt"); -/// -/// // directories only -/// parsed_path!(["path", "to"]); -/// -/// // filename + directories -/// parsed_path!(["path", "to"], "test.txt"); -/// ``` -#[macro_export] -macro_rules! parsed_path { - ([$($dir:expr),*], $file:expr) => { - $crate::path::parsed::DirsAndFileName { - directories: vec![$($crate::path::parts::PathPart::from($dir)),*], - file_name: Some($crate::path::parts::PathPart::from($file)), - } - }; - ([$($dir:expr),*]) => { - $crate::path::parsed::DirsAndFileName { - directories: vec![$($crate::path::parts::PathPart::from($dir)),*], - file_name: None, - } - }; - ($file:expr) => { - parsed_path!([], $file) - }; - () => { - parsed_path!([]) - }; -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn parts_after_prefix_behavior() { - let mut existing_path = DirsAndFileName::default(); - existing_path.push_all_dirs(&["apple", "bear", "cow", "dog"]); - existing_path.file_name = Some("egg.json".into()); - - // Prefix with one directory - let mut prefix = DirsAndFileName::default(); - prefix.push_dir("apple"); - let expected_parts: Vec = vec!["bear", "cow", "dog", "egg.json"] - .into_iter() - .map(Into::into) - .collect(); - let parts = existing_path.parts_after_prefix(&prefix).unwrap(); - assert_eq!(parts, expected_parts); - - // Prefix with two directories - let mut prefix = DirsAndFileName::default(); - prefix.push_all_dirs(&["apple", "bear"]); - let expected_parts: Vec = vec!["cow", "dog", "egg.json"] - .into_iter() - .map(Into::into) - .collect(); - let parts = existing_path.parts_after_prefix(&prefix).unwrap(); - assert_eq!(parts, expected_parts); - - // Not a prefix - let mut prefix = DirsAndFileName::default(); - prefix.push_dir("cow"); - assert!(existing_path.parts_after_prefix(&prefix).is_none()); - - // Prefix with a partial directory - let mut prefix = DirsAndFileName::default(); - prefix.push_dir("ap"); - assert!(existing_path.parts_after_prefix(&prefix).is_none()); - - // Prefix matches but there aren't any parts after it - let mut existing_path = DirsAndFileName::default(); - existing_path.push_all_dirs(&["apple", "bear", "cow", "dog"]); - let prefix = existing_path.clone(); - let parts = existing_path.parts_after_prefix(&prefix).unwrap(); - assert!(parts.is_empty()); - } - - #[test] - fn prefix_matches() { - let mut haystack = DirsAndFileName::default(); - haystack.push_all_dirs(&["foo/bar", "baz%2Ftest", "something"]); - - // self starts with self - assert!( - haystack.prefix_matches(&haystack), - "{:?} should have started with {:?}", - haystack, - haystack - ); - - // a longer prefix doesn't match - let mut needle = haystack.clone(); - needle.push_dir("longer now"); - assert!( - !haystack.prefix_matches(&needle), - "{:?} shouldn't have started with {:?}", - haystack, - needle - ); - - // one dir prefix matches - let mut needle = DirsAndFileName::default(); - needle.push_dir("foo/bar"); - assert!( - haystack.prefix_matches(&needle), - "{:?} should have started with {:?}", - haystack, - needle - ); - - // two dir prefix matches - needle.push_dir("baz%2Ftest"); - assert!( - haystack.prefix_matches(&needle), - "{:?} should have started with {:?}", - haystack, - needle - ); - - // partial dir prefix matches - let mut needle = DirsAndFileName::default(); - needle.push_dir("f"); - assert!( - haystack.prefix_matches(&needle), - "{:?} should have started with {:?}", - haystack, - needle - ); - - // one dir and one partial dir matches - let mut needle = DirsAndFileName::default(); - needle.push_all_dirs(&["foo/bar", "baz"]); - assert!( - haystack.prefix_matches(&needle), - "{:?} should have started with {:?}", - haystack, - needle - ); - } - - #[test] - fn prefix_matches_with_file_name() { - let mut haystack = DirsAndFileName::default(); - haystack.push_all_dirs(&["foo/bar", "baz%2Ftest", "something"]); - - let mut needle = haystack.clone(); - - // All directories match and file name is a prefix - haystack.set_file_name("foo.segment"); - needle.set_file_name("foo"); - - assert!( - haystack.prefix_matches(&needle), - "{:?} should have started with {:?}", - haystack, - needle - ); - - // All directories match but file name is not a prefix - needle.set_file_name("e"); - - assert!( - !haystack.prefix_matches(&needle), - "{:?} should not have started with {:?}", - haystack, - needle - ); - - // Not all directories match; file name is a prefix of the next directory; this - // matches - let mut needle = DirsAndFileName::default(); - needle.push_all_dirs(&["foo/bar", "baz%2Ftest"]); - needle.set_file_name("s"); - - assert!( - haystack.prefix_matches(&needle), - "{:?} should have started with {:?}", - haystack, - needle - ); - - // Not all directories match; file name is NOT a prefix of the next directory; - // no match - needle.set_file_name("p"); - - assert!( - !haystack.prefix_matches(&needle), - "{:?} should not have started with {:?}", - haystack, - needle - ); - } - - #[test] - fn test_macro() { - let actual = parsed_path!(["foo", "bar"], "baz"); - let expected = DirsAndFileName { - directories: vec![PathPart::from("foo"), PathPart::from("bar")], - file_name: Some(PathPart::from("baz")), - }; - assert_eq!(actual, expected); - - let actual = parsed_path!([], "foo"); - let expected = DirsAndFileName { - directories: vec![], - file_name: Some(PathPart::from("foo")), - }; - assert_eq!(actual, expected); - - let actual = parsed_path!("foo"); - let expected = DirsAndFileName { - directories: vec![], - file_name: Some(PathPart::from("foo")), - }; - assert_eq!(actual, expected); - - let actual = parsed_path!(["foo", "bar"]); - let expected = DirsAndFileName { - directories: vec![PathPart::from("foo"), PathPart::from("bar")], - file_name: None, - }; - assert_eq!(actual, expected); - - let actual = parsed_path!([]); - let expected = DirsAndFileName { - directories: vec![], - file_name: None, - }; - assert_eq!(actual, expected); - - let actual = parsed_path!(); - let expected = DirsAndFileName { - directories: vec![], - file_name: None, - }; - assert_eq!(actual, expected); - } -} diff --git a/components/object_store/src/path/parts.rs b/components/object_store/src/path/parts.rs deleted file mode 100644 index b9e69becfb..0000000000 --- a/components/object_store/src/path/parts.rs +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -use percent_encoding::{percent_decode_str, percent_encode, AsciiSet, CONTROLS}; - -use super::DELIMITER; - -// percent_encode's API needs this as a byte -const DELIMITER_BYTE: u8 = DELIMITER.as_bytes()[0]; - -// special encoding of the empty string part. -// Using '%' is the safest character since it will always be used in the -// output of percent_encode no matter how we evolve the INVALID AsciiSet over -// time. -const EMPTY: &str = "%"; - -/// The PathPart type exists to validate the directory/file names that form part -/// of a path. -/// -/// A PathPart instance is guaranteed to be non-empty and to contain no `/` -/// characters as it can only be constructed by going through the `from` impl. -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Default, Hash)] -pub struct PathPart(pub(super) String); - -/// Characters we want to encode. -const INVALID: &AsciiSet = &CONTROLS - // The delimiter we are reserving for internal hierarchy - .add(DELIMITER_BYTE) - // Characters AWS recommends avoiding for object keys - // https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html - .add(b'\\') - .add(b'{') - // TODO: Non-printable ASCII characters (128–255 decimal characters) - .add(b'^') - .add(b'}') - .add(b'%') - .add(b'`') - .add(b']') - .add(b'"') // " <-- my editor is confused about double quotes within single quotes - .add(b'>') - .add(b'[') - .add(b'~') - .add(b'<') - .add(b'#') - .add(b'|') - // Characters Google Cloud Storage recommends avoiding for object names - // https://cloud.google.com/storage/docs/naming-objects - .add(b'\r') - .add(b'\n') - .add(b'*') - .add(b'?'); - -impl From<&str> for PathPart { - fn from(v: &str) -> Self { - match v { - // We don't want to encode `.` generally, but we do want to disallow parts of paths - // to be equal to `.` or `..` to prevent file system traversal shenanigans. - "." => Self(String::from("%2E")), - ".." => Self(String::from("%2E%2E")), - - // Every string except the empty string will be percent encoded. - // The empty string will be transformed into a sentinel value EMPTY - // which can safely be a prefix of an encoded value since it will be - // fully matched at decode time (see impl Display for PathPart). - "" => Self(String::from(EMPTY)), - other => Self(percent_encode(other.as_bytes(), INVALID).to_string()), - } - } -} - -impl std::fmt::Display for PathPart { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self.0[..] { - EMPTY => "".fmt(f), - _ => percent_decode_str(&self.0) - .decode_utf8() - .expect("Valid UTF-8 that came from String") - .fmt(f), - } - } -} - -impl PathPart { - /// Encode as string. - pub fn encoded(&self) -> &str { - &self.0 - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn path_part_delimiter_gets_encoded() { - let part: PathPart = "foo/bar".into(); - assert_eq!(part, PathPart(String::from("foo%2Fbar"))); - } - - #[test] - fn path_part_gets_decoded_for_display() { - let part: PathPart = "foo/bar".into(); - assert_eq!(part.to_string(), "foo/bar"); - } - - #[test] - fn path_part_given_already_encoded_string() { - let part: PathPart = "foo%2Fbar".into(); - assert_eq!(part, PathPart(String::from("foo%252Fbar"))); - assert_eq!(part.to_string(), "foo%2Fbar"); - } - - #[test] - fn path_part_cant_be_one_dot() { - let part: PathPart = ".".into(); - assert_eq!(part, PathPart(String::from("%2E"))); - assert_eq!(part.to_string(), "."); - } - - #[test] - fn path_part_cant_be_two_dots() { - let part: PathPart = "..".into(); - assert_eq!(part, PathPart(String::from("%2E%2E"))); - assert_eq!(part.to_string(), ".."); - } - - #[test] - fn path_part_cant_be_empty() { - let part: PathPart = "".into(); - assert_eq!(part, PathPart(String::from(EMPTY))); - assert_eq!(part.to_string(), ""); - } - - #[test] - fn empty_is_safely_encoded() { - let part: PathPart = EMPTY.into(); - assert_eq!( - part, - PathPart(percent_encode(EMPTY.as_bytes(), INVALID).to_string()) - ); - assert_eq!(part.to_string(), EMPTY); - } -}