Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: switch to upstream object_store interface #14

Merged
merged 9 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 70 additions & 186 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 8 additions & 11 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::{
stream, SinkExt, TryStreamExt,
};
use log::{error, info};
use object_store::{path::ObjectStorePath, ObjectStore};
use object_store::ObjectStore;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::{predicate::Predicate, table::Result as TableResult};
use tokio::sync::oneshot;
Expand Down Expand Up @@ -526,8 +526,7 @@ where
let (batch_record_sender, batch_record_receiver) =
channel::<Result<RecordBatchWithKey>>(DEFAULT_CHANNEL_SIZE);
let file_id = table_data.alloc_file_id();
let mut sst_file_path = self.space_store.store.new_path();
table_data.set_sst_file_path(file_id, &mut sst_file_path);
let sst_file_path = table_data.set_sst_file_path(file_id);

// TODO: min_key max_key set in sst_builder build
let mut sst_meta = SstMetaData {
Expand Down Expand Up @@ -567,7 +566,7 @@ where
Box::new(e) as _
})
.with_context(|| FailBuildSst {
path: sst_file_path.display(),
path: sst_file_path.to_string(),
})?;

// update sst metadata by built info.
Expand Down Expand Up @@ -645,8 +644,7 @@ where

// Alloc file id for next sst file
let file_id = table_data.alloc_file_id();
let mut sst_file_path = self.space_store.store.new_path();
table_data.set_sst_file_path(file_id, &mut sst_file_path);
let sst_file_path = table_data.set_sst_file_path(file_id);

let sst_builder_options = SstBuilderOptions {
sst_type: table_data.sst_type,
Expand Down Expand Up @@ -679,7 +677,7 @@ where
Box::new(e) as _
})
.with_context(|| FailBuildSst {
path: sst_file_path.display(),
path: sst_file_path.to_string(),
})?;

// update sst metadata by built info.
Expand Down Expand Up @@ -848,8 +846,7 @@ impl<Wal, Meta: Manifest, Store: ObjectStore, Fa: Factory> SpaceStore<Wal, Meta,

// Alloc file id for the merged sst.
let file_id = table_data.alloc_file_id();
let mut sst_file_path = self.store.new_path();
table_data.set_sst_file_path(file_id, &mut sst_file_path);
let sst_file_path = table_data.set_sst_file_path(file_id);

let sst_builder_options = SstBuilderOptions {
sst_type: table_data.sst_type,
Expand All @@ -868,7 +865,7 @@ impl<Wal, Meta: Manifest, Store: ObjectStore, Fa: Factory> SpaceStore<Wal, Meta,
.await
.map_err(|e| Box::new(e) as _)
.with_context(|| FailBuildSst {
path: sst_file_path.display(),
path: sst_file_path.to_string(),
})?;

// update sst metadata by built info.
Expand All @@ -887,7 +884,7 @@ impl<Wal, Meta: Manifest, Store: ObjectStore, Fa: Factory> SpaceStore<Wal, Meta,
table_data.name,
table_data.id,
request_id,
sst_file_path.display(),
sst_file_path.to_string(),
input.files,
sst_meta
);
Expand Down
7 changes: 4 additions & 3 deletions analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod table_options;
pub mod tests;

use meta::details::{ManifestImpl, Options as ManifestOptions};
use object_store::disk::File;
use object_store::LocalFileSystem;
use serde_derive::Deserialize;
use wal::rocks_impl::manager::RocksImpl;

