Skip to content

Commit

Permalink
Merge pull request #20475 from mjibson/notice-explain
Browse files Browse the repository at this point in the history
adapter: teach emit_timestamp_notice to explain
  • Loading branch information
maddyblue committed Jul 11, 2023
2 parents e3a877c + 602d8a2 commit dcdb2bd
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 69 deletions.
11 changes: 4 additions & 7 deletions src/adapter/src/coord/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ use tokio::sync::oneshot;
use uuid::Uuid;

use crate::client::ConnectionId;
use crate::coord::id_bundle::CollectionIdBundle;
use crate::coord::timestamp_selection::TimestampContext;
use crate::coord::timestamp_selection::TimestampDetermination;
use crate::util::{send_immediate_rows, ResultExt};
use crate::AdapterError;

Expand Down Expand Up @@ -138,10 +137,9 @@ where
#[derive(Debug)]
pub struct PlannedPeek {
pub plan: PeekPlan,
pub timestamp_context: TimestampContext<mz_repr::Timestamp>,
pub determination: TimestampDetermination<mz_repr::Timestamp>,
pub conn_id: ConnectionId,
pub source_arity: usize,
pub id_bundle: CollectionIdBundle,
pub source_ids: BTreeSet<GlobalId>,
}

Expand Down Expand Up @@ -303,10 +301,9 @@ impl crate::coord::Coordinator {
) -> Result<crate::ExecuteResponse, AdapterError> {
let PlannedPeek {
plan: fast_path,
timestamp_context,
determination,
conn_id,
source_arity,
id_bundle: _,
source_ids,
} = plan;

Expand Down Expand Up @@ -345,7 +342,7 @@ impl crate::coord::Coordinator {
};
}

let timestamp = timestamp_context.timestamp_or_default();
let timestamp = determination.timestamp_context.timestamp_or_default();

// The remaining cases are a peek into a maintained arrangement, or building a dataflow.
// In both cases we will want to peek, and the main difference is that we might want to
Expand Down
107 changes: 57 additions & 50 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2163,23 +2163,23 @@ impl Coordinator {
index_id,
timeline_context,
source_ids,
id_bundle,
&id_bundle,
real_time_recency_ts,
key,
typ,
)?;

let timestamp = peek_plan.timestamp_context.timestamp().cloned();
let determination = peek_plan.determination.clone();

// Implement the peek, and capture the response.
let resp = self
.implement_peek_plan(peek_plan, finishing, cluster_id, target_replica)
.await?;

if session.vars().emit_timestamp_notice() {
if let Some(timestamp) = timestamp {
session.add_notice(AdapterNotice::QueryTimestamp { timestamp });
}
let explanation =
self.explain_timestamp(session, cluster_id, &id_bundle, determination);
session.add_notice(AdapterNotice::QueryTimestamp { explanation });
}

