Skip to content

Commit

Permalink
adapter: correct initial timestamp determination in transactions
Browse files Browse the repository at this point in the history
In #19026 various timestamp functions were refactored to allow `EXPLAIN
TIMESTAMP` to reflect (and effect) the current transaction. This is
tricky code and the refactor incorrectly changed the behavior of
transactions. In a transaction our intention is to find a timestamp
during the first statement that will be valid for any other object a
user might query later in the transaction, then aquire read holds on all
those referenced objects. In that change, the first statement only used
the referenced IDs when determining a timestamp, not all nearby (objects
in the same schema) IDs. Read holds were still correctly aquired on all
nearby objects.

This could cause the transaction's timestamp to not be in advance of the
since of other nearby objects. For example, take two objects in the
table. At the first query, object A has a since of 10, object B has a
since of 20. A first query in the transaction `SELECT * FROM A` would
choose 10 as its timestamp, then aquire read holds on A and B at 10. A
later query in the transaction `SELECT * FROM B` would choose 10 as its
timestamp (which it fetched from the in-progress transaction state), and
panic because B's since is in advance of 10.

One of the reasons this happened is the transaction metadata only held
on to the TimestampContext. `EXPLAIN TIMESTAMP` needs a larger struct, a
TimestampDetermination. Teach the session metadata to track this outer
struct so EXPLAIN can use it. This allows us to now only call
determine_timestamp once during transaction start, instead of during
each query in a transaction. Refactor some of the other read handling
code to look more like it did pre-19026.
  • Loading branch information
maddyblue committed Jul 5, 2023
1 parent 4b920ea commit bf093bf
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 213 deletions.
6 changes: 4 additions & 2 deletions src/adapter/src/coord/id_bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl Coordinator {
session: &Session,
id_bundle: &CollectionIdBundle,
) -> Vec<String> {
id_bundle
let mut names: Vec<_> = id_bundle
.iter()
// This could filter out an entry that has been replaced in another transaction.
.filter_map(|id| self.catalog().try_get_entry(&id))
Expand All @@ -93,6 +93,8 @@ impl Coordinator {
.resolve_full_name(item.name(), Some(session.conn_id()))
.to_string()
})
.collect()
.collect();
names.sort();
names
}
}
180 changes: 91 additions & 89 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1659,7 +1659,7 @@ impl Coordinator {
});
return;
}
Ok((Some(TransactionOps::Peeks(timestamp_context)), _))
Ok((Some(TransactionOps::Peeks(determination)), _))
if ctx.session().vars().transaction_isolation()
== &IsolationLevel::StrictSerializable =>
{
Expand All @@ -1670,7 +1670,7 @@ impl Coordinator {
response,
action,
},
timestamp_context,
timestamp_context: determination.timestamp_context,
})
.expect("sending to strict_serializable_reads_tx cannot fail");
return;
Expand Down Expand Up @@ -2117,67 +2117,109 @@ impl Coordinator {
when: &QueryWhen,
cluster_id: ClusterId,
timeline_context: TimelineContext,
id_bundle: &CollectionIdBundle,
source_bundle: &CollectionIdBundle,
source_ids: &BTreeSet<GlobalId>,
real_time_recency_ts: Option<Timestamp>,
) -> Result<TimestampDetermination<Timestamp>, AdapterError> {
let determination = self.determine_timestamp(
session,
id_bundle,
when,
cluster_id,
timeline_context.clone(),
real_time_recency_ts,
)?;

// We only track the peeks in the session if the query doesn't use AS
// OF or we're inside an explicit transaction. The latter case is
// necessary to support PG's `BEGIN` semantics, whose behavior can
// depend on whether or not reads have occurred in the txn.
if when.is_transactional() {
session.add_transaction_ops(TransactionOps::Peeks(
determination.timestamp_context.clone(),
))?;
} else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
// If the query uses AS OF, then ignore the timestamp.
session.add_transaction_ops(TransactionOps::Peeks(TimestampContext::NoTimestamp))?;
}

let in_immediate_multi_stmt_txn = session.transaction().is_in_multi_statement_transaction()
&& when == &QueryWhen::Immediately;
let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when);
let timedomain_bundle;

// Fetch or generate a timestamp for this query and what the read holds would be if we need to set
// them.
let (determination, potential_read_holds) =
match session.get_transaction_timestamp_determination() {
// If the session already has a timestamp, use that.
Some(
determination @ TimestampDetermination {
timestamp_context: TimestampContext::TimelineTimestamp(_, _),
..
},
) => (determination, None),
_ => {
let determine_bundle = if in_immediate_multi_stmt_txn {
// In a transaction, determine a timestamp that will be valid for anything in
// any schema referenced by the first query.
timedomain_bundle = self.timedomain_for(
source_ids,
&timeline_context,
session.conn_id(),
cluster_id,
)?;
&timedomain_bundle
} else {
// If not in a transaction, use the source.
source_bundle
};
let determination = self.determine_timestamp(
session,
determine_bundle,
when,
cluster_id,
timeline_context,
real_time_recency_ts,
)?;
// We only need read holds if the read depends on a timestamp. We don't set the
// read holds here because it makes the code a bit more clear to handle the two
// cases for "is this the first statement in a transaction?" in an if/else block
// below.
let read_holds = determination
.timestamp_context
.timestamp()
.map(|timestamp| (timestamp.clone(), determine_bundle));
(determination, read_holds)
}
};

// If we are in a single statement transaction, there is no need to
// acquire read holds to prevent compaction as they will be released
// immediately following the completion of the transaction.
//
// If we're in a multi-statement transaction and the query does not use `AS OF`,
// acquire read holds on any sources in the current time-domain if they have not
// already been acquired. If the query does use `AS OF`, it is not necessary to
// acquire read holds.
if in_immediate_multi_stmt_txn {
self.check_txn_timedomain_conflicts(
session,
&determination.timestamp_context,
id_bundle,
)?;

// If we've already acquired read holds for the txn, we can skip doing so again
if !self.txn_reads.contains_key(session.conn_id()) {
if let Some(timestamp) = determination.timestamp_context.timestamp() {
let timedomain_id_bundle = self.timedomain_for(
source_ids,
&timeline_context,
session.conn_id(),
cluster_id,
)?;

let read_holds =
self.acquire_read_holds(timestamp.clone(), &timedomain_id_bundle);
// Either set the valid read ids for this transaction (if it's the first statement in a
// transaction) otherwise verify the ids referenced in this query are in the timedomain.
if let Some(txn_reads) = self.txn_reads.get(session.conn_id()) {
// Find referenced ids not in the read hold. A reference could be caused by a
// user specifying an object in a different schema than the first query. An
// index could be caused by a CREATE INDEX after the transaction started.
let allowed_id_bundle = txn_reads.id_bundle();
let outside = source_bundle.difference(&allowed_id_bundle);
// Queries without a timestamp and timeline can belong to any existing timedomain.
if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
let valid_names =
self.resolve_collection_id_bundle_names(session, &allowed_id_bundle);
let invalid_names = self.resolve_collection_id_bundle_names(session, &outside);
return Err(AdapterError::RelationOutsideTimeDomain {
relations: invalid_names,
names: valid_names,
});
}
} else {
if let Some((timestamp, bundle)) = potential_read_holds {
let read_holds = self.acquire_read_holds(timestamp, bundle);
self.txn_reads.insert(session.conn_id().clone(), read_holds);
}
}
}

// TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
// arbitrary and we don't recall why we did it (possibly an error!). Change this to always
// set the transaction ops. Decide and document what our policy should be on AS OF queries.
// Maybe they shouldn't be allowed in transactions at all because it's hard to explain
// what's going on there. This should probably get a small design document.

// We only track the peeks in the session if the query doesn't use AS
// OF or we're inside an explicit transaction. The latter case is
// necessary to support PG's `BEGIN` semantics, whose behavior can
// depend on whether or not reads have occurred in the txn.
let mut transaction_determination = determination.clone();
if when.is_transactional() {
session.add_transaction_ops(TransactionOps::Peeks(transaction_determination))?;
} else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
// If the query uses AS OF, then ignore the timestamp.
transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
session.add_transaction_ops(TransactionOps::Peeks(transaction_determination))?;
};

Ok(determination)
}

Expand Down Expand Up @@ -2245,46 +2287,6 @@ impl Coordinator {
})
}

/// Verifies that no timedomain conflicts exist between the transaction's current read
/// holds and the specified sets of incoming collections
fn check_txn_timedomain_conflicts(
&self,
session: &Session,
timestamp_context: &TimestampContext<Timestamp>,
incoming_id_bundle: &CollectionIdBundle,
) -> Result<(), AdapterError> {
// If there are no `txn_reads`, then this must be the first query in the transaction
// and we can skip timedomain validations.
if let Some(txn_reads) = self.txn_reads.get(session.conn_id()) {
// Queries without a timestamp and timeline can belong to any existing timedomain.
if let TimestampContext::TimelineTimestamp(_, _) = timestamp_context {
// Verify that the references and indexes for this query are in the
// current read transaction.
let allowed_id_bundle = txn_reads.id_bundle();
// Find the first reference or index (if any) that is not in the transaction. A
// reference could be caused by a user specifying an object in a different
// schema than the first query. An index could be caused by a CREATE INDEX
// after the transaction started.
let outside = incoming_id_bundle.difference(&allowed_id_bundle);
if !outside.is_empty() {
let mut valid_names =
self.resolve_collection_id_bundle_names(session, &allowed_id_bundle);
let mut invalid_names =
self.resolve_collection_id_bundle_names(session, &outside);
// Sort so error messages are deterministic.
valid_names.sort();
invalid_names.sort();
return Err(AdapterError::RelationOutsideTimeDomain {
relations: invalid_names,
names: valid_names,
});
}
}
}

Ok(())
}

/// Checks to see if the session needs a real time recency timestamp and if so returns
/// a future that will return the timestamp.
fn recent_timestamp(
Expand Down

0 comments on commit bf093bf

Please sign in to comment.