Expand All @@ -30,9 +30,10 @@ use crate::{engine::TableEngineImpl, instance::InstanceRef, sst::factory::Factor

/// Analytic table engine
pub type AnalyticTableEngine =
TableEngineImpl<RocksImpl, ManifestImpl<RocksImpl>, File, FactoryImpl>;
TableEngineImpl<RocksImpl, ManifestImpl<RocksImpl>, LocalFileSystem, FactoryImpl>;
/// Default instance
pub(crate) type EngineInstance = InstanceRef<RocksImpl, ManifestImpl<RocksImpl>, File, FactoryImpl>;
pub(crate) type EngineInstance =
InstanceRef<RocksImpl, ManifestImpl<RocksImpl>, LocalFileSystem, FactoryImpl>;

/// Config of analytic engine.
#[derive(Debug, Clone, Deserialize)]
Expand Down
8 changes: 4 additions & 4 deletions analytic_engine/src/row_iter/record_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use common_types::{
use common_util::define_result;
use futures::stream::{self, Stream, StreamExt};
use log::{error, trace};
use object_store::ObjectStore;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::{
predicate::{filter_record_batch::RecordBatchFilter, Predicate},
Expand Down Expand Up @@ -208,7 +209,7 @@ pub async fn filtered_stream_from_sst_file<Fa, S>(
) -> Result<SequencedRecordBatchStream>
where
Fa: sst::factory::Factory,
S: object_store::ObjectStore,
S: ObjectStore,
{
stream_from_sst_file(
space_id,
Expand All @@ -233,11 +234,10 @@ pub async fn stream_from_sst_file<Fa, S>(
) -> Result<SequencedRecordBatchStream>
where
Fa: sst::factory::Factory,
S: object_store::ObjectStore,
S: ObjectStore,
{
sst_file.read_meter().mark();
let mut path = store.new_path();
sst_util::set_sst_file_path(space_id, table_id, sst_file.id(), &mut path);
let path = sst_util::new_sst_file_path(space_id, table_id, sst_file.id());
let mut sst_reader = sst_factory
.new_sst_reader(sst_reader_options, &path, store)
.with_context(|| SstReaderNotFound {
Expand Down
20 changes: 18 additions & 2 deletions analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::{path::Path, sync::Arc};

use common_util::define_result;
use object_store::disk::File;
use object_store::LocalFileSystem;
use parquet::{
cache::{LruDataCache, LruMetaCache},
DataCacheRef, MetaCacheRef,
Expand Down Expand Up @@ -34,6 +34,17 @@ pub enum Error {

#[snafu(display("Failed to open manifest, err:{}", source))]
OpenManifest { source: crate::meta::details::Error },

#[snafu(display("Failed to open object store, err:{}", source))]
OpenObjectStore {
source: object_store::ObjectStoreError,
},

#[snafu(display("Failed to create dir for {}, err:{}", path, source))]
CreateDir {
path: String,
source: std::io::Error,
},
}

define_result!(Error);
Expand Down Expand Up @@ -87,7 +98,12 @@ async fn open_instance(
};

let sst_path = data_path.join(STORE_DIR_NAME);
let store = File::new(sst_path);
tokio::fs::create_dir_all(&sst_path)
.await
.context(CreateDir {
path: sst_path.to_string_lossy().into_owned(),
})?;
let store = LocalFileSystem::new_with_prefix(sst_path).context(OpenObjectStore)?;
let open_ctx = OpenContext {
config,
runtimes: engine_runtimes,
Expand Down
15 changes: 8 additions & 7 deletions analytic_engine/src/sst/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,16 @@ pub mod error {
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to persist sst content, path:{}, err:{}", path, source))]
Persist {
path: String,
source: Box<dyn std::error::Error + Send + Sync>,
#[snafu(display("Failed to perform storage operation, err:{}", source))]
Storage {
source: object_store::ObjectStoreError,
},

#[snafu(display("Failed to encode meta data, err:{}", source))]
EncodeMetaData {
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display("Failed to get sst file size, path:{}", path))]
GetFileSize { path: String },

#[snafu(display(
"Failed to encode record batch into sst, err:{}.\nBacktrace:\n{}",
source,
Expand All @@ -43,6 +39,11 @@ pub mod error {
PollRecordBatch {
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display("Failed to read data, err:{}", source))]
ReadData {
source: Box<dyn std::error::Error + Send + Sync>,
},
}

define_result!(Error);
Expand Down
10 changes: 5 additions & 5 deletions analytic_engine/src/sst/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{fmt::Debug, sync::Arc};

use common_types::projected_schema::ProjectedSchema;
use common_util::runtime::Runtime;
use object_store::ObjectStore;
use object_store::{ObjectStore, Path};
use parquet::{DataCacheRef, MetaCacheRef};
use table_engine::predicate::PredicateRef;

Expand All @@ -23,14 +23,14 @@ pub trait Factory: Clone {
fn new_sst_reader<'a, S: ObjectStore>(
&self,
options: &SstReaderOptions,
path: &'a S::Path,
path: &'a Path,
storage: &'a S,
) -> Option<Box<dyn SstReader + Send + 'a>>;

fn new_sst_builder<'a, S: ObjectStore>(
&self,
options: &SstBuilderOptions,
path: &'a S::Path,
path: &'a Path,
storage: &'a S,
) -> Option<Box<dyn SstBuilder + Send + 'a>>;
}
Expand Down Expand Up @@ -66,7 +66,7 @@ impl Factory for FactoryImpl {
fn new_sst_reader<'a, S: ObjectStore>(
&self,
options: &SstReaderOptions,
path: &'a S::Path,
path: &'a Path,
storage: &'a S,
) -> Option<Box<dyn SstReader + Send + 'a>> {
match options.sst_type {
Expand All @@ -77,7 +77,7 @@ impl Factory for FactoryImpl {
fn new_sst_builder<'a, S: ObjectStore>(
&self,
options: &SstBuilderOptions,
path: &'a S::Path,
path: &'a Path,
storage: &'a S,
) -> Option<Box<dyn SstBuilder + Send + 'a>> {
match options.sst_type {
Expand Down
10 changes: 4 additions & 6 deletions analytic_engine/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_util::{
runtime::{JoinHandle, Runtime},
};
use log::{debug, error, info};
use object_store::{path::ObjectStorePath, ObjectStore};
use object_store::ObjectStore;
use proto::{common::TimeRange as TimeRangePb, sst::SstMetaData as SstMetaDataPb};
use snafu::{ResultExt, Snafu};
use table_engine::table::TableId;
Expand Down Expand Up @@ -579,24 +579,22 @@ impl FilePurger {
while let Some(request) = receiver.recv().await {
match request {
Request::Purge(purge_request) => {
let mut sst_file_path = store.new_path();
sst_util::set_sst_file_path(
let sst_file_path = sst_util::new_sst_file_path(
purge_request.space_id,
purge_request.table_id,
purge_request.file_id,
&mut sst_file_path,
);

info!(
"File purger delete file, purge_request:{:?}, sst_file_path:{}",
purge_request,
sst_file_path.display()
sst_file_path.to_string()
);

if let Err(e) = store.delete(&sst_file_path).await {
error!(
"File purger failed to delete file, sst_file_path:{}, err:{}",
sst_file_path.display(),
sst_file_path.to_string(),
e
);
}
Expand Down
53 changes: 22 additions & 31 deletions analytic_engine/src/sst/parquet/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use arrow_deps::{
};
use async_trait::async_trait;
use common_types::{bytes::BufMut, request_id::RequestId};
use futures::AsyncRead;
use futures::{AsyncRead, AsyncReadExt};
use log::debug;
use object_store::{path::ObjectStorePath, ObjectStore};
use snafu::{ensure, ResultExt};
use object_store::{ObjectStore, Path};
use snafu::ResultExt;

use crate::sst::{
builder::{RecordBatchStream, SstBuilder, *},
Expand All @@ -38,7 +38,7 @@ use crate::sst::{
#[derive(Debug)]
pub struct ParquetSstBuilder<'a, S: ObjectStore> {
/// The path where the data is persisted.
path: &'a S::Path,
path: &'a Path,
/// The storage where the data is persist.
storage: &'a S,
/// Max row group size.
Expand All @@ -47,7 +47,7 @@ pub struct ParquetSstBuilder<'a, S: ObjectStore> {
}

impl<'a, S: ObjectStore> ParquetSstBuilder<'a, S> {
pub fn new(path: &'a S::Path, storage: &'a S, options: &SstBuilderOptions) -> Self {
pub fn new(path: &'a Path, storage: &'a S, options: &SstBuilderOptions) -> Self {
Self {
path,
storage,
Expand Down Expand Up @@ -377,7 +377,7 @@ impl<'a, S: ObjectStore> SstBuilder for ParquetSstBuilder<'a, S> {
);

let total_row_num = Arc::new(AtomicUsize::new(0));
let reader = RecordBytesReader {
let mut reader = RecordBytesReader {
request_id,
record_stream,
encoding_buffer: EncodingBuffer::default(),
Expand All @@ -391,33 +391,25 @@ impl<'a, S: ObjectStore> SstBuilder for ParquetSstBuilder<'a, S> {
stream_finished: false,
fetched_row_num: 0,
};

self.storage
.put(self.path, reader, None)
// TODO(ruihang): `RecordBytesReader` support stream read. It could be improved
// if the storage supports streaming upload (maltipart upload).
let mut bytes = vec![];
waynexia marked this conversation as resolved.
Show resolved Hide resolved
reader
.read_to_end(&mut bytes)
.await
.map_err(|e| Box::new(e) as _)
.context(Persist {
path: self.path.display(),
})?;
.context(ReadData)?;
waynexia marked this conversation as resolved.
Show resolved Hide resolved
drop(reader);

let result = self
.storage
.list_with_delimiter(self.path)
self.storage
.put(self.path, bytes.into())
.await
.map_err(|e| Box::new(e) as _)
.context(Persist {
path: self.path.display(),
})?;

ensure!(
result.objects.len() == 1,
GetFileSize {
path: self.path.display(),
}
);
.context(Storage)?;

let file_head = self.storage.head(self.path).await.context(Storage)?;

Ok(SstInfo {
file_size: result.objects[0].size,
file_size: file_head.size,
row_num: total_row_num.load(Ordering::Relaxed),
})
}
Expand All @@ -434,7 +426,7 @@ mod tests {
};
use common_util::runtime::{self, Runtime};
use futures::stream;
use object_store::disk::File;
use object_store::LocalFileSystem;
use table_engine::predicate::Predicate;
use tempfile::tempdir;

Expand Down Expand Up @@ -476,9 +468,8 @@ mod tests {

let dir = tempdir().unwrap();
let root = dir.path();
let store = File::new(root);
let mut sst_file_path = store.new_path();
sst_file_path.set_file_name("data.par");
let store = LocalFileSystem::new_with_prefix(root).unwrap();
let sst_file_path = Path::from("data.par");

let schema = build_schema();
let projected_schema = ProjectedSchema::no_projection(schema.clone());
Expand Down
Loading