match copy_to {
Expand Down Expand Up @@ -2315,28 +2315,26 @@ impl Coordinator {
index_id: GlobalId,
timeline_context: TimelineContext,
source_ids: BTreeSet<GlobalId>,
id_bundle: CollectionIdBundle,
id_bundle: &CollectionIdBundle,
real_time_recency_ts: Option<Timestamp>,
key: Vec<MirScalarExpr>,
typ: RelationType,
) -> Result<PlannedPeek, AdapterError> {
let conn_id = session.conn_id().clone();
let timestamp_context = self
.sequence_peek_timestamp(
session,
when,
cluster_id,
timeline_context,
&id_bundle,
&source_ids,
real_time_recency_ts,
)?
.timestamp_context;
let determination = self.sequence_peek_timestamp(
session,
when,
cluster_id,
timeline_context,
id_bundle,
&source_ids,
real_time_recency_ts,
)?;

// Now that we have a timestamp, set the as of and resolve calls to mz_now().
dataflow.set_as_of(timestamp_context.antichain());
dataflow.set_as_of(determination.timestamp_context.antichain());
let style = ExprPrepStyle::OneShot {
logical_time: EvalTime::Time(timestamp_context.timestamp_or_default()),
logical_time: EvalTime::Time(determination.timestamp_context.timestamp_or_default()),
session,
};
let state = self.catalog().state();
Expand All @@ -2361,10 +2359,9 @@ impl Coordinator {

Ok(PlannedPeek {
plan: peek_plan,
timestamp_context,
determination,
conn_id,
source_arity: typ.arity(),
id_bundle,
source_ids,
})
}
Expand Down Expand Up @@ -2899,35 +2896,13 @@ impl Coordinator {
Ok((format, source_ids, optimized_plan, cluster.id(), id_bundle))
}

pub(super) fn sequence_explain_timestamp_finish_inner(
&mut self,
session: &mut Session,
format: ExplainFormat,
pub(crate) fn explain_timestamp(
&self,
session: &Session,
cluster_id: ClusterId,
source: OptimizedMirRelationExpr,
id_bundle: CollectionIdBundle,
real_time_recency_ts: Option<Timestamp>,
) -> Result<ExecuteResponse, AdapterError> {
let is_json = match format {
ExplainFormat::Text => false,
ExplainFormat::Json => true,
ExplainFormat::Dot => {
return Err(AdapterError::Unsupported("EXPLAIN TIMESTAMP AS DOT"));
}
};
let source_ids = source.depends_on();
let timeline_context = self.validate_timeline_context(source_ids.clone())?;

let determination = self.sequence_peek_timestamp(
session,
&QueryWhen::Immediately,
cluster_id,
timeline_context,
&id_bundle,
&source_ids,
real_time_recency_ts,
)?;

id_bundle: &CollectionIdBundle,
determination: TimestampDetermination<mz_repr::Timestamp>,
) -> TimestampExplanation<mz_repr::Timestamp> {
let mut sources = Vec::new();
{
for id in id_bundle.storage_ids.iter() {
Expand Down Expand Up @@ -2979,10 +2954,42 @@ impl Coordinator {
}
}
}
let explanation = TimestampExplanation {
TimestampExplanation {
determination,
sources,
}
}

pub(super) fn sequence_explain_timestamp_finish_inner(
&mut self,
session: &mut Session,
format: ExplainFormat,
cluster_id: ClusterId,
source: OptimizedMirRelationExpr,
id_bundle: CollectionIdBundle,
real_time_recency_ts: Option<Timestamp>,
) -> Result<ExecuteResponse, AdapterError> {
let is_json = match format {
ExplainFormat::Text => false,
ExplainFormat::Json => true,
ExplainFormat::Dot => {
return Err(AdapterError::Unsupported("EXPLAIN TIMESTAMP AS DOT"));
}
};
let source_ids = source.depends_on();
let timeline_context = self.validate_timeline_context(source_ids.clone())?;

let determination = self.sequence_peek_timestamp(
session,
&QueryWhen::Immediately,
cluster_id,
timeline_context,
&id_bundle,
&source_ids,
real_time_recency_ts,
)?;
let explanation = self.explain_timestamp(session, cluster_id, &id_bundle, determination);

let s = if is_json {
serde_json::to_string_pretty(&explanation).expect("failed to serialize explanation")
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/coord/timestamp_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,15 +535,15 @@ impl<T: TimestampManipulation> TimestampDetermination<T> {
}

/// Information used when determining the timestamp for a query.
#[derive(Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TimestampExplanation<T> {
/// The chosen timestamp from `determine_timestamp`.
pub determination: TimestampDetermination<T>,
/// Details about each source.
pub sources: Vec<TimestampSource<T>>,
}

#[derive(Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TimestampSource<T> {
pub name: String,
pub read_frontier: Vec<T>,
Expand Down
11 changes: 6 additions & 5 deletions src/adapter/src/notice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ use mz_sql::plan::PlanNotice;
use mz_sql::session::vars::IsolationLevel;
use tokio_postgres::error::SqlState;

use crate::TimestampExplanation;

/// Notices that can occur in the adapter layer.
///
/// These are diagnostic warnings or informational messages that are not
/// severe enough to warrant failing a query entirely.
#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug)]
pub enum AdapterNotice {
DatabaseAlreadyExists {
name: String,
Expand Down Expand Up @@ -67,7 +69,7 @@ pub enum AdapterNotice {
name: String,
},
QueryTimestamp {
timestamp: mz_repr::Timestamp,
explanation: TimestampExplanation<mz_repr::Timestamp>,
},
EqualSubscribeBounds {
bound: mz_repr::Timestamp,
Expand Down Expand Up @@ -114,6 +116,7 @@ impl AdapterNotice {
pub fn detail(&self) -> Option<String> {
match self {
AdapterNotice::PlanNotice(notice) => notice.detail(),
AdapterNotice::QueryTimestamp { explanation } => Some(format!("\n{explanation}")),
_ => None,
}
}
Expand Down Expand Up @@ -241,9 +244,7 @@ impl fmt::Display for AdapterNotice {
AdapterNotice::DroppedActiveCluster { name } => {
write!(f, "active cluster {} has been dropped", name.quoted())
}
AdapterNotice::QueryTimestamp { timestamp } => {
write!(f, "query timestamp: {}", timestamp)
}
AdapterNotice::QueryTimestamp { .. } => write!(f, "EXPLAIN TIMESTAMP for query"),
AdapterNotice::EqualSubscribeBounds { bound } => {
write!(f, "subscribe as of {bound} (inclusive) up to the same bound {bound} (exclusive) is guaranteed to be empty")
}
Expand Down
7 changes: 4 additions & 3 deletions src/environmentd/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2406,14 +2406,14 @@ fn test_emit_timestamp_notice() {
.unwrap();
client.batch_execute("SELECT * FROM t").unwrap();

let timestamp_re = Regex::new("query timestamp: (.*)").unwrap();
let timestamp_re = Regex::new("query timestamp: *([0-9]+)").unwrap();

// Wait until there's a query timestamp notice.
let first_timestamp = Retry::default()
.retry(|_| loop {
match rx.try_next() {
Ok(Some(msg)) => {
if let Some(caps) = timestamp_re.captures(msg.message()) {
if let Some(caps) = timestamp_re.captures(msg.detail().unwrap_or_default()) {
let ts: u64 = caps.get(1).unwrap().as_str().parse().unwrap();
return Ok(mz_repr::Timestamp::from(ts));
}
Expand All @@ -2431,7 +2431,8 @@ fn test_emit_timestamp_notice() {
loop {
match rx.try_next() {
Ok(Some(msg)) => {
if let Some(caps) = timestamp_re.captures(msg.message()) {
if let Some(caps) = timestamp_re.captures(msg.detail().unwrap_or_default())
{
let ts: u64 = caps.get(1).unwrap().as_str().parse().unwrap();
let ts = mz_repr::Timestamp::from(ts);
if ts > first_timestamp {
Expand Down
2 changes: 1 addition & 1 deletion src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ static EMIT_TIMESTAMP_NOTICE: ServerVar<bool> = ServerVar {
name: UncasedStr::new("emit_timestamp_notice"),
value: &false,
description:
"Boolean flag indicating whether to send a NOTICE specifying query timestamps (Materialize).",
"Boolean flag indicating whether to send a NOTICE with timestamp explanations of queries (Materialize).",
internal: false
};

Expand Down
2 changes: 1 addition & 1 deletion test/testdrive/session.td
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ statement_timeout "10 s" "Sets the maximum al
idle_in_transaction_session_timeout "2 min" "Sets the maximum allowed duration that a session can sit idle in a transaction before being terminated. If this value is specified without units, it is taken as milliseconds. A value of zero disables the timeout (PostgreSQL)."
TimeZone UTC "Sets the time zone for displaying and interpreting time stamps (PostgreSQL)."
transaction_isolation "strict serializable" "Sets the current transaction's isolation level (PostgreSQL)."
emit_timestamp_notice off "Boolean flag indicating whether to send a NOTICE specifying query timestamps (Materialize)."
emit_timestamp_notice off "Boolean flag indicating whether to send a NOTICE with timestamp explanations of queries (Materialize)."
emit_trace_id_notice off "Boolean flag indicating whether to send a NOTICE specifying the trace id when available (Materialize)."
auto_route_introspection_queries on "Whether to force queries that depend only on system tables, to run on the mz_introspection cluster (Materialize)."
enable_session_rbac_checks off "User facing session boolean flag indicating whether to apply RBAC checks before executing statements (Materialize)."
Expand Down

0 comments on commit dcdb2bd

Please sign in to comment.