Skip to content

Commit

Permalink
feat: Engine::open_region code skeleton (#120)
Browse files Browse the repository at this point in the history
* refactor: Move fields in SharedData to EngineInner

Since `SharedData` isn't shared now, we move all its fields to
EngineInner, and remove the `SharedData` struct, also remove the
unused config field.

* feat: Store RegionSlot in engine's region map

A `RegionSlot` has three possible state:
- Opening
- Creating
- Ready (Holds the `RegionImpl`)

Also use the `RegionSlot` as a placeholder in the region map to indicate
the region is opening/creating, so another open/create request will
fail immediately. The `SlotGuard` is used to clean the slot if we failed
to create/open the region.

* feat: Add a blank method `RegionImpl::open`

* feat: Remove MetadataId from Manifest

Now metadata id of manifest is unused, also unnecessary as we have
manifest dir to build the manifest, but constructing the manifest
still needs a passing region id as argument, which is unavailable
during opening region. So we remove the metadata id from manifest so
`region_store_config()` don't need region id as input anymore

* feat: Remove region id from logstore::Namespace and Wal

This is necessary for implementing open, since we don't have region
id this time, but we need to build Wal and its logstore namespace. Now
this is ok as id is not actually used by logstore.

* feat: Setup `open_region` code skeleton
  • Loading branch information
evenyag committed Jul 29, 2022
1 parent 62cb649 commit f06968f
Show file tree
Hide file tree
Showing 19 changed files with 390 additions and 161 deletions.
13 changes: 2 additions & 11 deletions src/log-store/src/fs/namespace.rs
Expand Up @@ -9,21 +9,19 @@ pub struct LocalNamespace {

impl Default for LocalNamespace {
fn default() -> Self {
LocalNamespace::new("", 0)
LocalNamespace::new("")
}
}

#[derive(Debug)]
struct LocalNamespaceInner {
name: String,
id: u64,
}

impl Namespace for LocalNamespace {
fn new(name: &str, id: u64) -> Self {
fn new(name: &str) -> Self {
let inner = Arc::new(LocalNamespaceInner {
name: name.to_string(),
id,
});
Self { inner }
}
Expand All @@ -32,10 +30,3 @@ impl Namespace for LocalNamespace {
self.inner.name.as_str()
}
}

#[allow(dead_code)]
impl LocalNamespace {
fn id(&self) -> u64 {
self.inner.id
}
}
248 changes: 187 additions & 61 deletions src/storage/src/engine.rs
Expand Up @@ -9,7 +9,7 @@ use store_api::manifest::action::ProtocolAction;
use store_api::{
logstore::LogStore,
manifest::Manifest,
storage::{EngineContext, RegionDescriptor, StorageEngine},
storage::{EngineContext, OpenOptions, RegionDescriptor, StorageEngine},
};

use crate::background::JobPoolImpl;
Expand Down Expand Up @@ -41,8 +41,13 @@ impl<S: LogStore> StorageEngine for EngineImpl<S> {
type Error = Error;
type Region = RegionImpl<S>;

async fn open_region(&self, _ctx: &EngineContext, _name: &str) -> Result<Self::Region> {
unimplemented!()
async fn open_region(
&self,
_ctx: &EngineContext,
name: &str,
opts: &OpenOptions,
) -> Result<Self::Region> {
self.inner.open_region(name, opts).await
}

async fn close_region(&self, _ctx: &EngineContext, _region: Self::Region) -> Result<()> {
Expand Down Expand Up @@ -74,34 +79,19 @@ impl<S: LogStore> EngineImpl<S> {
}
}

/// Engine share data
/// TODO(dennis): merge to EngineInner?
#[derive(Clone, Debug)]
struct SharedData {
pub _config: EngineConfig,
pub object_store: ObjectStore,
}

impl SharedData {
async fn new(config: EngineConfig) -> Result<Self> {
// TODO(dennis): supports other backend
let store_dir = util::normalize_dir(match &config.store_config {
ObjectStoreConfig::File(file) => &file.store_dir,
});
async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
// TODO(dennis): supports other backend
let store_dir = util::normalize_dir(match store_config {
ObjectStoreConfig::File(file) => &file.store_dir,
});

let accessor = Backend::build()
.root(&store_dir)
.finish()
.await
.context(error::InitBackendSnafu { dir: &store_dir })?;
let accessor = Backend::build()
.root(&store_dir)
.finish()
.await
.context(error::InitBackendSnafu { dir: &store_dir })?;

let object_store = ObjectStore::new(accessor);

Ok(Self {
_config: config,
object_store,
})
}
Ok(ObjectStore::new(accessor))
}

#[inline]
Expand All @@ -114,12 +104,108 @@ pub fn region_manifest_dir(region_name: &str) -> String {
format!("{}/manifest/", region_name)
}

type RegionMap<S> = HashMap<String, RegionImpl<S>>;
/// A slot for region in the engine.
///
/// Also used as a placeholder in the region map when the region isn't ready, e.g. during
/// creating/opening.
#[derive(Debug)]
enum RegionSlot<S: LogStore> {
/// The region is during creation.
Creating,
/// The region is during opening.
Opening,
/// The region is ready for access.
Ready(RegionImpl<S>),
// TODO(yingwen): Closing state.
}

impl<S: LogStore> RegionSlot<S> {
/// Try to get a ready region.
fn try_get_ready_region(&self) -> Result<RegionImpl<S>> {
if let RegionSlot::Ready(region) = self {
Ok(region.clone())
} else {
error::InvalidRegionStateSnafu {
state: self.state_name(),
}
.fail()
}
}

/// Returns the ready region or `None`.
fn get_ready_region(&self) -> Option<RegionImpl<S>> {
if let RegionSlot::Ready(region) = self {
Some(region.clone())
} else {
None
}
}

fn state_name(&self) -> &'static str {
match self {
RegionSlot::Creating => "creating",
RegionSlot::Opening => "opening",
RegionSlot::Ready(_) => "ready",
}
}
}

impl<S: LogStore> Clone for RegionSlot<S> {
// Manually implement Clone due to [rust#26925](https://github.com/rust-lang/rust/issues/26925).
// Maybe we should require `LogStore` to be clonable to work around this.
fn clone(&self) -> RegionSlot<S> {
match self {
RegionSlot::Creating => RegionSlot::Creating,
RegionSlot::Opening => RegionSlot::Opening,
RegionSlot::Ready(region) => RegionSlot::Ready(region.clone()),
}
}
}

/// Used to update slot or clean the slot on failure.
struct SlotGuard<'a, S: LogStore> {
name: &'a str,
regions: &'a RwLock<RegionMap<S>>,
skip_clean: bool,
}

impl<'a, S: LogStore> SlotGuard<'a, S> {
fn new(name: &'a str, regions: &'a RwLock<RegionMap<S>>) -> SlotGuard<'a, S> {
SlotGuard {
name,
regions,
skip_clean: false,
}
}

/// Update the slot and skip cleaning on drop.
fn update(&mut self, slot: RegionSlot<S>) {
{
let mut regions = self.regions.write().unwrap();
if let Some(old) = regions.get_mut(self.name) {
*old = slot;
}
}

self.skip_clean = true;
}
}

impl<'a, S: LogStore> Drop for SlotGuard<'a, S> {
fn drop(&mut self) {
if !self.skip_clean {
let mut regions = self.regions.write().unwrap();
regions.remove(self.name);
}
}
}

type RegionMap<S> = HashMap<String, RegionSlot<S>>;

struct EngineInner<S: LogStore> {
object_store: ObjectStore,
log_store: Arc<S>,
regions: RwLock<RegionMap<S>>,
shared: SharedData,
memtable_builder: MemtableBuilderRef,
flush_scheduler: FlushSchedulerRef,
flush_strategy: FlushStrategyRef,
Expand All @@ -129,57 +215,87 @@ impl<S: LogStore> EngineInner<S> {
pub async fn new(config: EngineConfig, log_store: Arc<S>) -> Result<Self> {
let job_pool = Arc::new(JobPoolImpl {});
let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool));
let object_store = new_object_store(&config.store_config).await?;

Ok(Self {
object_store,
log_store,
regions: RwLock::new(Default::default()),
shared: SharedData::new(config).await?,
memtable_builder: Arc::new(DefaultMemtableBuilder {}),
flush_scheduler,
flush_strategy: Arc::new(SizeBasedStrategy::default()),
})
}

async fn create_region(&self, descriptor: RegionDescriptor) -> Result<RegionImpl<S>> {
/// Returns the `Some(slot)` if there is existing slot with given `name`, or insert
/// given `slot` and returns `None`.
fn get_or_occupy_slot(&self, name: &str, slot: RegionSlot<S>) -> Option<RegionSlot<S>> {
{
// Try to get the region under read lock.
let regions = self.regions.read().unwrap();
if let Some(region) = regions.get(&descriptor.name) {
return Ok(region.clone());
if let Some(slot) = regions.get(name) {
return Some(slot.clone());
}
}

// Get the region under write lock.
let mut regions = self.regions.write().unwrap();
if let Some(slot) = regions.get(name) {
return Some(slot.clone());
}

// No slot in map, we can insert the slot now.
regions.insert(name.to_string(), slot);

None
}

async fn open_region(&self, name: &str, opts: &OpenOptions) -> Result<RegionImpl<S>> {
// We can wait until the state of the slot has been changed to ready, but this will
// make the code more complicate, so we just return the error here.
if let Some(slot) = self.get_or_occupy_slot(name, RegionSlot::Opening) {
return slot.try_get_ready_region();
}

let mut guard = SlotGuard::new(name, &self.regions);

// FIXME(yingwen): Get region id or remove dependency of region id.
let store_config = self.region_store_config(name);
let region = RegionImpl::open(name.to_string(), store_config, opts).await?;

guard.update(RegionSlot::Ready(region.clone()));

info!("Storage engine open region {:?}", &region);

Ok(region)
}

async fn create_region(&self, descriptor: RegionDescriptor) -> Result<RegionImpl<S>> {
if let Some(slot) = self.get_or_occupy_slot(&descriptor.name, RegionSlot::Creating) {
return slot.try_get_ready_region();
}

// Now the region in under `Creating` state.
let region_id = descriptor.id;
let region_name = descriptor.name.clone();
let mut guard = SlotGuard::new(&region_name, &self.regions);

let metadata: RegionMetadata =
descriptor
.try_into()
.context(error::InvalidRegionDescSnafu {
region: &region_name,
})?;
let sst_dir = &region_sst_dir(&region_name);
let sst_layer = Arc::new(FsAccessLayer::new(
sst_dir,
self.shared.object_store.clone(),
));
let manifest_dir = region_manifest_dir(&region_name);
let manifest =
RegionManifest::new(region_id, &manifest_dir, self.shared.object_store.clone());

let store_config = StoreConfig {
log_store: self.log_store.clone(),
sst_layer,
manifest: manifest.clone(),
memtable_builder: self.memtable_builder.clone(),
flush_scheduler: self.flush_scheduler.clone(),
flush_strategy: self.flush_strategy.clone(),
};
let store_config = self.region_store_config(&region_name);
let manifest = store_config.manifest.clone();

let region = RegionImpl::new(
region_id,
region_name.clone(),
metadata.clone(),
store_config,
);

// Persist region metadata
manifest
.update(RegionMetaActionList::new(vec![
Expand All @@ -190,22 +306,32 @@ impl<S: LogStore> EngineInner<S> {
]))
.await?;

{
let mut regions = self.regions.write().unwrap();
if let Some(region) = regions.get(&region_name) {
return Ok(region.clone());
}

regions.insert(region_name.clone(), region.clone());
}
guard.update(RegionSlot::Ready(region.clone()));

info!("Storage engine create region {:?}", &region);

Ok(region)
}

fn get_region(&self, name: &str) -> Option<RegionImpl<S>> {
self.regions.read().unwrap().get(name).cloned()
let slot = self.regions.read().unwrap().get(name).cloned()?;
slot.get_ready_region()
}

fn region_store_config(&self, region_name: &str) -> StoreConfig<S> {
let sst_dir = &region_sst_dir(region_name);
let sst_layer = Arc::new(FsAccessLayer::new(sst_dir, self.object_store.clone()));
let manifest_dir = region_manifest_dir(region_name);
let manifest = RegionManifest::new(&manifest_dir, self.object_store.clone());

StoreConfig {
log_store: self.log_store.clone(),
sst_layer,
manifest,
memtable_builder: self.memtable_builder.clone(),
flush_scheduler: self.flush_scheduler.clone(),
flush_strategy: self.flush_strategy.clone(),
}
}
}

Expand Down

0 comments on commit f06968f

Please sign in to comment.