Skip to content

Commit

Permalink
coord: enable select sys table persistence behind a flag (default off)
Browse files Browse the repository at this point in the history
Rename --persistent_tables to --persistent_user_tables and add
--persistent_system_tables. The latter currently defaults to off and
when on, only opts in the system tables that have self-elected to be
persisted (mz_metrics and mz_metrics_histogram). Once we've stabilized
persistence, we'll default --persistent_system_tables to on, but keep
the flag as a CYA.

Along the way, fix a bug where turning on persisted system tables also
created an unnecessary persistent stream for all system views.

Also improve the persistent stream naming, which would have helped debug
what was going on with this view bug.
  • Loading branch information
danhhz committed Aug 6, 2021
1 parent 02665c1 commit ef8152c
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 32 deletions.
42 changes: 34 additions & 8 deletions src/coord/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ impl Catalog {
);
let oid = catalog.allocate_oid()?;
let persist = if table.persistent {
catalog.persist_details(table.id)?
catalog.persist_details(table.id, &name)?
} else {
None
};
Expand Down Expand Up @@ -781,8 +781,9 @@ impl Catalog {
}

Builtin::View(view) if config.enable_logging || !view.needs_logs => {
let allow_persist = false;
let item = catalog
.parse_item(view.id, view.sql.into(), None)
.parse_item(view.id, &name, view.sql.into(), None, allow_persist)
.unwrap_or_else(|e| {
panic!(
"internal error: failed to load bootstrap view:\n\
Expand Down Expand Up @@ -896,7 +897,7 @@ impl Catalog {
static ref LOGGING_ERROR: Regex =
Regex::new("unknown catalog item 'mz_catalog.[^']*'").unwrap();
}
let item = match c.deserialize_item(id, def) {
let item = match c.deserialize_item(id, &name, def) {
Ok(item) => item,
Err(e) if LOGGING_ERROR.is_match(&e.to_string()) => {
return Err(Error::new(ErrorKind::UnsatisfiableLoggingDependency {
Expand Down Expand Up @@ -1876,23 +1877,44 @@ impl Catalog {
serde_json::to_vec(&item).expect("catalog serialization cannot fail")
}

fn deserialize_item(&self, id: GlobalId, bytes: Vec<u8>) -> Result<CatalogItem, anyhow::Error> {
fn deserialize_item(
&self,
id: GlobalId,
name: &FullName,
bytes: Vec<u8>,
) -> Result<CatalogItem, anyhow::Error> {
let SerializedCatalogItem::V1 {
create_sql,
eval_env: _,
} = serde_json::from_slice(&bytes)?;
self.parse_item(id, create_sql, Some(&PlanContext::zero()))
// TODO: Serialize the persisted stream name and pass it in here instead
// of re-deriving it from scratch. This will allow us to later change
// our persisted stream naming scheme, if necessary.
let allow_persist = true;
self.parse_item(
id,
&name,
create_sql,
Some(&PlanContext::zero()),
allow_persist,
)
}

fn parse_item(
&self,
id: GlobalId,
name: &FullName,
create_sql: String,
pcx: Option<&PlanContext>,
allow_persist: bool,
) -> Result<CatalogItem, anyhow::Error> {
let stmt = sql::parse::parse(&create_sql)?.into_element();
let plan = sql::plan::plan(pcx, &self.for_system_session(), stmt, &Params::empty())?;
let persist = self.persist_details(id)?;
let persist = if allow_persist {
self.persist_details(id, name)?
} else {
None
};
Ok(match plan {
Plan::CreateTable(CreateTablePlan {
table, depends_on, ..
Expand Down Expand Up @@ -2142,8 +2164,12 @@ impl Catalog {
relations.into_iter().collect()
}

pub fn persist_details(&self, id: GlobalId) -> Result<Option<PersistDetails>, PersistError> {
self.persist.details(id)
pub fn persist_details(
&self,
id: GlobalId,
name: &FullName,
) -> Result<Option<PersistDetails>, PersistError> {
self.persist.details(id, &name.to_string())
}

pub fn persist_multi_details(&self) -> Option<&PersistMultiDetails> {
Expand Down
14 changes: 8 additions & 6 deletions src/coord/src/catalog/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,9 +833,10 @@ lazy_static! {
.with_key(vec![0, 1, 2]),
id: GlobalId::System(4043),
index_id: GlobalId::System(4044),
// TODO: Enable this once persistence compaction is moved into its own
// thread.
persistent: false,
// Note that the `system_table_enabled` field of PersistConfig (hooked
// up to --persistent-system-tables) also has to be true for this to be
// persisted.
persistent: true,
};
pub static ref MZ_PROMETHEUS_METRICS: BuiltinTable = BuiltinTable {
name: "mz_metrics_meta",
Expand All @@ -861,9 +862,10 @@ lazy_static! {
.with_key(vec![0, 1, 2]),
id: GlobalId::System(4047),
index_id: GlobalId::System(4048),
// TODO: Enable this once persistence compaction is moved into its own
// thread.
persistent: false,
// Note that the `system_table_enabled` field of PersistConfig (hooked
// up to --persistent-system-tables) also has to be true for this to be
// persisted.
persistent: true,
};
}

Expand Down
2 changes: 1 addition & 1 deletion src/coord/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1657,7 +1657,7 @@ impl Coordinator {
index_depends_on.push(table_id);
let persist = self
.catalog
.persist_details(table_id)
.persist_details(table_id, &name)
.map_err(|err| anyhow!("{}", err))?;
let table = catalog::Table {
create_sql: table.create_sql,
Expand Down
31 changes: 19 additions & 12 deletions src/coord/src/persistcfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ pub struct PersistConfig {
/// A directory under which larger batches of indexed data are stored. This
/// will eventually be S3 for Cloud.
pub blob_path: PathBuf,
/// Whether to persist user tables. This is extremely experimental and
/// Whether to persist all user tables. This is extremely experimental and
/// should not even be tried by users. It's initially here for end-to-end
/// testing.
pub user_table_enabled: bool,
/// Whether to persist certain system tables that have opted in. This is
/// extremely experimental and should not even be tried by users. It's
/// initially here for end-to-end testing.
pub system_table_enabled: bool,
/// Information stored in the "lock" files created by the buffer and blob to
/// ensure that they are exclusive writers to those locations. This should
/// contain whatever information might be useful to investigating an
Expand All @@ -46,6 +50,7 @@ impl PersistConfig {
buffer_path: Default::default(),
blob_path: Default::default(),
user_table_enabled: false,
system_table_enabled: false,
lock_info: Default::default(),
}
}
Expand All @@ -54,7 +59,7 @@ impl PersistConfig {
/// interacting with it. Returns None and does not start the runtime if all
/// persistence features are disabled.
pub fn init(&self) -> Result<PersisterWithConfig, anyhow::Error> {
let persister = if self.user_table_enabled {
let persister = if self.user_table_enabled || self.system_table_enabled {
let buffer = FileBuffer::new(&self.buffer_path, &self.lock_info)
.map_err(|err| anyhow!("{}", err))?;
let blob = FileBlob::new(&self.blob_path, &self.lock_info)
Expand All @@ -78,28 +83,30 @@ pub struct PersisterWithConfig {
}

impl PersisterWithConfig {
fn stream_name(&self, id: GlobalId) -> Option<String> {
fn stream_name(&self, id: GlobalId, pretty: &str) -> Option<String> {
match id {
GlobalId::User(id) if self.config.user_table_enabled => {
// TODO: This needs to be written down somewhere in the catalog in case
// we need to change the naming at some point.
Some(format!("user-table-{:?}", id))
// TODO: This needs to be written down somewhere in the catalog
// in case we need to change the naming at some point. See
// related TODO in Catalog::deserialize_item.
Some(format!("user-table-{:?}-{}", id, pretty))
}
GlobalId::System(id) => {
// TODO: This needs to be written down somewhere in the catalog in case
// we need to change the naming at some point.
Some(format!("system-table-{:?}", id))
GlobalId::System(id) if self.config.system_table_enabled => {
// TODO: This needs to be written down somewhere in the catalog
// in case we need to change the naming at some point. See
// related TODO in Catalog::deserialize_item.
Some(format!("system-table-{:?}-{}", id, pretty))
}
_ => None,
}
}

pub fn details(&self, id: GlobalId) -> Result<Option<PersistDetails>, Error> {
pub fn details(&self, id: GlobalId, pretty: &str) -> Result<Option<PersistDetails>, Error> {
let persister = match self.persister.as_ref() {
Some(x) => x,
None => return Ok(None),
};
let stream_name = match self.stream_name(id) {
let stream_name = match self.stream_name(id, pretty) {
Some(x) => x,
None => return Ok(None),
};
Expand Down
22 changes: 17 additions & 5 deletions src/materialized/src/bin/materialized/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,13 @@ struct Args {
#[structopt(long, hidden = true)]
safe: bool,

/// Enable persistent tables. Has to be used with --experimental.
/// Enable persistent user tables. Has to be used with --experimental.
#[structopt(long, hidden = true)]
persistent_tables: bool,
persistent_user_tables: bool,

/// Enable persistent system tables. Has to be used with --experimental.
#[structopt(long, hidden = true)]
persistent_system_tables: bool,

// === Timely worker configuration. ===
/// Number of dataflow worker threads.
Expand Down Expand Up @@ -585,10 +589,17 @@ swap: {swap_total}KB total, {swap_used}KB used{swap_limit}",

// Configure persistence core.
let persist_config = {
let user_table_enabled = if args.experimental && args.persistent_tables {
let user_table_enabled = if args.experimental && args.persistent_user_tables {
true
} else if args.persistent_user_tables {
bail!("cannot specify --persistent-user-tables without --experimental");
} else {
false
};
let system_table_enabled = if args.experimental && args.persistent_system_tables {
true
} else if args.persistent_tables {
bail!("cannot specify --persistent-tables without --experimental");
} else if args.persistent_system_tables {
bail!("cannot specify --persistent-system-tables without --experimental");
} else {
false
};
Expand All @@ -605,6 +616,7 @@ swap: {swap_total}KB total, {swap_used}KB used{swap_limit}",
buffer_path: data_directory.join("persist").join("buffer"),
blob_path: data_directory.join("persist").join("blob"),
user_table_enabled,
system_table_enabled,
lock_info,
}
};
Expand Down

0 comments on commit ef8152c

Please sign in to comment.