Skip to content

Commit

Permalink
Allow database migration upon adding new database column family to sc…
Browse files Browse the repository at this point in the history
…hema (#1851)

Adds migration capabilities (allows adding new column family to existing
database) to RocksDB instance. This change makes sure RocksDB open is
always requested with the existing set of column families and all new
column families are later created once opened.

- [x] Breaking changes are clearly marked as such in the PR description
and changelog
- [x] New behavior is reflected in tests - Tests introduced by
#1853

### Before requesting review
- [x] I have reviewed the code myself

---------

Co-authored-by: Green Baneling <XgreenX9999@gmail.com>
  • Loading branch information
simrankedia and xgreenx authored Apr 29, 2024
1 parent e7755a6 commit 4165449
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
### Fixed

- [#1856](https://github.com/FuelLabs/fuel-core/pull/1856): Replaced instances of `Union` with `Enum` for GraphQL definitions of `ConsensusParametersVersion` and related types. This is needed because `Union` does not support multiple `Version`s inside discriminants or empty variants.
- [#1851](https://github.com/FuelLabs/fuel-core/pull/1851/): Provided migration capabilities (enabled addition of new column families) to RocksDB instance.

### Changed

Expand Down
80 changes: 46 additions & 34 deletions crates/fuel-core/src/state/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,6 @@ where
}
block_opts.set_bloom_filter(10.0, true);

let cf_descriptors = columns.clone().into_iter().map(|i| {
ColumnFamilyDescriptor::new(
Self::col_name(i.id()),
Self::cf_opts(i, &block_opts),
)
});

let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_compression_type(DBCompressionType::Lz4);
Expand All @@ -227,34 +220,53 @@ where
opts.set_row_cache(&cache);
}

let db = match DB::open_cf_descriptors(&opts, &path, cf_descriptors) {
Err(_) => {
// setup cfs
match DB::open_cf(&opts, &path, &[] as &[&str]) {
Ok(db) => {
for i in columns {
let opts = Self::cf_opts(i, &block_opts);
db.create_cf(Self::col_name(i.id()), &opts)
.map_err(|e| DatabaseError::Other(e.into()))?;
}
Ok(db)
}
Err(err) => {
tracing::error!("Couldn't open the database with an error: {}. \nTrying to repair the database", err);
DB::repair(&opts, &path)
.map_err(|e| DatabaseError::Other(e.into()))?;

let cf_descriptors = columns.clone().into_iter().map(|i| {
ColumnFamilyDescriptor::new(
Self::col_name(i.id()),
Self::cf_opts(i, &block_opts),
)
});
DB::open_cf_descriptors(&opts, &path, cf_descriptors)
}
}
let existing_column_families = match DB::list_cf(&opts, &path) {
Ok(cfs) => cfs,
Err(err) => {
tracing::error!(
"Couldn't get the list of cfs: {}. Returning an empty list",
err
);
vec![]
}
ok => ok,
};

let mut cf_descriptors_to_open = vec![];
let mut cf_descriptors_to_create = vec![];
for column in columns.clone() {
let column_name = Self::col_name(column.id());
if existing_column_families.contains(&column_name) {
cf_descriptors_to_open.push(ColumnFamilyDescriptor::new(
column_name,
Self::cf_opts(column, &block_opts),
));
} else {
let opts = Self::cf_opts(column, &block_opts);
cf_descriptors_to_create.push((column_name, opts));
}
}
let db = match DB::open_cf_descriptors(&opts, &path, cf_descriptors_to_open) {
Ok(db) => {
// Setup cfs
for (name, opt) in cf_descriptors_to_create {
db.create_cf(name, &opt)
.map_err(|e| DatabaseError::Other(e.into()))?;
}
Ok(db)
},
Err(err) => {
tracing::error!("Couldn't open the database with an error: {}. \nTrying to repair the database", err);
DB::repair(&opts, &path)
.map_err(|e| DatabaseError::Other(e.into()))?;

let cf_descriptors = columns.clone().into_iter().map(|i| {
ColumnFamilyDescriptor::new(
Self::col_name(i.id()),
Self::cf_opts(i, &block_opts),
)
});
DB::open_cf_descriptors(&opts, &path, cf_descriptors)
},
}
.map_err(|e| DatabaseError::Other(e.into()))?;
let rocks_db = RocksDb {
Expand Down

0 comments on commit 4165449

Please sign in to comment.