Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove Store type param in Instance #101

Merged
merged 4 commits into from Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 4 additions & 7 deletions analytic_engine/src/compaction/scheduler.rs
Expand Up @@ -20,7 +20,6 @@ use common_util::{
runtime::{JoinHandle, Runtime},
};
use log::{debug, error, info, warn};
use object_store::ObjectStore;
use serde_derive::Deserialize;
use snafu::{ResultExt, Snafu};
use table_engine::table::TableId;
Expand Down Expand Up @@ -224,10 +223,9 @@ impl SchedulerImpl {
pub fn new<
Wal: Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Store: ObjectStore + Send + Sync + 'static,
Fa: Factory + Send + Sync + 'static,
>(
space_store: Arc<SpaceStore<Wal, Meta, Store, Fa>>,
space_store: Arc<SpaceStore<Wal, Meta, Fa>>,
runtime: Arc<Runtime>,
config: SchedulerConfig,
) -> Self {
Expand Down Expand Up @@ -300,10 +298,10 @@ impl OngoingTask {
}
}

struct ScheduleWorker<Wal, Meta, Store, Fa> {
struct ScheduleWorker<Wal, Meta, Fa> {
sender: Sender<ScheduleTask>,
receiver: Receiver<ScheduleTask>,
space_store: Arc<SpaceStore<Wal, Meta, Store, Fa>>,
space_store: Arc<SpaceStore<Wal, Meta, Fa>>,
runtime: Arc<Runtime>,
schedule_interval: Duration,
picker_manager: PickerManager,
Expand All @@ -324,9 +322,8 @@ async fn schedule_table_compaction(sender: Sender<ScheduleTask>, request: TableC
impl<
Wal: Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Store: ObjectStore + Send + Sync + 'static,
Fa: Factory + Send + Sync + 'static,
> ScheduleWorker<Wal, Meta, Store, Fa>
> ScheduleWorker<Wal, Meta, Fa>
{
async fn schedule_loop(&mut self) {
while self.running.load(Ordering::Relaxed) {
Expand Down
28 changes: 10 additions & 18 deletions analytic_engine/src/engine.rs
Expand Up @@ -6,7 +6,6 @@ use std::sync::Arc;

use async_trait::async_trait;
use log::info;
use object_store::ObjectStore;
use snafu::ResultExt;
use table_engine::{
engine::{
Expand All @@ -29,32 +28,28 @@ use crate::{
};

/// TableEngine implementation
pub struct TableEngineImpl<Wal, Meta, Store, Fa> {
pub struct TableEngineImpl<Wal, Meta, Fa> {
/// Instance of the table engine
instance: InstanceRef<Wal, Meta, Store, Fa>,
instance: InstanceRef<Wal, Meta, Fa>,
}

impl<Wal, Meta, Store, Fa> Clone for TableEngineImpl<Wal, Meta, Store, Fa> {
impl<Wal, Meta, Fa> Clone for TableEngineImpl<Wal, Meta, Fa> {
fn clone(&self) -> Self {
Self {
instance: self.instance.clone(),
}
}
}

impl<
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Store: ObjectStore,
Fa,
> TableEngineImpl<Wal, Meta, Store, Fa>
impl<Wal: WalManager + Send + Sync + 'static, Meta: Manifest + Send + Sync + 'static, Fa>
TableEngineImpl<Wal, Meta, Fa>
{
pub fn new(instance: InstanceRef<Wal, Meta, Store, Fa>) -> Self {
pub fn new(instance: InstanceRef<Wal, Meta, Fa>) -> Self {
Self { instance }
}
}

impl<Wal, Meta, Store, Fa> Drop for TableEngineImpl<Wal, Meta, Store, Fa> {
impl<Wal, Meta, Fa> Drop for TableEngineImpl<Wal, Meta, Fa> {
fn drop(&mut self) {
info!("Table engine dropped");
}
Expand All @@ -64,9 +59,8 @@ impl<Wal, Meta, Store, Fa> Drop for TableEngineImpl<Wal, Meta, Store, Fa> {
impl<
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Store: ObjectStore,
Fa: Factory + Send + Sync + 'static,
> TableEngine for TableEngineImpl<Wal, Meta, Store, Fa>
> TableEngine for TableEngineImpl<Wal, Meta, Fa>
{
fn engine_type(&self) -> &str {
ANALYTIC_ENGINE_TYPE
Expand Down Expand Up @@ -170,11 +164,9 @@ impl<
}

/// Reference to instance based on rocksdb wal.
pub(crate) type RocksInstanceRef<Store> =
InstanceRef<RocksImpl, ManifestImpl<RocksImpl>, Store, FactoryImpl>;
pub(crate) type RocksInstanceRef = InstanceRef<RocksImpl, ManifestImpl<RocksImpl>, FactoryImpl>;
/// Reference to instance replicating data by obkv wal.
pub(crate) type ReplicatedInstanceRef<Store> =
InstanceRef<ObkvWal, ManifestImpl<ObkvWal>, Store, FactoryImpl>;
pub(crate) type ReplicatedInstanceRef = InstanceRef<ObkvWal, ManifestImpl<ObkvWal>, FactoryImpl>;

/// Generate the space id from the schema id with assumption schema id is unique
/// globally.
Expand Down
4 changes: 1 addition & 3 deletions analytic_engine/src/instance/alter.rs
Expand Up @@ -5,7 +5,6 @@
use std::{collections::HashMap, sync::Arc};

use log::info;
use object_store::ObjectStore;
use snafu::{ensure, ResultExt};
use table_engine::table::AlterSchemaRequest;
use tokio::sync::oneshot;
Expand All @@ -32,11 +31,10 @@ use crate::{
table_options,
};

impl<Wal, Meta, Store, Fa> Instance<Wal, Meta, Store, Fa>
impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Store: ObjectStore,
Fa: Factory + Send + Sync + 'static,
{
// Alter schema need to be handled by write worker.
Expand Down
4 changes: 1 addition & 3 deletions analytic_engine/src/instance/close.rs
Expand Up @@ -5,7 +5,6 @@
use std::sync::Arc;

use log::{info, warn};
use object_store::ObjectStore;
use snafu::ResultExt;
use table_engine::engine::CloseTableRequest;
use tokio::sync::oneshot;
Expand All @@ -23,11 +22,10 @@ use crate::{
sst::factory::Factory,
};

impl<Wal, Meta, Store, Fa> Instance<Wal, Meta, Store, Fa>
impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Store: ObjectStore,
Fa: Factory + Send + Sync + 'static,
{
/// Close table need to be handled by write worker.
Expand Down
4 changes: 1 addition & 3 deletions analytic_engine/src/instance/create.rs
Expand Up @@ -5,7 +5,6 @@
use std::sync::Arc;

use log::info;
use object_store::ObjectStore;
use snafu::ResultExt;
use table_engine::engine::CreateTableRequest;
use tokio::sync::oneshot;
Expand All @@ -27,11 +26,10 @@ use crate::{
table_options,
};

impl<Wal, Meta, Store, Fa> Instance<Wal, Meta, Store, Fa>
impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Store: ObjectStore,
Fa: Factory + Send + Sync + 'static,
{
/// Create table need to be handled by write worker.
Expand Down
4 changes: 1 addition & 3 deletions analytic_engine/src/instance/drop.rs
Expand Up @@ -5,7 +5,6 @@
use std::sync::Arc;

use log::{info, warn};
use object_store::ObjectStore;
use snafu::ResultExt;
use table_engine::engine::DropTableRequest;
use tokio::sync::oneshot;
Expand All @@ -26,11 +25,10 @@ use crate::{
sst::factory::Factory,
};

impl<Wal, Meta, Store, Fa> Instance<Wal, Meta, Store, Fa>
impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Store: ObjectStore,
Fa: Factory + Send + Sync + 'static,
{
/// Drop a table under given space
Expand Down
4 changes: 1 addition & 3 deletions analytic_engine/src/instance/engine.rs
Expand Up @@ -6,7 +6,6 @@ use std::sync::Arc;

use common_types::schema::Version;
use common_util::define_result;
use object_store::ObjectStore;
use snafu::{Backtrace, OptionExt, Snafu};
use table_engine::{
engine::{CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest},
Expand Down Expand Up @@ -207,11 +206,10 @@ impl From<Error> for table_engine::engine::Error {
}
}

impl<Wal, Meta, Store, Fa> Instance<Wal, Meta, Store, Fa>
impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Store: ObjectStore,
Fa: Factory + Send + Sync + 'static,
{
/// Find space by name, create if the space is not exists
Expand Down
6 changes: 2 additions & 4 deletions analytic_engine/src/instance/flush_compaction.rs
Expand Up @@ -19,7 +19,6 @@ use futures::{
stream, SinkExt, TryStreamExt,
};
use log::{error, info};
use object_store::ObjectStore;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::{predicate::Predicate, table::Result as TableResult};
use tokio::sync::oneshot;
Expand Down Expand Up @@ -182,11 +181,10 @@ pub enum TableFlushPolicy {
Purge,
}

impl<Wal, Meta, Store, Fa> Instance<Wal, Meta, Store, Fa>
impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Store: ObjectStore,
Fa: Factory + Send + Sync + 'static,
{
/// Flush this table.
Expand Down Expand Up @@ -768,7 +766,7 @@ where
}
}

impl<Wal, Meta: Manifest, Store: ObjectStore, Fa: Factory> SpaceStore<Wal, Meta, Store, Fa> {
impl<Wal, Meta: Manifest, Fa: Factory> SpaceStore<Wal, Meta, Fa> {
pub(crate) async fn compact_table(
&self,
runtime: Arc<Runtime>,
Expand Down
28 changes: 13 additions & 15 deletions analytic_engine/src/instance/mod.rs
Expand Up @@ -25,7 +25,7 @@ use std::{
use common_util::{define_result, runtime::Runtime};
use log::info;
use mem_collector::MemUsageCollector;
use object_store::ObjectStore;
use object_store::ObjectStoreRef;
use parquet::{DataCacheRef, MetaCacheRef};
use snafu::{ResultExt, Snafu};
use table_engine::engine::EngineRuntimes;
Expand Down Expand Up @@ -85,29 +85,29 @@ impl Spaces {
}
}

pub struct SpaceStore<Wal, Meta, Store, Fa> {
pub struct SpaceStore<Wal, Meta, Fa> {
/// All spaces of the engine.
spaces: RwLock<Spaces>,
/// Manifest (or meta) stores meta data of the engine instance.
manifest: Meta,
/// Wal of all tables
wal_manager: Wal,
/// Sst storage.
store: Arc<Store>,
store: ObjectStoreRef,
/// Sst factory.
sst_factory: Fa,

meta_cache: Option<MetaCacheRef>,
data_cache: Option<DataCacheRef>,
}

impl<Wal, Meta, Store, Fa> Drop for SpaceStore<Wal, Meta, Store, Fa> {
impl<Wal, Meta, Fa> Drop for SpaceStore<Wal, Meta, Fa> {
fn drop(&mut self) {
info!("SpaceStore dropped");
}
}

impl<Wal, Meta, Store, Fa> SpaceStore<Wal, Meta, Store, Fa> {
impl<Wal, Meta, Fa> SpaceStore<Wal, Meta, Fa> {
async fn close(&self) -> Result<()> {
let spaces = self.spaces.read().unwrap().list_all_spaces();
for space in spaces {
Expand All @@ -119,9 +119,9 @@ impl<Wal, Meta, Store, Fa> SpaceStore<Wal, Meta, Store, Fa> {
}
}

impl<Wal, Meta, Store, Fa> SpaceStore<Wal, Meta, Store, Fa> {
fn store_ref(&self) -> &Store {
&*self.store
impl<Wal, Meta, Fa> SpaceStore<Wal, Meta, Fa> {
fn store_ref(&self) -> &ObjectStoreRef {
&self.store
}

/// List all tables of all spaces
Expand All @@ -142,9 +142,9 @@ impl<Wal, Meta, Store, Fa> SpaceStore<Wal, Meta, Store, Fa> {
///
/// Manages all spaces, also contains needed resources shared across all table
// TODO(yingwen): Track memory usage of all tables (or tables of space)
pub struct Instance<Wal, Meta, Store, Fa> {
pub struct Instance<Wal, Meta, Fa> {
/// Space storage
space_store: Arc<SpaceStore<Wal, Meta, Store, Fa>>,
space_store: Arc<SpaceStore<Wal, Meta, Fa>>,
/// Runtime to execute async tasks.
runtimes: Arc<EngineRuntimes>,
/// Global table options, overwrite mutable options in each table's
Expand All @@ -170,7 +170,7 @@ pub struct Instance<Wal, Meta, Store, Fa> {
pub(crate) replay_batch_size: usize,
}

impl<Wal, Meta, Store, Fa> Instance<Wal, Meta, Store, Fa> {
impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa> {
/// Close the instance gracefully.
pub async fn close(&self) -> Result<()> {
self.file_purger.stop().await.context(StopFilePurger)?;
Expand All @@ -185,9 +185,7 @@ impl<Wal, Meta, Store, Fa> Instance<Wal, Meta, Store, Fa> {
}

// TODO(yingwen): Instance builder
impl<Wal: WalManager + Send + Sync, Meta: Manifest, Store: ObjectStore, Fa>
Instance<Wal, Meta, Store, Fa>
{
impl<Wal: WalManager + Send + Sync, Meta: Manifest, Fa> Instance<Wal, Meta, Fa> {
/// Find space using read lock
fn get_space_by_read_lock(&self, space: SpaceId) -> Option<SpaceRef> {
let spaces = self.space_store.spaces.read().unwrap();
Expand Down Expand Up @@ -224,4 +222,4 @@ impl<Wal: WalManager + Send + Sync, Meta: Manifest, Store: ObjectStore, Fa>
}

/// Instance reference
pub type InstanceRef<Wal, Meta, Store, Fa> = Arc<Instance<Wal, Meta, Store, Fa>>;
pub type InstanceRef<Wal, Meta, Fa> = Arc<Instance<Wal, Meta, Fa>>;
8 changes: 3 additions & 5 deletions analytic_engine/src/instance/open.rs
Expand Up @@ -9,7 +9,7 @@ use std::{

use common_types::schema::IndexInWriterSchema;
use log::{debug, error, info, trace, warn};
use object_store::ObjectStore;
use object_store::ObjectStoreRef;
use snafu::ResultExt;
use table_engine::table::TableId;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -39,22 +39,20 @@ use crate::{
table::data::{TableData, TableDataRef},
};

impl<Wal, Meta, Store, Fa> Instance<Wal, Meta, Store, Fa>
impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Store: ObjectStore,
Fa: Factory + Send + Sync + 'static,
{
/// Open a new instance
pub async fn open(
ctx: OpenContext,
manifest: Meta,
wal_manager: Wal,
store: Store,
store: ObjectStoreRef,
sst_factory: Fa,
) -> Result<Arc<Self>> {
let store = Arc::new(store);
let space_store = Arc::new(SpaceStore {
spaces: RwLock::new(Spaces::default()),
manifest,
Expand Down